Skip to content

Commit

Permalink
refactor: client to upload paid chunks in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Jul 11, 2023
1 parent 32a6502 commit c58db73
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 77 deletions.
59 changes: 41 additions & 18 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::wallet::chunk_and_pay_for_storage;
use super::wallet::{chunk_and_pay_for_storage, ChunkedFile};

use bytes::Bytes;
use clap::Parser;
use color_eyre::Result;
use sn_client::{Client, Files, PaymentProofsMap};
use sn_protocol::storage::ChunkAddress;
use sn_protocol::storage::{Chunk, ChunkAddress};

use std::{
fs,
Expand Down Expand Up @@ -98,11 +98,26 @@ async fn upload_files(
}

let mut chunks_to_fetch = Vec::new();
for ((src_filename, file_addr, size), chunks_paths) in chunks_to_upload.into_iter() {
println!("Storing file {src_filename} of {size} bytes..");
upload_chunks(&file_api, chunks_paths, &src_filename, &payment_proofs).await?;
chunks_to_fetch.push((file_addr, src_filename.clone()));
println!("Successfully stored {src_filename} to {file_addr:64x}",);
for (
file_addr,
ChunkedFile {
file_name,
size,
chunks,
},
) in chunks_to_upload.into_iter()
{
println!(
"Storing file '{file_name}' of {size} bytes ({} chunk/s)..",
chunks.len()
);

if let Err(error) = upload_chunks(&file_api, &file_name, chunks, &payment_proofs).await {
println!("Did not store all chunks of file '{file_name}' to all nodes in the close group: {error}")
}

println!("Successfully stored '{file_name}' to {file_addr:64x}");
chunks_to_fetch.push((file_addr, file_name));
}

let content = bincode::serialize(&chunks_to_fetch)?;
Expand All @@ -115,23 +130,31 @@ async fn upload_files(
Ok(())
}

/// Upload an individual file to the network.
/// Upload chunks of an individual file to the network.
async fn upload_chunks(
file_api: &Files,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
src_filename: &str,
payment_proofs: &PaymentProofsMap,
) -> Result<()> {
for (_, chunk_path) in chunks_paths {
let file = fs::read(chunk_path)?;
let bytes = Bytes::from(file);
if let Err(error) = file_api.upload_with_proof(bytes, payment_proofs).await {
println!(
"Did not store chunk of file {src_filename} to all nodes in the close group: {error}"
)
}
}
let chunks_reader = chunks_paths
.into_iter()
.map(|(name, chunk_path)| (name, fs::read(chunk_path)))
.filter_map(|x| match x {
(name, Ok(file)) => Some(Chunk {
address: ChunkAddress::new(name),
value: Bytes::from(file),
}),
(_, Err(err)) => {
// FIXME: this error won't be seen/reported, thus assumed all chunks were read and stored.

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
println!("Could not upload generated chunk of file '{file_name}': {err}");
None
}
});

file_api
.upload_chunks_in_batches(chunks_reader, payment_proofs, false)
.await?;
Ok(())
}

Expand Down
33 changes: 24 additions & 9 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ pub(crate) async fn wallet_cmds(cmds: WalletCmds, client: &Client, root_dir: &Pa
WalletCmds::Balance => balance(root_dir).await?,
WalletCmds::Deposit { stdin } => deposit(root_dir, stdin).await?,
WalletCmds::Send { amount, to } => send(amount, to, client, root_dir).await?,
WalletCmds::Pay { path } => chunk_and_pay_for_storage(client, root_dir, &path, true)
.await
.map(|_| ())?,
WalletCmds::Pay { path } => {
chunk_and_pay_for_storage(client, root_dir, &path, true).await?;
}
}
Ok(())
}
Expand Down Expand Up @@ -171,15 +171,18 @@ async fn send(amount: String, to: String, client: &Client, root_dir: &Path) -> R
Ok(())
}

pub(super) struct ChunkedFile {
pub file_name: String,
pub size: usize,
pub chunks: Vec<(XorName, PathBuf)>,
}

pub(super) async fn chunk_and_pay_for_storage(
client: &Client,
root_dir: &Path,
files_path: &Path,
pay: bool, // TODO: to be removed; temporarily payment is optional

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
) -> Result<(
BTreeMap<(String, XorName, usize), Vec<(XorName, PathBuf)>>,
PaymentProofsMap,
)> {
) -> Result<(BTreeMap<XorName, ChunkedFile>, PaymentProofsMap)> {
let wallet = LocalWallet::load_from(root_dir).await?;
let mut wallet_client = WalletClient::new(client.clone(), wallet);
let file_api: Files = Files::new(client.clone());
Expand Down Expand Up @@ -216,7 +219,14 @@ pub(super) async fn chunk_and_pay_for_storage(
chunks_paths.push((xorname, path));
}

files_to_pay.insert((file_name, file_addr, bytes.len()), chunks_paths);
files_to_pay.insert(
file_addr,
ChunkedFile {
file_name,
size: bytes.len(),
chunks: chunks_paths,
},
);
}
}

Expand All @@ -228,7 +238,12 @@ pub(super) async fn chunk_and_pay_for_storage(
);

let proofs = wallet_client
.pay_for_storage(files_to_pay.values().flatten().map(|(name, _)| name))
.pay_for_storage(
files_to_pay
.values()
.flat_map(|chunked_file| &chunked_file.chunks)
.map(|(name, _)| name),
)
.await?;

let wallet = wallet_client.into_wallet();
Expand Down
114 changes: 66 additions & 48 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,68 @@ impl Files {
}
}

/// Directly writes Chunks to the network in the
/// form of immutable self encrypted chunks.
#[instrument(skip_all, level = "trace")]
pub async fn upload_chunks_in_batches(
&self,
chunks: impl Iterator<Item = Chunk>,
payment_proofs: &PaymentProofsMap,
verify: bool,
) -> Result<()> {
trace!("Client upload in batches started");
let mut tasks = vec![];
let mut count = 0;
for chunk in chunks {
count += 1;
let client = self.client.clone();
let chunk_addr = *chunk.address();
let payment = payment_proofs.get(chunk_addr.name()).cloned();
// TODO: re-enable requirement to always provide payment proof

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
//.ok_or(super::Error::MissingPaymentProof(chunk_addr))?;

tasks.push(task::spawn(async move {
client.store_chunk(chunk, payment).await?;
if verify {
let _ = client.get_chunk(chunk_addr).await?;
}
Ok::<(), super::error::Error>(())
}));

if count == CHUNKS_BATCH_MAX_SIZE {
let tasks_to_poll = tasks;
let responses = join_all(tasks_to_poll)
.await
.into_iter()
.flatten() // swallows errors
.collect_vec();

for res in responses {
// fail with any issue here
res?;
}
tasks = vec![];
count = 0;
}
}

if count > 0 {
let responses = join_all(tasks)
.await
.into_iter()
.flatten() // swallows errors
.collect_vec();

for res in responses {
// fail with any issue here
res?;
}
}

trace!("Client upload in batches completed");
Ok(())
}

// --------------------------------------------
// ---------- Private helpers -----------------
// --------------------------------------------
Expand All @@ -147,7 +209,10 @@ impl Files {
let file = SmallFile::new(bytes)?;
self.upload_small(file, payment_proofs, verify).await
} else {
self.upload_large(bytes, payment_proofs, verify).await
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), payment_proofs, verify)
.await?;
Ok(ChunkAddress::new(head_address))
}
}

Expand Down Expand Up @@ -175,53 +240,6 @@ impl Files {
Ok(address)
}

/// Directly writes a [`LargeFile`] to the network in the
/// form of immutable self encrypted chunks, without any batching.
#[instrument(skip_all, level = "trace")]
async fn upload_large(
&self,
large: Bytes,
payment_proofs: &PaymentProofsMap,
verify: bool,
) -> Result<ChunkAddress> {
let (head_address, mut all_chunks) = encrypt_large(large)?;
trace!("Client upload started");
while !all_chunks.is_empty() {
let chop_size = std::cmp::min(CHUNKS_BATCH_MAX_SIZE, all_chunks.len());
let next_batch: Vec<Chunk> = all_chunks.drain(..chop_size).collect();

let mut tasks = vec![];
for chunk in next_batch {
let client = self.client.clone();
let chunk_addr = *chunk.address();
let payment = payment_proofs.get(chunk_addr.name()).cloned();
// TODO: re-enable requirement to always provide payment proof
//.ok_or(super::Error::MissingPaymentProof(chunk_addr))?;

tasks.push(task::spawn(async move {
client.store_chunk(chunk, payment).await?;
if verify {
let _ = client.get_chunk(chunk_addr).await?;
}
Ok::<(), super::error::Error>(())
}));
}

let responses = join_all(tasks)
.await
.into_iter()
.flatten() // swallows errors
.collect_vec();

for res in responses {
// fail with any issue here
res?;
}
}
trace!("Client upload completed");
Ok(ChunkAddress::new(head_address))
}

// Verify a chunk is stored at provided address
async fn verify_chunk_is_stored(&self, address: ChunkAddress) -> Result<()> {
let _ = self.client.get_chunk(address).await?;
Expand Down
4 changes: 2 additions & 2 deletions sn_protocol/src/storage/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use xor_name::XorName;
pub struct Chunk {
/// Network address. Omitted when serialising and
/// calculated from the `value` when deserialising.
address: ChunkAddress,
pub address: ChunkAddress,
/// Contained data.
#[debug(skip)]
value: Bytes,
pub value: Bytes,
}

impl Chunk {
Expand Down

0 comments on commit c58db73

Please sign in to comment.