Skip to content

Commit

Permalink
Rework internal rpc protocol handling (sigp#4290)
Browse files Browse the repository at this point in the history
## Issue Addressed

Resolves sigp#3980. Builds on work by @GeemoCandama in sigp#4084 

## Proposed Changes

Extends the `SupportedProtocol` abstraction added in Geemo's PR and attempts to fix internal versioning of requests that are mentioned in this comment sigp#4084 (comment) 

Co-authored-by: geemo <geemo@tutanota.com>
  • Loading branch information
2 people authored and isaac.asimov committed Jul 13, 2023
1 parent 255272a commit b4a5d97
Show file tree
Hide file tree
Showing 17 changed files with 610 additions and 575 deletions.
9 changes: 3 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ mod tests {
let mut buf = BytesMut::new();
buf.extend_from_slice(&message);

let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
Expand Down Expand Up @@ -249,8 +248,7 @@ mod tests {
// Insert length-prefix
uvi_codec.encode(len, &mut dst).unwrap();

let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);
let snappy_protocol_id = ProtocolId::new(SupportedProtocol::StatusV1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context(ForkName::Base));
let mut snappy_outbound_codec = SSZSnappyOutboundCodec::<Spec>::new(
Expand All @@ -277,8 +275,7 @@ mod tests {
dst
}

let protocol_id =
ProtocolId::new(Protocol::BlocksByRange, Version::V1, Encoding::SSZSnappy);
let protocol_id = ProtocolId::new(SupportedProtocol::BlocksByRangeV1, Encoding::SSZSnappy);

// Response limits
let fork_context = Arc::new(fork_context(ForkName::Base));
Expand Down
575 changes: 226 additions & 349 deletions beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ where
while let Some((id, req)) = self.dial_queue.pop() {
self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
}));
}
Expand All @@ -269,7 +269,7 @@ where
}
_ => self.events_out.push(Err(HandlerErr::Outbound {
error: RPCError::Disconnected,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
})),
}
Expand Down Expand Up @@ -334,7 +334,7 @@ where
) {
self.dial_negotiated -= 1;
let (id, request) = request_info;
let proto = request.protocol();
let proto = request.versioned_protocol().protocol();

// accept outbound connections only if the handler is not deactivated
if matches!(self.state, HandlerState::Deactivated) {
Expand Down Expand Up @@ -414,15 +414,15 @@ where
128,
) as usize),
delay_key: Some(delay_key),
protocol: req.protocol(),
protocol: req.versioned_protocol().protocol(),
request_start_time: Instant::now(),
remaining_chunks: expected_responses,
},
);
} else {
self.events_out.push(Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
error: RPCError::HandlerRejected,
}));
return self.shutdown(None);
Expand Down Expand Up @@ -498,7 +498,7 @@ where
};
self.events_out.push(Err(HandlerErr::Outbound {
error,
proto: req.protocol(),
proto: req.versioned_protocol().protocol(),
id,
}));
}
Expand Down Expand Up @@ -895,7 +895,7 @@ where
// else we return an error, stream should not have closed early.
let outbound_err = HandlerErr::Outbound {
id: request_id,
proto: request.protocol(),
proto: request.versioned_protocol().protocol(),
error: RPCError::IncompleteStream,
};
return Poll::Ready(ConnectionHandlerEvent::Custom(Err(outbound_err)));
Expand Down
125 changes: 119 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use regex::bytes::Regex;
use serde::Serialize;
use ssz::Encode;
use ssz_derive::{Decode, Encode};
use ssz_types::{
typenum::{U1024, U256},
VariableList,
};
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
Expand Down Expand Up @@ -85,6 +87,30 @@ pub struct Ping {
pub data: u64,
}

/// The METADATA request structure.
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetadataRequest<T: EthSpec> {
_phantom_data: PhantomData<T>,
}

impl<T: EthSpec> MetadataRequest<T> {
pub fn new_v1() -> Self {
Self::V1(MetadataRequestV1 {
_phantom_data: PhantomData,
})
}

pub fn new_v2() -> Self {
Self::V2(MetadataRequestV2 {
_phantom_data: PhantomData,
})
}
}

/// The METADATA response structure.
#[superstruct(
variants(V1, V2),
Expand All @@ -93,9 +119,8 @@ pub struct Ping {
serde(bound = "T: EthSpec", deny_unknown_fields),
)
)]
#[derive(Clone, Debug, PartialEq, Serialize, Encode)]
#[derive(Clone, Debug, PartialEq, Serialize)]
#[serde(bound = "T: EthSpec")]
#[ssz(enum_behaviour = "transparent")]
pub struct MetaData<T: EthSpec> {
/// A sequential counter indicating when data gets modified.
pub seq_number: u64,
Expand All @@ -106,6 +131,38 @@ pub struct MetaData<T: EthSpec> {
pub syncnets: EnrSyncCommitteeBitfield<T>,
}

impl<T: EthSpec> MetaData<T> {
/// Returns a V1 MetaData response from self.
pub fn metadata_v1(&self) -> Self {
match self {
md @ MetaData::V1(_) => md.clone(),
MetaData::V2(metadata) => MetaData::V1(MetaDataV1 {
seq_number: metadata.seq_number,
attnets: metadata.attnets.clone(),
}),
}
}

/// Returns a V2 MetaData response from self by filling unavailable fields with default.
pub fn metadata_v2(&self) -> Self {
match self {
MetaData::V1(metadata) => MetaData::V2(MetaDataV2 {
seq_number: metadata.seq_number,
attnets: metadata.attnets.clone(),
syncnets: Default::default(),
}),
md @ MetaData::V2(_) => md.clone(),
}
}

pub fn as_ssz_bytes(&self) -> Vec<u8> {
match self {
MetaData::V1(md) => md.as_ssz_bytes(),
MetaData::V2(md) => md.as_ssz_bytes(),
}
}
}

/// The reason given for a `Goodbye` message.
///
/// Note: any unknown `u64::into(n)` will resolve to `Goodbye::Unknown` for any unknown `n`,
Expand Down Expand Up @@ -197,7 +254,11 @@ impl ssz::Decode for GoodbyeReason {
}

/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRangeRequest {
/// The starting slot to request blocks.
pub start_slot: u64,
Expand All @@ -206,8 +267,23 @@ pub struct BlocksByRangeRequest {
pub count: u64,
}

impl BlocksByRangeRequest {
/// The default request is V2
pub fn new(start_slot: u64, count: u64) -> Self {
Self::V2(BlocksByRangeRequestV2 { start_slot, count })
}

pub fn new_v1(start_slot: u64, count: u64) -> Self {
Self::V1(BlocksByRangeRequestV1 { start_slot, count })
}
}

/// Request a number of beacon block roots from a peer.
#[derive(Encode, Decode, Clone, Debug, PartialEq)]
#[superstruct(
variants(V1, V2),
variant_attributes(derive(Encode, Decode, Clone, Debug, PartialEq))
)]
#[derive(Clone, Debug, PartialEq)]
pub struct OldBlocksByRangeRequest {
/// The starting slot to request blocks.
pub start_slot: u64,
Expand All @@ -223,13 +299,43 @@ pub struct OldBlocksByRangeRequest {
pub step: u64,
}

impl OldBlocksByRangeRequest {
/// The default request is V2
pub fn new(start_slot: u64, count: u64, step: u64) -> Self {
Self::V2(OldBlocksByRangeRequestV2 {
start_slot,
count,
step,
})
}

pub fn new_v1(start_slot: u64, count: u64, step: u64) -> Self {
Self::V1(OldBlocksByRangeRequestV1 {
start_slot,
count,
step,
})
}
}

/// Request a number of beacon block bodies from a peer.
#[superstruct(variants(V1, V2), variant_attributes(derive(Clone, Debug, PartialEq)))]
#[derive(Clone, Debug, PartialEq)]
pub struct BlocksByRootRequest {
/// The list of beacon block bodies being requested.
pub block_roots: VariableList<Hash256, MaxRequestBlocks>,
}

impl BlocksByRootRequest {
pub fn new(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
Self::V2(BlocksByRootRequestV2 { block_roots })
}

pub fn new_v1(block_roots: VariableList<Hash256, MaxRequestBlocks>) -> Self {
Self::V1(BlocksByRootRequestV1 { block_roots })
}
}

/* RPC Handling and Grouping */
// Collection of enums and structs used by the Codecs to encode/decode RPC messages

Expand Down Expand Up @@ -438,7 +544,12 @@ impl std::fmt::Display for GoodbyeReason {

impl std::fmt::Display for BlocksByRangeRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count)
write!(
f,
"Start Slot: {}, Count: {}",
self.start_slot(),
self.count()
)
}
}

Expand All @@ -447,7 +558,9 @@ impl std::fmt::Display for OldBlocksByRangeRequest {
write!(
f,
"Start Slot: {}, Count: {}, Step: {}",
self.start_slot, self.count, self.step
self.start_slot(),
self.count(),
self.step()
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
}
Err(RateLimitedErr::TooLarge) => {
// we set the batch sizes, so this is a coding/config err for most protocols
let protocol = req.protocol();
let protocol = req.versioned_protocol().protocol();
if matches!(protocol, Protocol::BlocksByRange) {
debug!(self.log, "Blocks by range request will never be processed"; "request" => %req);
} else {
Expand Down Expand Up @@ -335,7 +335,7 @@ where
serializer.emit_arguments("peer_id", &format_args!("{}", self.peer_id))?;
let (msg_kind, protocol) = match &self.event {
Ok(received) => match received {
RPCReceived::Request(_, req) => ("request", req.protocol()),
RPCReceived::Request(_, req) => ("request", req.versioned_protocol().protocol()),
RPCReceived::Response(_, res) => ("response", res.protocol()),
RPCReceived::EndOfStream(_, end) => (
"end_of_stream",
Expand Down

0 comments on commit b4a5d97

Please sign in to comment.