Skip to content

Commit

Permalink
Merge pull request #725 from subspace/verify-pieces
Browse files Browse the repository at this point in the history
Add pieces verification to farmers.
  • Loading branch information
shamil-gadelshin committed Aug 4, 2022
2 parents 0d3d2fa + 7d54abb commit dd02054
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 90 deletions.
47 changes: 45 additions & 2 deletions crates/sc-consensus-subspace-rpc/src/lib.rs
Expand Up @@ -44,15 +44,17 @@ use sp_runtime::traits::Block as BlockT;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{
Solution, MERKLE_NUM_LEAVES, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
Sha256Hash, Solution, MERKLE_NUM_LEAVES, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
};
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_INDEXES_PER_REQUEST,
};

const SOLUTION_TIMEOUT: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -97,6 +99,9 @@ pub trait SubspaceRpcApi {

#[method(name = "subspace_acknowledgeArchivedSegment")]
async fn acknowledge_archived_segment(&self, segment_index: u64) -> RpcResult<()>;

#[method(name = "subspace_recordsRoots")]
async fn records_roots(&self, segment_indexes: Vec<u64>) -> RpcResult<Vec<Option<Sha256Hash>>>;
}

#[derive(Default)]
Expand Down Expand Up @@ -204,7 +209,10 @@ where
let farmer_protocol_info: Result<FarmerProtocolInfo, ApiError> = try {
FarmerProtocolInfo {
genesis_hash,
record_size: RECORD_SIZE,
record_size: NonZeroU32::new(RECORD_SIZE).ok_or_else(|| {
error!("Incorrect record_size constant provided.");
ApiError::Application("Incorrect record_size set".to_string().into())
})?,
recorded_history_segment_size: RECORDED_HISTORY_SEGMENT_SIZE,
// TODO: `max_plot_size` in the protocol must change to bytes as well
max_plot_size: runtime_api.max_plot_size(&best_block_id)? * PIECE_SIZE as u64,
Expand Down Expand Up @@ -518,4 +526,39 @@ where

Ok(())
}

async fn records_roots(&self, segment_indexes: Vec<u64>) -> RpcResult<Vec<Option<Sha256Hash>>> {
if segment_indexes.len() > MAX_SEGMENT_INDEXES_PER_REQUEST {
error!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return Err(JsonRpseeError::Custom(format!(
"segment_indexes length exceed the limit {}",
MAX_SEGMENT_INDEXES_PER_REQUEST
)));
};

let runtime_api = self.client.runtime_api();
let best_block_id = BlockId::Hash(self.client.info().best_hash);

let records_root_result: Result<Vec<_>, JsonRpseeError> = segment_indexes
.into_iter()
.map(|idx| {
runtime_api.records_root(&best_block_id, idx).map_err(|_| {
JsonRpseeError::Custom("Internal error during `records_root` call".to_string())
})
})
.collect();

if let Err(ref err) = records_root_result {
error!(
"Failed to get data from runtime API (records_root): {}",
err
);
}

records_root_result
}
}
Expand Up @@ -13,10 +13,10 @@ use subspace_core_primitives::{
ArchivedBlockProgress, FlatPieces, LastArchivedBlock, PublicKey, RootBlock, Sha256Hash,
PIECE_SIZE,
};
use subspace_farmer::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use subspace_farmer::legacy_multi_plots_farm::{
LegacyMultiPlotsFarm, Options as MultiFarmingOptions,
};
use subspace_farmer::rpc_client::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use subspace_farmer::single_plot_farm::PlotFactoryOptions;
use subspace_farmer::{LegacyObjectMappings, PieceOffset, Plot, PlotFile, RpcClient};
use subspace_networking::Config;
Expand Down
Expand Up @@ -164,7 +164,7 @@ pub(crate) async fn farm_multi_disk(
};
let ws_server_addr = ws_server.local_addr()?;
let rpc_server = RpcServerImpl::new(
record_size,
record_size.get(),
recorded_history_segment_size,
Arc::new(
single_disk_farms
Expand Down Expand Up @@ -343,7 +343,7 @@ pub(crate) async fn farm_legacy(
};
let ws_server_addr = ws_server.local_addr()?;
let rpc_server = RpcServerImpl::new(
record_size,
record_size.get(),
recorded_history_segment_size,
Arc::new(multi_plots_farm.piece_getter()),
Arc::new(vec![]),
Expand Down
42 changes: 24 additions & 18 deletions crates/subspace-farmer/src/dsn.rs
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use futures::{SinkExt, Stream, StreamExt};
use num_traits::{WrappingAdd, WrappingSub};
use std::ops::Range;
Expand All @@ -6,7 +7,7 @@ use subspace_core_primitives::{
};
use subspace_networking::libp2p::core::multihash::{Code, MultihashDigest};
use subspace_networking::{PiecesByRangeRequest, PiecesByRangeResponse, PiecesToPlot};
use tracing::{debug, trace, warn};
use tracing::{debug, error, trace, warn};

#[cfg(test)]
mod pieces_by_range_tests;
Expand Down Expand Up @@ -162,16 +163,23 @@ impl DSNSync for subspace_networking::Node {
}
}

/// Defines actions on receiving the pieces during the sync process.
#[async_trait]
pub trait OnSync {
/// Defines a callback on receiving pieces.
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()>;
}

/// Syncs the closest pieces to the public key from the provided DSN.
pub async fn sync<DSN, OP>(
mut dsn: DSN,
options: SyncOptions,
mut on_pieces: OP,
) -> anyhow::Result<()>
pub async fn sync<DSN, OP>(mut dsn: DSN, options: SyncOptions, on_sync: OP) -> anyhow::Result<()>
where
DSN: DSNSync + Send + Sized,
DSN::Stream: Unpin + Send,
OP: FnMut(FlatPieces, Vec<PieceIndex>) -> anyhow::Result<()> + Send + 'static,
OP: OnSync,
{
let SyncOptions {
max_plot_size,
Expand Down Expand Up @@ -232,11 +240,12 @@ where
for (start, end) in sync_ranges {
let mut stream = dsn.get_pieces(start.into()..end.into()).await?;

while let Some(PiecesToPlot {
piece_indexes,
pieces,
}) = stream.next().await
{
while let Some(pieces_to_plot) = stream.next().await {
let PiecesToPlot {
piece_indexes,
pieces,
} = pieces_to_plot;

// Filter out pieces which are not in our range
let (piece_indexes, pieces) = piece_indexes
.into_iter()
Expand All @@ -256,12 +265,9 @@ where
})
.unzip();

// Writing pieces is usually synchronous, therefore might take some time
on_pieces = tokio::task::spawn_blocking(move || {
on_pieces(pieces, piece_indexes).map(|()| on_pieces)
})
.await
.expect("`on_pieces` must never panic")?;
if let Err(err) = on_sync.on_pieces(pieces, piece_indexes).await {
error!(%err, "DSN sync process returned an error");
}
}
}

Expand Down
67 changes: 38 additions & 29 deletions crates/subspace-farmer/src/dsn/tests.rs
@@ -1,6 +1,7 @@
use super::{sync, DSNSync, NoSync, PieceIndexHashNumber, PiecesToPlot, SyncOptions};
use crate::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use crate::dsn::OnSync;
use crate::legacy_multi_plots_farm::{LegacyMultiPlotsFarm, Options as MultiFarmingOptions};
use crate::rpc_client::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use crate::single_plot_farm::PlotFactoryOptions;
use crate::{LegacyObjectMappings, Plot};
use futures::channel::{mpsc, oneshot};
Expand All @@ -18,8 +19,32 @@ use subspace_core_primitives::{
PieceIndex, PieceIndexHash, RootBlock, Sha256Hash, PIECE_SIZE, U256,
};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_rpc_primitives::FarmerProtocolInfo;
use tempfile::TempDir;

struct TestPlotter {
pub(crate) result: Arc<Mutex<BTreeMap<PieceIndexHash, (Piece, PieceIndex)>>>,
}

#[async_trait::async_trait]
impl OnSync for TestPlotter {
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()> {
let mut result = self.result.lock();
result.extend(pieces.as_pieces().zip(piece_indices).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct TestDSN(BTreeMap<PieceIndexHash, (Piece, PieceIndex)>);

Expand Down Expand Up @@ -80,19 +105,8 @@ async fn simple_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 256,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand All @@ -117,18 +131,8 @@ async fn no_sync_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 0,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));
Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand Down Expand Up @@ -359,10 +363,15 @@ async fn test_dsn_sync() {

let range_size = PieceIndexHashNumber::MAX / seeder_max_piece_count * pieces_per_request;
let plot = syncer_multi_farming.single_plot_farms()[0].plot().clone();
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync(
syncer_max_plot_size,
seeder_max_piece_count,
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync::<BenchRpcClient>(
rpc_client.clone(),
FarmerProtocolInfo {
max_plot_size: syncer_max_plot_size,
total_pieces: seeder_max_piece_count,
..farmer_protocol_info
},
range_size,
false, // don't verify pieces
);
let public_key =
U256::from_be_bytes((*syncer_multi_farming.single_plot_farms()[0].public_key()).into());
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/farming/tests.rs
@@ -1,8 +1,8 @@
use crate::commitments::{CommitmentStatusChange, Commitments};
use crate::farming::Farming;
use crate::identity::Identity;
use crate::mock_rpc_client::MockRpcClient;
use crate::plot::Plot;
use crate::rpc_client::mock_rpc_client::MockRpcClient;
use crate::single_disk_farm::SingleDiskSemaphore;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
Expand Down
2 changes: 2 additions & 0 deletions crates/subspace-farmer/src/legacy_multi_plots_farm.rs
Expand Up @@ -78,6 +78,7 @@ impl LegacyMultiPlotsFarm {
let single_disk_semaphore =
SingleDiskSemaphore::new(NonZeroU16::try_from(16).expect("Non zero; qed"));

let verification_client = archiving_client.clone();
let single_plot_farms = tokio::task::spawn_blocking(move || {
let handle = Handle::current();
plot_sizes
Expand Down Expand Up @@ -112,6 +113,7 @@ impl LegacyMultiPlotsFarm {
enable_dsn_archiving,
enable_dsn_sync,
relay_server_node: relay_server_node.clone(),
verification_client: verification_client.clone(),
})
})
.collect::<anyhow::Result<Vec<_>>>()
Expand Down
9 changes: 3 additions & 6 deletions crates/subspace-farmer/src/lib.rs
@@ -1,4 +1,5 @@
#![feature(
const_option,
hash_drain_filter,
int_log,
io_error_other,
Expand Down Expand Up @@ -29,19 +30,15 @@
//! 64-bit unsigned integers.

pub(crate) mod archiving;
pub mod bench_rpc_client;
pub(crate) mod commitments;
pub(crate) mod dsn;
pub(crate) mod farming;
mod file_ext;
pub(crate) mod identity;
pub mod legacy_multi_plots_farm;
#[cfg(test)]
mod mock_rpc_client;
pub(crate) mod node_rpc_client;
pub(crate) mod object_mappings;
pub(crate) mod plot;
pub(crate) mod rpc_client;
pub mod rpc_client;
pub mod single_disk_farm;
pub mod single_plot_farm;
mod utils;
Expand All @@ -52,9 +49,9 @@ pub use commitments::{CommitmentError, Commitments};
pub use farming::{Farming, FarmingError};
pub use identity::Identity;
pub use jsonrpsee;
pub use node_rpc_client::NodeRpcClient;
pub use object_mappings::{
LegacyObjectMappingError, LegacyObjectMappings, ObjectMappingError, ObjectMappings,
};
pub use plot::{PieceOffset, Plot, PlotError, PlotFile};
pub use rpc_client::node_rpc_client::NodeRpcClient;
pub use rpc_client::{Error as RpcClientError, RpcClient};
12 changes: 12 additions & 0 deletions crates/subspace-farmer/src/rpc_client.rs
@@ -1,7 +1,13 @@
pub mod bench_rpc_client;
#[cfg(test)]
pub mod mock_rpc_client;
pub(crate) mod node_rpc_client;

use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -44,4 +50,10 @@ pub trait RpcClient: Clone + Send + Sync + 'static {

/// Acknowledge receiving of archived segments
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), Error>;

/// Get records roots for the segments
async fn records_roots(
&self,
segment_indexes: Vec<u64>,
) -> Result<Vec<Option<Sha256Hash>>, Error>;
}

0 comments on commit dd02054

Please sign in to comment.