Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paying to each holder in each group #649

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a0b000c
feat!(protocol): remove chunk merkletree to simplify payment
joshuef Aug 16, 2023
913c81d
feat!(protocol): get price and pay for each chunk individually
joshuef Aug 16, 2023
deaa859
feat!(protocol): gets keys with GetStoreCost
joshuef Aug 16, 2023
3b781a3
feat!: pay each chunk holder direct
joshuef Aug 16, 2023
cb7c81e
feat(node): store data if the majority of CLOSE_GROUP will
joshuef Aug 18, 2023
163e96f
test(node): reenable payment fail check
joshuef Aug 18, 2023
5037390
chore(node): clarify payment errors
joshuef Aug 18, 2023
07a622c
chore(node): only store paid for data, ignore maj
joshuef Aug 18, 2023
9db87b3
chore(networking): return all GetStoreCost prices and use them
joshuef Aug 18, 2023
8a94203
chore(client): error out early for invalid transfers
joshuef Aug 20, 2023
90b91d9
feat: one transfer per data set, mapped dbcs to content addrs
joshuef Aug 20, 2023
797d8f3
test(node): data verification test refactors for readability
joshuef Aug 23, 2023
b65ecd9
chore(node): data verification log tweaks
joshuef Aug 23, 2023
ab3ae31
fix: not check payment for relocated holder
maqi Aug 23, 2023
ce3dcce
fix: correct replicated spend validation
maqi Aug 23, 2023
a2edbd7
fix(node): handling events should wait before connected to the network
RolandSherwin Jul 25, 2023
0eac0f5
fix(network): trigger bootstrap until we have enough peers
RolandSherwin Aug 23, 2023
959aab1
ci: add missing package specification to dbc spend tests
joshuef Aug 25, 2023
1ebd1a2
ci: increase node mem limit with increased dbc processing
joshuef Aug 25, 2023
1e1ab71
fix(protocol): avoid panics
joshuef Aug 25, 2023
f40e769
chore: increase concurrent fetches for replication data
joshuef Aug 25, 2023
a973fba
test: parallelise churn data final query
joshuef Aug 25, 2023
1b32744
chore(networking): ensure we're always driving forward replication if…
joshuef Aug 25, 2023
9f59689
chore(client): reduce transferoutputs cloning
joshuef Aug 25, 2023
8eab1f5
chore(client): pass around content payments map mut ref
joshuef Aug 25, 2023
ffcd239
chore: logs
joshuef Aug 25, 2023
6e5af52
ci: Temp mem increase.
joshuef Aug 26, 2023
4ccdac2
chore: mem_check test update
maqi Aug 28, 2023
5526d1e
chore(deps): bump tokio to 1.32.0
RolandSherwin Aug 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/benchmark-prs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ jobs:
- name: Check client memory usage
shell: bash
run: |
client_peak_mem_limit_mb="2000" # mb
client_avg_mem_limit_mb="700" # mb
client_peak_mem_limit_mb="2500" # mb
client_avg_mem_limit_mb="1500" # mb

peak_mem_usage=$(
rg '"memory_used_mb":[^,]*' $CLIENT_DATA_PATH/logs/safe.* -o --no-line-number --no-filename |
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/memcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ jobs:
timeout-minutes: 25

- name: Chunks data integrity during nodes churn
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
run: cargo test --release -p sn_node --test data_with_churn -- --nocapture
env:
TEST_DURATION_MINS: 15
TEST_TOTAL_CHURN_CYCLES: 15
SN_LOG: "all"
timeout-minutes: 20
timeout-minutes: 30

- name: Verify restart of nodes using rg
shell: bash
Expand Down Expand Up @@ -151,7 +151,7 @@ jobs:
received_list_count=$(rg "Replicate list received from" $NODE_DATA_PATH -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Received $received_list_count replication lists"
fetching_attempt_count=$(rg "Fetching replication" $NODE_DATA_PATH -c --stats | \
fetching_attempt_count=$(rg "FetchingKeysForReplication" $NODE_DATA_PATH -c --stats | \
rg "(\d+) matches" | rg "\d+" -o)
echo "Carried out $fetching_attempt_count fetching attempts"
replication_attempt_count=$(rg "Replicating chunk" $NODE_DATA_PATH -c --stats | \
Expand Down Expand Up @@ -206,7 +206,7 @@ jobs:
# The memory usage here will be significantly higher here than in the benchmark test,
# where we don't have a bootstrap node.
run: |
node_peak_mem_limit_mb="160" # mb
node_peak_mem_limit_mb="200" # mb

peak_mem_usage=$(
rg '"memory_used_mb":[^,]*' $NODE_DATA_PATH/*/logs/* -o --no-line-number --no-filename |
Expand All @@ -225,8 +225,8 @@ jobs:
- name: Check client memory usage
shell: bash
run: |
client_peak_mem_limit_mb="2000" # mb
client_avg_mem_limit_mb="700" # mb
client_peak_mem_limit_mb="2500" # mb
client_avg_mem_limit_mb="1500" # mb

peak_mem_usage=$(
rg '"memory_used_mb":[^,]*' $CLIENT_DATA_PATH/logs/safe.* -o --no-line-number --no-filename |
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,14 @@ jobs:
fi

- name: execute the dbc spend tests
run: cargo test --release --features="local-discovery" --test sequential_transfers -- --nocapture
run: cargo test --release -p sn_node --features="local-discovery" --test sequential_transfers -- --nocapture
env:
SN_LOG: "all"
CARGO_TARGET_DIR: "./transfer-target"
timeout-minutes: 25

- name: execute the storage payment tests
run: cargo test --release --features="local-discovery" --test storage_payments -- --nocapture --test-threads=1
run: cargo test --release -p sn_node --features="local-discovery" --test storage_payments -- --nocapture --test-threads=1
env:
SN_LOG: "all"
CARGO_TARGET_DIR: "./transfer-target"
Expand Down
44 changes: 3 additions & 41 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sn_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
sn_logging = { path = "../sn_logging", version = "0.2.4" }
sn_peers_acquisition= { path="../sn_peers_acquisition", version = "0.1.4" }
sn_protocol = { path = "../sn_protocol", version = "0.5.3" }
tokio = { version = "1.17.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tracing = { version = "~0.1.26" }
tracing-core = "0.1.30"
url = "2.4.0"
Expand Down
18 changes: 12 additions & 6 deletions sn_cli/src/subcommands/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use clap::Parser;
use color_eyre::Result;
use sn_client::{Client, Files};
use sn_protocol::storage::{Chunk, ChunkAddress};
use sn_transfers::wallet::PaymentProofsMap;
use sn_transfers::client_transfers::ContentPaymentsMap;

use std::{
fs,
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn upload_files(
let file_names_path = root_dir.join("uploaded_files");

// Payment shall always be verified.
let (chunks_to_upload, payment_proofs) =
let (chunks_to_upload, mut content_payments_map) =
chunk_and_pay_for_storage(&client, root_dir, &files_path, true).await?;

let mut chunks_to_fetch = Vec::new();
Expand All @@ -114,8 +114,14 @@ async fn upload_files(
chunks.len()
);

if let Err(error) =
upload_chunks(&file_api, &file_name, chunks, &payment_proofs, verify_store).await
if let Err(error) = upload_chunks(
&file_api,
&file_name,
chunks,
&mut content_payments_map,
verify_store,
)
.await
{
println!("Failed to store all chunks of file '{file_name}' to all nodes in the close group: {error}")
} else {
Expand All @@ -140,7 +146,7 @@ async fn upload_chunks(
file_api: &Files,
file_name: &str,
chunks_paths: Vec<(XorName, PathBuf)>,
payment_proofs: &PaymentProofsMap,
content_payments_map: &mut ContentPaymentsMap,
verify_store: bool,
) -> Result<()> {
let chunks_reader = chunks_paths
Expand All @@ -159,7 +165,7 @@ async fn upload_chunks(
});

file_api
.upload_chunks_in_batches(chunks_reader, payment_proofs, verify_store)
.upload_chunks_in_batches(chunks_reader, content_payments_map, verify_store)
.await?;
Ok(())
}
Expand Down
32 changes: 18 additions & 14 deletions sn_cli/src/subcommands/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@

use sn_client::{Client, Files, WalletClient};
use sn_dbc::Token;
use sn_transfers::wallet::{parse_public_address, LocalWallet, PaymentProofsMap};
use sn_protocol::storage::ChunkAddress;
use sn_transfers::{
client_transfers::ContentPaymentsMap,
wallet::{parse_public_address, LocalWallet},
};

use bytes::Bytes;
use clap::Parser;
Expand Down Expand Up @@ -236,13 +240,15 @@ pub(super) async fn chunk_and_pay_for_storage(
root_dir: &Path,
files_path: &Path,
verify_store: bool,
) -> Result<(BTreeMap<XorName, ChunkedFile>, PaymentProofsMap)> {
) -> Result<(BTreeMap<XorName, ChunkedFile>, ContentPaymentsMap)> {
trace!("Starting to chunk_and_pay_for_storage");
let wallet = LocalWallet::load_from(root_dir)
.await
.wrap_err("Unable to read wallet file in {path:?}")
.suggestion(
"If you have an old wallet file, it may no longer be compatible. Try removing it",
)?;

let mut wallet_client = WalletClient::new(client.clone(), wallet);
let file_api: Files = Files::new(client.clone());

Expand Down Expand Up @@ -302,27 +308,25 @@ pub(super) async fn chunk_and_pay_for_storage(
chunked_files.len()
);

let (proofs, cost) = wallet_client
let (content_payments_map, cost) = wallet_client
.pay_for_storage(
chunked_files
.values()
.flat_map(|chunked_file| &chunked_file.chunks)
.map(|(name, _)| name),
.map(|(name, _)| {
sn_protocol::NetworkAddress::ChunkAddress(ChunkAddress::new(*name))
}),
verify_store,
)
.await?;

if let Some(cost) = cost {
let total_cost = proofs.len() as u64 * cost.as_nano();
println!(
"Successfully made payment of {total_cost} for {} records. (At a cost per record of {cost:?}.)",
proofs.len(),
);
} else {
println!("No payment needed for {} records.", proofs.len(),);
}
println!(
"Successfully made payment of {cost} for {} records. (At a cost per record of {cost:?}.)",
content_payments_map.len(),
);

let wallet = wallet_client.into_wallet();

if let Err(err) = wallet.store().await {
println!("Failed to store wallet: {err:?}");
} else {
Expand All @@ -333,5 +337,5 @@ pub(super) async fn chunk_and_pay_for_storage(
}

println!("Successfully paid for storage and generated the proofs. They can now be sent to the storage nodes when uploading paid chunks.");
Ok((chunked_files, proofs))
Ok((chunked_files, content_payments_map))
}
2 changes: 1 addition & 1 deletion sn_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,6 @@ sn_registers = { path = "../sn_registers", version = "0.2.3" }
sn_transfers = { path = "../sn_transfers", version = "0.10.28" }
thiserror = "1.0.23"
tiny-keccak = "~2.0.2"
tokio = { version = "1.17.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tokio = { version = "1.32.0", features = ["fs", "io-util", "macros", "parking_lot", "rt", "sync", "time"] }
tracing = { version = "~0.1.26" }
xor_name = "5.0.0"
44 changes: 12 additions & 32 deletions sn_client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ use super::{
use bls::{PublicKey, SecretKey, Signature};
use indicatif::ProgressBar;
use libp2p::{kad::Record, Multiaddr};
use sn_dbc::{DbcId, SignedSpend, Token};
use sn_dbc::{Dbc, DbcId, PublicAddress, SignedSpend, Token};
use sn_networking::{multiaddr_is_global, NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE};
use sn_protocol::{
error::Error as ProtocolError,
messages::PaymentProof,
storage::{
try_deserialize_record, try_serialize_record, Chunk, ChunkAddress, ChunkWithPayment,
DbcAddress, RecordHeader, RecordKind, RegisterAddress,
Expand Down Expand Up @@ -62,7 +61,6 @@ impl Client {
signer,
peers_added: 0,
progress: Some(Self::setup_connection_progress()),
network_store_cost: 0,
};

// subscribe to our events channel first, so we don't have intermittent
Expand Down Expand Up @@ -145,6 +143,7 @@ impl Client {
}
}
}

// The above loop breaks if `ConnectedToNetwork` is received, but we might need the
// receiver to still be active for us to not get any error if any other event is sent
let mut client_events_rx = client.events_channel();
Expand Down Expand Up @@ -273,7 +272,7 @@ impl Client {
pub(super) async fn store_chunk(
&self,
chunk: Chunk,
payment: PaymentProof,
payment: Vec<Dbc>,
verify_store: bool,
) -> Result<()> {
info!("Store chunk: {:?}", chunk.address());
Expand Down Expand Up @@ -316,7 +315,6 @@ impl Client {
let dbc_addr = DbcAddress::from_dbc_id(&dbc_id);

trace!("Sending spend {dbc_id:?} to the network via put_record, with addr of {dbc_addr:?}");

let key = NetworkAddress::from_dbc_address(dbc_addr).to_record_key();
let record = Record {
key,
Expand Down Expand Up @@ -401,33 +399,15 @@ impl Client {
}

/// Get the store cost at a given address
/// Replaces current network_store_cost with the new one, unless average is set to true
pub async fn get_store_cost_at_address(
&mut self,
address: NetworkAddress,
only_update_cost_if_higher: bool,
) -> Result<Token> {
trace!("Getting store cost at {address:?}, will update only if higher cost?: {only_update_cost_if_higher:?}");

// if we're averaging over many samples across the network, any cost will do
let any_cost_will_do = only_update_cost_if_higher;

let cost = self
.network
.get_store_cost_from_network(address.clone(), any_cost_will_do)
.await?
.as_nano();

if cost > self.network_store_cost {
self.network_store_cost = cost;
}

if !only_update_cost_if_higher {
self.network_store_cost = cost;
}

trace!("Set store cost: {}", self.network_store_cost);
pub async fn get_store_costs_at_address(
&self,
address: &NetworkAddress,
) -> Result<Vec<(PublicAddress, Token)>> {
trace!("Getting store cost at {address:?}");

Ok(Token::from_nano(cost))
Ok(self
.network
.get_store_costs_from_network(address.clone())
.await?)
}
}