Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: chunk put allow retrying un-get-able chunks #1047

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
46 changes: 27 additions & 19 deletions sn_cli/src/subcommands/files/chunk_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl ChunkManager {

/// Chunk all the files in the provided `files_path`
/// These are stored to the CHUNK_ARTIFACTS_DIR
pub(crate) fn chunk_path(&mut self, files_path: &Path) -> Result<()> {
trace!("Starting to chunk {files_path:?} now.");
/// if read_cache is true, will take cache from previous runs into account
pub(crate) fn chunk_path(&mut self, files_path: &Path, read_cache: bool) -> Result<()> {
println!("Starting to chunk {files_path:?} now.");
let now = Instant::now();
// clean up
self.files_to_chunk = Default::default();
Expand Down Expand Up @@ -107,7 +108,9 @@ impl ChunkManager {
let total_files = self.files_to_chunk.len();

// resume the chunks from the artifacts dir
self.resume_path();
if read_cache {
self.resume_path();
}

// note the number of chunks that we've resumed
self.resumed_chunk_count = self
Expand Down Expand Up @@ -345,6 +348,16 @@ impl ChunkManager {
&self.verified_files
}

/// Return the filename and the file's Xor address if all their chunks has been marked as
/// verified
pub(crate) fn already_put_chunks(
&mut self,
files_path: &Path,
) -> Result<Vec<(XorName, PathBuf)>> {
self.chunk_path(files_path, false)?;
Ok(self.get_chunks())
}

// Try to read the chunks from `file_chunks_dir`
// Returns the ChunkedFile if the metadata file exists
// file_chunks_dir: artifacts_dir/path_xor
Expand Down Expand Up @@ -432,11 +445,6 @@ impl ChunkManager {
.ok()?;
Some(XorName(decoded_xorname))
}

#[allow(dead_code)]
fn hex_encode_xorname(xorname: XorName) -> String {
hex::encode(xorname)
}
}

#[cfg(test)]
Expand All @@ -455,7 +463,7 @@ mod tests {
let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
let artifacts_dir = manager.artifacts_dir.clone();
let _ = create_random_files(&random_files_dir, 1, 1)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;

let chunks = manager.get_chunks();
// 1. 1mb file produces 4 chunks
Expand Down Expand Up @@ -523,7 +531,7 @@ mod tests {
let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;

let _ = create_random_files(&random_files_dir, 1, 1)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;

let path_xor = manager.chunks.keys().next().unwrap().clone();
let chunked_file = manager.chunks.values().next().unwrap().clone();
Expand Down Expand Up @@ -572,7 +580,7 @@ mod tests {
let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;

let _ = create_random_files(&random_files_dir, 5, 5)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;
// cloned after chunking
let manager_clone = manager.clone();

Expand Down Expand Up @@ -614,10 +622,10 @@ mod tests {
let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;

let _ = create_random_files(&random_files_dir, 5, 5)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;

let mut new_manager = ChunkManager::new(&root_dir);
new_manager.chunk_path(&random_files_dir)?;
new_manager.chunk_path(&random_files_dir, true)?;

// 1. make sure the chunk counts match
let total_chunk_count = manager
Expand All @@ -641,7 +649,7 @@ mod tests {
let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;

let _ = create_random_files(&random_files_dir, 5, 5)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;

let total_chunks_count = manager
.chunks
Expand All @@ -662,7 +670,7 @@ mod tests {
.0;
manager.mark_completed([removed_chunk].into_iter());
let mut new_manager = ChunkManager::new(&root_dir);
new_manager.chunk_path(&random_files_dir)?;
new_manager.chunk_path(&random_files_dir, true)?;

// 1. we should have 1 completed chunk and (total_chunks_count-1) incomplete chunks
assert_eq!(manager.resumed_chunk_count, 0);
Expand All @@ -689,11 +697,11 @@ mod tests {
let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;

let _ = create_random_files(&random_files_dir, 5, 5)?;
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;
manager.mark_completed_all();

let mut new_manager = ChunkManager::new(&root_dir);
new_manager.chunk_path(&random_files_dir)?;
new_manager.chunk_path(&random_files_dir, true)?;

// 1. we should have chunk entries, but 0 chunks inside them
assert_eq!(new_manager.chunks.len(), 5);
Expand Down Expand Up @@ -721,7 +729,7 @@ mod tests {

let mut random_files = create_random_files(&random_files_dir, 1, 1)?;
let random_file = random_files.remove(0);
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;

let mut old_chunks_list = BTreeSet::new();
for entry in WalkDir::new(&manager.artifacts_dir).into_iter().flatten() {
Expand All @@ -740,7 +748,7 @@ mod tests {
fs::remove_file(&metadata_path)?;

// use the same manager to chunk the path
manager.chunk_path(&random_files_dir)?;
manager.chunk_path(&random_files_dir, true)?;
// nothing should be resumed
assert_eq!(manager.resumed_chunk_count, 0);
// but it should be re-chunked
Expand Down
49 changes: 34 additions & 15 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,27 +159,46 @@ async fn upload_files(
bail!("The wallet is empty. Cannot upload any files! Please transfer some funds into the wallet");
}
let mut chunk_manager = ChunkManager::new(root_dir);
chunk_manager.chunk_path(&files_path)?;
chunk_manager.chunk_path(&files_path, true)?;

// Return early if we have no chunks to upload
// Return early if we already uploaded them
let chunks_to_upload;
if chunk_manager.is_chunks_empty() {
println!("All files were already uploaded and verified");
println!("**************************************");
println!("* Uploaded Files *");
println!("**************************************");
for (file_name, addr) in chunk_manager.verified_files() {
if let Some(file_name) = file_name.to_str() {
println!("\"{file_name}\" {addr:x}");
info!("Uploaded {file_name} to {addr:x}");
} else {
println!("\"{file_name:?}\" {addr:x}");
info!("Uploaded {file_name:?} to {addr:x}");
// make sure we don't have any failed chunks in those
let chunks = chunk_manager.already_put_chunks(&files_path)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would create chunks in the artifacts dir and if they're not marked as completed when failed_chunks is not empty (L172), then during the next run, we'd get resume the chunks at line 162 and we'll try to upload them again (L166 returns false).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should we make chunk_manager.already_put_chunks(&files_path)?; create the chunks to a temp dir for verification?

That might avoid complication.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch! Marked the non-failed chunks as completed just below. And if they fail, it's reasonable to keep them around for next time!

let failed_chunks = client.verify_uploaded_chunks(&chunks, batch_size).await?;

// mark the non-failed ones as completed
chunk_manager.mark_completed(
chunks
.into_iter()
.filter(|c| !failed_chunks.contains(c))
.map(|(xor, _)| xor),
);

// if none are failed, we can return early
if failed_chunks.is_empty() {
println!("All files were already uploaded and verified");
println!("**************************************");
println!("* Uploaded Files *");
println!("**************************************");
for (file_name, addr) in chunk_manager.verified_files() {
if let Some(file_name) = file_name.to_str() {
println!("\"{file_name}\" {addr:x}");
info!("Uploaded {file_name} to {addr:x}");
} else {
println!("\"{file_name:?}\" {addr:x}");
info!("Uploaded {file_name:?} to {addr:x}");
}
}
return Ok(());
}
return Ok(());
println!("{:?} chunks were uploaded in the past but failed to verify. Will attempt to upload them again...", failed_chunks.len());
chunks_to_upload = failed_chunks;
} else {
chunks_to_upload = chunk_manager.get_chunks();
}

let chunks_to_upload = chunk_manager.get_chunks();
let chunks_to_upload_len = chunks_to_upload.len();

let progress_bar = get_progress_bar(chunks_to_upload.len() as u64)?;
Expand Down
45 changes: 45 additions & 0 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{
chunks::Error as ChunksError,
error::{Error, Result},
Client, ClientEvent, ClientEventsChannel, ClientEventsReceiver, ClientRegister, WalletClient,
};
use bls::{PublicKey, SecretKey, Signature};
use bytes::Bytes;
use futures::future::join_all;
use indicatif::ProgressBar;
use libp2p::{
identity::Keypair,
Expand All @@ -37,6 +39,7 @@ use sn_transfers::{CashNote, CashNoteRedemption, MainPubkey, NanoTokens, Payment
use std::{
collections::{HashMap, HashSet},
num::NonZeroUsize,
path::PathBuf,
time::Duration,
};
use tokio::task::spawn;
Expand Down Expand Up @@ -676,6 +679,48 @@ impl Client {
.await?;
Ok(cash_notes)
}

/// Verify that chunks were uploaded
///
/// Returns a vec of any chunks that could not be verified
pub async fn verify_uploaded_chunks(
&self,
chunks_paths: &[(XorName, PathBuf)],
batch_size: usize,
) -> Result<Vec<(XorName, PathBuf)>> {
let mut failed_chunks = Vec::new();

for chunks_batch in chunks_paths.chunks(batch_size) {
// now we try and get batched chunks, keep track of any that fail
// Iterate over each uploaded chunk
let mut verify_handles = Vec::new();
for (name, path) in chunks_batch.iter().cloned() {
let client = self.clone();
// Spawn a new task to fetch each chunk concurrently
let handle = tokio::spawn(async move {
let chunk_address: ChunkAddress = ChunkAddress::new(name);
// make sure the chunk is stored
let res = client.verify_chunk_stored(chunk_address).await;

Ok::<_, ChunksError>(((name, path), res.is_err()))
});
verify_handles.push(handle);
}

// Await all fetch tasks and collect the results
let verify_results = join_all(verify_handles).await;

// Check for any errors during fetch
for result in verify_results {
if let ((chunk_addr, path), true) = result?? {
warn!("Failed to fetch a chunk {chunk_addr:?}");
failed_chunks.push((chunk_addr, path));
}
}
}

Ok(failed_chunks)
}
}

fn get_register_from_record(record: Record) -> Result<SignedRegister> {
Expand Down
50 changes: 6 additions & 44 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,48 +204,6 @@ impl Files {
Ok((storage_cost, royalties_fees, new_balance))
}

/// Verify that chunks were uploaded
///
/// Returns a vec of any chunks that could not be verified
pub async fn verify_uploaded_chunks(
&self,
chunks_paths: Vec<(XorName, PathBuf)>,
batch_size: usize,
) -> Result<Vec<(XorName, PathBuf)>> {
let mut failed_chunks = Vec::new();

for chunks_batch in chunks_paths.chunks(batch_size) {
// now we try and get batched chunks, keep track of any that fail
// Iterate over each uploaded chunk
let mut verify_handles = Vec::new();
for (name, path) in chunks_batch.iter().cloned() {
let client = self.client().clone();
// Spawn a new task to fetch each chunk concurrently
let handle = tokio::spawn(async move {
let chunk_address: ChunkAddress = ChunkAddress::new(name);
// make sure the chunk is stored
let res = client.verify_chunk_stored(chunk_address).await;

Ok::<_, ChunksError>(((name, path), res.is_err()))
});
verify_handles.push(handle);
}

// Await all fetch tasks and collect the results
let verify_results = join_all(verify_handles).await;

// Check for any errors during fetch
for result in verify_results {
if let ((chunk_addr, path), true) = result?? {
warn!("Failed to fetch a chunk {chunk_addr:?}");
failed_chunks.push((chunk_addr, path));
}
}
}

Ok(failed_chunks)
}

// --------------------------------------------
// ---------- Private helpers -----------------
// --------------------------------------------
Expand Down Expand Up @@ -317,7 +275,10 @@ impl Files {
return Ok((net_addr, storage_cost, royalties_fees));
}

let mut failed_chunks = self.verify_uploaded_chunks(chunks, BATCH_SIZE).await?;
let mut failed_chunks = self
.client
.verify_uploaded_chunks(&chunks, BATCH_SIZE)
.await?;
warn!("Failed chunks: {:?}", failed_chunks.len());

while !failed_chunks.is_empty() {
Expand Down Expand Up @@ -357,7 +318,8 @@ impl Files {
trace!("Chunks uploaded again....");

failed_chunks = self
.verify_uploaded_chunks(failed_chunks, BATCH_SIZE)
.client
.verify_uploaded_chunks(&failed_chunks, BATCH_SIZE)
.await?;
}

Expand Down