Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub enum Quorum {
/// One peer must be successfully contacted.
One,

/// `N` peer must be successfully contacted.
/// `N` peers must be successfully contacted.
N(NonZeroUsize),
}

Expand Down Expand Up @@ -103,6 +103,9 @@ pub enum KademliaCommand {
/// Record.
record: Record,

/// [`Quorum`] for the query.
quorum: Quorum,

/// Query ID for the query.
query_id: QueryId,
},
Expand All @@ -114,6 +117,9 @@ pub enum KademliaCommand {
/// Record.
record: Record,

/// [`Quorum`] for the query.
quorum: Quorum,

/// Query ID for the query.
query_id: QueryId,

Expand Down Expand Up @@ -226,8 +232,6 @@ pub enum KademliaEvent {
},

/// `PUT_VALUE` query succeeded.
// TODO: https://github.com/paritytech/litep2p/issues/336
// this is never emitted. Implement + add `AddProviderSuccess`.
PutRecordSuccess {
/// Query ID.
query_id: QueryId,
Expand Down Expand Up @@ -309,9 +313,16 @@ impl KademliaHandle {
}

/// Store record to DHT.
pub async fn put_record(&mut self, record: Record) -> QueryId {
pub async fn put_record(&mut self, record: Record, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
quorum,
query_id,
})
.await;

query_id
}
Expand All @@ -324,6 +335,7 @@ impl KademliaHandle {
record: Record,
peers: Vec<PeerId>,
update_local_store: bool,
quorum: Quorum,
) -> QueryId {
let query_id = self.next_query_id();
let _ = self
Expand All @@ -333,6 +345,7 @@ impl KademliaHandle {
query_id,
peers,
update_local_store,
quorum,
})
.await;

Expand Down Expand Up @@ -409,10 +422,14 @@ impl KademliaHandle {
}

/// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error.
pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
pub fn try_put_record(&mut self, record: Record, quorum: Quorum) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord { record, query_id })
.try_send(KademliaCommand::PutRecord {
record,
query_id,
quorum,
})
.map(|_| query_id)
.map_err(|_| ())
}
Expand All @@ -424,6 +441,7 @@ impl KademliaHandle {
record: Record,
peers: Vec<PeerId>,
update_local_store: bool,
quorum: Quorum,
) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
Expand All @@ -432,6 +450,7 @@ impl KademliaHandle {
query_id,
peers,
update_local_store,
quorum,
})
.map(|_| query_id)
.map_err(|_| ())
Expand Down
20 changes: 20 additions & 0 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,26 @@ impl KademliaMessage {
}

/// Create `PUT_VALUE` response.
pub fn put_value_response(key: RecordKey, value: Vec<u8>) -> Bytes {
let message = schema::kademlia::Message {
key: key.to_vec(),
cluster_level_raw: 10,
r#type: schema::kademlia::MessageType::PutValue.into(),
record: Some(schema::kademlia::Record {
key: key.to_vec(),
value,
..Default::default()
}),
..Default::default()
};

let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("BytesMut to provide needed capacity");

buf.freeze()
}

/// Create `GET_VALUE` response.
pub fn get_value_response(
key: RecordKey,
peers: Vec<KademliaPeer>,
Expand Down
94 changes: 74 additions & 20 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ enum PeerAction {
SendFindNode(QueryId),

/// Send `PUT_VALUE` message to peer.
SendPutValue(Bytes),
SendPutValue(QueryId, Bytes),

/// Send `ADD_PROVIDER` message to peer.
SendAddProvider(Bytes),
Expand Down Expand Up @@ -368,10 +368,10 @@ impl Kademlia {
}
}
}
Some(PeerAction::SendPutValue(message)) => {
Some(PeerAction::SendPutValue(query, message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message");

self.executor.send_message(peer, message, substream);
self.executor.send_request_read_response(peer, Some(query), message, substream);
}
Some(PeerAction::SendAddProvider(message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message");
Expand Down Expand Up @@ -472,20 +472,45 @@ impl Kademlia {
}
}
}
KademliaMessage::PutValue { record } => {
tracing::trace!(
target: LOG_TARGET,
?peer,
record_key = ?record.key,
"handle `PUT_VALUE` message",
);
KademliaMessage::PutValue { record } => match query_id {
Some(query_id) => {
tracing::trace!(
target: LOG_TARGET,
?peer,
query = ?query_id,
record_key = ?record.key,
"handle `PUT_VALUE` response",
);

if let IncomingRecordValidationMode::Automatic = self.validation_mode {
self.store.put(record.clone());
self.engine.register_response(
query_id,
peer,
KademliaMessage::PutValue { record },
);
}
None => {
tracing::trace!(
target: LOG_TARGET,
?peer,
record_key = ?record.key,
"handle `PUT_VALUE` request",
);

let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
}
if let IncomingRecordValidationMode::Automatic = self.validation_mode {
self.store.put(record.clone());
}

// Send ACK even if the record was/will be filtered out to not reveal any
// internal state.
let message = KademliaMessage::put_value_response(
record.key.clone(),
record.value.clone(),
);
self.executor.send_message(peer, message, substream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dq: Is libp2p sending the ACK as well? Im wondering if this is really needed, since we clone the record here and use some bandwidth?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is what libp2p does. I don't understand either, why the whole message is sent back instead of a small ACK message.

Due to these ACKs not currentty being sent by litep2p we will likely need a two-stage upgrade process: first start sending out ACKs and use some alternative mechanism for determining if the record was received (not relying on ACKs being received), and after most of the network upgrades switch to using received ACKs. I am going to implement "stage 1" in a follow-up PR.


let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await;
}
},
KademliaMessage::GetRecord { key, record, peers } => {
match (query_id, key) {
(Some(query_id), key) => {
Expand Down Expand Up @@ -749,7 +774,7 @@ impl Kademlia {
Ok(())
}
Err(err) => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
tracing::debug!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time");
Err(err.into())
}
}
Expand Down Expand Up @@ -803,20 +828,27 @@ impl Kademlia {
.await;
Ok(())
}
QueryAction::PutRecordToFoundNodes { record, peers } => {
QueryAction::PutRecordToFoundNodes {
query,
record,
peers,
quorum,
} => {
tracing::trace!(
target: LOG_TARGET,
?query,
record_key = ?record.key,
num_peers = ?peers.len(),
"store record to found peers",
);
let key = record.key.clone();
let message = KademliaMessage::put_value(record);
let message: Bytes = KademliaMessage::put_value(record);

for peer in peers {
for peer in &peers {
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendPutValue(message.clone()),
// `message` is cheaply clonable because of `Bytes` reference counting.
PeerAction::SendPutValue(query, message.clone()),
None,
) {
tracing::debug!(
Expand All @@ -829,6 +861,25 @@ impl Kademlia {
}
}

self.engine.start_put_record_to_found_nodes_requests_tracking(
query,
key,
peers.into_iter().map(|peer| peer.peer).collect(),
quorum,
);

Ok(())
}
QueryAction::PutRecordQuerySucceeded { query, key } => {
tracing::debug!(target: LOG_TARGET, ?query, "`PUT_VALUE` query succeeded");

let _ = self
.event_tx
.send(KademliaEvent::PutRecordSuccess {
query_id: query,
key,
})
.await;
Ok(())
}
QueryAction::AddProviderToFoundNodes {
Expand Down Expand Up @@ -1015,7 +1066,7 @@ impl Kademlia {
.into()
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
Some(KademliaCommand::PutRecord { mut record, quorum, query_id }) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
Expand All @@ -1040,13 +1091,15 @@ impl Kademlia {
query_id,
record,
self.routing_table.closest(&key, self.replication_factor).into(),
quorum,
);
}
Some(KademliaCommand::PutRecordToPeers {
mut record,
query_id,
peers,
update_local_store,
quorum,
}) => {
tracing::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -1082,6 +1135,7 @@ impl Kademlia {
query_id,
record,
peers,
quorum,
);
}
Some(KademliaCommand::StartProviding {
Expand Down
Loading
Loading