Skip to content
Open
212 changes: 190 additions & 22 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,33 @@ const READ_TIMEOUT: Duration = Duration::from_secs(15);
/// Write timeout for outbound messages.
const WRITE_TIMEOUT: Duration = Duration::from_secs(15);

/// Faulure reason.
#[derive(Debug)]
pub enum FailureReason {
/// Substream was closed while reading/writing message to remote peer.
SubstreamClosed,

/// Timeout while reading/writing to substream.
Timeout,
}

/// Query result.
#[derive(Debug)]
pub enum QueryResult {
/// Message was sent to remote peer successfully.
/// This result is only reported for send-only queries. Queries that include reading a
/// response won't report it and will only yield a [`QueryResult::ReadSuccess`].
SendSuccess {
/// Substream.
substream: Substream,
},

/// Failed to send message to remote peer.
SendFailure {
/// Failure reason.
reason: FailureReason,
},

/// Message was read from the remote peer successfully.
ReadSuccess {
/// Substream.
Expand All @@ -55,11 +73,16 @@ pub enum QueryResult {
message: BytesMut,
},

/// Timeout while reading a response from the substream.
Timeout,
/// Failed to read message from remote peer.
ReadFailure {
/// Failure reason.
reason: FailureReason,
},

/// Substream was closed wile reading/writing message to remote peer.
SubstreamClosed,
/// Result that must be treated as send success. This is needed as a workaround to support
/// older litep2p nodes not sending `PUT_VALUE` ACK messages and not reading them.
// TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429.
AssumeSendSuccess,
}

/// Query result.
Expand Down Expand Up @@ -90,24 +113,69 @@ impl QueryExecutor {
}

/// Send message to remote peer.
pub fn send_message(&mut self, peer: PeerId, message: Bytes, mut substream: Substream) {
pub fn send_message(
&mut self,
peer: PeerId,
query_id: Option<QueryId>,
message: Bytes,
mut substream: Substream,
) {
self.futures.push(Box::pin(async move {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) => QueryContext {
peer,
query_id: None,
result: QueryResult::Timeout,
query_id,
result: QueryResult::SendFailure {
reason: FailureReason::Timeout,
},
},
// Writing message to substream failed.
Ok(Err(_)) => QueryContext {
peer,
query_id: None,
result: QueryResult::SubstreamClosed,
query_id,
result: QueryResult::SendFailure {
reason: FailureReason::SubstreamClosed,
},
},
Ok(Ok(())) => QueryContext {
peer,
query_id: None,
query_id,
result: QueryResult::SendSuccess { substream },
},
}
}));
}

/// Send message and ignore sending errors.
///
/// This is a hackish way of dealing with older litep2p nodes not exppecting receiving
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo in exppecting

/// `PUT_VALUE` ACK messages. This should eventually be removed.
// TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429.
pub fn send_message_eat_failure(
&mut self,
peer: PeerId,
query_id: Option<QueryId>,
message: Bytes,
mut substream: Substream,
) {
self.futures.push(Box::pin(async move {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) => QueryContext {
peer,
query_id,
result: QueryResult::AssumeSendSuccess,
},
// Writing message to substream failed.
Ok(Err(_)) => QueryContext {
peer,
query_id,
result: QueryResult::AssumeSendSuccess,
},
Ok(Ok(())) => QueryContext {
peer,
query_id,
result: QueryResult::SendSuccess { substream },
},
}
Expand All @@ -126,7 +194,9 @@ impl QueryExecutor {
Err(_) => QueryContext {
peer,
query_id,
result: QueryResult::Timeout,
result: QueryResult::ReadFailure {
reason: FailureReason::Timeout,
},
},
Ok(Some(Ok(message))) => QueryContext {
peer,
Expand All @@ -136,7 +206,9 @@ impl QueryExecutor {
Ok(None) | Ok(Some(Err(_))) => QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
result: QueryResult::ReadFailure {
reason: FailureReason::SubstreamClosed,
},
},
}
}));
Expand All @@ -157,25 +229,32 @@ impl QueryExecutor {
return QueryContext {
peer,
query_id,
result: QueryResult::Timeout,
result: QueryResult::SendFailure {
reason: FailureReason::Timeout,
},
},
// Writing message to substream failed.
Ok(Err(_)) => {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
result: QueryResult::SendFailure {
reason: FailureReason::SubstreamClosed,
},
};
}
// This will result in either `SendAndReadSuccess` or `SendSuccessReadFailure`.
Ok(Ok(())) => (),
};

match tokio::time::timeout(READ_TIMEOUT, substream.next()).await {
Err(_) => QueryContext {
peer,
query_id,
result: QueryResult::Timeout,
result: QueryResult::ReadFailure {
reason: FailureReason::Timeout,
},
},
Ok(Some(Ok(message))) => QueryContext {
peer,
Expand All @@ -185,11 +264,70 @@ impl QueryExecutor {
Ok(None) | Ok(Some(Err(_))) => QueryContext {
peer,
query_id,
result: QueryResult::SubstreamClosed,
result: QueryResult::ReadFailure {
reason: FailureReason::SubstreamClosed,
},
},
}
}));
}

/// Send request to remote peer and read the response, ignoring it and any read errors.
///
/// This is a hackish way of dealing with older litep2p nodes not sending `PUT_VALUE` ACK
/// messages. This should eventually be removed.
// TODO: remove this as part of https://github.com/paritytech/litep2p/issues/429.
pub fn send_request_eat_response_failure(
&mut self,
peer: PeerId,
query_id: Option<QueryId>,
message: Bytes,
mut substream: Substream,
) {
self.futures.push(Box::pin(async move {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id,
result: QueryResult::SendFailure {
reason: FailureReason::Timeout,
},
},
// Writing message to substream failed.
Ok(Err(_)) => {
let _ = substream.close().await;
return QueryContext {
peer,
query_id,
result: QueryResult::SendFailure {
reason: FailureReason::SubstreamClosed,
},
};
}
// This will result in either `SendAndReadSuccess` or `SendSuccessReadFailure`.
Ok(Ok(())) => (),
};

// Ignore the read result (including errors).
if let Ok(Some(Ok(message))) =
tokio::time::timeout(READ_TIMEOUT, substream.next()).await
{
QueryContext {
peer,
query_id,
result: QueryResult::ReadSuccess { substream, message },
}
} else {
QueryContext {
peer,
query_id,
result: QueryResult::AssumeSendSuccess,
}
}
}));
}
}

impl Stream for QueryExecutor {
Expand Down Expand Up @@ -223,7 +361,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert!(query_id.is_none());
assert!(std::matches!(result, QueryResult::Timeout));
assert!(std::matches!(
result,
QueryResult::ReadFailure {
reason: FailureReason::Timeout
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down Expand Up @@ -252,7 +395,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert_eq!(query_id, Some(QueryId(1338)));
assert!(std::matches!(result, QueryResult::SubstreamClosed));
assert!(std::matches!(
result,
QueryResult::ReadFailure {
reason: FailureReason::SubstreamClosed
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down Expand Up @@ -287,7 +435,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert_eq!(query_id, Some(QueryId(1337)));
assert!(std::matches!(result, QueryResult::SubstreamClosed));
assert!(std::matches!(
result,
QueryResult::ReadFailure {
reason: FailureReason::SubstreamClosed
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down Expand Up @@ -321,7 +474,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert_eq!(query_id, Some(QueryId(1337)));
assert!(std::matches!(result, QueryResult::SubstreamClosed));
assert!(std::matches!(
result,
QueryResult::SendFailure {
reason: FailureReason::SubstreamClosed
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down Expand Up @@ -350,7 +508,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert_eq!(query_id, Some(QueryId(1336)));
assert!(std::matches!(result, QueryResult::Timeout));
assert!(std::matches!(
result,
QueryResult::ReadFailure {
reason: FailureReason::Timeout
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down Expand Up @@ -382,7 +545,12 @@ mod tests {
})) => {
assert_eq!(peer, queried_peer);
assert_eq!(query_id, Some(QueryId(1335)));
assert!(std::matches!(result, QueryResult::SubstreamClosed));
assert!(std::matches!(
result,
QueryResult::ReadFailure {
reason: FailureReason::SubstreamClosed
}
));
}
result => panic!("invalid result received: {result:?}"),
}
Expand Down
Loading