Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use hash based record verification for chunks #1059

Merged
merged 11 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ jobs:
echo "Total swarm_driver long handling duration is: $total_long_handling ms"
echo "Total average swarm_driver long handling duration is: $average_handling_ms ms"
total_num_of_times_limit_hits="10000" # hits
total_long_handling_limit_ms="150000" # ms
total_long_handling_limit_ms="170000" # ms
average_handling_limit_ms="20" # ms
if (( $(echo "$total_num_of_times > $total_num_of_times_limit_hits" | bc -l) )); then
echo "Swarm_driver long handling times exceeded threshold: $total_num_of_times hits"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ jobs:
- name: Start a client to upload files
run: |
ls -l
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data.zip" --show-holders -r 0
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data.zip" -r 0
env:
SN_LOG: "all"
timeout-minutes: 25
Expand All @@ -120,7 +120,7 @@ jobs:
cat initial_balance_from_faucet_1.txt | tail -n 1 > transfer_hex
cat transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir wallet receive --file transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data_1.zip" --show-holders -r 0 > second_upload.txt
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data_1.zip" -r 0 > second_upload.txt
cat second_upload.txt
rg "New wallet balance: 5000000.000000000" second_upload.txt -c --stats
env:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 0 additions & 8 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ pub enum FilesCmds {
/// during payment and upload processing.
#[clap(long, default_value_t = BATCH_SIZE, short='b')]
batch_size: usize,
/// Flagging whether to show the holders of the uploaded chunks.
/// Default to be not showing.
#[clap(long, name = "show_holders", default_value = "false")]
show_holders: bool,
/// The retry_count for retrying failed chunks
/// during payment and upload processing.
#[clap(long, default_value_t = MAX_UPLOAD_RETRIES, short = 'r')]
Expand Down Expand Up @@ -92,7 +88,6 @@ pub(crate) async fn files_cmds(
FilesCmds::Upload {
path,
batch_size,
show_holders,
max_retries,
} => {
upload_files(
Expand All @@ -101,7 +96,6 @@ pub(crate) async fn files_cmds(
root_dir.to_path_buf(),
verify_store,
batch_size,
show_holders,
max_retries,
)
.await?
Expand Down Expand Up @@ -162,7 +156,6 @@ async fn upload_files(
root_dir: PathBuf,
verify_store: bool,
batch_size: usize,
show_holders: bool,
max_retries: usize,
) -> Result<()> {
debug!("Uploading file(s) from {files_path:?}, batch size {batch_size:?} will verify?: {verify_store}");
Expand Down Expand Up @@ -227,7 +220,6 @@ async fn upload_files(
let mut files = Files::new(files_api)
.set_batch_size(batch_size)
.set_verify_store(verify_store)
.set_show_holders(show_holders)
.set_max_retries(max_retries);
let mut upload_event_rx = files.get_upload_events();
// keep track of the progress in a separate task
Expand Down
109 changes: 49 additions & 60 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use libp2p::{
};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use rand::{thread_rng, Rng};
use sn_networking::{
multiaddr_is_global, Error as NetworkError, GetRecordCfg, GetRecordError, NetworkBuilder,
NetworkEvent, PutRecordCfg, CLOSE_GROUP_SIZE,
NetworkEvent, PutRecordCfg, VerificationKind, CLOSE_GROUP_SIZE,
};
use sn_protocol::{
error::Error as ProtocolError,
messages::ChunkProof,
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, RecordHeader,
RecordKind, RegisterAddress, SpendAddress,
Expand Down Expand Up @@ -381,7 +383,6 @@ impl Client {
chunk: Chunk,
payment: Payment,
verify_store: bool,
show_holders: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
let key = chunk.network_address().to_record_key();
Expand All @@ -394,52 +395,41 @@ impl Client {
expires: None,
};

let expected_holders: HashSet<_> = if show_holders {
self.network
.get_closest_peers(&chunk.network_address(), true)
.await?
.iter()
.cloned()
.collect()
} else {
Default::default()
};

let record_to_verify = if verify_store {
let verification = if verify_store {
let verification_cfg = GetRecordCfg {
get_quorum: Quorum::N(
NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?,
),
re_attempt: true,
target_record: None, // Not used since we use ChunkProof
expected_holders: Default::default(),
};
// The `ChunkWithPayment` is only used to send out via PutRecord.
// The holders shall only hold the `Chunk` copies.
// Hence the fetched copies shall only be a `Chunk`
Some(Record {
key,
value: try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec(),
publisher: None,
expires: None,
})
} else {
None
};

let verification_cfg = GetRecordCfg {
get_quorum: Quorum::N(
NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?,
),
re_attempt: true,
target_record: record_to_verify,
expected_holders,
};
let stored_on_node = try_serialize_record(&chunk, RecordKind::Chunk)?.to_vec();
let random_nonce = thread_rng().gen::<u64>();
let expected_proof = ChunkProof::new(&stored_on_node, random_nonce);

let verification = if verify_store {
Some((record_kind, verification_cfg))
Some((
VerificationKind::ChunkProof {
expected_proof,
nonce: random_nonce,
},
verification_cfg,
))
} else {
None
};

let put_cfg = PutRecordCfg {
put_quorum: Quorum::One,
re_attempt: true,
verification,
};
Ok(self.network.put_record(record, &put_cfg).await?)
self.network.put_record(record, &put_cfg).await?;

Ok(())
}

/// Retrieve a `Chunk` from the kad network.
Expand Down Expand Up @@ -477,29 +467,28 @@ impl Client {
}

/// Verify if a `Chunk` is stored by expected nodes on the network.
pub async fn verify_chunk_stored(&self, address: ChunkAddress) -> Result<Chunk> {
pub async fn verify_chunk_stored(&self, chunk: &Chunk) -> Result<()> {
let address = chunk.network_address();
info!("Verifying chunk: {address:?}");
let key = NetworkAddress::from_chunk_address(address).to_record_key();
let random_nonce = thread_rng().gen::<u64>();
let record_value = try_serialize_record(&chunk, RecordKind::Chunk)?;
let expected_proof = ChunkProof::new(record_value.as_ref(), random_nonce);

let verification_cfg = GetRecordCfg {
get_quorum: Quorum::N(
NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?,
),
re_attempt: false,
target_record: None,
expected_holders: Default::default(),
};
let record = self
if let Err(err) = self
.network
.get_record_from_network(key, &verification_cfg)
.await?;
let header = RecordHeader::from_record(&record)?;
if let RecordKind::Chunk = header.kind {
let chunk: Chunk = try_deserialize_record(&record)?;
Ok(chunk)
} else {
Err(NetworkError::RecordKindMismatch(RecordKind::Chunk).into())
.verify_chunk_existence(
address.clone(),
random_nonce,
expected_proof,
Quorum::N(NonZeroUsize::new(2).ok_or(Error::NonZeroUsizeWasInitialisedAsZero)?),
false,
)
.await
{
error!("Failed to verify the existence of chunk {address:?} with err {err:?}");
}

Ok(())
}

/// Verify if a `Register` is stored by expected nodes on the network.
Expand Down Expand Up @@ -550,7 +539,7 @@ impl Client {
let put_cfg = PutRecordCfg {
put_quorum: Quorum::All,
re_attempt: true,
verification: Some((record_kind, verification_cfg)),
verification: Some((VerificationKind::Network, verification_cfg)),
};
Ok(self.network.put_record(record, &put_cfg).await?)
}
Expand Down Expand Up @@ -698,15 +687,15 @@ impl Client {
// now we try and get batched chunks, keep track of any that fail
// Iterate over each uploaded chunk
let mut verify_handles = Vec::new();
for (name, path) in chunks_batch.iter().cloned() {
for (name, chunk_path) in chunks_batch.iter().cloned() {
let client = self.clone();
// Spawn a new task to fetch each chunk concurrently
let handle = tokio::spawn(async move {
let chunk_address: ChunkAddress = ChunkAddress::new(name);
// make sure the chunk is stored
let res = client.verify_chunk_stored(chunk_address).await;
// make sure the chunk is stored;
let chunk = Chunk::new(Bytes::from(std::fs::read(&chunk_path)?));
let res = client.verify_chunk_stored(&chunk).await;

Ok::<_, ChunksError>(((name, path), res.is_err()))
Ok::<_, ChunksError>(((name, chunk_path), res.is_err()))
});
verify_handles.push(handle);
}
Expand Down
5 changes: 2 additions & 3 deletions sn_client/src/files/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ impl FilesApi {
&self,
chunk: Chunk,
verify_store: bool,
show_holders: bool,
) -> Result<()> {
let chunk_addr = chunk.network_address();
trace!("Client upload started for chunk: {chunk_addr:?}");
Expand All @@ -172,7 +171,7 @@ impl FilesApi {
);

self.client
.store_chunk(chunk, payment, verify_store, show_holders)
.store_chunk(chunk, payment, verify_store)
.await?;

trace!("Client upload completed for chunk: {chunk_addr:?}");
Expand Down Expand Up @@ -219,7 +218,7 @@ impl FilesApi {

for (_chunk_name, chunk_path) in chunks_paths {
let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?));
self.get_local_payment_and_upload_chunk(chunk, verify, false)
self.get_local_payment_and_upload_chunk(chunk, verify)
.await?;
}

Expand Down
11 changes: 2 additions & 9 deletions sn_client/src/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,15 +321,9 @@ impl Files {
for chunk_info in chunks_to_upload.into_iter() {
let files_api = self.api.clone();
let verify_store = self.verify_store;
let show_holders = self.show_holders;

// Spawn a task for each chunk to be uploaded
let handle = tokio::spawn(Self::upload_chunk(
files_api,
chunk_info,
verify_store,
show_holders,
));
let handle = tokio::spawn(Self::upload_chunk(files_api, chunk_info, verify_store));
self.progress_uploading_chunks(false).await?;

self.uploading_chunks.push(handle);
Expand Down Expand Up @@ -386,7 +380,6 @@ impl Files {
files_api: FilesApi,
chunk_info: ChunkInfo,
verify_store: bool,
show_holders: bool,
) -> (ChunkInfo, Result<()>) {
let chunk_address = ChunkAddress::new(chunk_info.name);
let bytes = match tokio::fs::read(chunk_info.path.clone()).await {
Expand All @@ -400,7 +393,7 @@ impl Files {
};
let chunk = Chunk::new(bytes);
match files_api
.get_local_payment_and_upload_chunk(chunk, verify_store, show_holders)
.get_local_payment_and_upload_chunk(chunk, verify_store)
.await
{
Ok(()) => (chunk_info, Ok(())),
Expand Down
4 changes: 2 additions & 2 deletions sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{Client, Error, Result, WalletClient};

use bls::PublicKey;
use libp2p::kad::{Quorum, Record};
use sn_networking::{GetRecordCfg, PutRecordCfg};
use sn_networking::{GetRecordCfg, PutRecordCfg, VerificationKind};
use sn_protocol::{
error::Error as ProtocolError,
messages::RegisterCmd,
Expand Down Expand Up @@ -398,7 +398,7 @@ impl ClientRegister {
let put_cfg = PutRecordCfg {
put_quorum: Quorum::All,
re_attempt: true,
verification: Some((RecordKind::Register, verification_cfg)),
verification: Some((VerificationKind::Network, verification_cfg)),
};

// Register edits might exist so we cannot be sure that just because we get a record back that this should fail
Expand Down
1 change: 0 additions & 1 deletion sn_faucet/src/faucet_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use tracing::{debug, error, trace};
///
/// # balance should be updated
/// ```

pub async fn run_faucet_server(client: &Client) -> Result<()> {
let server =
Server::http("0.0.0.0:8000").map_err(|err| eyre!("Failed to start server: {err}"))?;
Expand Down
19 changes: 15 additions & 4 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ use libp2p::{
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use sn_protocol::{
messages::{Request, Response},
storage::RecordKind,
messages::{ChunkProof, Nonce, Request, Response},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use std::{
Expand Down Expand Up @@ -159,8 +158,20 @@ pub struct PutRecordCfg {
pub put_quorum: Quorum,
/// If set to true, we retry upto PUT_RETRY_ATTEMPTS times
pub re_attempt: bool,
/// Enables verification after writing. The RecordKind is used to determine the verification delay.
pub verification: Option<(RecordKind, GetRecordCfg)>,
/// Enables verification after writing. The VerificationKind is used to determine the method to use.
pub verification: Option<(VerificationKind, GetRecordCfg)>,
}

/// The methods in which verification on a PUT can be carried out.
#[derive(Debug, Clone)]
pub enum VerificationKind {
/// Uses the default KAD GET to perform verification.
Network,
/// Uses the hash based verification for chunks.
ChunkProof {
expected_proof: ChunkProof,
nonce: Nonce,
},
}

/// NodeBehaviour struct
Expand Down
6 changes: 5 additions & 1 deletion sn_networking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use libp2p::{
swarm::DialError,
PeerId, TransportError,
};
use sn_protocol::{messages::Response, storage::RecordKind, PrettyPrintRecordKey};
use sn_protocol::{messages::Response, storage::RecordKind, NetworkAddress, PrettyPrintRecordKey};
use sn_transfers::{SignedSpend, SpendAddress};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -129,6 +129,10 @@ pub enum Error {
#[error("Transfer is invalid: {0}")]
InvalidTransfer(String),

// ---------- Chunk Errors
#[error("Failed to verify the ChunkProof with the provided quorum")]
FailedToVerifyChunkProof(NetworkAddress),

// ---------- Spend Errors
#[error("Spend not found: {0:?}")]
NoSpendFoundInsideRecord(SpendAddress),
Expand Down