Skip to content

Commit

Permalink
feat(networking): remove optional_semaphore being passed down from apps
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Sep 27, 2023
1 parent add0bb5 commit 5388289
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 47 deletions.
2 changes: 1 addition & 1 deletion sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ async fn upload_chunk(
let chunk = Chunk::new(Bytes::from(fs::read(path)?));

file_api
.get_local_payment_and_upload_chunk(chunk, verify_store, None)
.get_local_payment_and_upload_chunk(chunk, verify_store)
.await?;

println!(
Expand Down
25 changes: 3 additions & 22 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sn_transfers::{MainPubkey, NanoTokens, SignedSpend, UniquePubkey};
use sn_registers::SignedRegister;
use sn_transfers::{transfers::SpendRequest, wallet::Transfer};
use std::time::Duration;
use tokio::{sync::OwnedSemaphorePermit, task::spawn};
use tokio::task::spawn;
use tracing::trace;
use xor_name::XorName;

Expand Down Expand Up @@ -174,18 +174,6 @@ impl Client {
Ok(client)
}

/// Get the client's network concurrency permit
///
/// This allows us to grab a permit early if we're dealing with large data (chunks)
/// and want to hold off on loading more until other operations are complete.
pub async fn get_network_concurrency_permit(&self) -> Result<OwnedSemaphorePermit> {
if let Some(limiter) = self.network.concurrency_limiter() {
Ok(limiter.acquire_owned().await?)
} else {
Err(Error::NoNetworkConcurrencyLimiterFound)
}
}

/// Set up our initial progress bar for network connectivity
fn setup_connection_progress() -> ProgressBar {
// Network connection progress bar
Expand Down Expand Up @@ -312,7 +300,6 @@ impl Client {
chunk: Chunk,
payment: Vec<Transfer>,
verify_store: bool,
optional_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
let key = chunk.network_address().to_record_key();
Expand All @@ -330,10 +317,7 @@ impl Client {
None
};

Ok(self
.network
.put_record(record, record_to_verify, optional_permit)
.await?)
Ok(self.network.put_record(record, record_to_verify).await?)
}

/// Retrieve a `Chunk` from the kad network.
Expand Down Expand Up @@ -377,10 +361,7 @@ impl Client {
None
};

Ok(self
.network
.put_record(record, record_to_verify, None)
.await?)
Ok(self.network.put_record(record, record_to_verify).await?)
}

/// Get a cash_note spend from network
Expand Down
7 changes: 3 additions & 4 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{
time::Instant,
};
use tempfile::tempdir;
use tokio::{sync::OwnedSemaphorePermit, task};
use tokio::task;
use tracing::trace;
use xor_name::XorName;

Expand Down Expand Up @@ -165,7 +165,6 @@ impl Files {
&self,
chunk: Chunk,
verify_store: bool,
optional_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
let chunk_addr = chunk.network_address();
trace!("Client upload started for chunk: {chunk_addr:?}");
Expand All @@ -185,7 +184,7 @@ impl Files {
payment.len()
);
self.client
.store_chunk(chunk, payment, verify_store, optional_permit)
.store_chunk(chunk, payment, verify_store)
.await?;

trace!("Client upload completed for chunk: {chunk_addr:?}");
Expand Down Expand Up @@ -238,7 +237,7 @@ impl Files {

for (_chunk_name, chunk_path) in chunks_paths {
let chunk = Chunk::new(Bytes::from(fs::read(chunk_path)?));
self.get_local_payment_and_upload_chunk(chunk, verify, None)
self.get_local_payment_and_upload_chunk(chunk, verify)
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl ClientRegister {
Ok(self
.client
.network
.put_record(record, record_to_verify, None)
.put_record(record, record_to_verify)
.await?)
}

Expand Down
29 changes: 11 additions & 18 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use sn_protocol::{
use sn_transfers::MainPubkey;
use sn_transfers::NanoTokens;
use std::{collections::HashSet, path::PathBuf, sync::Arc};
use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore};
use tokio::sync::{mpsc, oneshot, Semaphore};
use tracing::warn;

/// The maximum number of peers to return in a `GetClosestPeers` response.
Expand Down Expand Up @@ -378,12 +378,7 @@ impl Network {
/// Put `Record` to network
/// Optionally verify the record is stored after putting it to network
/// Retry up to `PUT_RECORD_RETRIES` times if we can't verify the record is stored
pub async fn put_record(
&self,
record: Record,
verify_store: Option<Record>,
mut optional_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
pub async fn put_record(&self, record: Record, verify_store: Option<Record>) -> Result<()> {
let mut retries = 0;

// let mut has_permit = optional_permit.is_some();
Expand All @@ -395,27 +390,25 @@ impl Network {
);

let res = self
.put_record_once(record.clone(), verify_store.clone(), optional_permit)
.put_record_once(record.clone(), verify_store.clone())
.await;
if !matches!(res, Err(Error::FailedToVerifyRecordWasStored(_))) {
return res;
}

// the permit will have been consumed above.
optional_permit = None;

retries += 1;
}
Err(Error::FailedToVerifyRecordWasStored(record.key.into()))
}

async fn put_record_once(
&self,
record: Record,
verify_store: Option<Record>,
starting_permit: Option<OwnedSemaphorePermit>,
) -> Result<()> {
let mut _permit = starting_permit;
async fn put_record_once(&self, record: Record, verify_store: Option<Record>) -> Result<()> {
// get permit if semaphore supplied
let _permit = if let Some(semaphore) = self.concurrency_limiter.clone() {
let our_permit = semaphore.acquire_owned().await?;
Some(our_permit)
} else {
None
};

let record_key = record.key.clone();
let pretty_key = PrettyPrintRecordKey::from(record_key.clone());
Expand Down
2 changes: 1 addition & 1 deletion sn_node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ fn store_chunks_task(
}
};
match file_api
.get_local_payment_and_upload_chunk(chunk, true, None)
.get_local_payment_and_upload_chunk(chunk, true)
.await
{
Ok(()) => content
Expand Down

0 comments on commit 5388289

Please sign in to comment.