Skip to content

Commit

Permalink
chore: iterate upload code rearranged for clear readability
Browse files Browse the repository at this point in the history
  • Loading branch information
JasonPaulGithub committed Mar 13, 2024
1 parent 2164b3c commit 9e8f498
Showing 1 changed file with 47 additions and 27 deletions.
74 changes: 47 additions & 27 deletions sn_cli/src/subcommands/files/iterative_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use color_eyre::{eyre::eyre, Result};
use indicatif::ProgressBar;
use sn_client::transfers::{NanoTokens, TransferError, WalletError};
use sn_client::{Error as ClientError, Error, FileUploadEvent, FilesApi, FilesUpload};
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Instant;
Expand All @@ -27,45 +27,51 @@ impl IterativeUploader {
}

impl IterativeUploader {
/// Given an iterator over files, upload them. Optionally verify if the data was stored successfully.
/// Given an iterator over files, upload them.
/// Optionally verify if the data was stored successfully.
pub(crate) async fn iterate_upload(
self,
chunks_to_upload: Vec<(XorName, PathBuf)>,
files_path: &Path,
files_path: PathBuf,
options: FilesUploadOptions,
) -> Result<()> {
//
let FilesUploadOptions {
make_data_public,
verify_store,
batch_size,
retry_strategy,
} = options;

msg_init(files_path, &batch_size, &verify_store, make_data_public);

let mut files_upload = FilesUpload::new(self.files_api)
.set_batch_size(batch_size)
.set_verify_store(verify_store)
.set_retry_strategy(retry_strategy);
let progress_bar = files::get_progress_bar(chunks_to_upload.len() as u64)?;
let total_existing_chunks = Arc::new(AtomicU64::new(0));
let process_join_handle = spawn_progress_handler(
let map_join_handle_that_contains_resulting_file_upload_events = spawn_file_upload_events(
self.chunk_manager,
make_data_public,
progress_bar,
files_upload.get_upload_events(),
total_existing_chunks.clone(),
);

msg_uploading_chunks(&chunks_to_upload.len());
let current_instant = Instant::now();
IterativeUploader::upload_result(chunks_to_upload.clone(), &mut files_upload).await?;

process_join_handle
msg_begin_messages(
&files_path,
&batch_size,
&verify_store,
make_data_public,
&chunks_to_upload.len(),
);

IterativeUploader::upload_chunk_vector(chunks_to_upload.clone(), &mut files_upload).await?;

map_join_handle_that_contains_resulting_file_upload_events
.await?
.map_err(|err| eyre!("Failed to write uploaded files with err: {err:?}"))?;

msg_final(
msg_end_messages(
chunks_to_upload.len(),
current_instant,
total_existing_chunks,
Expand All @@ -75,7 +81,7 @@ impl IterativeUploader {
Ok(())
}

async fn upload_result(
async fn upload_chunk_vector(
chunks_to_upload: Vec<(XorName, PathBuf)>,
files_upload: &mut FilesUpload,
) -> Result<()> {
Expand All @@ -95,7 +101,7 @@ impl IterativeUploader {

///////////////// Associative Functions /////////////////

fn spawn_progress_handler(
fn spawn_file_upload_events(
mut chunk_manager: ChunkManager,
make_data_public: bool,
progress_bar: ProgressBar,
Expand All @@ -104,7 +110,8 @@ fn spawn_progress_handler(
) -> JoinHandle<Result<(), Error>> {
tokio::spawn(async move {
let mut upload_terminated_with_error = false;
// The loop is guaranteed to end, as the channel will be closed when the upload completes or errors out.
// The loop is guaranteed to end, as the channel will be
// closed when the upload completes or errors out.
while let Some(event) = upload_event_rx.recv().await {
match event {
FileUploadEvent::Uploaded(addr) => {
Expand Down Expand Up @@ -179,16 +186,27 @@ fn msg_check_incomplete_files(chunk_manager: &mut ChunkManager) {
}
}

fn msg_init(files_path: &Path, batch_size: &usize, verify_store: &bool, make_data_public: bool) {
debug!("Uploading file(s) from {files_path:?}, batch size {batch_size:?} will verify?: {verify_store}");
fn msg_begin_messages(
files_path: &PathBuf,
batch_size: &usize,
verify_store: &bool,
make_data_public: bool,
chunks_to_upload_len: &usize,
) {
debug!(
"Uploading file(s) from {files_path:?}, \
batch size {batch_size:?} will verify?: {verify_store}"
);

if make_data_public {
info!("{files_path:?} will be made public and linkable");
println!("{files_path:?} will be made public and linkable");
}
println!("Starting to chunk {files_path:?} now."); // check message responsibility

println!("Splitting and uploading {files_path:?} into {chunks_to_upload_len} chunks",);
}

fn msg_final(
fn msg_end_messages(
chunks_to_upload_amount: usize,
time_since_mark: Instant,
total_existing_chunks: Arc<AtomicU64>,
Expand Down Expand Up @@ -242,8 +260,11 @@ fn msg_chunks_found_existed_info(
total_existing_chunks: u64,
uploaded_chunks: usize,
) {
info!("Among {chunks_to_upload_len} chunks, found {total_existing_chunks} already existed in network, \
uploaded the leftover {uploaded_chunks} chunks in {elapsed}");
info!(
"Among {chunks_to_upload_len} chunks, found \
{total_existing_chunks} already existed in network, \
uploaded the leftover {uploaded_chunks} chunks in {elapsed}"
);
}

fn msg_chunks_found_existed(
Expand All @@ -252,8 +273,11 @@ fn msg_chunks_found_existed(
total_existing_chunks: u64,
uploaded_chunks: usize,
) {
println!("Among {chunks_to_upload_len} chunks, found {total_existing_chunks} already existed in network, \
uploaded the leftover {uploaded_chunks} chunks in {elapsed}");
println!(
"Among {chunks_to_upload_len} chunks, found \
{total_existing_chunks} already existed in network, \
uploaded the leftover {uploaded_chunks} chunks in {elapsed}"
);
}

fn msg_payment_details(
Expand All @@ -273,10 +297,6 @@ fn msg_star_line() {
println!("**************************************");
}

fn msg_uploading_chunks(chunks_to_upload_len: &usize) {
println!("Uploading {chunks_to_upload_len} chunks",);
}

fn msg_not_public_by_default_banner() {
println!("* *");
println!("* These are not public by default. *");
Expand Down

0 comments on commit 9e8f498

Please sign in to comment.