Skip to content

Commit

Permalink
feat: replicate Spend/Register with same key but different content
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Oct 25, 2023
1 parent 735fac1 commit c90b054
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 97 deletions.
44 changes: 37 additions & 7 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ use libp2p::{
};
use sn_protocol::{
messages::{Request, Response},
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintRecordKey,
};
use sn_transfers::NanoTokens;
use std::{collections::HashSet, fmt::Debug};
use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};
use tokio::sync::oneshot;
use xor_name::XorName;

/// Commands to send to the Swarm
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -86,7 +91,7 @@ pub enum SwarmCmd {
},
/// Get the Addresses of all the Records held locally
GetAllLocalRecordAddresses {
sender: oneshot::Sender<HashSet<NetworkAddress>>,
sender: oneshot::Sender<HashMap<NetworkAddress, RecordType>>,
},
/// Get Record from the Kad network
/// Passing a non empty expected_holders list means to
Expand Down Expand Up @@ -123,7 +128,7 @@ pub enum SwarmCmd {
/// The keys added to the replication fetcher are later used to fetch the Record from network
AddKeysToReplicationFetcher {
holder: PeerId,
keys: Vec<NetworkAddress>,
keys: Vec<(NetworkAddress, RecordType)>,
},
/// Subscribe to a given Gossipsub topic
GossipsubSubscribe(String),
Expand Down Expand Up @@ -272,7 +277,7 @@ impl SwarmDriver {
let all_peers = self.get_all_local_peers();
let keys_to_store = keys
.iter()
.filter(|key| self.is_in_close_range(key, &all_peers))
.filter(|(key, _)| self.is_in_close_range(key, &all_peers))
.cloned()
.collect();
#[allow(clippy::mutable_key_type)]
Expand Down Expand Up @@ -365,15 +370,39 @@ impl SwarmDriver {
}
SwarmCmd::PutLocalRecord { record } => {
let key = record.key.clone();
let record_key = PrettyPrintRecordKey::from(&key);

let record_type = match RecordHeader::from_record(&record) {
Ok(record_header) => {
match record_header.kind {
RecordKind::Chunk => RecordType::Chunk,
RecordKind::Spend | RecordKind::Register => {
let content_hash = XorName::from_content(&record.value);
RecordType::NonChunk(content_hash)
}
RecordKind::ChunkWithPayment | RecordKind::RegisterWithPayment => {
error!("Record {record_key:?} with payment shall not be stored locally.");
return Err(Error::InCorrectRecordHeader);
}
}
}
Err(err) => {
error!("For record {record_key:?}, failed to parse record_header {err:?}");
return Err(Error::InCorrectRecordHeader);
}
};

match self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.put_verified(record)
.put_verified(record, record_type.clone())
{
Ok(_) => {
let new_keys_to_fetch = self.replication_fetcher.notify_about_new_put(&key);
let new_keys_to_fetch = self
.replication_fetcher
.notify_about_new_put(key, record_type);
if !new_keys_to_fetch.is_empty() {
self.send_event(NetworkEvent::KeysForReplication(new_keys_to_fetch));
}
Expand All @@ -390,7 +419,8 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.contains(&key);
.contains(&key)
.is_some();
let _ = sender.send(has_key);
}
SwarmCmd::GetAllLocalRecordAddresses { sender } => {
Expand Down
3 changes: 3 additions & 0 deletions sn_networking/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub enum Error {

#[error("Split Record: {0:?}")]
SplitRecord(HashMap<XorName, (Record, HashSet<PeerId>)>),

#[error("Record header is incorrect")]
InCorrectRecordHeader,
}

#[cfg(test)]
Expand Down
10 changes: 6 additions & 4 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ use libp2p::{
};
use sn_protocol::{
messages::{Query, QueryResponse, Request, Response},
storage::{RecordHeader, RecordKind},
storage::{RecordHeader, RecordKind, RecordType},
NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey,
};
use sn_transfers::MainPubkey;
use sn_transfers::NanoTokens;
use std::{collections::HashSet, path::PathBuf};
use std::{collections::HashMap, path::PathBuf};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

Expand Down Expand Up @@ -526,7 +526,9 @@ impl Network {
}

/// Returns the Addresses of all the locally stored Records
pub async fn get_all_local_record_addresses(&self) -> Result<HashSet<NetworkAddress>> {
pub async fn get_all_local_record_addresses(
&self,
) -> Result<HashMap<NetworkAddress, RecordType>> {
let (sender, receiver) = oneshot::channel();
self.send_swarm_cmd(SwarmCmd::GetAllLocalRecordAddresses { sender })?;

Expand All @@ -539,7 +541,7 @@ impl Network {
pub fn add_keys_to_replication_fetcher(
&self,
holder: PeerId,
keys: Vec<NetworkAddress>,
keys: Vec<(NetworkAddress, RecordType)>,
) -> Result<()> {
self.send_swarm_cmd(SwarmCmd::AddKeysToReplicationFetcher { holder, keys })
}
Expand Down

0 comments on commit c90b054

Please sign in to comment.