Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/optimism/flashblocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ alloy-rpc-types-engine = { workspace = true, features = ["serde"] }
alloy-consensus.workspace = true

# op-alloy
op-alloy-consensus.workspace = true
op-alloy-rpc-types-engine = { workspace = true, features = ["k256"] }

# io
tokio.workspace = true
tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] }
serde.workspace = true
serde_json.workspace = true
url.workspace = true
futures-util.workspace = true
Expand All @@ -57,4 +59,3 @@ derive_more.workspace = true
[dev-dependencies]
test-case.workspace = true
alloy-consensus.workspace = true
op-alloy-consensus.workspace = true
102 changes: 57 additions & 45 deletions crates/optimism/flashblocks/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,28 @@

use crate::{
sequence::{FlashBlockPendingSequence, SequenceExecutionOutcome},
traits::{FlashblockDiff, FlashblockPayload, FlashblockPayloadBase},
worker::BuildArgs,
FlashBlock, FlashBlockCompleteSequence, PendingFlashBlock,
FlashBlockCompleteSequence, PendingFlashBlock,
};
use alloy_eips::eip2718::WithEncoded;
use alloy_primitives::B256;
use reth_primitives_traits::{NodePrimitives, Recovered, SignedTransaction};
use reth_primitives_traits::{NodePrimitives, Recovered};
use reth_revm::cached::CachedReads;
use ringbuffer::{AllocRingBuffer, RingBuffer};
use tokio::sync::broadcast;
use tracing::*;

type CachedSequenceEntry<P> = (
FlashBlockCompleteSequence<P>,
Vec<WithEncoded<Recovered<<P as FlashblockPayload>::SignedTx>>>,
);

type SequenceBuildArgs<P> = BuildArgs<
Vec<WithEncoded<Recovered<<P as FlashblockPayload>::SignedTx>>>,
<P as FlashblockPayload>::Base,
>;

/// Maximum number of cached sequences in the ring buffer.
const CACHE_SIZE: usize = 3;
/// 200 ms flashblock time.
Expand All @@ -29,21 +40,21 @@ pub(crate) const FLASHBLOCK_BLOCK_TIME: u64 = 200;
/// - Finding the best sequence to build based on local chain tip
/// - Broadcasting completed sequences to subscribers
#[derive(Debug)]
pub(crate) struct SequenceManager<T: SignedTransaction> {
pub(crate) struct SequenceManager<P: FlashblockPayload> {
/// Current pending sequence being built up from incoming flashblocks
pending: FlashBlockPendingSequence,
pending: FlashBlockPendingSequence<P>,
/// Cached recovered transactions for the pending sequence
pending_transactions: Vec<WithEncoded<Recovered<T>>>,
pending_transactions: Vec<WithEncoded<Recovered<P::SignedTx>>>,
/// Ring buffer of recently completed sequences bundled with their decoded transactions (FIFO,
/// size 3)
completed_cache: AllocRingBuffer<(FlashBlockCompleteSequence, Vec<WithEncoded<Recovered<T>>>)>,
completed_cache: AllocRingBuffer<CachedSequenceEntry<P>>,
/// Broadcast channel for completed sequences
block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence>,
block_broadcaster: broadcast::Sender<FlashBlockCompleteSequence<P>>,
/// Whether to compute state roots when building blocks
compute_state_root: bool,
}

impl<T: SignedTransaction> SequenceManager<T> {
impl<P: FlashblockPayload> SequenceManager<P> {
/// Creates a new sequence manager.
pub(crate) fn new(compute_state_root: bool) -> Self {
let (block_broadcaster, _) = broadcast::channel(128);
Expand All @@ -59,12 +70,14 @@ impl<T: SignedTransaction> SequenceManager<T> {
/// Returns the sender half of the flashblock sequence broadcast channel.
pub(crate) const fn block_sequence_broadcaster(
&self,
) -> &broadcast::Sender<FlashBlockCompleteSequence> {
) -> &broadcast::Sender<FlashBlockCompleteSequence<P>> {
&self.block_broadcaster
}

/// Gets a subscriber to the flashblock sequences produced.
pub(crate) fn subscribe_block_sequence(&self) -> crate::FlashBlockCompleteSequenceRx {
pub(crate) fn subscribe_block_sequence(
&self,
) -> broadcast::Receiver<FlashBlockCompleteSequence<P>> {
self.block_broadcaster.subscribe()
}

Expand All @@ -76,12 +89,12 @@ impl<T: SignedTransaction> SequenceManager<T> {
/// with computed `state_root`.
///
/// Transactions are recovered once and cached for reuse during block building.
pub(crate) fn insert_flashblock(&mut self, flashblock: FlashBlock) -> eyre::Result<()> {
pub(crate) fn insert_flashblock(&mut self, flashblock: P) -> eyre::Result<()> {
// If this starts a new block, finalize and cache the previous sequence BEFORE inserting
if flashblock.index == 0 && self.pending.count() > 0 {
if flashblock.index() == 0 && self.pending.count() > 0 {
let completed = self.pending.finalize()?;
let block_number = completed.block_number();
let parent_hash = completed.payload_base().parent_hash;
let parent_hash = completed.payload_base().parent_hash();

trace!(
target: "flashblocks",
Expand Down Expand Up @@ -114,7 +127,7 @@ impl<T: SignedTransaction> SequenceManager<T> {
}

/// Returns the current pending sequence for inspection.
pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence {
pub(crate) const fn pending(&self) -> &FlashBlockPendingSequence<P> {
&self.pending
}

Expand All @@ -129,21 +142,21 @@ impl<T: SignedTransaction> SequenceManager<T> {
&mut self,
local_tip_hash: B256,
local_tip_timestamp: u64,
) -> Option<BuildArgs<Vec<WithEncoded<Recovered<T>>>>> {
) -> Option<SequenceBuildArgs<P>> {
// Try to find a buildable sequence: (base, last_fb, transactions, cached_state,
// source_name)
let (base, last_flashblock, transactions, cached_state, source_name) =
// Priority 1: Try current pending sequence
if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash == local_tip_hash) {
let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash, r));
let last_fb = self.pending.last_flashblock()?;
if let Some(base) = self.pending.payload_base().filter(|b| b.parent_hash() == local_tip_hash) {
let cached_state = self.pending.take_cached_reads().map(|r| (base.parent_hash(), r));
let last_fb = self.pending.last_flashblock()?.clone();
let transactions = self.pending_transactions.clone();
(base, last_fb, transactions, cached_state, "pending")
}
// Priority 2: Try cached sequence with exact parent match
else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash == local_tip_hash) {
let base = cached.payload_base().clone();
let last_fb = cached.last();
else if let Some((cached, txs)) = self.completed_cache.iter().find(|(c, _)| c.payload_base().parent_hash() == local_tip_hash) {
let base = cached.payload_base();
let last_fb = cached.last().clone();
let transactions = txs.clone();
let cached_state = None;
(base, last_fb, transactions, cached_state, "cached")
Expand Down Expand Up @@ -179,20 +192,20 @@ impl<T: SignedTransaction> SequenceManager<T> {
// compute the state root, causing FlashblockConsensusClient to lack precomputed state for
// engine_newPayload. This is safe: we still have op-node as backstop to maintain
// chain progression.
let block_time_ms = (base.timestamp - local_tip_timestamp) * 1000;
let block_time_ms = (base.timestamp() - local_tip_timestamp) * 1000;
let expected_final_flashblock = block_time_ms / FLASHBLOCK_BLOCK_TIME;
let compute_state_root = self.compute_state_root &&
last_flashblock.diff.state_root.is_zero() &&
last_flashblock.index >= expected_final_flashblock.saturating_sub(1);
last_flashblock.diff().state_root().is_zero() &&
last_flashblock.index() >= expected_final_flashblock.saturating_sub(1);

trace!(
target: "flashblocks",
block_number = base.block_number,
block_number = base.block_number(),
source = source_name,
flashblock_index = last_flashblock.index,
flashblock_index = last_flashblock.index(),
expected_final_flashblock,
compute_state_root_enabled = self.compute_state_root,
state_root_is_zero = last_flashblock.diff.state_root.is_zero(),
state_root_is_zero = last_flashblock.diff().state_root().is_zero(),
will_compute_state_root = compute_state_root,
"Building from flashblock sequence"
);
Expand All @@ -201,8 +214,8 @@ impl<T: SignedTransaction> SequenceManager<T> {
base,
transactions,
cached_state,
last_flashblock_index: last_flashblock.index,
last_flashblock_hash: last_flashblock.diff.block_hash,
last_flashblock_index: last_flashblock.index(),
last_flashblock_hash: last_flashblock.diff().block_hash(),
compute_state_root,
})
}
Expand All @@ -227,7 +240,7 @@ impl<T: SignedTransaction> SequenceManager<T> {
});

// Update pending sequence with execution results
if self.pending.payload_base().is_some_and(|base| base.parent_hash == parent_hash) {
if self.pending.payload_base().is_some_and(|base| base.parent_hash() == parent_hash) {
self.pending.set_execution_outcome(execution_outcome);
self.pending.set_cached_reads(cached_reads);
trace!(
Expand All @@ -241,7 +254,7 @@ impl<T: SignedTransaction> SequenceManager<T> {
else if let Some((cached, _)) = self
.completed_cache
.iter_mut()
.find(|(c, _)| c.payload_base().parent_hash == parent_hash)
.find(|(c, _)| c.payload_base().parent_hash() == parent_hash)
{
// Only re-broadcast if we computed new information (state_root was missing).
// If sequencer already provided state_root, we already broadcast in insert_flashblock,
Expand All @@ -266,19 +279,18 @@ impl<T: SignedTransaction> SequenceManager<T> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestFlashBlockFactory;
use crate::{test_utils::TestFlashBlockFactory, FlashBlock};
use alloy_primitives::B256;
use op_alloy_consensus::OpTxEnvelope;

#[test]
fn test_sequence_manager_new() {
let manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
assert_eq!(manager.pending().count(), 0);
}

#[test]
fn test_insert_flashblock_creates_pending_sequence() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

let fb0 = factory.flashblock_at(0).build();
Expand All @@ -290,7 +302,7 @@ mod tests {

#[test]
fn test_insert_flashblock_caches_completed_sequence() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

// Build first sequence
Expand All @@ -314,7 +326,7 @@ mod tests {

#[test]
fn test_next_buildable_args_returns_none_when_empty() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let local_tip_hash = B256::random();
let local_tip_timestamp = 1000;

Expand All @@ -324,7 +336,7 @@ mod tests {

#[test]
fn test_next_buildable_args_matches_pending_parent() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

let fb0 = factory.flashblock_at(0).build();
Expand All @@ -340,7 +352,7 @@ mod tests {

#[test]
fn test_next_buildable_args_returns_none_when_parent_mismatch() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

let fb0 = factory.flashblock_at(0).build();
Expand All @@ -354,7 +366,7 @@ mod tests {

#[test]
fn test_next_buildable_args_prefers_pending_over_cached() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

// Create and finalize first sequence
Expand All @@ -373,7 +385,7 @@ mod tests {

#[test]
fn test_next_buildable_args_finds_cached_sequence() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

// Build and cache first sequence
Expand All @@ -396,7 +408,7 @@ mod tests {

#[test]
fn test_compute_state_root_logic_near_expected_final() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let block_time = 2u64;
let factory = TestFlashBlockFactory::new().with_block_time(block_time);

Expand All @@ -420,7 +432,7 @@ mod tests {

#[test]
fn test_no_compute_state_root_when_provided_by_sequencer() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let block_time = 2u64;
let factory = TestFlashBlockFactory::new().with_block_time(block_time);

Expand All @@ -437,7 +449,7 @@ mod tests {

#[test]
fn test_no_compute_state_root_when_disabled() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(false);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(false);
let block_time = 2u64;
let factory = TestFlashBlockFactory::new().with_block_time(block_time);

Expand All @@ -461,7 +473,7 @@ mod tests {

#[test]
fn test_cache_ring_buffer_evicts_oldest() {
let mut manager: SequenceManager<OpTxEnvelope> = SequenceManager::new(true);
let mut manager: SequenceManager<FlashBlock> = SequenceManager::new(true);
let factory = TestFlashBlockFactory::new();

// Fill cache with 4 sequences (cache size is 3, so oldest should be evicted)
Expand Down
Loading
Loading