Skip to content

Commit

Permalink
payingper
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuef committed Aug 16, 2023
1 parent e92f6e9 commit 082f18e
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 156 deletions.
5 changes: 4 additions & 1 deletion sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use sn_client::{Client, Files, WalletClient};
use sn_dbc::Token;
use sn_protocol::storage::ChunkAddress;
use sn_transfers::wallet::{parse_public_address, LocalWallet, PaymentTransactionsMap};

use bytes::Bytes;
Expand Down Expand Up @@ -307,7 +308,9 @@ pub(super) async fn chunk_and_pay_for_storage(
chunked_files
.values()
.flat_map(|chunked_file| &chunked_file.chunks)
.map(|(name, _)| name),
.map(|(name, _)| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name))
}),
verify_store,
)
.await?;
Expand Down
32 changes: 7 additions & 25 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use sn_dbc::{DbcId, SignedSpend, Token};
use sn_networking::{multiaddr_is_global, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE};
use sn_protocol::{
error::Error as ProtocolError,
messages::PaymentTransactions,
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, ChunkWithPayment,
DbcAddress, RecordHeader, RecordKind, RegisterAddress,
Expand Down Expand Up @@ -59,13 +58,12 @@ impl Client {
info!("Client constructed network and swarm_driver");
let events_channel = ClientEventsChannel::default();

let mut client = Self {
let client = Self {
network: network.clone(),
events_channel,
signer,
peers_added: 0,
progress: Some(Self::setup_connection_progress()),
network_store_cost: 0,
};

// subscribe to our events channel first, so we don't have intermittent
Expand Down Expand Up @@ -139,7 +137,7 @@ impl Client {

// ignore errors here, as we're just trying to fill up the RT
let _ = client
.get_store_cost_at_address(NetworkAddress::ChunkAddress(random_target), true)
.get_store_cost_at_address(&NetworkAddress::ChunkAddress(random_target))
.await;
}

Expand Down Expand Up @@ -292,7 +290,7 @@ impl Client {
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payment: PaymentTransactions,
payment: Vec<DbcId>,
verify_store: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
Expand Down Expand Up @@ -420,32 +418,16 @@ impl Client {
}

/// Get the store cost at a given address
/// Replaces current network_store_cost with the new one, unless average is set to true
pub async fn get_store_cost_at_address(
&mut self,
address: NetworkAddress,
only_update_cost_if_higher: bool,
) -> Result<Token> {
trace!("Getting store cost at {address:?}, will update only if higher cost?: {only_update_cost_if_higher:?}");

// if we're averaging over many samples across the network, any cost will do
let any_cost_will_do = only_update_cost_if_higher;
pub async fn get_store_cost_at_address(&self, address: &NetworkAddress) -> Result<Token> {
trace!("Getting store cost at {address:?}");

let cost = self
.network
.get_store_cost_from_network(address.clone(), any_cost_will_do)
.get_store_cost_from_network(address.clone())
.await?
.as_nano();

if cost > self.network_store_cost {
self.network_store_cost = cost;
}

if !only_update_cost_if_higher {
self.network_store_cost = cost;
}

trace!("Set store cost: {}", self.network_store_cost);
trace!("Store cost at address {address:?} is: {cost:?}");

Ok(Token::from_nano(cost))
}
Expand Down
4 changes: 2 additions & 2 deletions sn_client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) type Result<T> = std::result::Result<T, Error>;

use super::ClientEvent;

use sn_protocol::storage::ChunkAddress;
use sn_protocol::NetworkAddress;
use sn_registers::{Entry, EntryHash};

use std::collections::BTreeSet;
Expand Down Expand Up @@ -58,7 +58,7 @@ pub enum Error {
ContentBranchDetected(BTreeSet<(EntryHash, Entry)>),

#[error("Missing a payment proof for address {0:?}")]
MissingPaymentProof(ChunkAddress),
MissingPaymentProof(NetworkAddress),

/// A general error when a transfer fails.
#[error("Failed to send tokens due to {0}")]
Expand Down
27 changes: 16 additions & 11 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use super::{
Client,
};

use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::wallet::PaymentTransactionsMap;

use bincode::deserialize;
Expand Down Expand Up @@ -98,7 +101,7 @@ impl Files {
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
verify_store: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, verify_store).await
}

Expand All @@ -111,7 +114,7 @@ impl Files {
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, true).await
}

Expand Down Expand Up @@ -150,9 +153,9 @@ impl Files {
for chunk in chunks {
next_batch_size += 1;
let client = self.client.clone();
let chunk_addr = *chunk.address();
let chunk_addr = chunk.network_address();
let payment = payment_proofs
.get(chunk_addr.xorname())
.get(&chunk_addr)
.cloned()
.ok_or(super::Error::MissingPaymentProof(chunk_addr))?;

Expand Down Expand Up @@ -188,15 +191,17 @@ impl Files {
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
verify: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
if bytes.len() < MIN_ENCRYPTABLE_BYTES {
let file = SmallFile::new(bytes)?;
self.upload_small(file, payment_proofs, verify).await
} else {
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), payment_proofs, verify)
.await?;
Ok(ChunkAddress::new(head_address))
Ok(NetworkAddress::ChunkAddress(ChunkAddress::new(
head_address,
)))
}
}

Expand All @@ -208,13 +213,13 @@ impl Files {
small: SmallFile,
payment_proofs: &PaymentTransactionsMap,
verify_store: bool,
) -> Result<ChunkAddress> {
) -> Result<NetworkAddress> {
let chunk = package_small(small)?;
let address = *chunk.address();
let address = chunk.network_address();
let payment = payment_proofs
.get(address.xorname())
.get(&address)
.cloned()
.ok_or(super::Error::MissingPaymentProof(address))?;
.ok_or(super::Error::MissingPaymentProof(address.clone()))?;

self.client
.store_chunk(chunk, payment, verify_store)
Expand Down
1 change: 0 additions & 1 deletion sn_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,4 @@ pub struct Client {
signer: bls::SecretKey,
peers_added: usize,
progress: Option<ProgressBar>,
network_store_cost: u64,
}
86 changes: 38 additions & 48 deletions sn_client/src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

use super::Client;

use rand::rngs::OsRng;
use sn_dbc::{Dbc, PublicAddress, Token};
use sn_protocol::{messages::PaymentTransactions, storage::ChunkAddress, NetworkAddress};
use sn_dbc::{Dbc, DbcId, PublicAddress, Token};
use sn_protocol::NetworkAddress;
use sn_transfers::{
client_transfers::TransferOutputs,
wallet::{Error, LocalWallet, PaymentTransactionsMap, Result},
Expand All @@ -19,7 +18,6 @@ use sn_transfers::{
use futures::future::join_all;
use std::{iter::Iterator, time::Duration};
use tokio::time::sleep;
use xor_name::XorName;

/// A wallet client can be used to send and
/// receive tokens to/from other wallets.
Expand All @@ -44,11 +42,6 @@ impl WalletClient {
}
}

/// Get any known store cost estimate
pub fn store_cost(&self) -> Token {
Token::from_nano(self.client.network_store_cost)
}

/// Do we have any unconfirmed transactions?
pub fn unconfirmed_txs_exist(&self) -> bool {
!self.unconfirmed_txs.is_empty()
Expand Down Expand Up @@ -87,11 +80,9 @@ impl WalletClient {

/// Get storecost from the network
/// Stores this value as the new baseline at the client
pub async fn set_store_cost_from_random_address(&mut self) -> Result<Token> {
let random_target = ChunkAddress::new(XorName::random(&mut OsRng));

pub async fn get_store_cost_at_address(&mut self, address: &NetworkAddress) -> Result<Token> {
self.client
.get_store_cost_at_address(NetworkAddress::ChunkAddress(random_target), false)
.get_store_cost_at_address(address)
.await
.map_err(|error| Error::CouldNotSendTokens(error.to_string()))
}
Expand All @@ -103,51 +94,67 @@ impl WalletClient {
/// This can optionally verify the store has been successful (this will attempt to GET the dbc from the network)
pub async fn pay_for_storage(
&mut self,
content_addrs: impl Iterator<Item = &XorName>,
content_addrs: impl Iterator<Item = NetworkAddress>,
verify_store: bool,
) -> Result<(PaymentTransactionsMap, Option<Token>)> {
// Let's filter the content addresses we hold payment proofs for, i.e. avoid
// paying for those chunks we've already paid for with this wallet.
let mut proofs = PaymentTransactionsMap::default();
let addrs_to_pay: Vec<&XorName> = content_addrs
let addrs_to_pay: Vec<NetworkAddress> = content_addrs
.filter(|name| {
if let Some(proof) = self.wallet.get_payment_proof(name) {
proofs.insert(**name, proof.clone());
proofs.insert(name.clone(), proof.clone());
false
} else {
true
}
})
.collect();

let number_of_records_to_pay = addrs_to_pay.len() as u64;
let _number_of_records_to_pay = addrs_to_pay.len() as u64;
// If no addresses need to be paid for, we don't have to go further
if addrs_to_pay.is_empty() {
trace!("We already hold payment proofs for all the Chunks.");
return Ok((proofs, None));
}

// Let's build the payment proofs for list of content addresses
// let (root_hash, audit_trail_info) = build_payment_proofs(addrs_to_pay.into_iter())?;
// let num_of_addrs = audit_trail_info.len() as u64;
let mut total_cost = Token::zero();

let num_of_addrs = addrs_to_pay.len();
for content_addr in addrs_to_pay {
// TODO: parallelise this

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let (payments, cost) = self
.pay_for_storage_at_address(&content_addr, verify_store)
.await?;

// Always check storage cost, and overpay to allow margin when validation.
self.set_store_cost_from_random_address().await?;
let storage_cost = Token::from_nano(
self.store_cost().as_nano() * (2.0f64.powf((num_of_addrs / 100 + 1) as f64)) as u64,
);
if let Some(cost) = total_cost.checked_add(cost) {
total_cost = cost;
}

info!("Storage cost per record: {}", storage_cost);
// store the payment proof
proofs.insert(content_addr.clone(), payments);
}

let amount_to_pay = number_of_records_to_pay * storage_cost.as_nano();
trace!("Making payment for {num_of_addrs} addresses of {amount_to_pay:?} nano tokens.");
Ok((proofs, Some(total_cost)))
}

/// Send tokens to nodes closest to the data we want to make storage payment for.
///
/// Returns DbcId created for the payment, storage cost is for that record
///
/// This can optionally verify the store has been successful (this will attempt to GET the dbc from the network)
pub async fn pay_for_storage_at_address(
&mut self,
content_addr: &NetworkAddress,
verify_store: bool,
) -> Result<(Vec<DbcId>, Token)> {
let amount_to_pay = self.get_store_cost_at_address(content_addr).await?;

trace!("Making payment for of {amount_to_pay:?} nano tokens to each node in close group at {content_addr:?}.");

// TODO: This needs to go out to each CLOSEGROUP of addresses

Check notice

Code scanning / devskim

A "TODO" or similar was left in source code, possibly indicating incomplete functionality Note

Suspicious comment
let transfer = self
.wallet
.local_send_storage_payment(Token::from_nano(amount_to_pay), None)
.local_send_storage_payment(amount_to_pay, None)
.await?;

// send to network
Expand All @@ -160,19 +167,7 @@ impl WalletClient {

let spent_ids: Vec<_> = transfer.tx.inputs.iter().map(|i| i.dbc_id()).collect();

for addr in addrs_to_pay.into_iter() {
proofs.insert(
*addr,
PaymentTransactions {
spent_ids: spent_ids.clone(),
},
);
}

// cache the new set of payment proofs
self.wallet.add_payment_proofs(proofs.clone());

Ok((proofs, Some(storage_cost)))
Ok((spent_ids, amount_to_pay))
}

/// Resend failed txs
Expand Down Expand Up @@ -204,11 +199,6 @@ impl WalletClient {
}

impl Client {
/// Get any known store cost estimate
pub fn store_cost(&self) -> u64 {
self.network_store_cost
}

/// Send a spend request to the network.
/// This can optionally verify the spend has been correctly stored before returning
pub async fn send(&self, transfer: TransferOutputs, verify_store: bool) -> Result<()> {
Expand Down

0 comments on commit 082f18e

Please sign in to comment.