Skip to content

Commit

Permalink
congestion: create congestion modelling framework (#10695)
Browse files Browse the repository at this point in the history
This tool let's us model different workloads and congestion control
strategies. See the added READEME.md for how it works.

This is part of the first milestone in
[near-one-project-tracking/issues/48](near/near-one-project-tracking#48)
  • Loading branch information
jakmeier committed Mar 5, 2024
1 parent ac5cba2 commit 00d51bd
Show file tree
Hide file tree
Showing 25 changed files with 1,610 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ members = [
"test-utils/testlib",
"tools/database",
"tools/chainsync-loadtest",
"tools/congestion-model",
"tools/fork-network",
"tools/indexer/example",
"tools/mirror",
Expand Down
14 changes: 14 additions & 0 deletions tools/congestion-model/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "congestion-model"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
repository.workspace = true
license.workspace = true
publish = false

[dependencies]

[lints]
workspace = true
123 changes: 123 additions & 0 deletions tools/congestion-model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Congestion Model

A model for simulating cross-shard congestion behavior.
We use it to compare different design proposals against each other.

## Running the model

Simply run it with

```bash
cargo run
```

and you should see a summary table of model execution results.

## Architecture

A model execution takes a workload and a design proposal as inputs and then it
produces some evaluation output.

```txt
WORKLOAD DESIGN PROPOSAL
[trait Producer] [trait CongestionStrategy]
\ /
\ /
\ /
\ /
\ /
CONGESTION MODEL
[struct Model]
|
|
|
|
|
EVALUATION
```

Each component of the four components above has its own directors.
- ./src/workload
- ./src/strategy
- ./src/model
- ./src/evaluation

## Add more strategies

To add a new congestion strategy, create a new module in [./src/strategy] and
create a `struct` in it. Implement the `trait CongestionStrategy` for your struct.

```rust
pub trait CongestionStrategy {
/// Initial state and register all necessary queues.
fn init(&mut self, id: ShardId, other_shards: &[ShardId], queue_factory: &mut dyn QueueFactory);

/// Decide which receipts to execute, which to delay, and which to forward.
fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext);
}
```

In the `init` function, you may create receipt queues.

In the `compute_chunk` function, consume incoming receipts and transactions.
Then either execute them, or put them in a queue.

When executing receipts or transactions, it potentially produces outgoing
receipts. These can be forwarded or kept in a queue for later forwarding. It all
depends on your strategy for applying backpressure.

To communicate between shards, store congestion information in the current
block. It is accessible by all shards one round later, so they can make
decisions to throttle their own rate if necessary.

```rust
fn compute_chunk(&mut self, ctx: &mut ChunkExecutionContext) {
// store own data in the block
let shared_info = MyInfo::new();
ctx.current_block_info().insert(shared_info);

// read data from previous block
for shard_info in ctx.prev_block_info().values() {
let info = shard_info.get::<MyInfo>().unwrap();
}
}
```

Internally, this uses a `HashMap<TypeId, Box<dyn Any>>` allowing to store any
retrieve any data in a type-safe way without data serialization.

In the end, go to [src/main.rs] and add your strategy to the list of tested
strategies.

## Add more workloads

To add more workloads, create a module in [./src/workload] and create a struct.
Then implement the `trait Producer` for it.

```rust
/// Produces workload in the form of transactions.
///
/// Describes how many messages by which producer should be created in a model
/// execution.
pub trait Producer {
/// Set up initial state of the producer if necessary.
fn init(&mut self, shards: &[ShardId]);

/// Create transactions for a round.
fn produce_transactions(
&mut self,
round: Round,
shards: &[ShardId],
tx_factory: &mut dyn FnMut(ShardId) -> TransactionBuilder,
) -> Vec<TransactionBuilder>;
}
```

In the `init` function, you have the option to initialize some state that
depends on shard ids.

In the `produce_transactions` function, create as many transactions as you want
by calling `let tx = tx_factory()` followed by calls on the transaction builder.
Start with `let receipt_id = tx.add_first_receipt(receipt_definition,
conversion_gas)` and add more receipts to it by calling `let receipt_id_1 =
tx.new_outgoing_receipt(receipt_id, receipt_definition)`.
75 changes: 75 additions & 0 deletions tools/congestion-model/src/evaluation/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
pub use transaction_progress::TransactionStatus;

use crate::{GGas, Model, ShardId};
use std::collections::HashMap;

pub mod summary_table;
mod transaction_progress;

#[derive(Debug, Clone)]
pub struct ShardQueueLengths {
pub unprocessed_incoming_transactions: u64,
pub incoming_receipts: u64,
pub queued_receipts: u64,
}

#[derive(Debug, Clone)]
pub struct GasThroughput {
pub total: GGas,
}

#[derive(Debug, Clone)]
pub struct Progress {
pub finished_transactions: usize,
pub pending_transactions: usize,
pub waiting_transactions: usize,
pub failed_transactions: usize,
}

impl Model {
pub fn queue_lengths(&self) -> HashMap<ShardId, ShardQueueLengths> {
let mut out = HashMap::new();
for shard in self.shard_ids.clone() {
let unprocessed_incoming_transactions =
self.queues.incoming_transactions(shard).len() as u64;
let incoming_receipts = self.queues.incoming_receipts(shard).len() as u64;
let total_shard_receipts: u64 =
self.queues.shard_queues(shard).map(|q| q.len() as u64).sum();

let shard_stats = ShardQueueLengths {
unprocessed_incoming_transactions,
incoming_receipts,
queued_receipts: total_shard_receipts - incoming_receipts,
};
out.insert(shard, shard_stats);
}
out
}

pub fn gas_throughput(&self) -> GasThroughput {
GasThroughput { total: self.transactions.all_transactions().map(|tx| tx.gas_burnt()).sum() }
}

pub fn progress(&self) -> Progress {
let mut finished_transactions = 0;
let mut pending_transactions = 0;
let mut waiting_transactions = 0;
let mut failed_transactions = 0;

for tx in self.transactions.all_transactions() {
match tx.status() {
TransactionStatus::Init => waiting_transactions += 1,
TransactionStatus::Pending => pending_transactions += 1,
TransactionStatus::Failed => failed_transactions += 1,
TransactionStatus::FinishedSuccess => finished_transactions += 1,
}
}

Progress {
finished_transactions,
pending_transactions,
waiting_transactions,
failed_transactions,
}
}
}
27 changes: 27 additions & 0 deletions tools/congestion-model/src/evaluation/summary_table.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{Model, PGAS};

pub fn print_summary_header() {
println!(
"{:<25}{:<25}{:>25}{:>25}{:>25}",
"WORKLOAD", "STRATEGY", "BURNT GAS", "TRANSACTIONS FINISHED", "MAX QUEUE LEN",
);
}

pub fn print_summary_row(model: &Model, workload: &str, strategy: &str) {
let queues = model.queue_lengths();
let throughput = model.gas_throughput();
let progress = model.progress();

let mut max_queue_len = 0;
for q in queues.values() {
let len = q.incoming_receipts + q.queued_receipts;
max_queue_len = len.max(max_queue_len);
}

println!(
"{workload:<25}{strategy:<25}{:>20} PGas{:>25}{:>25}",
throughput.total / PGAS,
progress.finished_transactions,
max_queue_len
);
}
36 changes: 36 additions & 0 deletions tools/congestion-model/src/evaluation/transaction_progress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use crate::{GGas, Transaction};

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TransactionStatus {
Init,
Pending,
Failed,
FinishedSuccess,
}

impl Transaction {
pub(crate) fn status(&self) -> TransactionStatus {
if !self.dropped_receipts.is_empty() {
return TransactionStatus::Failed;
}

if !self.pending_receipts.is_empty() {
return TransactionStatus::Pending;
}

if self.executed_receipts.is_empty() {
return TransactionStatus::Init;
}

return TransactionStatus::FinishedSuccess;
}

pub(crate) fn gas_burnt(&self) -> GGas {
if self.future_receipts.contains_key(&self.initial_receipt) {
return 0;
}
let receipts_gas: GGas =
self.executed_receipts.values().map(|receipt| receipt.gas_burnt()).sum();
receipts_gas + self.tx_conversion_cost
}
}
36 changes: 36 additions & 0 deletions tools/congestion-model/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
mod evaluation;
mod model;
pub mod strategy;
pub mod workload;

pub use evaluation::{summary_table, TransactionStatus};
pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId};
pub use strategy::CongestionStrategy;
pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder};

pub(crate) use model::Transaction;

/// Gas is measured in Giga Gas as the smallest unit. This way, it fits in a u64
/// even for long simulations.
type GGas = u64;
type Round = u64;

pub const GGAS: GGas = 10u64.pow(0);
pub const TGAS: GGas = 10u64.pow(3);
pub const PGAS: GGas = 10u64.pow(6);

/// How much gas can be executed in a chunk. (Soft limit, one additional receipt is allowed.)
///
/// We assume chunk.gas_limit() is constant. In fact, validators are allowed to
/// dynamically adjust it in small steps per chunk. But in genesis it was set to
/// 1000 TGas and no known validator client ever changes it.
pub const GAS_LIMIT: GGas = 1000 * TGAS;

/// How much gas can be spent for converting transactions, in the original
/// design of Near Protocol.
///
/// The TX gas limit has been hard-coded to gas_limit / 2 for years.
/// https://github.com/near/nearcore/blob/ac5cba2e7a7507aecce09cbd0152641e986ea381/chain/chain/src/runtime/mod.rs#L709
///
/// Changing this could be part of a congestion strategy.
pub const TX_GAS_LIMIT: GGas = 500 * TGAS;
Loading

0 comments on commit 00d51bd

Please sign in to comment.