Skip to content

Commit

Permalink
feat!: pay each chunk holder direct
Browse files Browse the repository at this point in the history
Sets us up to pay every node that will be storing the data.
We GetStoreCost and pay to their key, and put the chunk with the DBCs

This is not yet validated at the node side
  • Loading branch information
joshuef committed Aug 17, 2023
1 parent ace78a7 commit 8a7eb19
Show file tree
Hide file tree
Showing 25 changed files with 377 additions and 684 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ jobs:
cargo run --bin safe --release -- --log-output-dest=data-dir files upload -- "./target/release/testnet"
env:
SN_LOG: "all"
timeout-minutes: 40
timeout-minutes: 25

- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 18 additions & 7 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ use bytes::Bytes;
use clap::Parser;
use color_eyre::Result;
use sn_client::{Client, Files};
use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_transfers::wallet::PaymentTransactionsMap;
use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::client_transfers::TransferOutputs;

use std::{
collections::BTreeMap,
fs,
path::{Path, PathBuf},
};
Expand Down Expand Up @@ -95,9 +99,10 @@ async fn upload_files(
// The input files_path has to be a dir
let file_names_path = root_dir.join("uploaded_files");

let (chunks_to_upload, payment_proofs) =
let (chunks_to_upload, transfer_outputs_map) =
chunk_and_pay_for_storage(&client, root_dir, &files_path, verify_store).await?;

println!("payment proofs: {:?}", transfer_outputs_map);
let mut chunks_to_fetch = Vec::new();
for (
file_addr,
Expand All @@ -113,8 +118,14 @@ async fn upload_files(
chunks.len()
);

if let Err(error) =
upload_chunks(&file_api, &file_name, chunks, &payment_proofs, verify_store).await
if let Err(error) = upload_chunks(
&file_api,
&file_name,
chunks,
&transfer_outputs_map,
verify_store,
)
.await
{
println!("Failed to store all chunks of file '{file_name}' to all nodes in the close group: {error}")
} else {
Expand All @@ -139,7 +150,7 @@ async fn upload_chunks(
file_api: &Files,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
payment_proofs: &PaymentTransactionsMap,
transfer_outputs_map: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<()> {
let chunks_reader = chunks_paths
Expand All @@ -158,7 +169,7 @@ async fn upload_chunks(
});

file_api
.upload_chunks_in_batches(chunks_reader, payment_proofs, verify_store)
.upload_chunks_in_batches(chunks_reader, transfer_outputs_map, verify_store)
.await?;
Ok(())
}
Expand Down
23 changes: 14 additions & 9 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@

use sn_client::{Client, Files, WalletClient};
use sn_dbc::Token;
use sn_protocol::storage::ChunkAddress;
use sn_transfers::wallet::{parse_public_address, LocalWallet, PaymentTransactionsMap};
use sn_protocol::{storage::ChunkAddress, NetworkAddress};
use sn_transfers::{
client_transfers::TransferOutputs,
wallet::{parse_public_address, LocalWallet},
};

use bytes::Bytes;
use clap::Parser;
Expand Down Expand Up @@ -237,7 +240,10 @@ pub(super) async fn chunk_and_pay_for_storage(
root_dir: &Path,
files_path: &Path,
verify_store: bool,
) -> Result<(BTreeMap<XorName, ChunkedFile>, PaymentTransactionsMap)> {
) -> Result<(
BTreeMap<XorName, ChunkedFile>,
BTreeMap<NetworkAddress, TransferOutputs>,
)> {
let wallet = LocalWallet::load_from(root_dir)
.await
.wrap_err("Unable to read wallet file in {path:?}")
Expand Down Expand Up @@ -303,7 +309,7 @@ pub(super) async fn chunk_and_pay_for_storage(
chunked_files.len()
);

let (proofs, cost) = wallet_client
let (transfer_outputs_map, cost) = wallet_client
.pay_for_storage(
chunked_files
.values()
Expand All @@ -315,11 +321,10 @@ pub(super) async fn chunk_and_pay_for_storage(
)
.await?;

let total_cost = proofs.len() as u64 * cost.as_nano();
println!(
"Successfully made payment of {total_cost} for {} records. (At a cost per record of {cost:?}.)",
proofs.len(),
);
"Successfully made payment of {cost} for {} records. (At a cost per record of {cost:?}.)",
transfer_outputs_map.len(),
);

let wallet = wallet_client.into_wallet();
if let Err(err) = wallet.store().await {
Expand All @@ -332,5 +337,5 @@ pub(super) async fn chunk_and_pay_for_storage(
}

println!("Successfully paid for storage and generated the proofs. They can now be sent to the storage nodes when uploading paid chunks.");
Ok((chunked_files, proofs))
Ok((chunked_files, transfer_outputs_map))
}
21 changes: 8 additions & 13 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::{
use bls::{PublicKey, SecretKey, Signature};
use indicatif::ProgressBar;
use libp2p::{kad::Record, Multiaddr};
use sn_dbc::{DbcId, SignedSpend, Token};
use sn_dbc::{Dbc, DbcId, PublicAddress, SignedSpend, Token};
use sn_networking::{multiaddr_is_global, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE};
use sn_protocol::{
error::Error as ProtocolError,
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Client {
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payment: Vec<DbcId>,
payment: Vec<Dbc>,
verify_store: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
Expand Down Expand Up @@ -411,20 +411,15 @@ impl Client {
}

/// Get the store cost at a given address
pub async fn get_store_cost_at_address(&self, address: &NetworkAddress) -> Result<Token> {
pub async fn get_store_costs_at_address(
&self,
address: &NetworkAddress,
) -> Result<Vec<(PublicAddress, Token)>> {
trace!("Getting store cost at {address:?}");

let costs = self
Ok(self
.network
.get_store_costs_from_network(address.clone())
.await?;



.as_nano();

trace!("Store cost at address {address:?} is: {cost:?}");

Ok(Token::from_nano(cost))
.await?)
}
}
34 changes: 20 additions & 14 deletions sn_client/src/file_apis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use std::collections::BTreeMap;

use super::{
chunks::{to_chunk, DataMapLevel, Error, SmallFile},
error::Result,
Expand All @@ -16,7 +18,7 @@ use sn_protocol::{
storage::{Chunk, ChunkAddress},
NetworkAddress,
};
use sn_transfers::wallet::PaymentTransactionsMap;
use sn_transfers::client_transfers::{TransferOutputs, TransferOutputsMap};

use bincode::deserialize;
use bytes::Bytes;
Expand Down Expand Up @@ -96,13 +98,14 @@ impl Files {
/// Directly writes [`Bytes`] to the network in the
/// form of immutable chunks, without any batching.
#[instrument(skip(self, bytes), level = "debug")]
pub async fn upload_with_proof(
pub async fn upload_with_transfers(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
transfer_outputs_map: &TransferOutputsMap,
verify_store: bool,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, verify_store).await
self.upload_bytes(bytes, transfer_outputs_map, verify_store)
.await
}

/// Directly writes [`Bytes`] to the network in the
Expand All @@ -113,9 +116,9 @@ impl Files {
pub async fn upload_and_verify(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
transfer_outputs_map: &TransferOutputsMap,
) -> Result<NetworkAddress> {
self.upload_bytes(bytes, payment_proofs, true).await
self.upload_bytes(bytes, transfer_outputs_map, true).await
}

/// Calculates a LargeFile's/SmallFile's address from self encrypted chunks,
Expand Down Expand Up @@ -144,7 +147,7 @@ impl Files {
pub async fn upload_chunks_in_batches(
&self,
chunks: impl Iterator<Item = Chunk>,
payment_proofs: &PaymentTransactionsMap,
transfer_outputs_map: &TransferOutputsMap,
verify_store: bool,
) -> Result<()> {
trace!("Client upload in batches started");
Expand All @@ -154,13 +157,16 @@ impl Files {
next_batch_size += 1;
let client = self.client.clone();
let chunk_addr = chunk.network_address();
let payment = payment_proofs
let payment = transfer_outputs_map
.get(&chunk_addr)
.cloned()
.ok_or(super::Error::MissingPaymentProof(format!("{chunk_addr}")))?;

trace!("Payment for {chunk:?}: {:?}", payment);
tasks.push(task::spawn(async move {
client.store_chunk(chunk, payment, verify_store).await?;
client
.store_chunk(chunk, payment.created_dbcs.clone(), verify_store)
.await?;

Ok::<(), super::error::Error>(())
}));
Expand Down Expand Up @@ -189,15 +195,15 @@ impl Files {
async fn upload_bytes(
&self,
bytes: Bytes,
payment_proofs: &PaymentTransactionsMap,
payment_dbcs: &BTreeMap<NetworkAddress, TransferOutputs>,
verify: bool,
) -> Result<NetworkAddress> {
if bytes.len() < MIN_ENCRYPTABLE_BYTES {
let file = SmallFile::new(bytes)?;
self.upload_small(file, payment_proofs, verify).await
self.upload_small(file, payment_dbcs, verify).await
} else {
let (head_address, chunks) = encrypt_large(bytes)?;
self.upload_chunks_in_batches(chunks.into_iter(), payment_proofs, verify)
self.upload_chunks_in_batches(chunks.into_iter(), payment_dbcs, verify)
.await?;
Ok(NetworkAddress::ChunkAddress(ChunkAddress::new(
head_address,
Expand All @@ -211,7 +217,7 @@ impl Files {
async fn upload_small(
&self,
small: SmallFile,
payment_proofs: &PaymentTransactionsMap,
payment_proofs: &BTreeMap<NetworkAddress, TransferOutputs>,
verify_store: bool,
) -> Result<NetworkAddress> {
let chunk = package_small(small)?;
Expand All @@ -222,7 +228,7 @@ impl Files {
.ok_or(super::Error::MissingPaymentProof(format!("{address}")))?;

self.client
.store_chunk(chunk, payment, verify_store)
.store_chunk(chunk, payment.created_dbcs.clone(), verify_store)
.await?;

Ok(address)
Expand Down

0 comments on commit 8a7eb19

Please sign in to comment.