Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Refactor GetRecord query and add tests #97

Merged
merged 19 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ pub enum KademliaEvent {
/// Query ID.
query_id: QueryId,

/// Found record.
record: PeerRecord,
/// Found records.
records: RecordsType,
},

/// `PUT_VALUE` query succeeded.
Expand All @@ -173,6 +173,18 @@ pub enum KademliaEvent {
},
}

/// The type of the DHT records.
#[derive(Debug, Clone)]
pub enum RecordsType {
/// Record was found in the local store.
///
/// This contains only a single result.
LocalStore(Record),

/// Records found in the network.
Network(Vec<PeerRecord>),
}

/// Handle for communicating with the Kademlia protocol.
pub struct KademliaHandle {
/// TX channel for sending commands to `Kademlia`.
Expand Down
33 changes: 28 additions & 5 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
pub use query::QueryId;
pub use record::{Key as RecordKey, PeerRecord, Record};

use self::handle::RecordsType;

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia";

Expand Down Expand Up @@ -636,11 +638,32 @@ impl Kademlia {

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, record } => {
self.store.put(record.record.clone());
QueryAction::GetRecordQueryDone { query_id, records } => {
// Considering this gives a view of all peers and their records, some peers may have
// outdated records. Store only the record which is backed by most
// peers.
let rec = records
.iter()
.map(|peer_record| &peer_record.record)
.fold(HashMap::new(), |mut acc, rec| {
*acc.entry(rec).or_insert(0) += 1;
acc
})
.into_iter()
.max_by_key(|(_, v)| *v)
.map(|(k, _)| k);

if let Some(record) = rec {
self.store.put(record.clone());
}

let _ =
self.event_tx.send(KademliaEvent::GetRecordSuccess { query_id, record }).await;
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess {
query_id,
records: RecordsType::Network(records),
})
.await;
Ok(())
}
QueryAction::QueryFailed { query } => {
Expand Down Expand Up @@ -782,7 +805,7 @@ impl Kademlia {
(Some(record), Quorum::One) => {
let _ = self
.event_tx
.send(KademliaEvent::GetRecordSuccess { query_id, record: PeerRecord { record: record.clone(), peer: None } })
.send(KademliaEvent::GetRecordSuccess { query_id, records: RecordsType::LocalStore(record.clone()) })
.await;
}
(record, _) => {
Expand Down
Loading