Skip to content

Commit

Permalink
adds socket address for repair service over QUIC (solana-labs#32834)
Browse files Browse the repository at this point in the history
Working towards migrating repair to QUIC.
  • Loading branch information
behzadnouri committed Aug 15, 2023
1 parent e700dde commit 0de8ccf
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 32 deletions.
14 changes: 7 additions & 7 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ mod test {
},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
contact_info::{ContactInfo, Protocol},
},
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path, shred::Nonce},
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
Expand Down Expand Up @@ -1400,7 +1400,7 @@ mod test {
nonce,
);
if let Ok(request_bytes) = request_bytes {
let socket = responder_info.serve_repair().unwrap();
let socket = responder_info.serve_repair(Protocol::UDP).unwrap();
let _ = ancestor_hashes_request_socket.send_to(&request_bytes, socket);
}
}
Expand Down Expand Up @@ -1470,7 +1470,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down Expand Up @@ -1511,7 +1511,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down Expand Up @@ -1571,7 +1571,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down Expand Up @@ -1947,7 +1947,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let decision = AncestorHashesService::verify_and_process_ancestor_response(
packet,
&ancestor_hashes_request_statuses,
Expand Down Expand Up @@ -2010,7 +2010,7 @@ mod test {
let packet = &mut response_packet[0];
packet
.meta_mut()
.set_socket_addr(&responder_info.serve_repair().unwrap());
.set_socket_addr(&responder_info.serve_repair(Protocol::UDP).unwrap());
let AncestorRequestDecision {
slot,
request_type,
Expand Down
17 changes: 10 additions & 7 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
},
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
legacy_contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo},
contact_info::{LegacyContactInfo as ContactInfo, LegacyContactInfo, Protocol},
ping_pong::{self, PingCache, Pong},
weighted_shuffle::WeightedShuffle,
},
Expand Down Expand Up @@ -214,7 +214,7 @@ pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;

/// Window protocol messages
#[derive(Debug, AbiEnumVisitor, AbiExample, Deserialize, Serialize, strum_macros::Display)]
#[frozen_abi(digest = "7vZyACjc13qQYWUsqWbdidLXR3uNXpmqUZaKeV3gKuY2")]
#[frozen_abi(digest = "3VzVe3kMrG6ijkVPyCGeJVA9hQjWcFEZbAQPc5Zizrjm")]
pub enum RepairProtocol {
LegacyWindowIndex(LegacyContactInfo, Slot, u64),
LegacyHighestWindowIndex(LegacyContactInfo, Slot, u64),
Expand Down Expand Up @@ -350,7 +350,7 @@ impl RepairPeers {
.iter()
.zip(weights)
.filter_map(|(peer, &weight)| {
let addr = peer.serve_repair().ok()?;
let addr = peer.serve_repair(Protocol::UDP).ok()?;
Some(((*peer.pubkey(), addr), weight))
})
.unzip();
Expand Down Expand Up @@ -1078,7 +1078,7 @@ impl ServeRepair {
.shuffle(&mut rand::thread_rng())
.map(|i| index[i])
.filter_map(|i| {
let addr = repair_peers[i].serve_repair().ok()?;
let addr = repair_peers[i].serve_repair(Protocol::UDP).ok()?;
Some((*repair_peers[i].pubkey(), addr))
})
.take(get_ancestor_hash_repair_sample_size())
Expand All @@ -1102,7 +1102,10 @@ impl ServeRepair {
.unzip();
let k = WeightedIndex::new(weights)?.sample(&mut rand::thread_rng());
let n = index[k];
Ok((*repair_peers[n].pubkey(), repair_peers[n].serve_repair()?))
Ok((
*repair_peers[n].pubkey(),
repair_peers[n].serve_repair(Protocol::UDP)?,
))
}

pub(crate) fn map_repair_request(
Expand Down Expand Up @@ -1930,8 +1933,8 @@ mod tests {
&identity_keypair,
)
.unwrap();
assert_eq!(nxt.serve_repair().unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair().unwrap());
assert_eq!(nxt.serve_repair(Protocol::UDP).unwrap(), serve_repair_addr);
assert_eq!(rv.0, nxt.serve_repair(Protocol::UDP).unwrap());

let serve_repair_addr2 = socketaddr!([127, 0, 0, 2], 1243);
let mut nxt = ContactInfo::new(
Expand Down
4 changes: 3 additions & 1 deletion dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,9 @@ fn get_target(
Some((*node.pubkey(), node.tpu_forwards(protocol).unwrap()))
}
Mode::Repair => todo!("repair socket is not gossiped anymore!"),
Mode::ServeRepair => Some((*node.pubkey(), node.serve_repair().unwrap())),
Mode::ServeRepair => {
Some((*node.pubkey(), node.serve_repair(Protocol::UDP).unwrap()))
}
Mode::Rpc => None,
};
break;
Expand Down
24 changes: 21 additions & 3 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "4jtxvWyeFwfDQTTGh4yJLyukALzRNVJ9WNnCbFeJUmaS")]
#[frozen_abi(digest = "6T2sn92PMrTijsgncH3bBZL4K5GUowb442cCw4y4DuwV")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down Expand Up @@ -847,7 +847,7 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP).ok()),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC).ok()),
self.addr_to_string(&ip_addr, &node.serve_repair().ok()),
self.addr_to_string(&ip_addr, &node.serve_repair(contact_info::Protocol::UDP).ok()),
node.shred_version(),
))
}
Expand Down Expand Up @@ -1345,7 +1345,7 @@ impl ClusterInfo {
node.pubkey() != &self_pubkey
&& node.shred_version() == self_shred_version
&& self.check_socket_addr_space(&node.tvu(contact_info::Protocol::UDP))
&& self.check_socket_addr_space(&node.serve_repair())
&& self.check_socket_addr_space(&node.serve_repair(contact_info::Protocol::UDP))
&& match gossip_crds.get::<&LowestSlot>(*node.pubkey()) {
None => true, // fallback to legacy behavior
Some(lowest_slot) => lowest_slot.lowest <= slot,
Expand Down Expand Up @@ -2799,6 +2799,7 @@ pub struct Sockets {
pub repair: UdpSocket,
pub retransmit_sockets: Vec<UdpSocket>,
pub serve_repair: UdpSocket,
pub serve_repair_quic: UdpSocket,
pub ancestor_hashes_requests: UdpSocket,
pub tpu_quic: UdpSocket,
pub tpu_forwards_quic: UdpSocket,
Expand Down Expand Up @@ -2839,6 +2840,7 @@ impl Node {
let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()];
let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap();

let mut info = ContactInfo::new(
Expand Down Expand Up @@ -2871,6 +2873,11 @@ impl Node {
serve_repair.local_addr().unwrap(),
"serve-repair"
);
set_socket!(
set_serve_repair_quic,
serve_repair_quic.local_addr().unwrap(),
"serve-repair QUIC"
);
Node {
info,
sockets: Sockets {
Expand All @@ -2885,6 +2892,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
Expand Down Expand Up @@ -2930,6 +2938,7 @@ impl Node {
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);

Expand Down Expand Up @@ -2959,6 +2968,11 @@ impl Node {
set_socket!(set_rpc, rpc_port, "RPC");
set_socket!(set_rpc_pubsub, rpc_pubsub_port, "RPC-pubsub");
set_socket!(set_serve_repair, serve_repair_port, "serve-repair");
set_socket!(
set_serve_repair_quic,
serve_repair_quic_port,
"serve-repair QUIC"
);
trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -2975,6 +2989,7 @@ impl Node {
repair,
retransmit_sockets: vec![retransmit_socket],
serve_repair,
serve_repair_quic,
ancestor_hashes_requests,
tpu_quic,
tpu_forwards_quic,
Expand Down Expand Up @@ -3023,6 +3038,7 @@ impl Node {

let (_, repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);

let (_, broadcast) =
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
Expand All @@ -3044,6 +3060,7 @@ impl Node {
);
let _ = info.set_tpu_vote((addr, tpu_vote_port));
let _ = info.set_serve_repair((addr, serve_repair_port));
let _ = info.set_serve_repair((addr, serve_repair_quic_port));
trace!("new ContactInfo: {:?}", info);

Node {
Expand All @@ -3059,6 +3076,7 @@ impl Node {
repair,
retransmit_sockets,
serve_repair,
serve_repair_quic,
ip_echo: Some(ip_echo),
ancestor_hashes_requests,
tpu_quic,
Expand Down
32 changes: 28 additions & 4 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const SOCKET_TAG_GOSSIP: u8 = 0;
const SOCKET_TAG_RPC: u8 = 2;
const SOCKET_TAG_RPC_PUBSUB: u8 = 3;
const SOCKET_TAG_SERVE_REPAIR: u8 = 4;
const SOCKET_TAG_SERVE_REPAIR_QUIC: u8 = 1;
const SOCKET_TAG_TPU: u8 = 5;
const SOCKET_TAG_TPU_FORWARDS: u8 = 6;
const SOCKET_TAG_TPU_FORWARDS_QUIC: u8 = 7;
Expand Down Expand Up @@ -224,7 +225,11 @@ impl ContactInfo {
get_socket!(gossip, SOCKET_TAG_GOSSIP);
get_socket!(rpc, SOCKET_TAG_RPC);
get_socket!(rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
get_socket!(serve_repair, SOCKET_TAG_SERVE_REPAIR);
get_socket!(
serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
get_socket!(tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
get_socket!(
tpu_forwards,
Expand All @@ -238,6 +243,7 @@ impl ContactInfo {
set_socket!(set_rpc, SOCKET_TAG_RPC);
set_socket!(set_rpc_pubsub, SOCKET_TAG_RPC_PUBSUB);
set_socket!(set_serve_repair, SOCKET_TAG_SERVE_REPAIR);
set_socket!(set_serve_repair_quic, SOCKET_TAG_SERVE_REPAIR_QUIC);
set_socket!(set_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
set_socket!(
set_tpu_forwards,
Expand All @@ -248,7 +254,11 @@ impl ContactInfo {
set_socket!(set_tvu, SOCKET_TAG_TVU);
set_socket!(set_tvu_quic, SOCKET_TAG_TVU_QUIC);

remove_socket!(remove_serve_repair, SOCKET_TAG_SERVE_REPAIR);
remove_socket!(
remove_serve_repair,
SOCKET_TAG_SERVE_REPAIR,
SOCKET_TAG_SERVE_REPAIR_QUIC
);
remove_socket!(remove_tpu, SOCKET_TAG_TPU, SOCKET_TAG_TPU_QUIC);
remove_socket!(
remove_tpu_forwards,
Expand Down Expand Up @@ -370,6 +380,8 @@ impl ContactInfo {
node.set_rpc_pubsub((Ipv4Addr::LOCALHOST, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((Ipv4Addr::LOCALHOST, 8008)).unwrap();
node.set_serve_repair_quic((Ipv4Addr::LOCALHOST, 8006))
.unwrap();
node
}

Expand All @@ -392,6 +404,7 @@ impl ContactInfo {
node.set_rpc_pubsub((addr, DEFAULT_RPC_PUBSUB_PORT))
.unwrap();
node.set_serve_repair((addr, port + 8)).unwrap();
node.set_serve_repair_quic((addr, port + 4)).unwrap();
node
}
}
Expand Down Expand Up @@ -733,9 +746,13 @@ mod tests {
sockets.get(&SOCKET_TAG_RPC_PUBSUB)
);
assert_eq!(
node.serve_repair().ok().as_ref(),
node.serve_repair(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR)
);
assert_eq!(
node.serve_repair(Protocol::QUIC).ok().as_ref(),
sockets.get(&SOCKET_TAG_SERVE_REPAIR_QUIC)
);
assert_eq!(
node.tpu(Protocol::UDP).ok().as_ref(),
sockets.get(&SOCKET_TAG_TPU)
Expand Down Expand Up @@ -813,7 +830,14 @@ mod tests {
assert_eq!(old.gossip().unwrap(), node.gossip().unwrap());
assert_eq!(old.rpc().unwrap(), node.rpc().unwrap());
assert_eq!(old.rpc_pubsub().unwrap(), node.rpc_pubsub().unwrap());
assert_eq!(old.serve_repair().unwrap(), node.serve_repair().unwrap());
assert_eq!(
old.serve_repair(Protocol::QUIC).unwrap(),
node.serve_repair(Protocol::QUIC).unwrap()
);
assert_eq!(
old.serve_repair(Protocol::UDP).unwrap(),
node.serve_repair(Protocol::UDP).unwrap()
);
assert_eq!(
old.tpu(Protocol::QUIC).unwrap(),
node.tpu(Protocol::QUIC).unwrap()
Expand Down
13 changes: 7 additions & 6 deletions gossip/src/legacy_contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ pub struct LegacyContactInfo {
tvu: SocketAddr,
/// TVU over QUIC protocol.
tvu_quic: SocketAddr,
unused: SocketAddr,
/// repair service over QUIC protocol.
serve_repair_quic: SocketAddr,
/// transactions address
tpu: SocketAddr,
/// address to forward unprocessed transactions to
Expand Down Expand Up @@ -123,7 +124,7 @@ impl Default for LegacyContactInfo {
gossip: socketaddr_any!(),
tvu: socketaddr_any!(),
tvu_quic: socketaddr_any!(),
unused: socketaddr_any!(),
serve_repair_quic: socketaddr_any!(),
tpu: socketaddr_any!(),
tpu_forwards: socketaddr_any!(),
tpu_vote: socketaddr_any!(),
Expand All @@ -143,7 +144,7 @@ impl LegacyContactInfo {
gossip: socketaddr!(Ipv4Addr::LOCALHOST, 1234),
tvu: socketaddr!(Ipv4Addr::LOCALHOST, 1235),
tvu_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1236),
unused: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
serve_repair_quic: socketaddr!(Ipv4Addr::LOCALHOST, 1237),
tpu: socketaddr!(Ipv4Addr::LOCALHOST, 1238),
tpu_forwards: socketaddr!(Ipv4Addr::LOCALHOST, 1239),
tpu_vote: socketaddr!(Ipv4Addr::LOCALHOST, 1240),
Expand Down Expand Up @@ -210,7 +211,7 @@ impl LegacyContactInfo {
get_socket!(tpu_vote);
get_socket!(rpc);
get_socket!(rpc_pubsub);
get_socket!(serve_repair);
get_socket!(serve_repair, serve_repair_quic);

set_socket!(set_gossip, gossip);
set_socket!(set_rpc, rpc);
Expand Down Expand Up @@ -272,13 +273,13 @@ impl TryFrom<&ContactInfo> for LegacyContactInfo {
gossip: unwrap_socket!(gossip),
tvu: unwrap_socket!(tvu, Protocol::UDP),
tvu_quic: unwrap_socket!(tvu, Protocol::QUIC),
unused: SOCKET_ADDR_UNSPECIFIED,
serve_repair_quic: unwrap_socket!(serve_repair, Protocol::QUIC),
tpu: unwrap_socket!(tpu, Protocol::UDP),
tpu_forwards: unwrap_socket!(tpu_forwards, Protocol::UDP),
tpu_vote: unwrap_socket!(tpu_vote),
rpc: unwrap_socket!(rpc),
rpc_pubsub: unwrap_socket!(rpc_pubsub),
serve_repair: unwrap_socket!(serve_repair),
serve_repair: unwrap_socket!(serve_repair, Protocol::UDP),
wallclock: node.wallclock(),
shred_version: node.shred_version(),
})
Expand Down
Loading

0 comments on commit 0de8ccf

Please sign in to comment.