Skip to content

Commit

Permalink
Network altair rpc (#2301)
Browse files Browse the repository at this point in the history
* Add v2 messages to rpc decoder

* Ugly hack

* Pass chainspec and genesis_root to Rpc

* Add context bytes while encoding

* Add a ForkContext struct

* Pass ForkContext to rpc

* crate compiles

* Extract ForkContext into separate file; add a current_fork field

* Fix encoding/decoding

* Fix tests

* Remove fork_schedule from rebase

* Fix ForkContext

* Fix tests

* Remove fork_schedule again

* Add altair empty and full block limits

* Fix panic in snappy decoding

* Fix limits

* Move wrapping of RPCRequests to handler

* RpcRequestContainer only used in OutboundUpgrade

* Add altair blocks in rpc end to end tests

* same rpc limits for V1 and V2

* V2 response decoding happens only for valid protocols

* Add snappy response decoding tests

* Add more snappy tests

* Minor fixes

* Appease clippy

* to_context_bytes returns an Option

* Add padding snappy message test for v2

* Minor fixes; remove accidentally added file

* lint
  • Loading branch information
pawanjay176 committed Jun 18, 2021
1 parent f2b0e39 commit 88e0227
Show file tree
Hide file tree
Showing 17 changed files with 1,070 additions and 297 deletions.
15 changes: 11 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3069,14 +3069,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// therefore use the genesis slot.
let slot = self.slot().unwrap_or(self.spec.genesis_slot);

self.spec.enr_fork_id(slot, self.genesis_validators_root)
self.spec
.enr_fork_id::<T::EthSpec>(slot, self.genesis_validators_root)
}

/// Calculates the `Duration` to the next fork, if one exists.
pub fn duration_to_next_fork(&self) -> Option<Duration> {
let epoch = self.spec.next_fork_epoch()?;
/// Calculates the `Duration` to the next fork if it exists and returns it
/// with it's corresponding `ForkName`.
pub fn duration_to_next_fork(&self) -> Option<(ForkName, Duration)> {
// If we are unable to read the slot clock we assume that it is prior to genesis and
// therefore use the genesis slot.
let slot = self.slot().unwrap_or(self.spec.genesis_slot);

let (fork_name, epoch) = self.spec.next_fork_epoch::<T::EthSpec>(slot)?;
self.slot_clock
.duration_to_slot(epoch.start_slot(T::EthSpec::slots_per_epoch()))
.map(|duration| (fork_name, duration))
}

pub fn dump_as_dot<W: Write>(&self, output: &mut W) {
Expand Down
13 changes: 3 additions & 10 deletions beacon_node/eth2_libp2p/src/behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use types::{ChainSpec, EnrForkId, EthSpec, Hash256, SignedBeaconBlock, Slot, SubnetId};
use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SignedBeaconBlock, Slot, SubnetId};

mod gossipsub_scoring_parameters;
mod handler;
Expand Down Expand Up @@ -136,11 +136,6 @@ pub struct Behaviour<TSpec: EthSpec> {

score_settings: PeerScoreSettings<TSpec>,

spec: ChainSpec,

/// The genesis root for the eth2 network
genesis_validators_root: Hash256,

/// The interval for updating gossipsub scores
update_gossipsub_scores: tokio::time::Interval,
}
Expand All @@ -152,7 +147,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
net_conf: &NetworkConfig,
network_globals: Arc<NetworkGlobals<TSpec>>,
log: &slog::Logger,
genesis_validators_root: Hash256,
fork_context: Arc<ForkContext>,
chain_spec: &ChainSpec,
) -> error::Result<Self> {
let behaviour_log = log.new(o!());
Expand Down Expand Up @@ -225,7 +220,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
.expect("Valid score params and thresholds");

Ok(Behaviour {
eth2_rpc: RPC::new(log.clone()),
eth2_rpc: RPC::new(fork_context, log.clone()),
gossipsub,
identify,
peer_manager: PeerManager::new(local_key, net_conf, network_globals.clone(), log)
Expand All @@ -238,9 +233,7 @@ impl<TSpec: EthSpec> Behaviour<TSpec> {
network_dir: net_conf.network_dir.clone(),
log: behaviour_log,
score_settings,
spec: chain_spec.clone(),
update_gossipsub_scores,
genesis_validators_root,
})
}

Expand Down
99 changes: 29 additions & 70 deletions beacon_node/eth2_libp2p/src/rpc/codec/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,18 @@ where
mod tests {
use super::super::ssz_snappy::*;
use super::*;
use crate::rpc::methods::StatusMessage;
use crate::rpc::protocol::*;
use snap::write::FrameEncoder;
use ssz::Encode;
use std::io::Write;
use types::{Epoch, Hash256, Slot};

use std::sync::Arc;
use types::{ForkContext, Hash256};
use unsigned_varint::codec::Uvi;

type Spec = types::MainnetEthSpec;

fn fork_context() -> ForkContext {
ForkContext::new(Hash256::zero(), &Spec::default_spec())
}

#[test]
fn test_decode_status_message() {
let message = hex::decode("0054ff060000734e615070590032000006e71e7b54989925efd6c9cbcb8ceb9b5f71216f5137282bf6a1e3b50f64e42d6c7fb347abe07eb0db8200000005029e2800").unwrap();
Expand All @@ -200,8 +202,9 @@ mod tests {
let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context());
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);

// remove response code
let mut snappy_buf = buf.clone();
Expand Down Expand Up @@ -233,8 +236,10 @@ mod tests {

let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);

let fork_context = Arc::new(fork_context());
let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576, fork_context);

let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();

Expand All @@ -260,80 +265,34 @@ mod tests {
// Response limits
let limit = protocol_id.rpc_response_limits::<Spec>();
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
let fork_context = Arc::new(fork_context());
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);

let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);

// Request limits
let limit = protocol_id.rpc_request_limits();
let mut max = encode_len(limit.max + 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id.clone(), 1_048_576);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(
protocol_id.clone(),
1_048_576,
fork_context.clone(),
);
assert_eq!(codec.decode(&mut max).unwrap_err(), RPCError::InvalidData);

let mut min = encode_len(limit.min - 1);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id, 1_048_576);
let mut codec = SSZSnappyOutboundCodec::<Spec>::new(protocol_id, 1_048_576, fork_context);
assert_eq!(codec.decode(&mut min).unwrap_err(), RPCError::InvalidData);
}

#[test]
fn test_decode_malicious_status_message() {
// 10 byte snappy stream identifier
let stream_identifier: &'static [u8] = b"\xFF\x06\x00\x00sNaPpY";

assert_eq!(stream_identifier.len(), 10);

// byte 0(0xFE) is padding chunk type identifier for snappy messages
// byte 1,2,3 are chunk length (little endian)
let malicious_padding: &'static [u8] = b"\xFE\x00\x00\x00";

// Status message is 84 bytes uncompressed. `max_compressed_len` is 32 + 84 + 84/6 = 130.
let status_message_bytes = StatusMessage {
fork_digest: [0; 4],
finalized_root: Hash256::from_low_u64_be(0),
finalized_epoch: Epoch::new(1),
head_root: Hash256::from_low_u64_be(0),
head_slot: Slot::new(1),
}
.as_ssz_bytes();

assert_eq!(status_message_bytes.len(), 84);
assert_eq!(snap::raw::max_compress_len(status_message_bytes.len()), 130);

let mut uvi_codec: Uvi<usize> = Uvi::default();
let mut dst = BytesMut::with_capacity(1024);

// Insert length-prefix
uvi_codec
.encode(status_message_bytes.len(), &mut dst)
.unwrap();

// Insert snappy stream identifier
dst.extend_from_slice(stream_identifier);

// Insert malicious padding of 80 bytes.
for _ in 0..20 {
dst.extend_from_slice(malicious_padding);
}

// Insert payload (42 bytes compressed)
let mut writer = FrameEncoder::new(Vec::new());
writer.write_all(&status_message_bytes).unwrap();
writer.flush().unwrap();
assert_eq!(writer.get_ref().len(), 42);
dst.extend_from_slice(writer.get_ref());

// 10 (for stream identifier) + 80 + 42 = 132 > `max_compressed_len`. Hence, decoding should fail with `InvalidData`.

let snappy_protocol_id =
ProtocolId::new(Protocol::Status, Version::V1, Encoding::SSZSnappy);

let mut snappy_outbound_codec =
SSZSnappyOutboundCodec::<Spec>::new(snappy_protocol_id, 1_048_576);

let snappy_decoded_message = snappy_outbound_codec.decode(&mut dst).unwrap_err();
assert_eq!(snappy_decoded_message, RPCError::InvalidData);
}
}
Loading

0 comments on commit 88e0227

Please sign in to comment.