diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index a3f63448..07bca65d 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -50,7 +50,7 @@ pub enum Quorum { /// One peer must be successfully contacted. One, - /// `N` peer must be successfully contacted. + /// `N` peers must be successfully contacted. N(NonZeroUsize), } @@ -103,6 +103,9 @@ pub enum KademliaCommand { /// Record. record: Record, + /// [`Quorum`] for the query. + quorum: Quorum, + /// Query ID for the query. query_id: QueryId, }, @@ -114,6 +117,9 @@ pub enum KademliaCommand { /// Record. record: Record, + /// [`Quorum`] for the query. + quorum: Quorum, + /// Query ID for the query. query_id: QueryId, @@ -226,8 +232,6 @@ pub enum KademliaEvent { }, /// `PUT_VALUE` query succeeded. - // TODO: https://github.com/paritytech/litep2p/issues/336 - // this is never emitted. Implement + add `AddProviderSuccess`. PutRecordSuccess { /// Query ID. query_id: QueryId, @@ -309,9 +313,16 @@ impl KademliaHandle { } /// Store record to DHT. - pub async fn put_record(&mut self, record: Record) -> QueryId { + pub async fn put_record(&mut self, record: Record, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); - let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await; + let _ = self + .cmd_tx + .send(KademliaCommand::PutRecord { + record, + quorum, + query_id, + }) + .await; query_id } @@ -324,6 +335,7 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, + quorum: Quorum, ) -> QueryId { let query_id = self.next_query_id(); let _ = self @@ -333,6 +345,7 @@ impl KademliaHandle { query_id, peers, update_local_store, + quorum, }) .await; @@ -409,10 +422,14 @@ impl KademliaHandle { } /// Try to initiate `PUT_VALUE` query and if the channel is clogged, return an error. - pub fn try_put_record(&mut self, record: Record) -> Result { + pub fn try_put_record(&mut self, record: Record, quorum: Quorum) -> Result { let query_id = self.next_query_id(); self.cmd_tx - .try_send(KademliaCommand::PutRecord { record, query_id }) + .try_send(KademliaCommand::PutRecord { + record, + query_id, + quorum, + }) .map(|_| query_id) .map_err(|_| ()) } @@ -424,6 +441,7 @@ impl KademliaHandle { record: Record, peers: Vec, update_local_store: bool, + quorum: Quorum, ) -> Result { let query_id = self.next_query_id(); self.cmd_tx @@ -432,6 +450,7 @@ impl KademliaHandle { query_id, peers, update_local_store, + quorum, }) .map(|_| query_id) .map_err(|_| ()) diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index de8665f5..ff2b8f34 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -151,6 +151,26 @@ impl KademliaMessage { } /// Create `PUT_VALUE` response. + pub fn put_value_response(key: RecordKey, value: Vec) -> Bytes { + let message = schema::kademlia::Message { + key: key.to_vec(), + cluster_level_raw: 10, + r#type: schema::kademlia::MessageType::PutValue.into(), + record: Some(schema::kademlia::Record { + key: key.to_vec(), + value, + ..Default::default() + }), + ..Default::default() + }; + + let mut buf = BytesMut::with_capacity(message.encoded_len()); + message.encode(&mut buf).expect("BytesMut to provide needed capacity"); + + buf.freeze() + } + + /// Create `GET_VALUE` response. pub fn get_value_response( key: RecordKey, peers: Vec, diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index fcfb6a88..15df1bec 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -93,7 +93,7 @@ enum PeerAction { SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. - SendPutValue(Bytes), + SendPutValue(QueryId, Bytes), /// Send `ADD_PROVIDER` message to peer. SendAddProvider(Bytes), @@ -368,10 +368,10 @@ impl Kademlia { } } } - Some(PeerAction::SendPutValue(message)) => { + Some(PeerAction::SendPutValue(query, message)) => { tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); - self.executor.send_message(peer, message, substream); + self.executor.send_request_read_response(peer, Some(query), message, substream); } Some(PeerAction::SendAddProvider(message)) => { tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); @@ -472,20 +472,45 @@ impl Kademlia { } } } - KademliaMessage::PutValue { record } => { - tracing::trace!( - target: LOG_TARGET, - ?peer, - record_key = ?record.key, - "handle `PUT_VALUE` message", - ); + KademliaMessage::PutValue { record } => match query_id { + Some(query_id) => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + record_key = ?record.key, + "handle `PUT_VALUE` response", + ); - if let IncomingRecordValidationMode::Automatic = self.validation_mode { - self.store.put(record.clone()); + self.engine.register_response( + query_id, + peer, + KademliaMessage::PutValue { record }, + ); } + None => { + tracing::trace!( + target: LOG_TARGET, + ?peer, + record_key = ?record.key, + "handle `PUT_VALUE` request", + ); - let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; - } + if let IncomingRecordValidationMode::Automatic = self.validation_mode { + self.store.put(record.clone()); + } + + // Send ACK even if the record was/will be filtered out to not reveal any + // internal state. + let message = KademliaMessage::put_value_response( + record.key.clone(), + record.value.clone(), + ); + self.executor.send_message(peer, message, substream); + + let _ = self.event_tx.send(KademliaEvent::IncomingRecord { record }).await; + } + }, KademliaMessage::GetRecord { key, record, peers } => { match (query_id, key) { (Some(query_id), key) => { @@ -749,7 +774,7 @@ impl Kademlia { Ok(()) } Err(err) => { - tracing::trace!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); + tracing::debug!(target: LOG_TARGET, ?query, ?peer, ?err, "Failed to open substream a second time"); Err(err.into()) } } @@ -803,20 +828,27 @@ impl Kademlia { .await; Ok(()) } - QueryAction::PutRecordToFoundNodes { record, peers } => { + QueryAction::PutRecordToFoundNodes { + query, + record, + peers, + quorum, + } => { tracing::trace!( target: LOG_TARGET, + ?query, record_key = ?record.key, num_peers = ?peers.len(), "store record to found peers", ); let key = record.key.clone(); - let message = KademliaMessage::put_value(record); + let message: Bytes = KademliaMessage::put_value(record); - for peer in peers { + for peer in &peers { if let Err(error) = self.open_substream_or_dial( peer.peer, - PeerAction::SendPutValue(message.clone()), + // `message` is cheaply clonable because of `Bytes` reference counting. + PeerAction::SendPutValue(query, message.clone()), None, ) { tracing::debug!( @@ -829,6 +861,25 @@ impl Kademlia { } } + self.engine.start_put_record_to_found_nodes_requests_tracking( + query, + key, + peers.into_iter().map(|peer| peer.peer).collect(), + quorum, + ); + + Ok(()) + } + QueryAction::PutRecordQuerySucceeded { query, key } => { + tracing::debug!(target: LOG_TARGET, ?query, "`PUT_VALUE` query succeeded"); + + let _ = self + .event_tx + .send(KademliaEvent::PutRecordSuccess { + query_id: query, + key, + }) + .await; Ok(()) } QueryAction::AddProviderToFoundNodes { @@ -1015,7 +1066,7 @@ impl Kademlia { .into() ); } - Some(KademliaCommand::PutRecord { mut record, query_id }) => { + Some(KademliaCommand::PutRecord { mut record, quorum, query_id }) => { tracing::debug!( target: LOG_TARGET, query = ?query_id, @@ -1040,6 +1091,7 @@ impl Kademlia { query_id, record, self.routing_table.closest(&key, self.replication_factor).into(), + quorum, ); } Some(KademliaCommand::PutRecordToPeers { @@ -1047,6 +1099,7 @@ impl Kademlia { query_id, peers, update_local_store, + quorum, }) => { tracing::debug!( target: LOG_TARGET, @@ -1082,6 +1135,7 @@ impl Kademlia { query_id, record, peers, + quorum, ); } Some(KademliaCommand::StartProviding { diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index c2679024..3fcd4650 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -37,12 +37,13 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; -use self::find_many_nodes::FindManyNodesContext; +use self::{find_many_nodes::FindManyNodesContext, put_record::PutRecordToFoundNodesContext}; mod find_many_nodes; mod find_node; mod get_providers; mod get_record; +mod put_record; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; @@ -66,6 +67,9 @@ enum QueryType { /// Record that needs to be stored. record: Record, + /// [`Quorum`] that needs to be reached for the query to succeed. + quorum: Quorum, + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -75,10 +79,19 @@ enum QueryType { /// Record that needs to be stored. record: Record, + /// [`Quorum`] that needs to be reached for the query to succeed. + quorum: Quorum, + /// Context for finding peers. context: FindManyNodesContext, }, + /// `PUT_VALUE` message sending phase. + PutRecordToFoundNodes { + /// Context for tracking `PUT_VALUE` responses. + context: PutRecordToFoundNodesContext, + }, + /// `GET_VALUE` query. GetRecord { /// Context for the `GET_VALUE` query. @@ -133,11 +146,26 @@ pub enum QueryAction { /// Store the record to nodes closest to target key. PutRecordToFoundNodes { - /// Target peer. + /// Query ID of the original PUT_RECORD request. + query: QueryId, + + /// Record to store. record: Record, /// Peers for whom the `PUT_VALUE` must be sent to. peers: Vec, + + /// [`Quorum`] that needs to be reached for the query to succeed. + quorum: Quorum, + }, + + /// `PUT_VALUE` query succeeded. + PutRecordQuerySucceeded { + /// ID of the query that succeeded. + query: QueryId, + + /// Record key of the stored record. + key: RecordKey, }, /// Add the provider record to nodes closest to the target key. @@ -264,6 +292,7 @@ impl QueryEngine { query_id: QueryId, record: Record, candidates: VecDeque, + quorum: Quorum, ) -> QueryId { tracing::debug!( target: LOG_TARGET, @@ -286,6 +315,7 @@ impl QueryEngine { query_id, QueryType::PutRecord { record, + quorum, context: FindNodeContext::new(config, candidates), }, ); @@ -299,6 +329,7 @@ impl QueryEngine { query_id: QueryId, record: Record, peers_to_report: Vec, + quorum: Quorum, ) -> QueryId { tracing::debug!( target: LOG_TARGET, @@ -312,6 +343,7 @@ impl QueryEngine { query_id, QueryType::PutRecordToPeers { record, + quorum, context: FindManyNodesContext::new(query_id, peers_to_report), }, ); @@ -428,6 +460,29 @@ impl QueryEngine { query_id } + /// Start `PUT_VALUE` requests tracking. + pub fn start_put_record_to_found_nodes_requests_tracking( + &mut self, + query_id: QueryId, + key: RecordKey, + peers: Vec, + quorum: Quorum, + ) { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + num_peers = ?peers.len(), + "start `PUT_VALUE` responses tracking" + ); + + self.queries.insert( + query_id, + QueryType::PutRecordToFoundNodes { + context: PutRecordToFoundNodesContext::new(query_id, key, peers, quorum), + }, + ); + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -445,6 +500,9 @@ impl QueryEngine { Some(QueryType::PutRecordToPeers { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::PutRecordToFoundNodes { context, .. }) => { + context.register_response_failure(peer); + } Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } @@ -488,6 +546,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::PutRecordToFoundNodes { context, .. }) => match message { + KademliaMessage::PutValue { .. } => { + context.register_response(peer); + } + _ => unreachable!(), + }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => context.register_response(peer, record, peers), @@ -529,6 +593,10 @@ impl QueryEngine { Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetProviders { context }) => context.next_peer_action(peer), + Some(QueryType::PutRecordToFoundNodes { .. }) => { + // All `PUT_VALUE` requests were sent when initiating this query type. + None + } } } @@ -541,13 +609,29 @@ impl QueryEngine { target: context.config.target.into_preimage(), peers: context.responses.into_values().collect::>(), }, - QueryType::PutRecord { record, context } => QueryAction::PutRecordToFoundNodes { + QueryType::PutRecord { + record, + quorum, + context, + } => QueryAction::PutRecordToFoundNodes { + query: context.config.query, record, peers: context.responses.into_values().collect::>(), + quorum, }, - QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes { + QueryType::PutRecordToPeers { + record, + quorum, + context, + } => QueryAction::PutRecordToFoundNodes { + query: context.query, record, peers: context.peers_to_report, + quorum, + }, + QueryType::PutRecordToFoundNodes { context } => QueryAction::PutRecordQuerySucceeded { + query: context.query, + key: context.key, }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.config.query, @@ -587,6 +671,7 @@ impl QueryEngine { QueryType::GetRecord { context } => context.next_action(), QueryType::AddProvider { context, .. } => context.next_action(), QueryType::GetProviders { context } => context.next_action(), + QueryType::PutRecordToFoundNodes { context, .. } => context.next_action(), }; match action { @@ -809,8 +894,9 @@ mod tests { let mut iter = distances.iter(); // start find node with one known peer + let original_query_id = QueryId(1340); let _query = engine.start_put_record( - QueryId(1340), + original_query_id, original_record.clone(), vec![KademliaPeer::new( *iter.next().unwrap().1, @@ -818,6 +904,7 @@ mod tests { ConnectionType::NotConnected, )] .into(), + Quorum::All, ); let action = engine.next_action(); @@ -873,15 +960,49 @@ mod tests { } let peers = match engine.next_action() { - Some(QueryAction::PutRecordToFoundNodes { peers, record }) => { + Some(QueryAction::PutRecordToFoundNodes { + query, + peers, + record, + quorum, + }) => { + assert_eq!(query, original_query_id); assert_eq!(peers.len(), 4); assert_eq!(record.key, original_record.key); assert_eq!(record.value, original_record.value); + assert!(matches!(quorum, Quorum::All)); + peers } _ => panic!("invalid event received"), }; + engine.start_put_record_to_found_nodes_requests_tracking( + original_query_id, + record_key.clone(), + peers.iter().map(|p| p.peer).collect(), + Quorum::All, + ); + + // Receive ACKs for PUT_VALUE requests. + for peer in &peers { + engine.register_response( + original_query_id, + peer.peer, + KademliaMessage::PutValue { + record: original_record.clone(), + }, + ); + } + + match engine.next_action() { + Some(QueryAction::PutRecordQuerySucceeded { query, key }) => { + assert_eq!(query, original_query_id); + assert_eq!(key, record_key); + } + _ => panic!("invalid event received"), + } + assert!(engine.next_action().is_none()); // get records from those peers. diff --git a/src/protocol/libp2p/kademlia/query/put_record.rs b/src/protocol/libp2p/kademlia/query/put_record.rs new file mode 100644 index 00000000..1c9c9e06 --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/put_record.rs @@ -0,0 +1,130 @@ +// Copyright 2025 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. +use crate::{ + protocol::libp2p::kademlia::{handle::Quorum, query::QueryAction, QueryId, RecordKey}, + PeerId, +}; + +use std::{cmp, collections::HashSet}; + +/// Logging target for this file. +const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::put_record"; + +/// Context for tracking `PUT_VALUE` responses from peers. +#[derive(Debug)] +pub struct PutRecordToFoundNodesContext { + /// Query ID. + pub query: QueryId, + + /// Record key. + pub key: RecordKey, + + /// Quorum that needs to be reached for the query to succeed. + peers_to_succeed: usize, + + /// Peers we're waiting for responses from. + pending_peers: HashSet, + + /// Number of successfully responded peers. + n_succeeded: usize, +} + +impl PutRecordToFoundNodesContext { + /// Create new [`PutRecordToFoundNodesContext`]. + pub fn new(query: QueryId, key: RecordKey, peers: Vec, quorum: Quorum) -> Self { + Self { + query, + key, + peers_to_succeed: match quorum { + Quorum::One => 1, + // Clamp by the number of discovered peers. This should ever be relevant on + // small networks with fewer peers than the replication factor. Without such + // clamping the query would always fail in small testnets. + Quorum::N(n) => cmp::min(n.get(), cmp::max(peers.len(), 1)), + Quorum::All => cmp::max(peers.len(), 1), + }, + pending_peers: peers.into_iter().collect(), + n_succeeded: 0, + } + } + + /// Register successful response from peer. + pub fn register_response(&mut self, peer: PeerId) { + if self.pending_peers.remove(&peer) { + self.n_succeeded += 1; + + tracing::trace!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "successful `PUT_VALUE` to peer", + ); + } else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "`PutRecordToFoundNodesContext::register_response`: pending peer does not exist", + ); + } + } + + /// Register failed response from peer. + pub fn register_response_failure(&mut self, peer: PeerId) { + if self.pending_peers.remove(&peer) { + tracing::trace!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "failed `PUT_VALUE` to peer", + ); + } else { + tracing::debug!( + target: LOG_TARGET, + query = ?self.query, + ?peer, + "`PutRecordToFoundNodesContext::register_response_failure`: pending peer does not exist", + ); + } + } + + /// Check if all responses have been received. + pub fn is_finished(&self) -> bool { + self.pending_peers.is_empty() + } + + /// Check if all requests were successful. + pub fn is_succeded(&self) -> bool { + self.n_succeeded >= self.peers_to_succeed + } + + /// Get next action if the context is finished. + pub fn next_action(&self) -> Option { + if self.is_finished() { + if self.is_succeded() { + Some(QueryAction::QuerySucceeded { query: self.query }) + } else { + Some(QueryAction::QueryFailed { query: self.query }) + } + } else { + None + } + } +} diff --git a/tests/conformance/rust/kademlia.rs b/tests/conformance/rust/kademlia.rs index 84761914..2b01a44d 100644 --- a/tests/conformance/rust/kademlia.rs +++ b/tests/conformance/rust/kademlia.rs @@ -284,7 +284,7 @@ async fn put_record() { let record_key = RecordKey::new(&vec![1, 2, 3, 4]); let record = Record::new(record_key, vec![1, 3, 3, 7, 1, 3, 3, 8]); - let _ = kad_handle.put_record(record).await; + let _ = kad_handle.put_record(record, Quorum::All).await; loop { tokio::time::sleep(std::time::Duration::from_secs(1)).await; diff --git a/tests/protocol/kademlia.rs b/tests/protocol/kademlia.rs index b90cd31d..242ce670 100644 --- a/tests/protocol/kademlia.rs +++ b/tests/protocol/kademlia.rs @@ -164,7 +164,7 @@ async fn records_are_stored_automatically() { // Publish the record. let record = Record::new(vec![1, 2, 3], vec![0x01]); - kad_handle1.put_record(record.clone()).await; + let query_id = kad_handle1.put_record(record.clone(), Quorum::All).await; let mut records = Vec::new(); loop { @@ -174,7 +174,19 @@ async fn records_are_stored_automatically() { } _ = litep2p1.next_event() => {} _ = litep2p2.next_event() => {} - _ = kad_handle1.next() => {} + event = kad_handle1.next() => { + match event { + Some(KademliaEvent::PutRecordSuccess { query_id: got_query_id, key }) => { + assert_eq!(got_query_id, query_id); + assert_eq!(key, record.key); + + // Check if the record was stored. + let _ = kad_handle2 + .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; + } + _ => {} + } + } event = kad_handle2.next() => { match event { Some(KademliaEvent::IncomingRecord { record: got_record }) => { @@ -182,10 +194,6 @@ async fn records_are_stored_automatically() { assert_eq!(got_record.value, record.value); assert_eq!(got_record.publisher.unwrap(), *litep2p1.local_peer_id()); assert!(got_record.expires.is_some()); - - // Check if the record was stored. - let _ = kad_handle2 - .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; } Some(KademliaEvent::GetRecordPartialResult { query_id: _, record }) => { records.push(record); @@ -245,8 +253,10 @@ async fn records_are_stored_manually() { // Publish the record. let mut record = Record::new(vec![1, 2, 3], vec![0x01]); - kad_handle1.put_record(record.clone()).await; + let query_id = kad_handle1.put_record(record.clone(), Quorum::All).await; let mut records = Vec::new(); + let mut put_record_success = false; + let mut get_record_success = false; loop { tokio::select! { @@ -255,7 +265,23 @@ async fn records_are_stored_manually() { } _ = litep2p1.next_event() => {} _ = litep2p2.next_event() => {} - _ = kad_handle1.next() => {} + event = kad_handle1.next() => { + match event { + Some(KademliaEvent::PutRecordSuccess { query_id: got_query_id, key }) => { + assert_eq!(got_query_id, query_id); + assert_eq!(key, record.key); + + // Due to manual validation, the record will be stored later, so we request + // it in `kad_handle2` after receiving the incoming record + put_record_success = true; + + if get_record_success { + break; + } + } + _ => {} + } + } event = kad_handle2.next() => { match event { Some(KademliaEvent::IncomingRecord { record: got_record }) => { @@ -282,7 +308,12 @@ async fn records_are_stored_manually() { assert_eq!(got_record.record.value, record.value); assert_eq!(got_record.record.publisher.unwrap(), *litep2p1.local_peer_id()); assert!(got_record.record.expires.is_some()); - break + + get_record_success = true; + + if put_record_success { + break; + } } _ => {} } @@ -328,9 +359,12 @@ async fn not_validated_records_are_not_stored() { // Publish the record. let record = Record::new(vec![1, 2, 3], vec![0x01]); - kad_handle1.put_record(record.clone()).await; + let query_id = kad_handle1.put_record(record.clone(), Quorum::All).await; let mut records = Vec::new(); let mut get_record_query_id = None; + let mut put_record_success = false; + let mut get_record_success = false; + let mut query_failed = false; loop { tokio::select! { @@ -339,7 +373,21 @@ async fn not_validated_records_are_not_stored() { } event = litep2p1.next_event() => {} event = litep2p2.next_event() => {} - event = kad_handle1.next() => {} + event = kad_handle1.next() => { + match event { + Some(KademliaEvent::PutRecordSuccess { query_id: got_query_id, key }) => { + assert_eq!(got_query_id, query_id); + assert_eq!(key, record.key); + + put_record_success = true; + + if get_record_success || query_failed { + break; + } + } + _ => {} + } + } event = kad_handle2.next() => { match event { Some(KademliaEvent::IncomingRecord { record: got_record }) => { @@ -354,19 +402,30 @@ async fn not_validated_records_are_not_stored() { .get_record(RecordKey::from(vec![1, 2, 3]), Quorum::One).await; get_record_query_id = Some(query_id); } - Some(KademliaEvent::GetRecordPartialResult { query_id: _, record }) => { + Some(KademliaEvent::GetRecordPartialResult { query_id, record }) => { + assert_eq!(query_id, get_record_query_id.unwrap()); records.push(record); } Some(KademliaEvent::GetRecordSuccess { query_id: _ }) => { assert_eq!(records.len(), 1); let got_record = records.first().unwrap(); - // The record was not stored. - assert_ne!(got_record.peer, *litep2p1.local_peer_id()); - break + // The record was not stored at litep2p2. + assert_eq!(got_record.peer, *litep2p1.local_peer_id()); + + get_record_success = true; + + if put_record_success { + break + } } Some(KademliaEvent::QueryFailed { query_id }) => { assert_eq!(query_id, get_record_query_id.unwrap()); - break + + query_failed = true; + + if put_record_success { + break + } } _ => {} } @@ -405,7 +464,7 @@ async fn get_record_retrieves_remote_records() { // Store the record on `litep2p1``. let original_record = Record::new(vec![1, 2, 3], vec![0x01]); - let query1 = kad_handle1.put_record(original_record.clone()).await; + let query1 = kad_handle1.put_record(original_record.clone(), Quorum::All).await; let mut records = Vec::new(); let mut query2 = None; @@ -501,11 +560,12 @@ async fn get_record_retrieves_local_and_remote_records() { // Store the record on `litep2p1``. let original_record = Record::new(vec![1, 2, 3], vec![0x01]); - let query1 = kad_handle1.put_record(original_record.clone()).await; + let query1 = kad_handle1.put_record(original_record.clone(), Quorum::All).await; let (mut peer1_stored, mut peer2_stored) = (false, false); let mut query3 = None; let mut records = Vec::new(); + let mut put_record_success = false; loop { tokio::select! { @@ -514,7 +574,19 @@ async fn get_record_retrieves_local_and_remote_records() { } event = litep2p1.next_event() => {} event = litep2p2.next_event() => {} - event = kad_handle1.next() => {} + event = kad_handle1.next() => { + match event { + Some(KademliaEvent::PutRecordSuccess { query_id: got_query_id, key }) => { + assert_eq!(got_query_id, query1); + assert_eq!(key, original_record.key); + + // Due to manual validation, the record will be stored later, so we request + // it in `kad_handle2` after receiving the incoming record + put_record_success = true; + } + _ => {} + } + } event = kad_handle2.next() => { match event { Some(KademliaEvent::IncomingRecord { record: got_record }) => { @@ -558,6 +630,11 @@ async fn get_record_retrieves_local_and_remote_records() { } } } + + assert!( + put_record_success, + "Publisher was not notified that the record was received", + ); } #[tokio::test]