Skip to content

Commit

Permalink
chore(client): pass around content payments map mut ref
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Aug 25, 2023
1 parent 9f59689 commit 8eab1f5
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 31 deletions.
6 changes: 3 additions & 3 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn upload_files(
let file_names_path = root_dir.join("uploaded_files");

// Payment shall always be verified.
let (chunks_to_upload, content_payments_map) =
let (chunks_to_upload, mut content_payments_map) =
chunk_and_pay_for_storage(&client, root_dir, &files_path, true).await?;

let mut chunks_to_fetch = Vec::new();
Expand All @@ -118,7 +118,7 @@ async fn upload_files(
&file_api,
&file_name,
chunks,
&content_payments_map,
&mut content_payments_map,
verify_store,
)
.await
Expand Down Expand Up @@ -146,7 +146,7 @@ async fn upload_chunks(
file_api: &Files,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
content_payments_map: &ContentPaymentsMap,
content_payments_map: &mut ContentPaymentsMap,
verify_store: bool,
) -> Result<()> {
let chunks_reader = chunks_paths
Expand Down
17 changes: 8 additions & 9 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl Files {
pub async fn upload_with_payments(
&self,
bytes: Bytes,
content_payments_map: &ContentPaymentsMap,
content_payments_map: ContentPaymentsMap,
verify_store: bool,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, content_payments_map, verify_store)
Expand All @@ -114,9 +114,9 @@ impl Files {
pub async fn upload_and_verify(
&self,
bytes: Bytes,
transfer_outputs_map: &ContentPaymentsMap,
content_payments_map: ContentPaymentsMap,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, transfer_outputs_map, true).await
self.upload_bytes(bytes, content_payments_map, true).await
}

/// Calculates a LargeFile's/SmallFile's address from self encrypted chunks,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl Files {
pub async fn upload_chunks_in_batches(
&self,
chunks: impl Iterator<Item = Chunk>,
content_payments_map: &ContentPaymentsMap,
content_payments_map: &mut ContentPaymentsMap,
verify_store: bool,
) -> Result<()> {
trace!("Client upload in batches started");
Expand All @@ -156,8 +156,7 @@ impl Files {
let client = self.client.clone();
let chunk_addr = chunk.network_address();
let payment = content_payments_map
.get(&chunk_addr)
.cloned()
.remove(&chunk_addr)
.ok_or(super::Error::MissingPaymentProof(format!("{chunk_addr}")))?;

trace!("Payment for {chunk:?}: {:?}", payment);
Expand Down Expand Up @@ -195,15 +194,15 @@ impl Files {
async fn upload_bytes(
&self,
bytes: Bytes,
content_payments_map: &ContentPaymentsMap,
mut content_payments_map: ContentPaymentsMap,
verify: bool,
) -> Result<NetworkAddress> {
if bytes.len() < MIN_ENCRYPTABLE_BYTES {
let file = SmallFile::new(bytes)?;
self.upload_small(file, content_payments_map, verify).await
} else {
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), content_payments_map, verify)
self.upload_chunks_in_batches(chunks.into_iter(), &mut content_payments_map, verify)
.await?;
Ok(NetworkAddress::ChunkAddress(ChunkAddress::new(
head_address,
Expand All @@ -217,7 +216,7 @@ impl Files {
async fn upload_small(
&self,
small: SmallFile,
content_payments_map: &ContentPaymentsMap,
content_payments_map: ContentPaymentsMap,
verify_store: bool,
) -> Result<NetworkAddress> {
let chunk = package_small(small)?;
Expand Down
29 changes: 26 additions & 3 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use sn_transfers::client_transfers::{ContentPaymentsMap, SpendRequest};
use sn_transfers::client_transfers::{ContentPaymentsIdMap, ContentPaymentsMap, SpendRequest};

use super::Client;

Expand All @@ -19,7 +19,7 @@ use sn_transfers::{

use futures::future::join_all;
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
iter::Iterator,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -185,7 +185,30 @@ impl WalletClient {
let elapsed = now.elapsed();
println!("After {elapsed:?}, All transfers made for total payment of {total_cost:?} nano tokens. ");

Ok((address_payment_map, total_cost))
let content_payment_map =
Self::build_content_payments_map(address_payment_map, transfers.created_dbcs);

Ok((content_payment_map, total_cost))
}

/// Build ContentPayment map from ContentPaymentId map and TransferOutputs
pub fn build_content_payments_map(
content_payments_id_map: ContentPaymentsIdMap,
created_dbcs: Vec<Dbc>,
) -> ContentPaymentsMap {
let mut content_payments_map = ContentPaymentsMap::new();
let mut dbc_map: HashMap<_, Dbc> = created_dbcs
.into_iter()
.map(|dbc| (dbc.id().to_bytes(), dbc))
.collect();
for (network_address, dbc_ids) in content_payments_id_map {
let dbcs = dbc_ids
.into_iter()
.filter_map(|dbc_id| dbc_map.remove(&dbc_id.to_bytes()))
.collect();
content_payments_map.insert(network_address, dbcs);
}
content_payments_map
}

/// Resend failed txs
Expand Down
4 changes: 2 additions & 2 deletions sn_node/tests/data_with_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ fn store_chunks_task(
);
sleep(delay).await;

let (proofs, cost) = wallet_client
let (mut content_payments_map, cost) = wallet_client
.pay_for_storage(chunks.iter().map(|c| c.network_address()), true)
.await
.expect("Failed to pay for storage for new file at {addr:?}");
Expand All @@ -372,7 +372,7 @@ fn store_chunks_task(
sleep(delay).await;

match file_api
.upload_chunks_in_batches(chunks.into_iter(), &proofs, false)
.upload_chunks_in_batches(chunks.into_iter(), &mut content_payments_map, false)
.await
{
Ok(()) => content
Expand Down
19 changes: 13 additions & 6 deletions sn_node/tests/storage_payments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn storage_payment_fails_with_insufficient_money() -> Result<()> {
// now let's request to upload all addresses, even that we've already paid for a subset of them
let verify_store = false;
let res = files_api
.upload_with_payments(content_bytes, &subset_of_transfer_outputs_map, verify_store)
.upload_with_payments(content_bytes, subset_of_transfer_outputs_map, verify_store)
.await;
assert!(
res.is_err(),
Expand Down Expand Up @@ -210,7 +210,7 @@ async fn storage_payment_chunk_upload_succeeds() -> Result<()> {
.await?;

files_api
.upload_with_payments(content_bytes, &transfer_outputs_map, true)
.upload_with_payments(content_bytes, transfer_outputs_map, true)
.await?;

files_api.read_bytes(content_addr).await?;
Expand Down Expand Up @@ -246,18 +246,25 @@ async fn storage_payment_chunk_upload_fails() -> Result<()> {
);
}

let (bad_transfer_outputs, contents_payment_map) = wallet_client
let (bad_transfer_outputs, contents_payment_id_map) = wallet_client
.into_wallet()
.local_send_storage_payment(no_data_payments, None)
.await?;

// invalid spends
client.send(bad_transfer_outputs.clone(), false).await?;
client
.send(&bad_transfer_outputs.all_spend_requests, true)
.await?;

sleep(Duration::from_secs(5)).await;
// it should fail to store as the amount paid is not enough

let contents_payments_map = WalletClient::build_content_payments_map(
contents_payment_id_map,
bad_transfer_outputs.created_dbcs,
);
// this should fail to store as the amount paid is not enough
files_api
.upload_with_payments(content_bytes.clone(), &contents_payment_map, false)
.upload_with_payments(content_bytes.clone(), contents_payments_map, false)
.await?;

assert!(matches!(
Expand Down
6 changes: 4 additions & 2 deletions sn_node/tests/verify_data_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async fn store_chunks(
bytes.len()
);

let (proofs, _) = wallet_client
let (contents_payment_map, _) = wallet_client
.pay_for_storage(chunks.iter().map(|c| c.network_address()), true)
.await
.expect("Failed to pay for storage for new file at {addr:?}");
Expand All @@ -352,7 +352,9 @@ async fn store_chunks(

let addr = ChunkAddress::new(file_api.calculate_address(bytes.clone())?);
let key = PrettyPrintRecordKey::from(RecordKey::new(addr.xorname()));
file_api.upload_with_payments(bytes, &proofs, true).await?;
file_api
.upload_with_payments(bytes, contents_payment_map, true)
.await?;
uploaded_chunks_count += 1;

println!("Stored Chunk with {addr:?} / {key:?}");
Expand Down
5 changes: 4 additions & 1 deletion sn_transfers/src/client_transfers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ use std::collections::BTreeMap;
pub(crate) use self::error::{Error, Result};
pub use self::transfer::create_transfer;

use sn_dbc::{Dbc, DbcTransaction, DerivationIndex, DerivedKey, PublicAddress, SignedSpend, Token};
use sn_dbc::{
Dbc, DbcId, DbcTransaction, DerivationIndex, DerivedKey, PublicAddress, SignedSpend, Token,
};
use sn_protocol::NetworkAddress;

pub type ContentPaymentsIdMap = BTreeMap<NetworkAddress, Vec<DbcId>>;
pub type ContentPaymentsMap = BTreeMap<NetworkAddress, Vec<Dbc>>;

/// The input details necessary to
Expand Down
12 changes: 7 additions & 5 deletions sn_transfers/src/wallet/local_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use super::{
},
ContentPaymentsMap, Error, KeyLessWallet, Result,
};
use crate::client_transfers::{create_transfer, TransferOutputs};
use crate::client_transfers::{create_transfer, ContentPaymentsIdMap, TransferOutputs};
use itertools::Itertools;
use sn_dbc::{random_derivation_index, Dbc, DerivedKey, Hash, MainKey, PublicAddress, Token};
use sn_dbc::{
random_derivation_index, Dbc, DbcId, DerivedKey, Hash, MainKey, PublicAddress, Token,
};
use sn_protocol::NetworkAddress;

use std::{
Expand Down Expand Up @@ -171,7 +173,7 @@ impl LocalWallet {
&mut self,
all_data_payments: BTreeMap<NetworkAddress, Vec<(PublicAddress, Token)>>,
reason_hash: Option<Hash>,
) -> Result<(TransferOutputs, ContentPaymentsMap)> {
) -> Result<(TransferOutputs, ContentPaymentsIdMap)> {
let mut rng = &mut rand::thread_rng();

// create a unique key for each output
Expand Down Expand Up @@ -210,10 +212,10 @@ impl LocalWallet {
dbc.public_address() == &payee && !used_dbcs.contains(&dbc.id().to_bytes())
}) {
used_dbcs.insert(dbc.id().to_bytes());
let dbcs_for_content: &mut Vec<Dbc> = all_transfers_per_address
let dbcs_for_content: &mut Vec<DbcId> = all_transfers_per_address
.entry(content_addr.clone())
.or_default();
dbcs_for_content.push(dbc.clone());
dbcs_for_content.push(dbc.id());
}
}
}
Expand Down

0 comments on commit 8eab1f5

Please sign in to comment.