Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ pub struct BlockInfo {
/// Does this block start a new epoch?
pub new_epoch: bool,

/// Which slot was the tip at when we received this block?
#[serde(default)]
pub tip_slot: Option<u64>,

/// UNIX timestamp
#[serde(default)]
pub timestamp: u64,
Expand All @@ -240,6 +244,15 @@ impl PartialOrd for BlockInfo {
}
}

impl BlockInfo {
pub fn is_at_tip(&self) -> bool {
// The slot of a newly-reported block can be later than the slot of the tip.
// This is because the tip is the most recent slot with a _validated_ block,
// and we can receive and propagate blocks which have not yet been validated.
self.tip_slot.is_some_and(|s| s <= self.slot)
}
}

// Individual transaction UTxO deltas
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct TxUTxODeltas {
Expand Down
1 change: 1 addition & 0 deletions common/src/upstream_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ mod test {
new_epoch: false,
timestamp: n,
era: Era::default(),
tip_slot: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions modules/block_vrf_validator/src/ouroboros/praos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ mod tests {
epoch: 368,
epoch_slot: 1729,
new_epoch: false,
tip_slot: None,
era: Era::Babbage,
};
let block_header =
Expand Down
6 changes: 6 additions & 0 deletions modules/block_vrf_validator/src/ouroboros/tpraos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ mod tests {
epoch: 208,
epoch_slot: 0,
new_epoch: true,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down Expand Up @@ -270,6 +271,7 @@ mod tests {
epoch: 211,
epoch_slot: 36049,
new_epoch: false,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down Expand Up @@ -328,6 +330,7 @@ mod tests {
epoch: 211,
epoch_slot: 431949,
new_epoch: false,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down Expand Up @@ -386,6 +389,7 @@ mod tests {
epoch: 211,
epoch_slot: 431949,
new_epoch: false,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down Expand Up @@ -445,6 +449,7 @@ mod tests {
epoch: 211,
epoch_slot: 431949,
new_epoch: false,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down Expand Up @@ -513,6 +518,7 @@ mod tests {
epoch: 211,
epoch_slot: 431949,
new_epoch: false,
tip_slot: None,
era: Era::Shelley,
};
let block_header =
Expand Down
1 change: 1 addition & 0 deletions modules/chain_store/src/stores/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ mod tests {
epoch_slot,
new_epoch: false,
timestamp,
tip_slot: None,
era: acropolis_common::Era::Conway,
}
}
Expand Down
3 changes: 3 additions & 0 deletions modules/epochs_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ mod tests {
new_epoch: false,
timestamp: 99999,
era: Era::Shelley,
tip_slot: None,
}
}

Expand All @@ -337,6 +338,7 @@ mod tests {
new_epoch: true,
timestamp: 99999,
era: Era::Shelley,
tip_slot: None,
}
}

Expand All @@ -351,6 +353,7 @@ mod tests {
new_epoch: false,
timestamp: 99999,
era: Era::Conway,
tip_slot: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions modules/genesis_bootstrapper/src/genesis_bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ impl GenesisBootstrapper {
new_epoch: false,
timestamp: byron_genesis.start_time,
era: Era::Byron,
tip_slot: None,
};

let mut utxo_deltas_message = UTXODeltasMessage { deltas: Vec::new() };
Expand Down
1 change: 1 addition & 0 deletions modules/governance_state/src/alonzo_babbage_voting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ mod tests {
era: era.try_into()?,
new_epoch: new_epoch != 0,
timestamp: 0,
tip_slot: None,
hash: BlockHash::default(),
};

Expand Down
1 change: 1 addition & 0 deletions modules/historical_epochs_state/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ mod tests {
era: Era::Shelley,
number: epoch * 10 + 100,
epoch,
tip_slot: None,
timestamp: epoch * 10,
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ mod tests {
epoch_slot: 1,
new_epoch: false,
timestamp: 1,
tip_slot: None,
era: Era::Shelley,
};
let ea = EpochActivityMessage {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ impl MithrilSnapshotFetcher {
epoch_slot,
new_epoch,
timestamp,
tip_slot: None,
era,
};

Expand Down
82 changes: 77 additions & 5 deletions modules/peer_network_interface/src/chain_state.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, HashMap, VecDeque};

use acropolis_common::{BlockHash, hash::Hash, params::SECURITY_PARAMETER_K};
use pallas::network::miniprotocols::Point;
Expand Down Expand Up @@ -106,6 +106,7 @@ pub struct ChainState {
published_blocks: VecDeque<SpecificPoint>,
unpublished_blocks: VecDeque<SpecificPoint>,
rolled_back_to: Option<Header>,
tips: HashMap<PeerId, Point>,
waiting_for_first_message: bool,
}

Expand Down Expand Up @@ -218,6 +219,21 @@ impl ChainState {
self.switch_head_to_peer(id);
}

pub fn handle_tip(&mut self, id: PeerId, tip: Point) {
self.tips.insert(id, tip);
}

pub fn preferred_upstream_tip(&self) -> Option<&Point> {
self.tips.get(&self.preferred_upstream?)
}

pub fn handle_disconnect(&mut self, id: PeerId) {
self.tips.remove(&id);
if self.preferred_upstream == Some(id) {
self.preferred_upstream = None;
}
}

fn switch_head_to_peer(&mut self, id: PeerId) {
self.waiting_for_first_message = false;

Expand Down Expand Up @@ -277,10 +293,6 @@ impl ChainState {
}
}

pub fn clear_preferred_upstream(&mut self) {
self.preferred_upstream = None;
}

pub fn next_unpublished_event(&self) -> Option<ChainEvent<'_>> {
if let Some(header) = &self.rolled_back_to {
return Some(ChainEvent::RollBackward { header });
Expand Down Expand Up @@ -874,4 +886,64 @@ mod tests {
state.handle_event_published();
assert_eq!(state.next_unpublished_event(), None);
}

#[test]
fn should_not_drop_messages_when_switching_to_new_chain() {
let mut state = ChainState::new();
let p1 = PeerId(0);
state.handle_new_preferred_upstream(p1);

// Our initial preferred upstream is broken somehow.
// We're not getting any messages from it, but it's not disconnecting.
let (h1, b1) = make_block(10, "first block");
let (h2, b2) = make_block(11, "second block");
let (h3, b3) = make_block(12, "third block");

// Meanwhile, another upstream is sending us blocks.
let p2 = PeerId(1);

assert_eq!(state.handle_roll_forward(p2, h1.clone()), vec![p2]);
state.handle_body_fetched(h1.slot, h1.hash, b1.clone());
assert_eq!(state.next_unpublished_event(), None);

assert_eq!(state.handle_roll_forward(p2, h2.clone()), vec![p2]);
state.handle_body_fetched(h2.slot, h2.hash, b2.clone());
assert_eq!(state.next_unpublished_event(), None);

// The initial preferred upstream finally gives up completely.
// We switch over to one we know is wokring.
state.handle_new_preferred_upstream(p2);

// Immediately, we publish both blocks which it sent.
assert_eq!(
state.next_unpublished_event(),
Some(ChainEvent::RollForward {
header: &h1,
body: b1.as_slice(),
}),
);
state.handle_event_published();
assert_eq!(
state.next_unpublished_event(),
Some(ChainEvent::RollForward {
header: &h2,
body: b2.as_slice(),
}),
);
state.handle_event_published();
assert_eq!(state.next_unpublished_event(), None);

// And when it sends another, we publish that as well.
assert_eq!(state.handle_roll_forward(p2, h3.clone()), vec![p2]);
state.handle_body_fetched(h3.slot, h3.hash, b3.clone());
assert_eq!(
state.next_unpublished_event(),
Some(ChainEvent::RollForward {
header: &h3,
body: b3.as_slice(),
}),
);
state.handle_event_published();
assert_eq!(state.next_unpublished_event(), None);
}
}
12 changes: 6 additions & 6 deletions modules/peer_network_interface/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ pub enum PeerEvent {

#[derive(Debug)]
pub enum PeerChainSyncEvent {
RollForward(Header),
RollBackward(Point),
RollForward(Header, Point),
RollBackward(Point, Point),
IntersectNotFound(Point),
}

Expand Down Expand Up @@ -184,19 +184,19 @@ impl PeerConnectionWorker {
msg: chainsync::NextResponse<chainsync::HeaderContent>,
) -> Result<Option<ParsedChainsyncMessage>> {
match msg {
chainsync::NextResponse::RollForward(header, _) => {
chainsync::NextResponse::RollForward(header, tip) => {
let Some(parsed) = self.parse_header(header)? else {
return Ok(None);
};
let point = Point::Specific(parsed.slot, parsed.hash.to_vec());
Ok(Some(ParsedChainsyncMessage {
point,
event: PeerChainSyncEvent::RollForward(parsed),
event: PeerChainSyncEvent::RollForward(parsed, tip.0),
}))
}
chainsync::NextResponse::RollBackward(point, _) => Ok(Some(ParsedChainsyncMessage {
chainsync::NextResponse::RollBackward(point, tip) => Ok(Some(ParsedChainsyncMessage {
point: point.clone(),
event: PeerChainSyncEvent::RollBackward(point),
event: PeerChainSyncEvent::RollBackward(point, tip.0),
})),
chainsync::NextResponse::Await => Ok(None),
}
Expand Down
21 changes: 10 additions & 11 deletions modules/peer_network_interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ impl NetworkManager {
// is full and this method is blocked on writing to it, the queue can never drain.
fn handle_peer_update(&mut self, peer: PeerId, event: PeerEvent) {
match event {
PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header)) => {
PeerEvent::ChainSync(PeerChainSyncEvent::RollForward(header, tip)) => {
self.chain.handle_tip(peer, tip);
let slot = header.slot;
let hash = header.hash;
let request_body_from = self.chain.handle_roll_forward(peer, header);
Expand All @@ -181,10 +182,12 @@ impl NetworkManager {
self.request_block(slot, hash, request_body_from);
}
}
PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point)) => {
PeerEvent::ChainSync(PeerChainSyncEvent::RollBackward(point, tip)) => {
self.chain.handle_tip(peer, tip);
self.chain.handle_roll_backward(peer, point);
}
PeerEvent::ChainSync(PeerChainSyncEvent::IntersectNotFound(tip)) => {
self.chain.handle_tip(peer, tip.clone());
// We called find_intersect on a peer, and it didn't recognize any of the points we passed.
// That peer must either be behind us or on a different fork; either way, that chain should sync from its own tip
if let Some(peer) = self.peers.get(&peer) {
Expand All @@ -208,13 +211,12 @@ impl NetworkManager {
return;
};
warn!("disconnected from {}", peer.conn.address);
let was_preferred = self.chain.preferred_upstream.is_some_and(|i| i == id);
if was_preferred {
self.chain.handle_disconnect(id);
if self.chain.preferred_upstream.is_none() {
if let Some(new_preferred) = self.peers.keys().next().copied() {
self.set_preferred_upstream(new_preferred);
} else {
warn!("no upstream peers!");
self.clear_preferred_upstream();
}
}
for (requested_hash, requested_slot) in peer.reqs {
Expand Down Expand Up @@ -248,22 +250,19 @@ impl NetworkManager {
self.chain.handle_new_preferred_upstream(id);
}

fn clear_preferred_upstream(&mut self) {
self.chain.clear_preferred_upstream();
}

async fn publish_events(&mut self) -> Result<()> {
while let Some(event) = self.chain.next_unpublished_event() {
let tip = self.chain.preferred_upstream_tip();
match event {
ChainEvent::RollForward { header, body } => {
self.block_sink.announce_roll_forward(header, body).await?;
self.block_sink.announce_roll_forward(header, body, tip).await?;
self.published_blocks += 1;
if self.published_blocks.is_multiple_of(100) {
info!("Published block {}", header.number);
}
}
ChainEvent::RollBackward { header } => {
self.block_sink.announce_roll_backward(header).await?;
self.block_sink.announce_roll_backward(header, tip).await?;
}
}
self.chain.handle_event_published();
Expand Down
Loading