Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): skip payment and upload for existing chunks #1069

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ on:
branches: ["*"]

env:
SAFE_DATA_PATH: /home/runner/.local/share/safe
CLIENT_DATA_PATH: /home/runner/.local/share/safe/client
NODE_DATA_PATH: /home/runner/.local/share/safe/node
BOOTSTRAP_NODE_DATA_PATH: /home/runner/.local/share/safe/bootstrap_node
Expand Down Expand Up @@ -101,6 +102,23 @@ jobs:
SN_LOG: "all"
timeout-minutes: 25

# Uploading same file using different client shall not incur any payment neither uploads
# Note rg will throw an error directly in case of failed to find a matching pattern.
- name: Start a different client to upload the same file
run: |
mv $CLIENT_DATA_PATH $SAFE_DATA_PATH/client_first
cargo run --bin faucet --release -- --log-output-dest=data-dir send 5000000 $(cargo run --bin safe --release -- --log-output-dest=data-dir wallet address | tail -n 1) > initial_balance_from_faucet_1.txt
cat initial_balance_from_faucet_1.txt
cat initial_balance_from_faucet_1.txt | tail -n 1 > transfer_hex
cat transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir wallet receive --file transfer_hex
cargo run --bin safe --release -- --log-output-dest=data-dir files upload "./the-test-data.zip" --show-holders > second_upload.txt
cat second_upload.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this a full test and check the output here has no spend? (or am I missing that?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added an rg check to confirm the balance of wallet remain unchanged

rg "New wallet balance: 5000000.000000000" second_upload.txt -c --stats
env:
SN_LOG: "all"
timeout-minutes: 25

- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
env:
Expand Down
23 changes: 18 additions & 5 deletions sn_cli/src/subcommands/files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ async fn upload_files(
if chunk_manager.is_chunks_empty() {
// make sure we don't have any failed chunks in those
let chunks = chunk_manager.already_put_chunks(&files_path)?;
println!(
"Files upload attempted previously, verifying {} chunks",
chunks.len()
);
let failed_chunks = client.verify_uploaded_chunks(&chunks, batch_size).await?;

// mark the non-failed ones as completed
Expand Down Expand Up @@ -214,6 +218,7 @@ async fn upload_files(

// Task set to add and remove chunks from the chunk manager
let mut uploading_chunks = FuturesUnordered::new();
let mut total_exist_chunks = 0;

for chunks_batch in chunks_batches {
// while we dont have a full batch_size of ongoing uploading_chunks
Expand All @@ -236,18 +241,19 @@ async fn upload_files(
}

// pay for and verify payment... if we don't verify here, chunks uploads will surely fail
match file_api
let skipped_chunks = match file_api
.pay_for_chunks(chunks_batch.iter().map(|(name, _)| *name).collect())
.await
{
Ok((storage_cost, royalties_fees, new_balance)) => {
Ok(((storage_cost, royalties_fees, new_balance), skipped_chunks)) => {
final_balance = new_balance;
total_cost = total_cost
.checked_add(storage_cost)
.ok_or_else(|| eyre!("Unable to add cost to total cost"))?;
total_royalties = total_royalties
.checked_add(royalties_fees)
.ok_or_else(|| eyre!("Unable to add cost to total royalties fees"))?;
skipped_chunks
}
Err(ClientError::Transfers(WalletError::Transfer(
TransfersError::NotEnoughBalance(available, required),
Expand All @@ -259,10 +265,17 @@ async fn upload_files(
}
};

let mut chunks_to_upload = chunks_batch.to_vec();
chunks_to_upload.retain(|(name, _)| !skipped_chunks.contains(name));

total_exist_chunks += skipped_chunks.len();
progress_bar.inc(skipped_chunks.len() as u64);
chunk_manager.mark_completed(skipped_chunks.into_iter());

// upload paid chunks
let upload_tasks = upload_chunks_in_parallel(
&file_api,
chunks_batch.to_vec(),
chunks_to_upload,
verify_store,
&progress_bar,
show_holders,
Expand Down Expand Up @@ -341,8 +354,8 @@ async fn upload_files(
file.flush()?;

let elapsed = format_elapsed_time(now.elapsed());
println!("Uploaded {chunks_to_upload_len} chunks in {elapsed}");
info!("Uploaded {chunks_to_upload_len} chunks in {elapsed}");
println!("Uploaded {chunks_to_upload_len} chunks (with {total_exist_chunks} exist chunks) in {elapsed}");
info!("Uploaded {chunks_to_upload_len} chunks (with {total_exist_chunks} exist chunks) in {elapsed}");

println!("**************************************");
println!("* Payment Details *");
Expand Down
10 changes: 5 additions & 5 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ impl Files {
pub async fn pay_for_chunks(
&self,
chunks: Vec<XorName>,
) -> Result<(NanoTokens, NanoTokens, NanoTokens)> {
) -> Result<((NanoTokens, NanoTokens, NanoTokens), Vec<XorName>)> {
let mut wallet_client = self.wallet()?;
info!("Paying for and uploading {:?} chunks", chunks.len());

let (storage_cost, royalties_fees) =
let ((storage_cost, royalties_fees), skipped_chunks) =
wallet_client
.pay_for_storage(chunks.iter().map(|name| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name))
Expand All @@ -201,7 +201,7 @@ impl Files {

wallet_client.store_local_wallet()?;
let new_balance = wallet_client.balance();
Ok((storage_cost, royalties_fees, new_balance))
Ok(((storage_cost, royalties_fees, new_balance), skipped_chunks))
}

// --------------------------------------------
Expand Down Expand Up @@ -241,7 +241,7 @@ impl Files {
verify: bool,
) -> Result<(NetworkAddress, NanoTokens, NanoTokens)> {
// initial payment
let (mut storage_cost, mut royalties_fees) = self
let ((mut storage_cost, mut royalties_fees), _skipped) = self
.wallet()?
.pay_for_storage(
chunks
Expand Down Expand Up @@ -285,7 +285,7 @@ impl Files {
info!("Repaying for {:?} chunks, so far paid {storage_cost} (royalties fees: {royalties_fees})", failed_chunks.len());

// Now we pay again or top up, depending on the new current store cost is
let (new_storage_cost, new_royalties_fees) = self
let ((new_storage_cost, new_royalties_fees), _skipped) = self
.wallet()?
.pay_for_storage(failed_chunks.iter().map(|(addr, _path)| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*addr))
Expand Down
2 changes: 1 addition & 1 deletion sn_client/src/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl ClientRegister {
// Let's check if the user has already paid for this address first
let net_addr = sn_protocol::NetworkAddress::RegisterAddress(addr);
// Let's make the storage payment
(storage_cost, royalties_fees) = wallet_client
((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(std::iter::once(net_addr.clone()))
.await?;
let cost = storage_cost
Expand Down
22 changes: 17 additions & 5 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl WalletClient {
pub async fn pay_for_storage(
&mut self,
content_addrs: impl Iterator<Item = NetworkAddress>,
) -> WalletResult<(NanoTokens, NanoTokens)> {
) -> WalletResult<((NanoTokens, NanoTokens), Vec<XorName>)> {
let verify_store = true;
let c: Vec<_> = content_addrs.collect();
let mut last_err = "No retries".to_string();
Expand All @@ -166,11 +166,14 @@ impl WalletClient {
Err(WalletError::CouldNotSendMoney(last_err))
}

/// Existing chunks will have a store cost to be Zero.
/// The payement procedure shall be skipped, and the chunk upload as well.
/// Hence the list of existing chunks will be returned.
async fn pay_for_storage_once(
&mut self,
content_addrs: impl Iterator<Item = NetworkAddress>,
verify_store: bool,
) -> WalletResult<(NanoTokens, NanoTokens)> {
) -> WalletResult<((NanoTokens, NanoTokens), Vec<XorName>)> {
// get store cost from network in parrallel
let mut tasks = JoinSet::new();
for content_addr in content_addrs {
Expand All @@ -190,12 +193,18 @@ impl WalletClient {

// collect store costs
let mut cost_map = BTreeMap::default();
let mut skipped_chunks = vec![];
while let Some(res) = tasks.join_next().await {
match res {
Ok((content_addr, Ok(cost))) => {
if let Some(xorname) = content_addr.as_xorname() {
let _ = cost_map.insert(xorname, cost);
debug!("Storecost inserted into payment map for {content_addr:?}");
if cost.1.cost == NanoTokens::zero() {
skipped_chunks.push(xorname);
debug!("Skipped existing chunk {content_addr:?}");
} else {
let _ = cost_map.insert(xorname, cost);
debug!("Storecost inserted into payment map for {content_addr:?}");
}
} else {
warn!("Cannot get store cost for a content that is not a data type: {content_addr:?}");
}
Expand All @@ -214,7 +223,10 @@ impl WalletClient {
info!("Storecosts retrieved");

// pay for records
self.pay_for_records(&cost_map, verify_store).await
Ok((
self.pay_for_records(&cost_map, verify_store).await?,
skipped_chunks,
))
}

/// Send tokens to nodes closest to the data we want to make storage payment for.
Expand Down
17 changes: 7 additions & 10 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,13 @@ impl Node {
Query::GetStoreCost(address) => {
trace!("Got GetStoreCost request for {address:?}");

let record_exists = {
if let Some(key) = address.as_record_key() {
match network.is_record_key_present_locally(&key).await {
Ok(res) => res,
Err(error) => {
error!("Problem getting record key's existence: {error:?}");
false
}
}
} else {
let record_exists = match network
.is_record_key_present_locally(&address.to_record_key())
.await
{
Ok(res) => res,
Err(error) => {
error!("Problem getting record key's existence: {error:?}");
false
}
};
Expand Down
4 changes: 2 additions & 2 deletions sn_node/tests/storage_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn storage_payment_proofs_cached_in_wallet() -> Result<()> {
// let's first pay only for a subset of the addresses
let subset_len = random_content_addrs.len() / 3;
println!("Paying for {subset_len} random addresses...",);
let (storage_cost, royalties_fees) = wallet_client
let ((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(random_content_addrs.clone().into_iter().take(subset_len))
.await?;

Expand All @@ -151,7 +151,7 @@ async fn storage_payment_proofs_cached_in_wallet() -> Result<()> {

// now let's request to pay for all addresses, even that we've already paid for a subset of them
let mut wallet_client = WalletClient::new(client.clone(), paying_wallet);
let (storage_cost, royalties_fees) = wallet_client
let ((storage_cost, royalties_fees), _) = wallet_client
.pay_for_storage(random_content_addrs.clone().into_iter())
.await?;
let total_cost = storage_cost
Expand Down