Skip to content

Commit

Permalink
feat!(protocol): get price and pay for each chunk individually
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Aug 25, 2023
1 parent a0b000c commit 913c81d
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 196 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
cargo run --bin safe --release -- --log-output-dest=data-dir files upload -- "./target/release/testnet"
env:
SN_LOG: "all"
timeout-minutes: 25
timeout-minutes: 40

- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
Expand Down
13 changes: 6 additions & 7 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use sn_client::{Client, Files, WalletClient};
use sn_dbc::Token;
use sn_protocol::storage::ChunkAddress;
use sn_transfers::wallet::{parse_public_address, LocalWallet, PaymentTransactionsMap};

use bytes::Bytes;
Expand Down Expand Up @@ -307,20 +308,18 @@ pub(super) async fn chunk_and_pay_for_storage(
chunked_files
.values()
.flat_map(|chunked_file| &chunked_file.chunks)
.map(|(name, _)| name),
.map(|(name, _)| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name))
}),
verify_store,
)
.await?;

if let Some(cost) = cost {
let total_cost = proofs.len() as u64 * cost.as_nano();
println!(
let total_cost = proofs.len() as u64 * cost.as_nano();
println!(
"Successfully made payment of {total_cost} for {} records. (At a cost per record of {cost:?}.)",
proofs.len(),
);
} else {
println!("No payment needed for {} records.", proofs.len(),);
}

let wallet = wallet_client.into_wallet();
if let Err(err) = wallet.store().await {
Expand Down
28 changes: 5 additions & 23 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use sn_dbc::{DbcId, SignedSpend, Token};
use sn_networking::{multiaddr_is_global, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE};
use sn_protocol::{
error::Error as ProtocolError,
messages::PaymentTransactions,
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, ChunkWithPayment,
DbcAddress, RecordHeader, RecordKind, RegisterAddress,
Expand Down Expand Up @@ -62,7 +61,6 @@ impl Client {
signer,
peers_added: 0,
progress: Some(Self::setup_connection_progress()),
network_store_cost: 0,
};

// subscribe to our events channel first, so we don't have intermittent
Expand Down Expand Up @@ -273,7 +271,7 @@ impl Client {
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payment: PaymentTransactions,
payment: Vec<DbcId>,
verify_store: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
Expand Down Expand Up @@ -401,32 +399,16 @@ impl Client {
}

/// Get the store cost at a given address
/// Replaces current network_store_cost with the new one, unless average is set to true
pub async fn get_store_cost_at_address(
&mut self,
address: NetworkAddress,
only_update_cost_if_higher: bool,
) -> Result<Token> {
trace!("Getting store cost at {address:?}, will update only if higher cost?: {only_update_cost_if_higher:?}");

// if we're averaging over many samples across the network, any cost will do
let any_cost_will_do = only_update_cost_if_higher;
pub async fn get_store_cost_at_address(&self, address: &NetworkAddress) -> Result<Token> {
trace!("Getting store cost at {address:?}");

let cost = self
.network
.get_store_cost_from_network(address.clone(), any_cost_will_do)
.get_store_cost_from_network(address.clone())
.await?
.as_nano();

if cost > self.network_store_cost {
self.network_store_cost = cost;
}

if !only_update_cost_if_higher {
self.network_store_cost = cost;
}

trace!("Set store cost: {}", self.network_store_cost);
trace!("Store cost at address {address:?} is: {cost:?}");

Ok(Token::from_nano(cost))
}
Expand Down
3 changes: 1 addition & 2 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub(crate) type Result<T> = std::result::Result<T, Error>;

use super::ClientEvent;

use sn_protocol::storage::ChunkAddress;
use sn_registers::{Entry, EntryHash};

use std::collections::BTreeSet;
Expand Down Expand Up @@ -58,7 +57,7 @@ pub enum Error {
ContentBranchDetected(BTreeSet<(EntryHash, Entry)>),

#[error("Missing a payment proof for address {0:?}")]
MissingPaymentProof(ChunkAddress),
MissingPaymentProof(String),

/// A general error when a transfer fails.
#[error("Failed to send tokens due to {0}")]
Expand Down
29 changes: 17 additions & 12 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use super::{
Client,
};

use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::wallet::PaymentTransactionsMap;

use bincode::deserialize;
Expand Down Expand Up @@ -98,7 +101,7 @@ impl Files {
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
verify_store: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, verify_store).await
}

Expand All @@ -111,7 +114,7 @@ impl Files {
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, true).await
}

Expand Down Expand Up @@ -150,11 +153,11 @@ impl Files {
for chunk in chunks {
next_batch_size += 1;
let client = self.client.clone();
let chunk_addr = *chunk.address();
let chunk_addr = chunk.network_address();
let payment = payment_proofs
.get(chunk_addr.xorname())
.get(&chunk_addr)
.cloned()
.ok_or(super::Error::MissingPaymentProof(chunk_addr))?;
.ok_or(super::Error::MissingPaymentProof(format!("{chunk_addr}")))?;

tasks.push(task::spawn(async move {
client.store_chunk(chunk, payment, verify_store).await?;
Expand Down Expand Up @@ -192,15 +195,17 @@ impl Files {
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
verify: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
if bytes.len() < MIN_ENCRYPTABLE_BYTES {
let file = SmallFile::new(bytes)?;
self.upload_small(file, payment_proofs, verify).await
} else {
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), payment_proofs, verify)
.await?;
Ok(ChunkAddress::new(head_address))
Ok(NetworkAddress::ChunkAddress(ChunkAddress::new(
head_address,
)))
}
}

Expand All @@ -212,13 +217,13 @@ impl Files {
small: SmallFile,
payment_proofs: &PaymentTransactionsMap,
verify_store: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
let chunk = package_small(small)?;
let address = *chunk.address();
let address = chunk.network_address();
let payment = payment_proofs
.get(address.xorname())
.get(&address)
.cloned()
.ok_or(super::Error::MissingPaymentProof(address))?;
.ok_or(super::Error::MissingPaymentProof(format!("{address}")))?;

self.client
.store_chunk(chunk, payment, verify_store)
Expand Down
1 change: 0 additions & 1 deletion sn_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@ pub struct Client {
signer: bls::SecretKey,
peers_added: usize,
progress: Option<ProgressBar>,
network_store_cost: u64,
}
123 changes: 54 additions & 69 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

use super::Client;

use rand::rngs::OsRng;
use sn_dbc::{Dbc, PublicAddress, Token};
use sn_protocol::{messages::PaymentTransactions, storage::ChunkAddress, NetworkAddress};
use sn_dbc::{Dbc, DbcId, PublicAddress, Token};
use sn_protocol::NetworkAddress;
use sn_transfers::{
client_transfers::TransferOutputs,
wallet::{Error, LocalWallet, PaymentTransactionsMap, Result},
Expand All @@ -19,7 +18,6 @@ use sn_transfers::{
use futures::future::join_all;
use std::{iter::Iterator, time::Duration};
use tokio::time::sleep;
use xor_name::XorName;

/// A wallet client can be used to send and
/// receive tokens to/from other wallets.
Expand All @@ -44,11 +42,6 @@ impl WalletClient {
}
}

/// Get any known store cost estimate
pub fn store_cost(&self) -> Token {
Token::from_nano(self.client.network_store_cost)
}

/// Do we have any unconfirmed transactions?
pub fn unconfirmed_txs_exist(&self) -> bool {
!self.unconfirmed_txs.is_empty()
Expand Down Expand Up @@ -87,11 +80,9 @@ impl WalletClient {

/// Get storecost from the network
/// Stores this value as the new baseline at the client
pub async fn set_store_cost_from_random_address(&mut self) -> Result<Token> {
let random_target = ChunkAddress::new(XorName::random(&mut OsRng));

pub async fn get_store_cost_at_address(&mut self, address: &NetworkAddress) -> Result<Token> {
self.client
.get_store_cost_at_address(NetworkAddress::ChunkAddress(random_target), false)
.get_store_cost_at_address(address)
.await
.map_err(|error| Error::CouldNotSendTokens(error.to_string()))
}
Expand All @@ -103,55 +94,66 @@ impl WalletClient {
/// This can optionally verify the store has been successful (this will attempt to GET the dbc from the network)
pub async fn pay_for_storage(
&mut self,
content_addrs: impl Iterator<Item = &XorName>,
content_addrs: impl Iterator<Item = NetworkAddress>,
verify_store: bool,
) -> Result<(PaymentTransactionsMap, Option<Token>)> {
) -> Result<(PaymentTransactionsMap, Token)> {
// Let's filter the content addresses we hold payment proofs for, i.e. avoid
// paying for those chunks we've already paid for with this wallet.
let mut proofs = PaymentProofsMap::default();

let addrs_to_pay: Vec<&XorName> = content_addrs.collect();
// TODO: reenable this when we have a way to get the store cost from the network
// per chunk, and can readily check what we've paid here.
// .filter(|name| {
// if let Some(proof) = self.wallet.get_payment_proof(name) {
// proofs.insert(**name, proof.clone());
// false
// } else {
// true
// }
// })
// .collect();

let number_of_records_to_pay = addrs_to_pay.len() as u64;

// If no addresses need to be paid for, we don't have to go further
if addrs_to_pay.is_empty() {
trace!("We already hold payment proofs for all the records.");
return Ok((proofs, None));
}

// Let's build the payment proofs for list of content addresses
// let (root_hash, audit_trail_info) = build_payment_proofs(addrs_to_pay.into_iter())?;
// let num_of_addrs = audit_trail_info.len() as u64;
let mut proofs = PaymentTransactionsMap::default();
// // let addrs_to_pay: Vec<NetworkAddress> = content_addrs
// // .filter(|name| {
// // if let Some(proof) = self.wallet.get_payment_proof(name) {
// // proofs.insert(name.clone(), proof.clone());
// // false
// // } else {
// // true
// // }
// // })
// // .collect();

// // If no addresses need to be paid for, we don't have to go further
// if addrs_to_pay.is_empty() {
// trace!("We already hold payment proofs for all the records.");
// return Ok((proofs, None));
// }

let mut total_cost = Token::zero();

for content_addr in content_addrs {
// TODO: parallelise this
let (payments, cost) = self
.pay_for_storage_at_address(&content_addr, verify_store)
.await?;

if let Some(cost) = total_cost.checked_add(cost) {
total_cost = cost;
}

let num_of_addrs = addrs_to_pay.len();
// store the payment proof
proofs.insert(content_addr.clone(), payments);
}

// Always check storage cost, and overpay to allow margin when validation.
self.set_store_cost_from_random_address().await?;
let storage_cost = Token::from_nano(
self.store_cost().as_nano() * (2.0f64.powf((num_of_addrs / 100 + 1) as f64)) as u64,
);
Ok((proofs, total_cost))
}

info!("Storage cost per record: {}", storage_cost);
/// Send tokens to nodes closest to the data we want to make storage payment for.
///
/// Returns DbcId created for the payment, storage cost is for that record
///
/// This can optionally verify the store has been successful (this will attempt to GET the dbc from the network)
pub async fn pay_for_storage_at_address(
&mut self,
content_addr: &NetworkAddress,
verify_store: bool,
) -> Result<(Vec<DbcId>, Token)> {
let amount_to_pay = self.get_store_cost_at_address(content_addr).await?;

let amount_to_pay = number_of_records_to_pay * storage_cost.as_nano();
trace!("Making payment for {num_of_addrs} addresses of {amount_to_pay:?} nano tokens.");
trace!("Making payment for of {amount_to_pay:?} nano tokens to each node in close group at {content_addr:?}.");

// TODO: This needs to go out to each CLOSEGROUP of addresses
let transfer = self
.wallet
.local_send_storage_payment(Token::from_nano(amount_to_pay), None)
.local_send_storage_payment(amount_to_pay, None)
.await?;

// send to network
Expand All @@ -164,19 +166,7 @@ impl WalletClient {

let spent_ids: Vec<_> = transfer.tx.inputs.iter().map(|i| i.dbc_id()).collect();

for addr in addrs_to_pay.into_iter() {
proofs.insert(
*addr,
PaymentTransactions {
spent_ids: spent_ids.clone(),
},
);
}

// cache the new set of payment proofs
self.wallet.add_payment_proofs(proofs.clone());

Ok((proofs, Some(storage_cost)))
Ok((spent_ids, amount_to_pay))
}

/// Resend failed txs
Expand Down Expand Up @@ -208,11 +198,6 @@ impl WalletClient {
}

impl Client {
/// Get any known store cost estimate
pub fn store_cost(&self) -> u64 {
self.network_store_cost
}

/// Send a spend request to the network.
/// This can optionally verify the spend has been correctly stored before returning
pub async fn send(&self, transfer: TransferOutputs, verify_store: bool) -> Result<()> {
Expand Down

0 comments on commit 913c81d

Please sign in to comment.