Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: chunk files only once when making payment for their storage #501

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 50 additions & 55 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::wallet::pay_for_storage;
use super::wallet::{chunk_and_pay_for_storage, ChunkedFile};

use bytes::Bytes;
use clap::Parser;
use color_eyre::Result;
use sn_client::{Client, Files, PaymentProofsMap};
use sn_protocol::storage::ChunkAddress;
use walkdir::DirEntry;
use sn_protocol::storage::{Chunk, ChunkAddress};

use std::{
fs,
Expand Down Expand Up @@ -87,29 +86,40 @@

// The input files_path has to be a dir
let file_names_path = root_dir.join("uploaded_files");
let mut chunks_to_fetch = Vec::new();

// We make the payment for Chunks storage only if requested by the user
let mut payment_proofs = if pay {
pay_for_storage(&client, root_dir, &files_path).await?
} else {
PaymentProofsMap::default()
};
let (chunks_to_upload, payment_proofs) =
chunk_and_pay_for_storage(&client, root_dir, &files_path, pay).await?;

let mut at_least_one_file_exists = false;
for entry in WalkDir::new(files_path).into_iter().flatten() {
if entry.file_type().is_file() {
at_least_one_file_exists = true;
upload_file(&file_api, entry, &mut chunks_to_fetch, &mut payment_proofs).await?;
}
}
if !at_least_one_file_exists {
if chunks_to_upload.is_empty() {
println!(
"The provided path does not contain any file. Please check your path!\nExiting..."
);
return Ok(());
}

let mut chunks_to_fetch = Vec::new();
for (
file_addr,
ChunkedFile {
file_name,
size,
chunks,
},
) in chunks_to_upload.into_iter()
{
println!(
"Storing file '{file_name}' of {size} bytes ({} chunk/s)..",
chunks.len()
);

if let Err(error) = upload_chunks(&file_api, &file_name, chunks, &payment_proofs).await {
println!("Did not store all chunks of file '{file_name}' to all nodes in the close group: {error}")
}

println!("Successfully stored '{file_name}' to {file_addr:64x}");
chunks_to_fetch.push((file_addr, file_name));
}

let content = bincode::serialize(&chunks_to_fetch)?;
tokio::fs::create_dir_all(file_names_path.as_path()).await?;
let date_time = chrono::Local::now().format("%Y-%m-%d_%H-%M-%S").to_string();
Expand All @@ -120,46 +130,31 @@
Ok(())
}

/// Upload an individual file to the network.
async fn upload_file(
/// Upload chunks of an individual file to the network.
async fn upload_chunks(
file_api: &Files,
entry: DirEntry,
chunks_to_fetch: &mut Vec<(XorName, String)>,
payment_proofs: &mut PaymentProofsMap,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
payment_proofs: &PaymentProofsMap,
) -> Result<()> {
let file_name = if let Some(file_name) = entry.file_name().to_str() {
file_name.to_string()
} else {
println!(
"Skipping file {:?} as it is not valid UTF-8.",
entry.file_name()
);

return Ok(());
};

let file_path = entry.path();

let file = fs::read(file_path)?;
let bytes = Bytes::from(file);

println!("Storing file {file_name:?} of {} bytes..", bytes.len());

match file_api.upload_with_proof(bytes, payment_proofs).await {
Ok(address) => {
// Output address in hex string.
println!(
"Successfully stored file {:?} to {:64x}",
file_name,
address.name()
);
chunks_to_fetch.push((*address.name(), file_name));
}
Err(error) => {
println!("Did not store file {file_name:?} to all nodes in the close group! {error}")
}
};
let chunks_reader = chunks_paths
.into_iter()
.map(|(name, chunk_path)| (name, fs::read(chunk_path)))
.filter_map(|x| match x {
(name, Ok(file)) => Some(Chunk {
address: ChunkAddress::new(name),
value: Bytes::from(file),
}),
(_, Err(err)) => {
// FIXME: this error won't be seen/reported, thus assumed all chunks were read and stored.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
println!("Could not upload generated chunk of file '{file_name}': {err}");
None
}
});

file_api
.upload_chunks_in_batches(chunks_reader, payment_proofs, false)
.await?;
Ok(())
}

Expand Down
108 changes: 81 additions & 27 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
use clap::Parser;
use color_eyre::Result;
use std::{
collections::BTreeSet,
collections::BTreeMap,
fs,
io::Read,
path::{Path, PathBuf},
};
use walkdir::WalkDir;
use xor_name::XorName;

// Please do not remove the blank lines in these doc comments.
// They are used for inserting line breaks when the help menu is rendered in the UI.
Expand Down Expand Up @@ -70,7 +71,9 @@
WalletCmds::Balance => balance(root_dir).await?,
WalletCmds::Deposit { stdin } => deposit(root_dir, stdin).await?,
WalletCmds::Send { amount, to } => send(amount, to, client, root_dir).await?,
WalletCmds::Pay { path } => pay_for_storage(client, root_dir, &path).await.map(|_| ())?,
WalletCmds::Pay { path } => {
chunk_and_pay_for_storage(client, root_dir, &path, true).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -168,48 +171,99 @@
Ok(())
}

pub(super) async fn pay_for_storage(
pub(super) struct ChunkedFile {
pub file_name: String,
pub size: usize,
pub chunks: Vec<(XorName, PathBuf)>,
}

pub(super) async fn chunk_and_pay_for_storage(
client: &Client,
root_dir: &Path,
files_path: &Path,
) -> Result<PaymentProofsMap> {
pay: bool, // TODO: to be removed; temporarily payment is optional
Fixed Show fixed Hide fixed

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
) -> Result<(BTreeMap<XorName, ChunkedFile>, PaymentProofsMap)> {
let wallet = LocalWallet::load_from(root_dir).await?;
let mut wallet_client = WalletClient::new(client.clone(), wallet);
let file_api: Files = Files::new(client.clone());

// Get the list of Chunks addresses from the files found at 'files_path'
let mut chunks_addrs = BTreeSet::new();
let mut num_of_files = 0;
println!(
"Preparing (chunking) files at '{}'...",
files_path.display()
);
let chunks_dir = std::env::temp_dir();
let mut files_to_pay = BTreeMap::new();
let mut num_of_chunks = 0;
for entry in WalkDir::new(files_path).into_iter().flatten() {
if entry.file_type().is_file() {
let file_name = if let Some(file_name) = entry.file_name().to_str() {
file_name.to_string()
} else {
println!(
"Skipping file {:?} as it is not valid UTF-8.",
entry.file_name()
);
continue;
};

let file = fs::read(entry.path())?;
let bytes = Bytes::from(file);
// we need all chunks addresses not just the data-map addr
let (_, chunks) = file_api.chunk_bytes(bytes)?;
num_of_files += 1;
chunks.iter().for_each(|c| {
let _ = chunks_addrs.insert(*c.name());
});
let (file_addr, chunks) = file_api.chunk_bytes(bytes.clone())?;
let mut chunks_paths = vec![];
for c in chunks.iter() {
num_of_chunks += 1;
let xorname = *c.name();
// let's store the chunk on temp file for the user
// to be able to upload it to the network after making the payment,
// without needing to chunk the files again.
let path = chunks_dir.join(hex::encode(xorname));
fs::write(&path, c.value())?;
chunks_paths.push((xorname, path));
}

files_to_pay.insert(
file_addr,
ChunkedFile {
file_name,
size: bytes.len(),
chunks: chunks_paths,
},
);
}
}

println!(
"Making payment for {} Chunks (belonging to {} files)...",
chunks_addrs.len(),
num_of_files
);
let proofs = wallet_client.pay_for_storage(chunks_addrs.iter()).await?;

let wallet = wallet_client.into_wallet();
let new_balance = wallet.balance();
// For now, we make the payment for Chunks storage only if requested by the user
let proofs = if pay {
println!(
"Making payment for {num_of_chunks} Chunks (belonging to {} files)...",
files_to_pay.len()
);

let proofs = wallet_client
.pay_for_storage(
files_to_pay
.values()
.flat_map(|chunked_file| &chunked_file.chunks)
.map(|(name, _)| name),
)
.await?;

let wallet = wallet_client.into_wallet();
let new_balance = wallet.balance();

if let Err(err) = wallet.store().await {
println!("Failed to store wallet: {err:?}");
} else {
println!("Successfully stored wallet with new balance {new_balance}.");
}

if let Err(err) = wallet.store().await {
println!("Failed to store wallet: {err:?}");
println!("Successfully paid for storage and generated the proofs. They can now be sent to the storage nodes when uploading paid chunks.");
proofs
} else {
println!("Successfully stored wallet with new balance {new_balance}.");
}

println!("Successfully paid for storage and generated the proofs. They can now be sent to the storage nodes when uploading paid chunks.");
PaymentProofsMap::default()
};

Ok(proofs)
Ok((files_to_pay, proofs))
}