Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: all records are Quorum::All once more #776

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async fn upload_chunk(
let chunk = Chunk::new(Bytes::from(fs::read(path)?));

file_api
.get_local_payment_and_upload_chunk(chunk, verify_store, None)
.get_local_payment_and_upload_chunk(chunk, verify_store)
.await?;

println!(
Expand Down
31 changes: 4 additions & 27 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@ use super::{
};
use bls::{PublicKey, SecretKey, Signature};
use indicatif::ProgressBar;
use libp2p::{
identity::Keypair,
kad::{Quorum, Record},
Multiaddr,
};
use libp2p::{identity::Keypair, kad::Record, Multiaddr};
#[cfg(feature = "open-metrics")]
use prometheus_client::registry::Registry;
use sn_networking::{multiaddr_is_global, NetworkBuilder, NetworkEvent, CLOSE_GROUP_SIZE};
Expand All @@ -33,7 +29,7 @@ use sn_transfers::{MainPubkey, NanoTokens, SignedSpend, UniquePubkey};
use sn_registers::SignedRegister;
use sn_transfers::{transfers::SpendRequest, wallet::Transfer};
use std::time::Duration;
use tokio::{sync::OwnedSemaphorePermit, task::spawn};
use tokio::task::spawn;
use tracing::trace;
use xor_name::XorName;

Expand Down Expand Up @@ -178,18 +174,6 @@ impl Client {
Ok(client)
}

/// Get the client's network concurrency permit
///
/// This allows us to grab a permit early if we're dealing with large data (chunks)
/// and want to hold off on loading more until other operations are complete.
pub async fn get_network_concurrency_permit(&self) -> Result<OwnedSemaphorePermit> {
if let Some(limiter) = self.network.concurrency_limiter() {
Ok(limiter.acquire_owned().await?)
} else {
Err(Error::NoNetworkConcurrencyLimiterFound)
}
}

/// Set up our initial progress bar for network connectivity
fn setup_connection_progress() -> ProgressBar {
// Network connection progress bar
Expand Down Expand Up @@ -316,7 +300,6 @@ impl Client {
chunk: Chunk,
payment: Vec<Transfer>,
verify_store: bool,
optional_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
let key = chunk.network_address().to_record_key();
Expand All @@ -334,10 +317,7 @@ impl Client {
None
};

Ok(self
.network
.put_record(record, record_to_verify, optional_permit, Quorum::One)
.await?)
Ok(self.network.put_record(record, record_to_verify).await?)
}

/// Retrieve a `Chunk` from the kad network.
Expand Down Expand Up @@ -381,10 +361,7 @@ impl Client {
None
};

Ok(self
.network
.put_record(record, record_to_verify, None, Quorum::All)
.await?)
Ok(self.network.put_record(record, record_to_verify).await?)
}

/// Get a cash_note spend from network
Expand Down
7 changes: 3 additions & 4 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{
time::Instant,
};
use tempfile::tempdir;
use tokio::{sync::OwnedSemaphorePermit, task};
use tokio::task;
use tracing::trace;
use xor_name::XorName;

Expand Down Expand Up @@ -165,7 +165,6 @@ impl Files {
&self,
chunk: Chunk,
verify_store: bool,
optional_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
let chunk_addr = chunk.network_address();
trace!("Client upload started for chunk: {chunk_addr:?}");
Expand All @@ -185,7 +184,7 @@ impl Files {
payment.len()
);
self.client
.store_chunk(chunk, payment, verify_store, optional_permit)
.store_chunk(chunk, payment, verify_store)
.await?;

trace!("Client upload completed for chunk: {chunk_addr:?}");
Expand Down Expand Up @@ -238,7 +237,7 @@ impl Files {

for (_chunk_name, chunk_path) in chunks_paths {
let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?));
self.get_local_payment_and_upload_chunk(chunk, verify, None)
self.get_local_payment_and_upload_chunk(chunk, verify)
.await?;
}

Expand Down
4 changes: 2 additions & 2 deletions sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use crate::{Client, Error, Result, WalletClient};

use bls::PublicKey;
use libp2p::kad::{Quorum, Record};
use libp2p::kad::Record;
use sn_protocol::{
error::Error as ProtocolError,
messages::RegisterCmd,
Expand Down Expand Up @@ -349,7 +349,7 @@ impl ClientRegister {
Ok(self
.client
.network
.put_record(record, record_to_verify, None, Quorum::All)
.put_record(record, record_to_verify)
.await?)
}

Expand Down
9 changes: 2 additions & 7 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ pub enum SwarmCmd {
PutRecord {
record: Record,
sender: oneshot::Sender<Result<()>>,
quorum: Quorum,
},
/// Put record to the local RecordStore
PutLocalRecord {
Expand Down Expand Up @@ -189,11 +188,7 @@ impl SwarmDriver {
.map(|rec| rec.into_owned());
let _ = sender.send(record);
}
SwarmCmd::PutRecord {
record,
sender,
quorum,
} => {
SwarmCmd::PutRecord { record, sender } => {
let record_key = PrettyPrintRecordKey::from(record.key.clone());
trace!(
"Putting record sized: {:?} to network {:?}",
Expand All @@ -204,7 +199,7 @@ impl SwarmDriver {
.swarm
.behaviour_mut()
.kademlia
.put_record(record, quorum)
.put_record(record, Quorum::All)
{
Ok(request_id) => {
trace!("Sent record {record_key:?} to network. Request id: {request_id:?} to network");
Expand Down
7 changes: 1 addition & 6 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p::mdns;
use libp2p::{
autonat,
identity::Keypair,
kad::{Kademlia, KademliaCaching, KademliaConfig, QueryId, Record},
kad::{Kademlia, KademliaConfig, QueryId, Record},
multiaddr::Protocol,
request_response::{self, Config as RequestResponseConfig, ProtocolSupport, RequestId},
swarm::{
Expand Down Expand Up @@ -183,11 +183,6 @@ impl NetworkBuilder {
.disjoint_query_paths(true)
// Records never expire
.set_record_ttl(None)
// Enable caching of records if we use Quorum::One for GetRecord (which should be used for chunks)
// This means the closest 16 nodes would be caching the record
.set_caching(KademliaCaching::Enabled {
max_peers: CLOSE_GROUP_SIZE as u16 * 2,
})
// Emit PUT events for validation prior to insertion into the RecordStore.
// This is no longer needed as the record_storage::put now can carry out validation.
// .set_record_filtering(KademliaStoreInserts::FilterBoth)
Expand Down
39 changes: 12 additions & 27 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use futures::future::select_all;
use itertools::Itertools;
use libp2p::{
identity::Keypair,
kad::{KBucketKey, Quorum, Record, RecordKey},
kad::{KBucketKey, Record, RecordKey},
multiaddr::Protocol,
Multiaddr, PeerId,
};
Expand All @@ -46,7 +46,7 @@ use sn_protocol::{
use sn_transfers::MainPubkey;
use sn_transfers::NanoTokens;
use std::{collections::HashSet, path::PathBuf, sync::Arc};
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio::sync::{mpsc, oneshot, Semaphore};
use tracing::warn;

/// The maximum number of peers to return in a `GetClosestPeers` response.
Expand Down Expand Up @@ -378,13 +378,7 @@ impl Network {
/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// Retry up to `PUT_RECORD_RETRIES` times if we can't verify the record is stored
pub async fn put_record(
&self,
record: Record,
verify_store: Option<Record>,
mut optional_permit: Option<OwnedSemaphorePermit>,
quorum: Quorum,
) -> Result<()> {
pub async fn put_record(&self, record: Record, verify_store: Option<Record>) -> Result<()> {
let mut retries = 0;

// let mut has_permit = optional_permit.is_some();
Expand All @@ -396,33 +390,25 @@ impl Network {
);

let res = self
.put_record_once(
record.clone(),
verify_store.clone(),
optional_permit,
quorum,
)
.put_record_once(record.clone(), verify_store.clone())
.await;
if !matches!(res, Err(Error::FailedToVerifyRecordWasStored(_))) {
return res;
}

// the permit will have been consumed above.
optional_permit = None;

retries += 1;
}
Err(Error::FailedToVerifyRecordWasStored(record.key.into()))
}

async fn put_record_once(
&self,
record: Record,
verify_store: Option<Record>,
starting_permit: Option<OwnedSemaphorePermit>,
quorum: Quorum,
) -> Result<()> {
let mut _permit = starting_permit;
async fn put_record_once(&self, record: Record, verify_store: Option<Record>) -> Result<()> {
// get permit if semaphore supplied
let _permit = if let Some(semaphore) = self.concurrency_limiter.clone() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to confirm so here we no longer consume the fetched permit, but leave it to be dropped when the whole function call completed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we hold it until the PUT is done, which is what was happening before with an optionally passed permit.

(which should have just be grabbed above)

let our_permit = semaphore.acquire_owned().await?;
Some(our_permit)
} else {
None
};

let record_key = record.key.clone();
let pretty_key = PrettyPrintRecordKey::from(record_key.clone());
Expand All @@ -437,7 +423,6 @@ impl Network {
self.send_swarm_cmd(SwarmCmd::PutRecord {
record: record.clone(),
sender,
quorum,
})?;
let response = receiver.await?;

Expand Down
2 changes: 1 addition & 1 deletion sn_node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ fn store_chunks_task(
}
};
match file_api
.get_local_payment_and_upload_chunk(chunk, true, None)
.get_local_payment_and_upload_chunk(chunk, true)
.await
{
Ok(()) => content
Expand Down