Skip to content

Commit

Permalink
Merge pull request #70 from paritytech/bredamatt/sender-parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
bredamatt committed Jun 20, 2023
2 parents 9d4480a + 4d7702d commit 3fc1ef4
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 22 deletions.
11 changes: 11 additions & 0 deletions tests/kubernetes/para-single.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
"default_image": "docker.io/parity/polkadot:latest",
"chain": "rococo-local",
"default_command": "polkadot",
"genesis": {
"runtime": {
"runtime_genesis_config": {
"configuration": {
"config": {
"max_pov_size": 5242880
}
}
}
}
},
"nodes": [
{
"name": "alice",
Expand Down
11 changes: 11 additions & 0 deletions tests/native/single-para-native.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@
"default_image": "docker.io/parity/polkadot:latest",
"chain": "rococo-local",
"default_command": "polkadot",
"genesis": {
"runtime": {
"runtime_genesis_config": {
"configuration": {
"config": {
"max_pov_size": 5242880
}
}
}
}
},
"nodes": [
{
"name": "alice",
Expand Down
3 changes: 2 additions & 1 deletion utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ tokio = { version = "1.28.2", features = ["rt-multi-thread", "macros", "time"] }
parity-scale-codec = { version = "3.5.0", default-features = false, features = ["derive", "full", "bit-vec"] }
subxt = "0.29.0"
log = "0.4.18"
jsonrpsee = { version = "0.16", features = ["jsonrpsee-ws-client"] }

[features]
rococo = []
polkadot-parachain = []
tick = []
versi-tick = []
versi-relay = []
versi-relay = []
148 changes: 136 additions & 12 deletions utils/sender/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,35 @@ use log::*;
use sp_core::{sr25519::Pair as SrPair, Pair};
use subxt::{
config::extrinsic_params::{BaseExtrinsicParamsBuilder as Params, Era},
tx::PairSigner,
tx::{PairSigner, SubmittableExtrinsic},
PolkadotConfig,
OnlineClient,
};
use utils::{connect, runtime, Api, Error, DERIVATION};

mod pre;

use pre::pre_conditions;
use pre::{parallel_pre_conditions, pre_conditions};

/// Util program to send transactions
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Node URL
/// Node URL. Can be either a collator, or relaychain node based on whether you want to measure parachain TPS, or relaychain TPS.
#[arg(long)]
node_url: String,

/// Sender index.
/// Set to the number of desired threads (default: 1). If set > 1 the program will spawn multiple threads to send transactions in parallel.
#[arg(long, default_value_t = 1)]
threads: usize,

/// The sender index. Useful if you set threads to =< 1 and run multiple sender instances (as in the zombienet tests).
#[arg(long)]
sender_index: usize,
sender_index: Option<usize>,

/// Total number of senders
#[arg(long)]
total_senders: usize,
total_senders: Option<usize>,

/// Chunk size for sending the extrinsics.
#[arg(long, default_value_t = 50)]
Expand All @@ -46,7 +51,6 @@ async fn send_funds(
n_tx_sender: usize,
) -> Result<(), Error> {
let receivers = generate_receivers(n_tx_sender, sender_index); // one receiver per tx

let ext_deposit_addr = runtime::constants().balances().existential_deposit();
let ext_deposit = api.constants().at(&ext_deposit_addr)?;

Expand Down Expand Up @@ -106,13 +110,14 @@ async fn send_funds(
Ok(())
}

/// Generates a signer from a given index.
pub fn generate_signer(i: usize) -> PairSigner<PolkadotConfig, SrPair> {
let pair: SrPair = Pair::from_string(format!("{}{}", DERIVATION, i).as_str(), None).unwrap();
let signer: PairSigner<PolkadotConfig, SrPair> = PairSigner::new(pair);
signer
}

/// Generates a vector of account IDs.
/// Generates a vector of account IDs from a given index.
fn generate_receivers(n: usize, sender_index: usize) -> Vec<sp_core::crypto::AccountId32> {
let shift = sender_index * n;
let mut receivers = Vec::new();
Expand All @@ -126,20 +131,139 @@ fn generate_receivers(n: usize, sender_index: usize) -> Vec<sp_core::crypto::Acc
receivers
}

/// Parallel version of the send funds function, except that it does not send the transactions.
/// Note that signing is a CPU bound task, and hence this cannot be async.
/// As a consequence, we use spawn_blocking here and communicate with the main thread using an unbounded channel.
fn parallel_signing(
api: &Api,
threads: &usize,
n_tx_sender: usize,
producer: tokio::sync::mpsc::UnboundedSender<Vec<SubmittableExtrinsic<PolkadotConfig, OnlineClient<PolkadotConfig>>>>
) -> Result<(), Error> {
let ext_deposit_addr = runtime::constants().balances().existential_deposit();
let genesis_hash = api.genesis_hash();
let ext_deposit = api.constants().at(&ext_deposit_addr)?;

for i in 0..*threads {
let api = api.clone();
let producer = producer.clone();
tokio::task::spawn_blocking(move || {
debug!("Thread {}: preparing {} transactions", i, n_tx_sender);
let ext_deposit = ext_deposit.clone();
let genesis_hash = genesis_hash.clone();
let receivers = generate_receivers(n_tx_sender, i);
let mut txs = Vec::new();
for j in 0..n_tx_sender {
debug!("Thread {}: preparing transaction {}", i, j);
let shift = i * n_tx_sender;
let signer = generate_signer(shift + j);
debug!("Thread {}: generated signer {}{}", i, DERIVATION, shift + j);
let tx_params = Params::new().era(Era::Immortal, genesis_hash);
let tx_payload = runtime::tx()
.balances()
.transfer_keep_alive(receivers[j as usize].clone().into(), ext_deposit);
let signed_tx =
match api.tx().create_signed_with_nonce(&tx_payload, &signer, 0, tx_params) {
Ok(signed) => signed,
Err(e) => panic!("Thread {}: failed to sign transaction due to: {}", i, e),
};
txs.push(signed_tx);
}
match producer.send(txs) {
Ok(_) => (),
Err(e) => error!("Thread {}: failed to send transactions to consumer: {}", i, e),
}
info!("Thread {}: prepared and signed {} transactions", i, n_tx_sender);
});
}
Ok(())
}

/// Here the signed extrinsics are submitted.
async fn submit_txs(
consumer: &mut tokio::sync::mpsc::UnboundedReceiver<Vec<SubmittableExtrinsic<PolkadotConfig, OnlineClient<PolkadotConfig>>>>,
chunk_size: usize,
threads: usize,
) -> Result<(), Error> {
let mut submittable_vecs = Vec::new();
while let Some(signed_txs) = consumer.recv().await {
debug!("Consumer: received {} submittable transactions", signed_txs.len());
submittable_vecs.push(signed_txs);
if threads == submittable_vecs.len() {
debug!("Consumer: received all submittable transactions, now starting submission");
for vec in &submittable_vecs {
for chunk in vec.chunks(chunk_size) {
let mut hashes = Vec::new();
for signed_tx in chunk {
let hash = signed_tx.submit();
hashes.push(hash);
}
try_join_all(hashes).await?;
debug!("Sender submitted chunk with size: {}", chunk_size);
}
}
info!("Sender submitted all transactions");
}
}
Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

let args = Args::parse();
let api = connect(&args.node_url).await?;
let n_tx_sender = args.num / args.total_senders;
let node_url = args.node_url;
let threads = args.threads;
let chunk_size = args.chunk_size;

pre_conditions(&api, args.sender_index, n_tx_sender).await?;
// This index is optional and only set when single-threaded mode is used.
// If it is not set, we default to 0.
let sender_index = match args.sender_index {
Some(i) => i,
None => 0,
};

send_funds(&api, args.sender_index, args.chunk_size, n_tx_sender).await?;
// In case the optional total_senders argument is not passed for single-threaded mode,
// we must make sure that we split the work evenly between threads for multi-threaded mode.
let n_tx_sender = match args.total_senders {
Some(tot_s) => args.num / tot_s,
None => args.num / threads,
};

// Create the client here, so that we can use it in the various functions.
let api = connect(&node_url).await?;

match args.threads {
n if n > 1 => {
info!("Starting sender in parallel mode");
let (producer, mut consumer) = tokio::sync::mpsc::unbounded_channel();
// I/O Bound
parallel_pre_conditions(&api, &threads, &n_tx_sender).await?;
// CPU Bound
match parallel_signing(&api, &threads, n_tx_sender, producer) {
Ok(_) => (),
Err(e) => panic!("Error: {:?}", e),
}
// I/O Bound
submit_txs(&mut consumer, chunk_size, threads).await?;
},
// Single-threaded mode
n if n == 1 => {
debug!("Starting sender in single-threaded mode");
match args.sender_index {
Some(i) => {
pre_conditions(&api, &i, &n_tx_sender).await?;
send_funds(&api, sender_index, chunk_size, n_tx_sender).await?;
},
None => panic!("Must set sender index when running in single-threaded mode"),
}
},
// All other non-sensical cases
_ => panic!("Number of threads must be 1, or greater!"),
}
Ok(())
}

Expand Down
34 changes: 31 additions & 3 deletions utils/sender/src/pre/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use subxt::{tx::PairSigner, utils::AccountId32, PolkadotConfig};
use utils::{runtime, Api, Error, DERIVATION};

/// Check pre-conditions of accounts attributed to this sender
pub async fn pre_conditions(api: &Api, i: usize, n: usize) -> Result<(), Error> {
pub async fn pre_conditions(api: &Api, i: &usize, n: &usize) -> Result<(), Error> {
info!(
"Sender {}: checking pre-conditions of accounts {}{} through {}{}",
i,
Expand All @@ -14,16 +14,44 @@ pub async fn pre_conditions(api: &Api, i: usize, n: usize) -> Result<(), Error>
DERIVATION,
(i + 1) * n - 1
);

for j in i * n..(i + 1) * n {
let pair: SrPair =
Pair::from_string(format!("{}{}", DERIVATION, j).as_str(), None).unwrap();
let signer: PairSigner<PolkadotConfig, SrPair> = PairSigner::new(pair);
let account = signer.account_id();
info!("Checking account: {}", account);
debug!("Sender {}: checking account {}", i, account);
check_account(&api, account).await?;
}
debug!("Sender {}: all pre-conditions checked and succeeded!", i);
Ok(())
}

/// Use JoinSet to run prechecks in a multi-threaded way.
/// The pre_condition call is async because it fetches the chain state and hence is I/O bound.
pub async fn parallel_pre_conditions(
api: &Api,
threads: &usize,
n_tx_sender: &usize,
) -> Result<(), Error> {
let mut precheck_set = tokio::task::JoinSet::new();
for i in 0..*threads {
let api = api.clone();
let n_tx_sender = n_tx_sender.clone();
precheck_set.spawn(async move {
match pre_conditions(&api, &i, &n_tx_sender).await {
Ok(_) => Ok(()),
Err(e) => Err(e),
}
});
}
while let Some(result) = precheck_set.join_next().await {
match result {
Ok(_) => (),
Err(e) => {
error!("Error: {:?}", e);
},
}
}
Ok(())
}

Expand Down
18 changes: 12 additions & 6 deletions utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use log::{error, info, warn};
use log::{error, debug, warn};
use std::time::Duration;
use subxt::{OnlineClient, PolkadotConfig};
use jsonrpsee::ws_client::WsClientBuilder;
use std::sync::Arc;

#[cfg(feature = "polkadot-parachain")]
#[subxt::subxt(runtime_metadata_path = "metadata/polkadot-parachain.scale")]
Expand Down Expand Up @@ -37,12 +39,16 @@ pub const DERIVATION: &str = "//Sender/";
/// Tries [`MAX_ATTEMPTS`] times to connect to the given node.
pub async fn connect(url: &str) -> Result<Api, Error> {
for i in 1..=MAX_ATTEMPTS {
info!("Attempt #{}: Connecting to {}", i, url);
let promise = OnlineClient::<PolkadotConfig>::from_url(url);

match promise.await {
debug!("Attempt #{}: Connecting to {}", i, url);
let rpc = WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(u32::MAX as usize)
.build(url)
.await?;
let api = Api::from_rpc_client(Arc::new(rpc));
match api.await {
Ok(client) => {
info!("Connection established to: {}", url);
debug!("Connection established to: {}", url);
return Ok(client);
},
Err(err) => {
Expand Down

0 comments on commit 3fc1ef4

Please sign in to comment.