Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add a feedback when response is successfully sent (#8510)
Browse files Browse the repository at this point in the history
* Add a feedback when response is successfully sent

* Fix gp warp sync
  • Loading branch information
tomaka committed Apr 1, 2021
1 parent 9f5a73f commit b597818
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 2 deletions.
1 change: 1 addition & 0 deletions client/finality-grandpa-warp-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
pending_response.send(OutgoingResponse {
result: Ok(proof.encode()),
reputation_changes: Vec::new(),
sent_feedback: None,
}).map_err(|_| HandleRequestError::SendResponse)
}

Expand Down
1 change: 1 addition & 0 deletions client/network/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl<B: BlockT> BlockRequestHandler<B> {
pending_response.send(OutgoingResponse {
result,
reputation_changes,
sent_feedback: None,
}).map_err(|_| HandleRequestError::SendResponse)
}

Expand Down
14 changes: 12 additions & 2 deletions client/network/src/light_client_requests/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ impl<B: Block> LightClientRequestHandler<B> {

match self.handle_request(peer, payload) {
Ok(response_data) => {
let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() };
let response = OutgoingResponse {
result: Ok(response_data),
reputation_changes: Vec::new(),
sent_feedback: None
};

match pending_response.send(response) {
Ok(()) => debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -110,7 +115,12 @@ impl<B: Block> LightClientRequestHandler<B> {
_ => Vec::new(),
};

let response = OutgoingResponse { result: Err(()), reputation_changes };
let response = OutgoingResponse {
result: Err(()),
reputation_changes,
sent_feedback: None
};

if pending_response.send(response).is_err() {
debug!(
target: LOG_TARGET,
Expand Down
38 changes: 38 additions & 0 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,20 @@ pub struct OutgoingResponse {
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,

/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,

/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
/// peer.
///
/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
/// > outgoing data for each TCP socket, and it is not possible for a user
/// > application to inspect this buffer. This channel here is not actually notified
/// > when the response has been fully sent out, but rather when it has fully been
/// > written to the buffer managed by the operating system.
pub sent_feedback: Option<oneshot::Sender<()>>,
}

/// Event generated by the [`RequestResponsesBehaviour`].
Expand Down Expand Up @@ -240,6 +251,10 @@ pub struct RequestResponsesBehaviour {

/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
pending_responses_arrival_time: HashMap<ProtocolRequestId, Instant>,

/// Whenever a response is received on `pending_responses`, insert a channel to be notified
/// when the request has been sent out.
send_feedback: HashMap<ProtocolRequestId, oneshot::Sender<()>>,
}

/// Generated by the response builder and waiting to be processed.
Expand Down Expand Up @@ -284,6 +299,7 @@ impl RequestResponsesBehaviour {
pending_requests: Default::default(),
pending_responses: Default::default(),
pending_responses_arrival_time: Default::default(),
send_feedback: Default::default(),
})
}

Expand Down Expand Up @@ -463,6 +479,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
response: OutgoingResponse {
result,
reputation_changes,
sent_feedback,
},
} = match outcome {
Some(outcome) => outcome,
Expand All @@ -483,6 +500,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
Dropping response",
request_id, protocol_name,
);
} else {
if let Some(sent_feedback) = sent_feedback {
self.send_feedback.insert(
(protocol_name, request_id).into(),
sent_feedback
);
}
}
}
}
Expand Down Expand Up @@ -668,6 +692,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
self.pending_responses_arrival_time.remove(
&(protocol.clone(), request_id).into(),
);
self.send_feedback.remove(&(protocol.clone(), request_id).into());
let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
Expand All @@ -690,11 +715,18 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
failed; qed.",
);

if let Some(send_feedback) = self.send_feedback.remove(
&(protocol.clone(), request_id).into()
) {
let _ = send_feedback.send(());
}

let out = Event::InboundRequest {
peer,
protocol: protocol.clone(),
result: Ok(arrival_time),
};

return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));

}
Expand Down Expand Up @@ -914,11 +946,14 @@ mod tests {

pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
let (fb_tx, fb_rx) = oneshot::channel();
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: Some(fb_tx),
});
fb_rx.await.unwrap();
}
}.boxed().into()).unwrap();

Expand Down Expand Up @@ -1005,6 +1040,7 @@ mod tests {
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this response exceeds the limit".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
});
}
}.boxed().into()).unwrap();
Expand Down Expand Up @@ -1175,13 +1211,15 @@ mod tests {
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.unwrap();
protocol_2_request.unwrap()
.pending_response
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
sent_feedback: None,
})
.unwrap();
}.boxed().into()).unwrap();
Expand Down

0 comments on commit b597818

Please sign in to comment.