Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 89 additions & 31 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ pub fn serve<T: BeaconChainTypes>(
return Ok(api_types::GenericResponse::from(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
address,
last_seen_p2p_address: address,
direction: api_types::PeerDirection::from_connection_direction(
&dir,
),
Expand All @@ -1375,47 +1375,104 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path("node"))
.and(warp::path("peers"))
.and(warp::path::end())
.and(warp::query::<api_types::PeersQuery>())
.and(network_globals.clone())
.and_then(
|query: api_types::PeersQuery, network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_json_task(move || {
let mut peers: Vec<api_types::PeerData> = Vec::new();
network_globals
.peers
.read()
.peers()
.for_each(|(peer_id, peer_info)| {
let address =
if let Some(socket_addr) = peer_info.seen_addresses.iter().next() {
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(
socket_addr.port(),
));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
addr.to_string()
} else {
String::new()
};

// the eth2 API spec implies only peers we have been connected to at some point should be included.
if let Some(dir) = peer_info.connection_direction.as_ref() {
let direction =
api_types::PeerDirection::from_connection_direction(&dir);
let state = api_types::PeerState::from_peer_connection_status(
&peer_info.connection_status(),
);

let state_matches = query.state.as_ref().map_or(true, |states| {
states.0.iter().any(|state_param| *state_param == state)
});
let direction_matches =
query.direction.as_ref().map_or(true, |directions| {
directions.0.iter().any(|dir_param| *dir_param == direction)
});

if state_matches && direction_matches {
peers.push(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
last_seen_p2p_address: address,
direction,
state,
});
}
}
});
Ok(api_types::PeersData {
meta: api_types::PeersMetaData {
count: peers.len() as u64,
},
data: peers,
})
})
},
);

// GET node/peer_count
let get_node_peer_count = eth1_v1
.and(warp::path("node"))
.and(warp::path("peer_count"))
.and(warp::path::end())
.and(network_globals.clone())
.and_then(|network_globals: Arc<NetworkGlobals<T::EthSpec>>| {
blocking_json_task(move || {
let mut peers: Vec<api_types::PeerData> = Vec::new();
let mut connected: u64 = 0;
let mut connecting: u64 = 0;
let mut disconnected: u64 = 0;
let mut disconnecting: u64 = 0;

network_globals
.peers
.read()
.peers()
// the eth2 API spec implies only peers we have been connected to at some point should be included.
.filter(|(_, peer_info)| peer_info.connection_direction.is_some())
.for_each(|(peer_id, peer_info)| {
let address = if let Some(socket_addr) =
peer_info.seen_addresses.iter().next()
{
let mut addr = eth2_libp2p::Multiaddr::from(socket_addr.ip());
addr.push(eth2_libp2p::multiaddr::Protocol::Tcp(socket_addr.port()));
addr.to_string()
} else if let Some(addr) = peer_info.listening_addresses.first() {
addr.to_string()
} else {
String::new()
};

if let Some(dir) = peer_info.connection_direction.as_ref() {
peers.push(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr.as_ref().map(|enr| enr.to_base64()),
address,
direction: api_types::PeerDirection::from_connection_direction(
&dir,
),
state: api_types::PeerState::from_peer_connection_status(
&peer_info.connection_status(),
),
});
.for_each(|(_, peer_info)| {
let state = api_types::PeerState::from_peer_connection_status(
&peer_info.connection_status(),
);
match state {
api_types::PeerState::Connected => connected += 1,
api_types::PeerState::Connecting => connecting += 1,
api_types::PeerState::Disconnected => disconnected += 1,
api_types::PeerState::Disconnecting => disconnecting += 1,
}
});
Ok(api_types::GenericResponse::from(peers))

Ok(api_types::GenericResponse::from(api_types::PeerCount {
disconnecting,
connecting,
connected,
disconnected,
}))
})
});

/*
* validator
*/
Expand Down Expand Up @@ -2076,6 +2133,7 @@ pub fn serve<T: BeaconChainTypes>(
.or(get_node_health.boxed())
.or(get_node_peers_by_id.boxed())
.or(get_node_peers.boxed())
.or(get_node_peer_count.boxed())
.or(get_validator_duties_proposer.boxed())
.or(get_validator_blocks.boxed())
.or(get_validator_attestation_data.boxed())
Expand Down
70 changes: 60 additions & 10 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ impl ApiTester {
let expected = PeerData {
peer_id: self.external_peer_id.to_string(),
enr: None,
address: EXTERNAL_ADDR.to_string(),
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected,
direction: PeerDirection::Inbound,
};
Expand All @@ -1189,18 +1189,66 @@ impl ApiTester {
}

pub async fn test_get_node_peers(self) -> Self {
let result = self.client.get_node_peers().await.unwrap().data;
let peer_states: Vec<Option<&[PeerState]>> = vec![
Some(&[PeerState::Connected]),
Some(&[PeerState::Connecting]),
Some(&[PeerState::Disconnected]),
Some(&[PeerState::Disconnecting]),
None,
Some(&[PeerState::Connected, PeerState::Connecting]),
];
let peer_dirs: Vec<Option<&[PeerDirection]>> = vec![
Some(&[PeerDirection::Outbound]),
Some(&[PeerDirection::Inbound]),
Some(&[PeerDirection::Inbound, PeerDirection::Outbound]),
None,
];

let expected = PeerData {
peer_id: self.external_peer_id.to_string(),
enr: None,
address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected,
direction: PeerDirection::Inbound,
};
for states in peer_states {
for dirs in peer_dirs.clone() {
let result = self.client.get_node_peers(states, dirs).await.unwrap();
let expected_peer = PeerData {
peer_id: self.external_peer_id.to_string(),
enr: None,
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected,
direction: PeerDirection::Inbound,
};

let state_match =
states.map_or(true, |states| states.contains(&PeerState::Connected));
let dir_match = dirs.map_or(true, |dirs| dirs.contains(&PeerDirection::Inbound));

assert_eq!(result, vec![expected]);
let mut expected_peers = Vec::new();
if state_match && dir_match {
expected_peers.push(expected_peer);
}

assert_eq!(
result,
PeersData {
meta: PeersMetaData {
count: expected_peers.len() as u64
},
data: expected_peers,
}
);
}
}
self
}

pub async fn test_get_node_peer_count(self) -> Self {
let result = self.client.get_node_peer_count().await.unwrap().data;
assert_eq!(
result,
PeerCount {
connected: 1,
connecting: 0,
disconnected: 0,
disconnecting: 0,
}
);
self
}

Expand Down Expand Up @@ -1899,6 +1947,8 @@ async fn node_get() {
.test_get_node_peers_by_id()
.await
.test_get_node_peers()
.await
.test_get_node_peer_count()
.await;
}

Expand Down
36 changes: 35 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,14 +722,48 @@ impl BeaconNodeHttpClient {
}

/// `GET node/peers`
pub async fn get_node_peers(&self) -> Result<GenericResponse<Vec<PeerData>>, Error> {
pub async fn get_node_peers(
&self,
states: Option<&[PeerState]>,
directions: Option<&[PeerDirection]>,
) -> Result<PeersData, Error> {
let mut path = self.eth_path()?;

path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("node")
.push("peers");

if let Some(states) = states {
let state_string = states
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
path.query_pairs_mut().append_pair("state", &state_string);
}

if let Some(directions) = directions {
let dir_string = directions
.iter()
.map(|i| i.to_string())
.collect::<Vec<_>>()
.join(",");
path.query_pairs_mut().append_pair("direction", &dir_string);
}

self.get(path).await
}

/// `GET node/peer_count`
pub async fn get_node_peer_count(&self) -> Result<GenericResponse<PeerCount>, Error> {
let mut path = self.eth_path()?;

path.path_segments_mut()
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
.push("node")
.push("peer_count");

self.get(path).await
}

Expand Down
51 changes: 50 additions & 1 deletion common/eth2/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,15 +509,32 @@ pub struct BeaconCommitteeSubscription {
pub is_aggregator: bool,
}

#[derive(Deserialize)]
pub struct PeersQuery {
pub state: Option<QueryVec<PeerState>>,
pub direction: Option<QueryVec<PeerDirection>>,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PeerData {
pub peer_id: String,
pub enr: Option<String>,
pub address: String,
pub last_seen_p2p_address: String,
pub state: PeerState,
pub direction: PeerDirection,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PeersData {
pub data: Vec<PeerData>,
pub meta: PeersMetaData,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PeersMetaData {
pub count: u64,
}

#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PeerState {
Expand Down Expand Up @@ -554,6 +571,17 @@ impl FromStr for PeerState {
}
}

impl fmt::Display for PeerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerState::Connected => write!(f, "connected"),
PeerState::Connecting => write!(f, "connecting"),
PeerState::Disconnected => write!(f, "disconnected"),
PeerState::Disconnecting => write!(f, "disconnecting"),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum PeerDirection {
Expand Down Expand Up @@ -582,6 +610,27 @@ impl FromStr for PeerDirection {
}
}

impl fmt::Display for PeerDirection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PeerDirection::Inbound => write!(f, "inbound"),
PeerDirection::Outbound => write!(f, "outbound"),
}
}
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PeerCount {
#[serde(with = "serde_utils::quoted_u64")]
pub connected: u64,
#[serde(with = "serde_utils::quoted_u64")]
pub connecting: u64,
#[serde(with = "serde_utils::quoted_u64")]
pub disconnected: u64,
#[serde(with = "serde_utils::quoted_u64")]
pub disconnecting: u64,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down