Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pieces verification to farmers. #725

Merged
merged 19 commits into from Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
926372a
subspace-rpc: Introduce `records_root` endpoint.
shamil-gadelshin Jul 27, 2022
1ea616b
farmer: Introduce pieces_verification module.
shamil-gadelshin Jul 27, 2022
7743d25
farmer: Implement rpc_client method.
shamil-gadelshin Jul 27, 2022
673fb47
farmer: Add pieces verification to DSN sync.
shamil-gadelshin Jul 27, 2022
3d901b5
farmer: Fix review comments.
shamil-gadelshin Jul 27, 2022
ef4f9cf
farmer: Refactor `verify_pieces_at_blockchain` method.
shamil-gadelshin Jul 27, 2022
22011d8
subspace-rpc: Add records_roots request batching.
shamil-gadelshin Jul 28, 2022
79b59b2
Merge branch 'main' into verify-pieces
shamil-gadelshin Jul 28, 2022
8ec51dc
farmer: Fix farmer side of the `records_root` call.
shamil-gadelshin Jul 28, 2022
fb1687f
farmer: Fix review comments (tracing errors).
shamil-gadelshin Jul 28, 2022
a34268a
farmer: Fix minor review comments.
shamil-gadelshin Jul 28, 2022
7ead01d
Modify FarmerProtocolInfo structure.
shamil-gadelshin Jul 29, 2022
25982e1
Add a limit for a RPC records_roots call.
shamil-gadelshin Jul 29, 2022
e4ae0ce
farmer: Fix pieces verifcation formula.
shamil-gadelshin Jul 29, 2022
7d1cc50
farmer: Move rpc_client_* files to a new folder.
shamil-gadelshin Jul 29, 2022
c866ea7
Merge branch 'main' into verify-pieces
shamil-gadelshin Jul 29, 2022
790f371
Fix review comments.
shamil-gadelshin Aug 1, 2022
8538361
Merge branch 'main' into verify-pieces
shamil-gadelshin Aug 1, 2022
7d54abb
Merge branch 'main' into verify-pieces
shamil-gadelshin Aug 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 21 additions & 1 deletion crates/sc-consensus-subspace-rpc/src/lib.rs
Expand Up @@ -48,7 +48,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::{Solution, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE};
use subspace_core_primitives::{
Sha256Hash, Solution, PIECE_SIZE, RECORDED_HISTORY_SEGMENT_SIZE, RECORD_SIZE,
};
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -95,6 +97,9 @@ pub trait SubspaceRpcApi {

#[method(name = "subspace_acknowledgeArchivedSegment")]
async fn acknowledge_archived_segment(&self, segment_index: u64) -> RpcResult<()>;

#[method(name = "subspace_recordsRoot")]
async fn records_root(&self, segment_index: u64) -> RpcResult<Option<Sha256Hash>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should take a vector of segment indexes and return a vector of roots to improve efficiency, otherwise there might be too many requests necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure. However, it seems vulnerable to a large indexes vector. It makes sense to introduce MAX_SEGMENT_INDEXES_PER_REQUEST const to limit both farmer and node sides.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly. Requester needs to limit number of roots requested or else response will not fit into max size configured in RPC server and response will fail. For public RPC endpoints we also may want to limit this, so yeah, makes sense to have certain limits on both sides.

}

#[derive(Default)]
Expand Down Expand Up @@ -513,4 +518,19 @@ where

Ok(())
}

async fn records_root(&self, segment_index: u64) -> RpcResult<Option<Sha256Hash>> {
let runtime_api = self.client.runtime_api();
let best_block_id = BlockId::Hash(self.client.info().best_hash);

let records_root = runtime_api.records_root(&best_block_id, segment_index);

records_root.map_err(|error| {
error!(
"Failed to get data from runtime API (records_root): {}",
error
);
JsonRpseeError::Custom("Internal error during `records_root` call".to_string())
})
}
}
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/bench_rpc_client.rs
Expand Up @@ -6,6 +6,7 @@ use futures::{stream, SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -137,4 +138,8 @@ impl RpcClient for BenchRpcClient {
.await?;
Ok(())
}

async fn records_root(&self, _: u64) -> Result<Option<Sha256Hash>, Error> {
Ok(None)
}
}
41 changes: 24 additions & 17 deletions crates/subspace-farmer/src/dsn.rs
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use futures::{SinkExt, Stream, StreamExt};
use num_traits::{WrappingAdd, WrappingSub};
use std::ops::Range;
Expand Down Expand Up @@ -162,16 +163,23 @@ impl DSNSync for subspace_networking::Node {
}
}

/// Defines actions on receiving the pieces during the sync process.
#[async_trait]
pub trait OnSync {
/// Defines a callback on receiving pieces.
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()>;
}

/// Syncs the closest pieces to the public key from the provided DSN.
pub async fn sync<DSN, OP>(
mut dsn: DSN,
options: SyncOptions,
mut on_pieces: OP,
) -> anyhow::Result<()>
pub async fn sync<DSN, OP>(mut dsn: DSN, options: SyncOptions, on_sync: OP) -> anyhow::Result<()>
where
DSN: DSNSync + Send + Sized,
DSN::Stream: Unpin + Send,
OP: FnMut(FlatPieces, Vec<PieceIndex>) -> anyhow::Result<()> + Send + 'static,
OP: OnSync,
{
let SyncOptions {
max_plot_size,
Expand Down Expand Up @@ -232,11 +240,12 @@ where
for (start, end) in sync_ranges {
let mut stream = dsn.get_pieces(start.into()..end.into()).await?;

while let Some(PiecesToPlot {
piece_indexes,
pieces,
}) = stream.next().await
{
while let Some(pieces_to_plot) = stream.next().await {
let PiecesToPlot {
piece_indexes,
pieces,
} = pieces_to_plot;

// Filter out pieces which are not in our range
let (piece_indexes, pieces) = piece_indexes
.into_iter()
Expand All @@ -256,12 +265,10 @@ where
})
.unzip();

// Writing pieces is usually synchronous, therefore might take some time
on_pieces = tokio::task::spawn_blocking(move || {
on_pieces(pieces, piece_indexes).map(|()| on_pieces)
})
.await
.expect("`on_pieces` must never panic")?;
on_sync
.on_pieces(pieces, piece_indexes)
.await
.expect("`on_pieces` must never panic");
}
}

Expand Down
56 changes: 30 additions & 26 deletions crates/subspace-farmer/src/dsn/tests.rs
@@ -1,5 +1,6 @@
use super::{sync, DSNSync, NoSync, PieceIndexHashNumber, PiecesToPlot, SyncOptions};
use crate::bench_rpc_client::{BenchRpcClient, BENCH_FARMER_PROTOCOL_INFO};
use crate::dsn::OnSync;
use crate::legacy_multi_plots_farm::{LegacyMultiPlotsFarm, Options as MultiFarmingOptions};
use crate::single_plot_farm::PlotFactoryOptions;
use crate::{LegacyObjectMappings, Plot};
Expand All @@ -20,6 +21,29 @@ use subspace_core_primitives::{
use subspace_networking::libp2p::multiaddr::Protocol;
use tempfile::TempDir;

struct TestPlotter {
pub(crate) result: Arc<Mutex<BTreeMap<PieceIndexHash, (Piece, PieceIndex)>>>,
}

#[async_trait::async_trait]
impl OnSync for TestPlotter {
async fn on_pieces(
&self,
pieces: FlatPieces,
piece_indices: Vec<PieceIndex>,
) -> anyhow::Result<()> {
let mut result = self.result.lock();
result.extend(pieces.as_pieces().zip(piece_indices).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct TestDSN(BTreeMap<PieceIndexHash, (Piece, PieceIndex)>);

Expand Down Expand Up @@ -80,19 +104,8 @@ async fn simple_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 256,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));

Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand All @@ -117,18 +130,8 @@ async fn no_sync_test() {
max_plot_size: 100 * 1024 * 1024 * 1024,
total_pieces: 0,
},
{
let result = Arc::clone(&result);
move |pieces, piece_indexes| {
let mut result = result.lock();
result.extend(pieces.as_pieces().zip(piece_indexes).map(|(piece, index)| {
(
PieceIndexHash::from_index(index),
(piece.try_into().unwrap(), index),
)
}));
Ok(())
}
TestPlotter {
result: Arc::clone(&result),
},
)
.await
Expand Down Expand Up @@ -359,7 +362,8 @@ async fn test_dsn_sync() {

let range_size = PieceIndexHashNumber::MAX / seeder_max_piece_count * pieces_per_request;
let plot = syncer_multi_farming.single_plot_farms()[0].plot().clone();
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync(
let dsn_sync = syncer_multi_farming.single_plot_farms()[0].dsn_sync::<BenchRpcClient>(
None,
syncer_max_plot_size,
seeder_max_piece_count,
range_size,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Expand Up @@ -40,6 +40,7 @@ pub mod legacy_multi_plots_farm;
mod mock_rpc_client;
pub(crate) mod node_rpc_client;
pub(crate) mod object_mappings;
mod pieces_verification;
pub(crate) mod plot;
pub(crate) mod rpc_client;
pub mod single_disk_farm;
Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-farmer/src/mock_rpc_client.rs
Expand Up @@ -5,6 +5,7 @@ use futures::{SinkExt, Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -232,4 +233,8 @@ impl RpcClient for MockRpcClient {
.unwrap();
Ok(())
}

async fn records_root(&self, _: u64) -> Result<Option<Sha256Hash>, MockError> {
Ok(None)
}
}
8 changes: 8 additions & 0 deletions crates/subspace-farmer/src/node_rpc_client.rs
Expand Up @@ -8,6 +8,7 @@ use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::pin::Pin;
use std::sync::Arc;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -122,4 +123,11 @@ impl RpcClient for NodeRpcClient {
)
.await?)
}

async fn records_root(&self, segment_index: u64) -> Result<Option<Sha256Hash>, RpcError> {
Ok(self
.client
.request("subspace_recordsRoot", rpc_params![&segment_index])
.await?)
}
}
53 changes: 53 additions & 0 deletions crates/subspace-farmer/src/pieces_verification.rs
@@ -0,0 +1,53 @@
use crate::RpcClient;
use subspace_archiving::archiver::is_piece_valid;
use subspace_core_primitives::{
FlatPieces, PieceIndex, MERKLE_NUM_LEAVES, PIECE_SIZE, RECORD_SIZE,
};
use thiserror::Error;

/// Pieces verification errors.
#[derive(Error, Debug)]
pub enum PiecesVerificationError {
/// Invalid pieces data provided
#[error("Invalid pieces data provided")]
InvalidRawData,
/// Pieces verification failed.
#[error("Pieces verification failed.")]
InvalidPieces,
/// RPC client failed.
#[error("RPC client failed. jsonrpsee error: {0}")]
RpcError(Box<dyn std::error::Error + Send + Sync>),
/// RPC client returned empty records_root.
#[error("RPC client returned empty records_root.")]
NoRecordsRootFound,
}

//TODO: Optimize: parallel execution, batch execution.
/// Verifies pieces against the blockchain.
pub async fn verify_pieces_at_blockchain<RC: RpcClient>(
verification_client: &RC,
piece_indexes: &[PieceIndex],
pieces: &FlatPieces,
) -> Result<(), PiecesVerificationError> {
if piece_indexes.len() != (pieces.len() / PIECE_SIZE) {
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
return Err(PiecesVerificationError::InvalidRawData);
}
for (index, piece) in pieces.as_pieces().enumerate() {
let piece_index = piece_indexes[index];
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

let segment_index: u64 = piece_index / MERKLE_NUM_LEAVES as u64;
let position: u64 = piece_index % MERKLE_NUM_LEAVES as u64;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MERKLE_NUM_LEAVES and RECORD_SIZE should be derived from FarmerProtocolInfo instead, it allows us to have different values in tests and generally make implementation on the library level more flexible. This is the first time these constants are used on the farmer specifically.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Please, verify the new algorithm.


let root = verification_client
.records_root(segment_index)
.await
.map_err(|err| PiecesVerificationError::RpcError(err))?
.ok_or(PiecesVerificationError::NoRecordsRootFound)?;

if !is_piece_valid(piece, root, position as usize, RECORD_SIZE as usize) {
return Err(PiecesVerificationError::InvalidPieces);
}
}

Ok(())
}
4 changes: 4 additions & 0 deletions crates/subspace-farmer/src/rpc_client.rs
Expand Up @@ -2,6 +2,7 @@ use async_trait::async_trait;
use futures::Stream;
use std::pin::Pin;
use subspace_archiving::archiver::ArchivedSegment;
use subspace_core_primitives::Sha256Hash;
use subspace_rpc_primitives::{
FarmerProtocolInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
Expand Down Expand Up @@ -44,4 +45,7 @@ pub trait RpcClient: Clone + Send + Sync + 'static {

/// Acknowledge receiving of archived segments
async fn acknowledge_archived_segment(&self, segment_index: u64) -> Result<(), Error>;

/// Get records root for the segment
async fn records_root(&self, segment_index: u64) -> Result<Option<Sha256Hash>, Error>;
}