Skip to content

Commit

Permalink
feat(cli): add download batch-size option
Browse files Browse the repository at this point in the history
Sets default to 1/4 of upload
  • Loading branch information
joshuef committed Nov 22, 2023
1 parent d28d898 commit 899bc64
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 10 deletions.
45 changes: 38 additions & 7 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum FilesCmds {
/// Can be a file or a directory.
#[clap(name = "path", value_name = "PATH")]
path: PathBuf,
/// The batch_size to split chunks into parallely handling batches
/// The batch_size to split chunks into parallel handling batches
/// during payment and upload processing.
#[clap(long, default_value_t = BATCH_SIZE, short='b')]
batch_size: usize,
Expand Down Expand Up @@ -74,6 +74,9 @@ pub enum FilesCmds {
/// Default to be not showing.
#[clap(long, name = "show_holders", default_value = "false")]
show_holders: bool,
/// The batch_size for parallel downloading
#[clap(long, default_value_t = BATCH_SIZE / 4, short='b')]
batch_size: usize,
},
}

Expand Down Expand Up @@ -103,6 +106,7 @@ pub(crate) async fn files_cmds(
file_name,
file_addr,
show_holders,
batch_size,
} => {
if (file_name.is_some() && file_addr.is_none())
|| (file_addr.is_some() && file_name.is_none())
Expand All @@ -125,11 +129,19 @@ pub(crate) async fn files_cmds(
.try_into()
.expect("Failed to parse XorName from hex string"),
);
download_file(&file_api, &xor_name, &name, root_dir, show_holders).await
download_file(
&file_api,
&xor_name,
&name,
root_dir,
show_holders,
batch_size,
)
.await
}
_ => {
println!("Attempting to download all files uploaded by the current user...");
download_files(&file_api, root_dir, show_holders).await?
download_files(&file_api, root_dir, show_holders, batch_size).await?
}
}
}
Expand All @@ -148,7 +160,7 @@ async fn upload_files(
show_holders: bool,
) -> Result<()> {
debug!(
"Uploading file(s) from {:?}, will verify?: {verify_store}",
"Uploading file(s) from {:?}, batch size {batch_size:?} will verify?: {verify_store}",
files_path
);

Expand Down Expand Up @@ -523,7 +535,13 @@ async fn verify_and_repay_if_needed_once(
Ok(())
}

async fn download_files(file_api: &Files, root_dir: &Path, show_holders: bool) -> Result<()> {
async fn download_files(
file_api: &Files,
root_dir: &Path,
show_holders: bool,
batch_size: usize,
) -> Result<()> {
info!("Downloading with batch size of {}", batch_size);
let uploaded_files_path = root_dir.join("uploaded_files");
let download_path = root_dir.join("downloaded_files");
std::fs::create_dir_all(download_path.as_path())?;
Expand Down Expand Up @@ -552,7 +570,15 @@ async fn download_files(file_api: &Files, root_dir: &Path, show_holders: bool) -
}

for (xorname, file_name) in uploaded_files.iter() {
download_file(file_api, xorname, file_name, &download_path, show_holders).await;
download_file(
file_api,
xorname,
file_name,
&download_path,
show_holders,
batch_size,
)
.await;
}

Ok(())
Expand All @@ -575,15 +601,20 @@ async fn download_file(
file_name: &String,
download_path: &Path,
show_holders: bool,
batch_size: usize,
) {
println!("Downloading {file_name} from {:64x}", xorname);
println!(
"Downloading {file_name} from {:64x} with batch-size {batch_size}",
xorname
);
debug!("Downloading {file_name} from {:64x}", xorname);
let downloaded_file_path = download_path.join(file_name);
match file_api
.read_bytes(
ChunkAddress::new(*xorname),
Some(downloaded_file_path.clone()),
show_holders,
batch_size,
)
.await
{
Expand Down
11 changes: 8 additions & 3 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl Files {
address: ChunkAddress,
downloaded_file_path: Option<PathBuf>,
show_holders: bool,
batch_size: usize,
) -> Result<Option<Bytes>> {
let chunk = match self.client.get_chunk(address, show_holders).await {
Ok(chunk) => chunk,
Expand All @@ -81,7 +82,7 @@ impl Files {

// first try to deserialize a LargeFile, if it works, we go and seek it
if let Ok(data_map) = self.unpack_chunk(chunk.clone()).await {
self.read_all(data_map, downloaded_file_path, show_holders)
self.read_all(data_map, downloaded_file_path, show_holders, batch_size)
.await
} else {
// if an error occurs, we assume it's a SmallFile
Expand Down Expand Up @@ -361,6 +362,7 @@ impl Files {
data_map: DataMap,
decrypted_file_path: Option<PathBuf>,
show_holders: bool,
batch_size: usize,
) -> Result<Option<Bytes>> {
let mut decryptor = if let Some(path) = decrypted_file_path {
StreamSelfDecryptor::decrypt_to_file(Box::new(path), &data_map)?
Expand Down Expand Up @@ -391,7 +393,7 @@ impl Files {
)
});

if ordered_read_futures.len() >= BATCH_SIZE || index + BATCH_SIZE > expected_count {
if ordered_read_futures.len() >= batch_size || index + batch_size > expected_count {
while let Some((dst_hash, result)) = ordered_read_futures.next().await {
let chunk = result.map_err(|error| {
error!("Chunk missing {dst_hash:?} with {error:?}");
Expand Down Expand Up @@ -426,7 +428,10 @@ impl Files {
return Ok(data_map);
}
DataMapLevel::Additional(data_map) => {
let serialized_chunk = self.read_all(data_map, None, false).await?.unwrap();
let serialized_chunk = self
.read_all(data_map, None, false, BATCH_SIZE)
.await?
.unwrap();
chunk = bincode::deserialize(&serialized_chunk)
.map_err(ChunksError::Serialisation)?;
}
Expand Down

0 comments on commit 899bc64

Please sign in to comment.