Skip to content

Commit

Permalink
Add repair breakdown by slot and index (#10717)
Browse files Browse the repository at this point in the history
* Slot full logging

* Repair stats logging

Co-authored-by: Carl <carl@solana.com>
  • Loading branch information
carllin and carllin committed Jun 20, 2020
1 parent cae22ef commit a33fef9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 30 deletions.
76 changes: 52 additions & 24 deletions core/src/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,39 @@ use std::{
pub type DuplicateSlotsResetSender = CrossbeamSender<Slot>;
pub type DuplicateSlotsResetReceiver = CrossbeamReceiver<Slot>;

#[derive(Default)]
#[derive(Default, Debug)]
pub struct SlotRepairs {
highest_shred_index: u64,
// map from pubkey to total number of requests
pubkey_repairs: HashMap<Pubkey, u64>,
}

#[derive(Default, Debug)]
pub struct RepairStatsGroup {
pub count: u64,
pub min: u64,
pub max: u64,
pub slot_pubkeys: HashMap<Slot, SlotRepairs>,
}

impl RepairStatsGroup {
pub fn update(&mut self, slot: u64) {
pub fn update(&mut self, repair_peer_id: &Pubkey, slot: Slot, shred_index: u64) {
self.count += 1;
let slot_repairs = self.slot_pubkeys.entry(slot).or_default();
// Increment total number of repairs of this type for this pubkey by 1
*slot_repairs
.pubkey_repairs
.entry(*repair_peer_id)
.or_default() += 1;
// Update the max requested shred index for this slot
slot_repairs.highest_shred_index =
std::cmp::max(slot_repairs.highest_shred_index, shred_index);
self.min = std::cmp::min(self.min, slot);
self.max = std::cmp::max(self.max, slot);
}
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct RepairStats {
pub shred: RepairStatsGroup,
pub highest_shred: RepairStatsGroup,
Expand Down Expand Up @@ -81,7 +98,7 @@ impl Default for RepairSlotRange {
#[derive(Default, Clone)]
pub struct DuplicateSlotRepairStatus {
start: u64,
repair_addr: Option<SocketAddr>,
repair_pubkey_and_addr: Option<(Pubkey, SocketAddr)>,
}

pub struct RepairService {
Expand Down Expand Up @@ -197,6 +214,7 @@ impl RepairService {
let repair_total = repair_stats.shred.count
+ repair_stats.highest_shred.count
+ repair_stats.orphan.count;
info!("repair_stats: {:#?}", repair_stats);
if repair_total > 0 {
datapoint_info!(
"serve_repair-repair",
Expand Down Expand Up @@ -307,20 +325,24 @@ impl RepairService {
) {
duplicate_slot_repair_statuses.retain(|slot, status| {
Self::update_duplicate_slot_repair_addr(*slot, status, cluster_slots, serve_repair);
if let Some(repair_addr) = status.repair_addr {
if let Some((repair_pubkey, repair_addr)) = status.repair_pubkey_and_addr {
let repairs = Self::generate_duplicate_repairs_for_slot(&blockstore, *slot);

if let Some(repairs) = repairs {
for repair_type in repairs {
if let Err(e) = Self::serialize_and_send_request(
&repair_type,
repair_socket,
&repair_pubkey,
&repair_addr,
serve_repair,
repair_stats,
DEFAULT_NONCE,
) {
info!("repair req send_to({}) error {:?}", repair_addr, e);
info!(
"repair req send_to {} ({}) error {:?}",
repair_pubkey, repair_addr, e
);
}
}
true
Expand All @@ -336,12 +358,14 @@ impl RepairService {
fn serialize_and_send_request(
repair_type: &RepairType,
repair_socket: &UdpSocket,
repair_pubkey: &Pubkey,
to: &SocketAddr,
serve_repair: &ServeRepair,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<()> {
let req = serve_repair.map_repair_request(&repair_type, repair_stats, nonce)?;
let req =
serve_repair.map_repair_request(&repair_type, repair_pubkey, repair_stats, nonce)?;
repair_socket.send_to(&req, to)?;
Ok(())
}
Expand All @@ -353,12 +377,12 @@ impl RepairService {
serve_repair: &ServeRepair,
) {
let now = timestamp();
if status.repair_addr.is_none()
if status.repair_pubkey_and_addr.is_none()
|| now.saturating_sub(status.start) >= MAX_DUPLICATE_WAIT_MS as u64
{
let repair_addr =
let repair_pubkey_and_addr =
serve_repair.repair_request_duplicate_compute_best_peer(slot, cluster_slots);
status.repair_addr = repair_addr.ok();
status.repair_pubkey_and_addr = repair_pubkey_and_addr.ok();
status.start = timestamp();
}
}
Expand Down Expand Up @@ -395,12 +419,12 @@ impl RepairService {

// Mark this slot as special repair, try to download from single
// validator to avoid corruption
let repair_addr = serve_repair
let repair_pubkey_and_addr = serve_repair
.repair_request_duplicate_compute_best_peer(*slot, cluster_slots)
.ok();
let new_duplicate_slot_repair_status = DuplicateSlotRepairStatus {
start: timestamp(),
repair_addr,
repair_pubkey_and_addr,
};
duplicate_slot_repair_statuses.insert(*slot, new_duplicate_slot_repair_status);
}
Expand All @@ -423,7 +447,7 @@ impl RepairService {
warn!(
"Repaired version of slot {} most recently (but maybe not entirely)
from {:?} has failed again",
dead_slot, status.repair_addr
dead_slot, status.repair_pubkey_and_addr
);
}
cluster_slots
Expand Down Expand Up @@ -873,7 +897,7 @@ mod test {
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: None,
repair_pubkey_and_addr: None,
};

// Insert some shreds to create a SlotMeta,
Expand All @@ -898,15 +922,16 @@ mod test {
assert!(duplicate_slot_repair_statuses
.get(&dead_slot)
.unwrap()
.repair_addr
.repair_pubkey_and_addr
.is_none());
assert!(duplicate_slot_repair_statuses.get(&dead_slot).is_some());

// Give the slot a repair address
duplicate_slot_repair_statuses
.get_mut(&dead_slot)
.unwrap()
.repair_addr = Some(receive_socket.local_addr().unwrap());
.repair_pubkey_and_addr =
Some((Pubkey::default(), receive_socket.local_addr().unwrap()));

// Slot is not yet full, should not get filtered from `duplicate_slot_repair_statuses`
RepairService::generate_and_send_duplicate_repairs(
Expand Down Expand Up @@ -938,7 +963,10 @@ mod test {

#[test]
pub fn test_update_duplicate_slot_repair_addr() {
let dummy_addr = Some(UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap());
let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
));
let cluster_info = Arc::new(ClusterInfo::new_with_invalid_keypair(
Node::new_localhost().info,
));
Expand All @@ -956,41 +984,41 @@ mod test {
// address
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: dummy_addr,
repair_pubkey_and_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert_eq!(duplicate_status.repair_addr, dummy_addr);
assert_eq!(duplicate_status.repair_pubkey_and_addr, dummy_addr);

// If the repair address is None, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: std::u64::MAX,
repair_addr: None,
repair_pubkey_and_addr: None,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert!(duplicate_status.repair_addr.is_some());
assert!(duplicate_status.repair_pubkey_and_addr.is_some());

// If sufficient time has passssed, should try to update
// If sufficient time has passed, should try to update
let mut duplicate_status = DuplicateSlotRepairStatus {
start: timestamp() - MAX_DUPLICATE_WAIT_MS as u64,
repair_addr: dummy_addr,
repair_pubkey_and_addr: dummy_addr,
};
RepairService::update_duplicate_slot_repair_addr(
dead_slot,
&mut duplicate_status,
&cluster_slots,
&serve_repair,
);
assert_ne!(duplicate_status.repair_addr, dummy_addr);
assert_ne!(duplicate_status.repair_pubkey_and_addr, dummy_addr);
}

#[test]
Expand Down
23 changes: 17 additions & 6 deletions core/src/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,41 +397,52 @@ impl ServeRepair {
let (repair_peers, weights) = cache.get(&slot).unwrap();
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
let addr = repair_peers[n].serve_repair; // send the request to the peer's serve_repair port
let out = self.map_repair_request(&repair_request, repair_stats, DEFAULT_NONCE)?;
let repair_peer_id = repair_peers[n].id;
let out = self.map_repair_request(
&repair_request,
&repair_peer_id,
repair_stats,
DEFAULT_NONCE,
)?;
Ok((addr, out))
}

pub fn repair_request_duplicate_compute_best_peer(
&self,
slot: Slot,
cluster_slots: &ClusterSlots,
) -> Result<SocketAddr> {
) -> Result<(Pubkey, SocketAddr)> {
let repair_peers: Vec<_> = self.cluster_info.repair_peers(slot);
if repair_peers.is_empty() {
return Err(ClusterInfoError::NoPeers.into());
}
let weights = cluster_slots.compute_weights_exclude_noncomplete(slot, &repair_peers);
let n = weighted_best(&weights, Pubkey::new_rand().to_bytes());
Ok(repair_peers[n].serve_repair)
Ok((repair_peers[n].id, repair_peers[n].serve_repair))
}

pub fn map_repair_request(
&self,
repair_request: &RepairType,
repair_peer_id: &Pubkey,
repair_stats: &mut RepairStats,
nonce: Nonce,
) -> Result<Vec<u8>> {
match repair_request {
RepairType::Shred(slot, shred_index) => {
repair_stats.shred.update(*slot);
repair_stats
.shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::HighestShred(slot, shred_index) => {
repair_stats.highest_shred.update(*slot);
repair_stats
.highest_shred
.update(repair_peer_id, *slot, *shred_index);
Ok(self.window_highest_index_request_bytes(*slot, *shred_index, nonce)?)
}
RepairType::Orphan(slot) => {
repair_stats.orphan.update(*slot);
repair_stats.orphan.update(repair_peer_id, *slot, 0);
Ok(self.orphan_bytes(*slot, nonce)?)
}
}
Expand Down
6 changes: 6 additions & 0 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,12 @@ impl Blockstore {
new_consumed,
shred.reference_tick(),
);
if slot_meta.is_full() {
info!(
"slot {} is full, last: {}",
slot_meta.slot, slot_meta.last_index
);
}
data_index.set_present(index, true);
trace!("inserted shred into slot {:?} and index {:?}", slot, index);
Ok(())
Expand Down

0 comments on commit a33fef9

Please sign in to comment.