Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pieces verification to farmers. #725

Merged
merged 19 commits into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
926372a
subspace-rpc: Introduce `records_root` endpoint.
shamil-gadelshin Jul 27, 2022
1ea616b
farmer: Introduce pieces_verification module.
shamil-gadelshin Jul 27, 2022
7743d25
farmer: Implement rpc_client method.
shamil-gadelshin Jul 27, 2022
673fb47
farmer: Add pieces verification to DSN sync.
shamil-gadelshin Jul 27, 2022
3d901b5
farmer: Fix review comments.
shamil-gadelshin Jul 27, 2022
ef4f9cf
farmer: Refactor `verify_pieces_at_blockchain` method.
shamil-gadelshin Jul 27, 2022
22011d8
subspace-rpc: Add records_roots request batching.
shamil-gadelshin Jul 28, 2022
79b59b2
Merge branch 'main' into verify-pieces
shamil-gadelshin Jul 28, 2022
8ec51dc
farmer: Fix farmer side of the `records_root` call.
shamil-gadelshin Jul 28, 2022
fb1687f
farmer: Fix review comments (tracing errors).
shamil-gadelshin Jul 28, 2022
a34268a
farmer: Fix minor review comments.
shamil-gadelshin Jul 28, 2022
7ead01d
Modify FarmerProtocolInfo structure.
shamil-gadelshin Jul 29, 2022
25982e1
Add a limit for a RPC records_roots call.
shamil-gadelshin Jul 29, 2022
e4ae0ce
farmer: Fix pieces verifcation formula.
shamil-gadelshin Jul 29, 2022
7d1cc50
farmer: Move rpc_client_* files to a new folder.
shamil-gadelshin Jul 29, 2022
c866ea7
Merge branch 'main' into verify-pieces
shamil-gadelshin Jul 29, 2022
790f371
Fix review comments.
shamil-gadelshin Aug 1, 2022
8538361
Merge branch 'main' into verify-pieces
shamil-gadelshin Aug 1, 2022
7d54abb
Merge branch 'main' into verify-pieces
shamil-gadelshin Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 27 additions & 1 deletion crates/sc-consensus-subspace-rpc/src/lib.rs
Expand Up @@ -49,7 +49,7 @@ use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{
Solution, MERKLE_NUM_LEAVES, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
Sha256Hash, Solution, MERKLE_NUM_LEAVES, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
};
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
Expand Down Expand Up @@ -97,6 +97,9 @@ pub trait SubspaceRpcApi {

#[method(name = "subspace_acknowledgeArchivedSegment")]
async fn acknowledge_archived_segment(&self, segment_index: u64) -> RpcResult<()>;

#[method(name = "subspace_recordsRoots")]
async fn records_roots(&self, segment_indexes: Vec<u64>) -> RpcResult<Vec<Option<Sha256Hash>>>;
}

#[derive(Default)]
Expand Down Expand Up @@ -518,4 +521,27 @@ where

Ok(())
}

async fn records_roots(&self, segment_indexes: Vec<u64>) -> RpcResult<Vec<Option<Sha256Hash>>> {
let runtime_api = self.client.runtime_api();
let best_block_id = BlockId::Hash(self.client.info().best_hash);

let records_root_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
.into_iter()
.map(|idx| {
runtime_api.records_root(&best_block_id, idx).map_err(|_| {
JsonRpseeError::Custom("Internal error during `records_root` call".to_string())
})
})
.collect();

if let Err(ref err) = records_root_result {
error!(
"Failed to get data from runtime API (records_root): {}",
err
i1i1 marked this conversation as resolved.
Show resolved Hide resolved
);
}

records_root_result
}
}
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/bench_rpc_client.rs
Expand Up @@ -6,6 +6,7 @@ use futures::{stream, SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -137,4 +138,8 @@ impl RpcClient for BenchRpcClient {
.await?;
Ok(())
}

async fn records_roots(&self, _: Vec<u64>) -> Result<Vec<Option<Sha256Hash>>, Error> {
Ok(Default::default())
}
}
42 changes: 24 additions & 18 deletions crates/subspace-farmer/src/dsn.rs
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use futures::{SinkExt, Stream, StreamExt};
use num_traits::{WrappingAdd, WrappingSub};
use std::ops::Range;
Expand All @@ -6,7 +7,7 @@ use subspace_core_primitives::{
};
use subspace_networking::libp2p::core::multihash::{Code, MultihashDigest};
use subspace_networking::{PiecesByRangeRequest, PiecesByRangeResponse, PiecesToPlot};
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};

#[cfg(test)]
mod pieces_by_range_tests;
Expand Down Expand Up @@ -162,16 +163,23 @@ impl DSNSync for subspace_networking::Node {
}
}

/// Defines actions on receiving the pieces during the sync process.
#[async_trait]
pub trait OnSync {
/// Defines a callback on receiving pieces.
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()>;
}

/// Syncs the closest pieces to the public key from the provided DSN.
pub async fn sync<DSN, OP>(
mut dsn: DSN,
options: SyncOptions,
mut on_pieces: OP,
) -> anyhow::Result<()>
pub async fn sync<DSN, OP>(mut dsn: DSN, options: SyncOptions, on_sync: OP) -> anyhow::Result<()>
where
DSN: DSNSync + Send + Sized,
DSN::Stream: Unpin + Send,
OP: FnMut(FlatPieces, Vec<PieceIndex>) -> anyhow::Result<()> + Send + 'static,
OP: OnSync,
{
let SyncOptions {
max_plot_size,
Expand Down Expand Up @@ -232,11 +240,12 @@ where
for (start, end) in sync_ranges {
let mut stream = dsn.get_pieces(start.into()..end.into()).await?;

while let Some(PiecesToPlot {
piece_indexes,
pieces,
}) = stream.next().await
{
while let Some(pieces_to_plot) = stream.next().await {
let PiecesToPlot {
piece_indexes,
pieces,
} = pieces_to_plot;

// Filter out pieces which are not in our range
let (piece_indexes, pieces) = piece_indexes
.into_iter()
Expand All @@ -256,12 +265,9 @@ where
})
.unzip();

// Writing pieces is usually synchronous, therefore might take some time
on_pieces = tokio::task::spawn_blocking(move || {
on_pieces(pieces, piece_indexes).map(|()| on_pieces)
})
.await
.expect("`on_pieces` must never panic")?;
if let Err(err) = on_sync.on_pieces(pieces, piece_indexes).await {
error!("DSN sync process returned an error: {}", err);
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
70 changes: 41 additions & 29 deletions crates/subspace-farmer/src/dsn/tests.rs
@@ -1,5 +1,6 @@
use super::{sync, DSNSync, NoSync, PieceIndexHashNumber, PiecesToPlot, SyncOptions};
use crate::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use crate::dsn::OnSync;
use crate::legacy_multi_plots_farm::{LegacyMultiPlotsFarm, Options as MultiFarmingOptions};
use crate::single_plot_farm::PlotFactoryOptions;
use crate::{LegacyObjectMappings, Plot};
Expand All @@ -15,11 +16,36 @@ use std::time::Duration;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{
bidirectional_distance, ArchivedBlockProgress, FlatPieces, LastArchivedBlock, Piece,
PieceIndex, PieceIndexHash, RootBlock, Sha256Hash, PIECE_SIZE, U256,
PieceIndex, PieceIndexHash, RootBlock, Sha256Hash, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE,
RECORD_SIZE, U256,
};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_rpc_primitives::FarmerProtocolInfo;
use tempfile::TempDir;

struct TestPlotter {
pub(crate) result: Arc<Mutex<BTreeMap<PieceIndexHash, (Piece, PieceIndex)>>>,
}

#[async_trait::async_trait]
impl OnSync for TestPlotter {
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()> {
let mut result = self.result.lock();
result.extend(pieces.as_pieces().zip(piece_indices).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct TestDSN(BTreeMap<PieceIndexHash, (Piece, PieceIndex)>);

Expand Down Expand Up @@ -80,19 +106,8 @@ async fn simple_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 256,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand All @@ -117,18 +132,8 @@ async fn no_sync_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 0,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));
Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand Down Expand Up @@ -359,10 +364,17 @@ async fn test_dsn_sync() {

let range_size = PieceIndexHashNumber::MAX / seeder_max_piece_count * pieces_per_request;
let plot = syncer_multi_farming.single_plot_farms()[0].plot().clone();
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync(
syncer_max_plot_size,
seeder_max_piece_count,
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync::<BenchRpcClient>(
rpc_client.clone(),
FarmerProtocolInfo {
max_plot_size: syncer_max_plot_size,
total_pieces: seeder_max_piece_count,
genesis_hash: Default::default(), // We don't use this field.
record_size: RECORD_SIZE,
recorded_history_segment_size: RECORDED_HISTORY_SEGMENT_SIZE,
},
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
range_size,
false, // don't verify pieces
);
let public_key =
U256::from_be_bytes((*syncer_multi_farming.single_plot_farms()[0].public_key()).into());
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-farmer/src/legacy_multi_plots_farm.rs
Expand Up @@ -78,6 +78,7 @@ impl LegacyMultiPlotsFarm {
let single_disk_semaphore =
SingleDiskSemaphore::new(NonZeroU16::try_from(16).expect("Non zero; qed"));

let verification_client = archiving_client.clone();
let single_plot_farms = tokio::task::spawn_blocking(move || {
let handle = Handle::current();
plot_sizes
Expand Down Expand Up @@ -112,6 +113,7 @@ impl LegacyMultiPlotsFarm {
enable_dsn_archiving,
enable_dsn_sync,
relay_server_node: relay_server_node.clone(),
verification_client: verification_client.clone(),
})
})
.collect::<anyhow::Result<Vec<_>>>()
Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/mock_rpc_client.rs
Expand Up @@ -5,6 +5,7 @@ use futures::{SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -232,4 +233,8 @@ impl RpcClient for MockRpcClient {
.unwrap();
Ok(())
}

async fn records_roots(&self, _: Vec<u64>) -> Result<Vec<Option<Sha256Hash>>, MockError> {
Ok(Default::default())
}
}
11 changes: 11 additions & 0 deletions crates/subspace-farmer/src/node_rpc_client.rs
Expand Up @@ -8,6 +8,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -122,4 +123,14 @@ impl RpcClient for NodeRpcClient {
)
.await?)
}

async fn records_roots(
&self,
segment_indexes: Vec<u64>,
) -> Result<Vec<Option<Sha256Hash>>, RpcError> {
Ok(self
.client
.request("subspace_recordsRoots", rpc_params![&segment_indexes])
.await?)
}
}
7 changes: 7 additions & 0 deletions crates/subspace-farmer/src/rpc_client.rs
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -44,4 +45,10 @@ pub trait RpcClient: Clone + Send + Sync + 'static {

/// Acknowledge receiving of archived segments
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), Error>;

/// Get records roots for the segments
async fn records_roots(
&self,
segment_indexes: Vec<u64>,
) -> Result<Vec<Option<Sha256Hash>>, Error>;
}
3 changes: 2 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm.rs
Expand Up @@ -313,7 +313,7 @@ impl SingleDiskFarm {
};

let single_disk_semaphore = SingleDiskSemaphore::new(disk_concurrency);

let verification_client = archiving_client.clone();
let single_plot_farms = tokio::task::spawn_blocking(move || {
let handle = Handle::current();
single_disk_farm_info
Expand Down Expand Up @@ -350,6 +350,7 @@ impl SingleDiskFarm {
enable_dsn_archiving,
enable_dsn_sync,
relay_server_node: relay_server_node.clone(),
verification_client: verification_client.clone(),
})
})
.collect::<anyhow::Result<Vec<_>>>()
Expand Down