Skip to content

Commit

Permalink
feat!: pay each chunk holder direct
Browse files Browse the repository at this point in the history
Sets us up to pay every node that will be storing the data.
We GetStoreCost and pay to their key, and put the chunk with the DBCs

This is not yet validated at the node side
  • Loading branch information
joshuef committed Aug 17, 2023
1 parent adfef6d commit ab1b712
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 210 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 19 additions & 7 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@ use bytes::Bytes;
use clap::Parser;
use color_eyre::Result;
use sn_client::{Client, Files};
use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_transfers::wallet::PaymentTransactionsMap;
use sn_dbc::DbcId;
use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::{client_transfers::TransferOutputs, wallet::PaymentTransactionsMap};

use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
};
Expand Down Expand Up @@ -95,9 +100,10 @@ async fn upload_files(
// The input files_path has to be a dir
let file_names_path = root_dir.join("uploaded_files");

let (chunks_to_upload, payment_proofs) =
let (chunks_to_upload, transfer_output_map) =
chunk_and_pay_for_storage(&client, root_dir, &files_path, verify_store).await?;

println!("payment proofs: {:?}", transfer_output_map);
let mut chunks_to_fetch = Vec::new();
for (
file_addr,
Expand All @@ -113,8 +119,14 @@ async fn upload_files(
chunks.len()
);

if let Err(error) =
upload_chunks(&file_api, &file_name, chunks, &payment_proofs, verify_store).await
if let Err(error) = upload_chunks(
&file_api,
&file_name,
chunks,
&transfer_output_map,
verify_store,
)
.await
{
println!("Failed to store all chunks of file '{file_name}' to all nodes in the close group: {error}")
} else {
Expand All @@ -139,7 +151,7 @@ async fn upload_chunks(
file_api: &Files,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
payment_proofs: &PaymentTransactionsMap,
transfer_output_map: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<()> {
let chunks_reader = chunks_paths
Expand All @@ -158,7 +170,7 @@ async fn upload_chunks(
});

file_api
.upload_chunks_in_batches(chunks_reader, payment_proofs, verify_store)
.upload_chunks_in_batches(chunks_reader, transfer_output_map, verify_store)
.await?;
Ok(())
}
Expand Down
25 changes: 15 additions & 10 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
// permissions and limitations relating to use of the SAFE Network Software.

use sn_client::{Client, Files, WalletClient};
use sn_dbc::Token;
use sn_protocol::storage::ChunkAddress;
use sn_transfers::wallet::{parse_public_address, LocalWallet, PaymentTransactionsMap};
use sn_dbc::{DbcId, Token};
use sn_protocol::{storage::ChunkAddress, NetworkAddress};
use sn_transfers::{
client_transfers::TransferOutputs,
wallet::{parse_public_address, LocalWallet, PaymentTransactionsMap},
};

use bytes::Bytes;
use clap::Parser;
Expand Down Expand Up @@ -237,7 +240,10 @@ pub(super) async fn chunk_and_pay_for_storage(
root_dir: &Path,
files_path: &Path,
verify_store: bool,
) -> Result<(BTreeMap<XorName, ChunkedFile>, PaymentTransactionsMap)> {
) -> Result<(
BTreeMap<XorName, ChunkedFile>,
BTreeMap<NetworkAddress, TransferOutputs>,
)> {
let wallet = LocalWallet::load_from(root_dir)
.await
.wrap_err("Unable to read wallet file in {path:?}")
Expand Down Expand Up @@ -303,7 +309,7 @@ pub(super) async fn chunk_and_pay_for_storage(
chunked_files.len()
);

let (proofs, cost) = wallet_client
let (transfer_output_map, cost) = wallet_client
.pay_for_storage(
chunked_files
.values()
Expand All @@ -315,11 +321,10 @@ pub(super) async fn chunk_and_pay_for_storage(
)
.await?;

let total_cost = proofs.len() as u64 * cost.as_nano();
println!(
"Successfully made payment of {total_cost} for {} records. (At a cost per record of {cost:?}.)",
proofs.len(),
);
"Successfully made payment of {cost} for {} records. (At a cost per record of {cost:?}.)",
transfer_output_map.len(),
);

let wallet = wallet_client.into_wallet();
if let Err(err) = wallet.store().await {
Expand All @@ -332,5 +337,5 @@ pub(super) async fn chunk_and_pay_for_storage(
}

println!("Successfully paid for storage and generated the proofs. They can now be sent to the storage nodes when uploading paid chunks.");
Ok((chunked_files, proofs))
Ok((chunked_files, transfer_output_map))
}
25 changes: 10 additions & 15 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
use bls::{PublicKey, SecretKey, Signature};
use indicatif::ProgressBar;
use libp2p::{kad::Record, Multiaddr};
use sn_dbc::{DbcId, SignedSpend, Token};
use sn_dbc::{Dbc, DbcId, PublicAddress, SignedSpend, Token};
use sn_networking::{multiaddr_is_global, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE};
use sn_protocol::{
error::Error as ProtocolError,
Expand All @@ -25,7 +25,7 @@ use sn_protocol::{
NetworkAddress, PrettyPrintRecordKey,
};
use sn_registers::SignedRegister;
use sn_transfers::client_transfers::SpendRequest;
use sn_transfers::client_transfers::{SpendRequest, TransferOutputs};
use std::time::Duration;
use tokio::task::spawn;
use tracing::trace;
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Client {
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payment: Vec<DbcId>,
payment: Vec<Dbc>,
verify_store: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
Expand Down Expand Up @@ -325,7 +325,7 @@ impl Client {
let dbc_id = *spend.signed_spend.dbc_id();
let dbc_addr = DbcAddress::from_dbc_id(&dbc_id);

trace!("Sending spend {dbc_id:?} to the network via put_record, with addr of {dbc_addr:?}");
println!("Sending spend {dbc_id:?} to the network via put_record, with addr of {dbc_addr:?}");

let key = NetworkAddress::from_dbc_address(dbc_addr).to_record_key();
let record = Record {
Expand Down Expand Up @@ -411,20 +411,15 @@ impl Client {
}

/// Get the store cost at a given address
pub async fn get_store_cost_at_address(&self, address: &NetworkAddress) -> Result<Token> {
pub async fn get_store_costs_at_address(
&self,
address: &NetworkAddress,
) -> Result<Vec<(PublicAddress, Token)>> {
trace!("Getting store cost at {address:?}");

let costs = self
Ok(self
.network
.get_store_costs_from_network(address.clone())
.await?;



.as_nano();

trace!("Store cost at address {address:?} is: {cost:?}");

Ok(Token::from_nano(cost))
.await?)
}
}
33 changes: 20 additions & 13 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::collections::BTreeMap;

use super::{
chunks::{to_chunk, DataMapLevel, Error, SmallFile},
error::Result,
Client,
};

use sn_dbc::DbcId;
use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::wallet::PaymentTransactionsMap;
use sn_transfers::{client_transfers::TransferOutputs, wallet::PaymentTransactionsMap};

use bincode::deserialize;
use bytes::Bytes;
Expand Down Expand Up @@ -99,10 +102,11 @@ impl Files {
pub async fn upload_with_proof(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
transfer_output_map: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, verify_store).await
self.upload_bytes(bytes, transfer_output_map, verify_store)
.await
}

/// Directly writes [`Bytes`] to the network in the
Expand All @@ -113,9 +117,9 @@ impl Files {
pub async fn upload_and_verify(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
transfer_output_map: &BTreeMap<NetworkAddress, TransferOutputs>,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, true).await
self.upload_bytes(bytes, transfer_output_map, true).await
}

/// Calculates a LargeFile's/SmallFile's address from self encrypted chunks,
Expand Down Expand Up @@ -144,7 +148,7 @@ impl Files {
pub async fn upload_chunks_in_batches(
&self,
chunks: impl Iterator<Item = Chunk>,
payment_proofs: &PaymentTransactionsMap,
transfer_outputs_map: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<()> {
trace!("Client upload in batches started");
Expand All @@ -154,13 +158,16 @@ impl Files {
next_batch_size += 1;
let client = self.client.clone();
let chunk_addr = chunk.network_address();
let payment = payment_proofs
let payment = transfer_outputs_map
.get(&chunk_addr)
.cloned()
.ok_or(super::Error::MissingPaymentProof(format!("{chunk_addr}")))?;

println!("Payment for {chunk:?}: {:?}", payment);
tasks.push(task::spawn(async move {
client.store_chunk(chunk, payment, verify_store).await?;
client
.store_chunk(chunk, payment.created_dbcs.clone(), verify_store)
.await?;

Ok::<(), super::error::Error>(())
}));
Expand Down Expand Up @@ -189,15 +196,15 @@ impl Files {
async fn upload_bytes(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
payment_dbcs: &BTreeMap<NetworkAddress, TransferOutputs>,
verify: bool,
) -> Result<NetworkAddress> {
if bytes.len() < MIN_ENCRYPTABLE_BYTES {
let file = SmallFile::new(bytes)?;
self.upload_small(file, payment_proofs, verify).await
self.upload_small(file, payment_dbcs, verify).await
} else {
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), payment_proofs, verify)
self.upload_chunks_in_batches(chunks.into_iter(), payment_dbcs, verify)
.await?;
Ok(NetworkAddress::ChunkAddress(ChunkAddress::new(
head_address,
Expand All @@ -211,7 +218,7 @@ impl Files {
async fn upload_small(
&self,
small: SmallFile,
payment_proofs: &PaymentTransactionsMap,
payment_proofs: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<NetworkAddress> {
let chunk = package_small(small)?;
Expand All @@ -222,7 +229,7 @@ impl Files {
.ok_or(super::Error::MissingPaymentProof(format!("{address}")))?;

self.client
.store_chunk(chunk, payment, verify_store)
.store_chunk(chunk, payment.created_dbcs.clone(), verify_store)
.await?;

Ok(address)
Expand Down

0 comments on commit ab1b712

Please sign in to comment.