Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PART 1] Improve TX pool #1591

Closed
wants to merge 10 commits into from
4 changes: 2 additions & 2 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ impl ShardsManager {
self.tx_pools
.entry(shard_id)
.or_insert_with(TransactionPool::default)
.insert_transaction(tx);
.insert_transaction(tx.transaction);
}

pub fn remove_transactions(
Expand All @@ -449,7 +449,7 @@ impl ShardsManager {
self.tx_pools
.entry(shard_id)
.or_insert_with(TransactionPool::default)
.reintroduce_transactions(transactions);
.reintroduce_transactions(transactions.clone());
}

pub fn receipts_recipient_filter(
Expand Down
258 changes: 203 additions & 55 deletions chain/pool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,83 +1,120 @@
use std::collections::btree_map::BTreeMap;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use near_chain::ValidTransaction;
use near_crypto::PublicKey;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, Nonce};
use near_primitives::types::AccountId;

pub use crate::types::Error;
use near_primitives::hash::CryptoHash;

pub mod types;

/// Transaction pool: keeps track of transactions that were not yet accepted into the block chain.
#[derive(Default)]
pub struct TransactionPool {
num_transactions: usize,
/// Transactions grouped by account and ordered by nonce.
pub transactions: HashMap<AccountId, BTreeMap<Nonce, SignedTransaction>>,
/// Transactions grouped by a pair of (account ID, signer public key).
/// It's more efficient to keep transactions unsorted and with potentially conflicting nonce
/// than create a BTreeMap for every transaction on average.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment seems outdated. It is not BTreeMap anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've previously used BTreeMap instead of Vec.

pub transactions: HashMap<(AccountId, PublicKey), Vec<SignedTransaction>>,
/// Set of all hashes to quickly check if the given transaction is in the pool.
pub unique_transactions: HashSet<CryptoHash>,
}

impl TransactionPool {
/// Insert a valid transaction into the pool that passed validation.
pub fn insert_transaction(&mut self, valid_transaction: ValidTransaction) {
let signer_id = valid_transaction.transaction.transaction.signer_id.clone();
let nonce = valid_transaction.transaction.transaction.nonce;
self.num_transactions += 1;
/// Insert a signed transaction into the pool that passed validation.
pub fn insert_transaction(&mut self, signed_transaction: SignedTransaction) {
if self.unique_transactions.contains(&signed_transaction.get_hash()) {
return;
}
self.unique_transactions.insert(signed_transaction.get_hash());
let signer_id = signed_transaction.transaction.signer_id.clone();
let signer_public_key = signed_transaction.transaction.public_key.clone();
self.transactions
.entry(signer_id)
.or_insert_with(BTreeMap::new)
.insert(nonce, valid_transaction.transaction);
.entry((signer_id, signer_public_key))
.or_insert_with(Vec::new)
.push(signed_transaction);
}

/// Take transactions from the pool, in the appropriate order to be put in a new block.
/// Ensure that on average they will fit into expected weight.
/// Take `min(self.len(), max_number_of_transactions)` transactions from the pool, in the
/// appropriate order. We first take one transaction per key of (AccountId, PublicKey) with
/// the lowest nonce, then we take the next transaction per key with the lowest nonce.
pub fn prepare_transactions(
&mut self,
expected_weight: u32,
max_number_of_transactions: u32,
) -> Result<Vec<SignedTransaction>, Error> {
// TODO: pack transactions better.
let result = self
.transactions
.values()
.flat_map(BTreeMap::values)
.take(expected_weight as usize)
.cloned()
.collect();
let mut sorted = false;
let mut result = vec![];
while result.len() < max_number_of_transactions as usize && !self.transactions.is_empty() {
let mut keys_to_remove = vec![];
for (key, txs) in self.transactions.iter_mut() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iteration over HashMap and HashSet is non-deterministic. Meaning, if we fill HashMap with kv-elements and iterate over keys or key-values the order will be different with different executions. This might not be an issue, since there is exactly one chunk producer, but I think we still want the order to be strictly defined not by some random seed that was used in initialization of HashSet, but by our random seed. Also, this seems to be a part the protocol, and we don't want to say the order of transaction in our protocol is defined through internal implementation of bucketing in HashMap.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline decided this is intended behavior.
Adding docs to nomicon.

if !sorted {
// Sort by nonce in non-increasing order to pop from the end
txs.sort_by_key(|a| std::cmp::Reverse(a.transaction.nonce));
}
let tx = txs.pop().expect("transaction groups shouldn't be empty");
if txs.is_empty() {
keys_to_remove.push(key.clone());
}
result.push(tx);
if result.len() >= max_number_of_transactions as usize {
break;
}
}
sorted = true;
// Removing empty keys
for key in keys_to_remove {
self.transactions.remove(&key);
}
}
for tx in &result {
self.unique_transactions.remove(&tx.get_hash());
}
Ok(result)
}

/// Quick reconciliation step - evict all transactions that already in the block
/// or became invalid after it.
pub fn remove_transactions(&mut self, transactions: &Vec<SignedTransaction>) {
for transaction in transactions.iter() {
let signer_id = &transaction.transaction.signer_id;
let nonce = transaction.transaction.nonce;
let mut remove_map = false;
if let Some(map) = self.transactions.get_mut(signer_id) {
map.remove(&nonce);
remove_map = map.is_empty();
pub fn remove_transactions(&mut self, transactions: &[SignedTransaction]) {
let mut grouped_transactions = HashMap::new();
for tx in transactions {
if self.unique_transactions.contains(&tx.get_hash()) {
let signer_id = &tx.transaction.signer_id;
let signer_public_key = &tx.transaction.public_key;
grouped_transactions
.entry((signer_id, signer_public_key))
.or_insert_with(HashSet::new)
.insert(tx.get_hash());
}
if remove_map {
self.num_transactions -= 1;
self.transactions.remove(signer_id);
}
for (key, hashes) in grouped_transactions {
let key = (key.0.clone(), key.1.clone());
let mut remove_entry = false;
if let Some(v) = self.transactions.get_mut(&key) {
v.retain(|tx| !hashes.contains(&tx.get_hash()));
remove_entry = v.is_empty();
}
if remove_entry {
self.transactions.remove(&key);
}
for hash in hashes {
self.unique_transactions.remove(&hash);
}
}
}

/// Reintroduce transactions back during the reorg
pub fn reintroduce_transactions(&mut self, transactions: &Vec<SignedTransaction>) {
for transaction in transactions.iter() {
let transaction = transaction.clone();
self.insert_transaction(ValidTransaction { transaction });
/// Reintroduce transactions back during the chain reorg
pub fn reintroduce_transactions(&mut self, transactions: Vec<SignedTransaction>) {
for tx in transactions {
self.insert_transaction(tx);
}
}

pub fn len(&self) -> usize {
self.num_transactions
self.unique_transactions.len()
}

pub fn is_empty(&self) -> bool {
self.num_transactions == 0
self.unique_transactions.is_empty()
}
}

Expand All @@ -88,40 +125,151 @@ mod tests {
use rand::seq::SliceRandom;
use rand::thread_rng;

use near_chain::ValidTransaction;
use near_crypto::{InMemorySigner, KeyType};
use near_primitives::transaction::SignedTransaction;

use crate::TransactionPool;
use near_primitives::hash::CryptoHash;
use near_primitives::types::Balance;

fn generate_transactions(
signer_id: &str,
signer_seed: &str,
starting_nonce: u64,
end_nonce: u64,
) -> Vec<SignedTransaction> {
let signer =
Arc::new(InMemorySigner::from_seed(signer_seed, KeyType::ED25519, signer_seed));
(starting_nonce..=end_nonce)
.map(|i| {
SignedTransaction::send_money(
i,
signer_id.to_string(),
"bob.near".to_string(),
&*signer,
i as Balance,
CryptoHash::default(),
)
})
.collect()
}

fn process_txs_to_nonces(
mut transactions: Vec<SignedTransaction>,
expected_weight: u32,
) -> (Vec<u64>, TransactionPool) {
let mut pool = TransactionPool::default();
let mut rng = thread_rng();
transactions.shuffle(&mut rng);
for tx in transactions {
pool.insert_transaction(tx);
}
(
pool.prepare_transactions(expected_weight)
.unwrap()
.iter()
.map(|tx| tx.transaction.nonce)
.collect(),
pool,
)
}

/// Add transactions of nonce from 1..10 in random order. Check that mempool
/// orders them correctly.
#[test]
fn test_order_nonce() {
let signer =
Arc::new(InMemorySigner::from_seed("alice.near", KeyType::ED25519, "alice.near"));
let mut transactions: Vec<_> = (1..10)
let transactions = generate_transactions("alice.near", "alice.near", 1, 10);
let (nonces, _) = process_txs_to_nonces(transactions, 10);
assert_eq!(nonces, (1..=10).collect::<Vec<u64>>());
}

/// Add transactions of nonce from 1..10 in random order from 2 signers. Check that mempool
/// orders them correctly.
#[test]
fn test_order_nonce_two_signers() {
let mut transactions = generate_transactions("alice.near", "alice.near", 1, 10);
transactions.extend(generate_transactions("bob.near", "bob.near", 1, 10));

let (nonces, _) = process_txs_to_nonces(transactions, 10);
assert_eq!(nonces, (1..=5).map(|a| vec![a; 2]).flatten().collect::<Vec<u64>>());
}

/// Add transactions of nonce from 1..10 in random order from the same account but with
/// different public keys.
#[test]
fn test_order_nonce_same_account_two_access_keys_variable_nonces() {
let mut transactions = generate_transactions("alice.near", "alice.near", 1, 10);
transactions.extend(generate_transactions("alice.near", "bob.near", 21, 30));

let (nonces, _) = process_txs_to_nonces(transactions, 10);
if nonces[0] == 1 {
assert_eq!(nonces, (1..=5).map(|a| vec![a, a + 20]).flatten().collect::<Vec<u64>>());
} else {
assert_eq!(nonces, (1..=5).map(|a| vec![a + 20, a]).flatten().collect::<Vec<u64>>());
}
}

/// Add transactions of nonce from 1..=3 and transactions with nonce 21..=31. Pull 10.
/// Then try to get another 10.
#[test]
fn test_retain() {
let mut transactions = generate_transactions("alice.near", "alice.near", 1, 3);
transactions.extend(generate_transactions("alice.near", "bob.near", 21, 31));

let (nonces, mut pool) = process_txs_to_nonces(transactions, 10);
if nonces[0] == 1 {
assert_eq!(nonces, vec![1, 21, 2, 22, 3, 23, 24, 25, 26, 27]);
} else {
assert_eq!(nonces, vec![21, 1, 22, 2, 23, 3, 24, 25, 26, 27]);
}
let nonces: Vec<u64> =
pool.prepare_transactions(10).unwrap().iter().map(|tx| tx.transaction.nonce).collect();
assert_eq!(nonces, vec![28, 29, 30, 31]);
}

#[test]
fn test_remove_transactions() {
let n = 100;
let mut transactions = (1..=n)
.map(|i| {
let signer_seed = format!("user_{}", i % 3);
let signer = Arc::new(InMemorySigner::from_seed(
&signer_seed,
KeyType::ED25519,
&signer_seed,
));
let signer_id = format!("user_{}", i % 5);
SignedTransaction::send_money(
i,
"alice.near".to_string(),
signer_id.to_string(),
"bob.near".to_string(),
&*signer,
i as Balance,
CryptoHash::default(),
)
})
.collect();
.collect::<Vec<_>>();

let mut pool = TransactionPool::default();
let mut rng = thread_rng();
transactions.shuffle(&mut rng);
for tx in transactions {
pool.insert_transaction(ValidTransaction { transaction: tx });
for tx in transactions.clone() {
println!("{:?}", tx);
pool.insert_transaction(tx);
}
let transactions = pool.prepare_transactions(10).unwrap();
let nonces: Vec<u64> = transactions.iter().map(|tx| tx.transaction.nonce).collect();
assert_eq!(nonces, (1..10).collect::<Vec<u64>>())
assert_eq!(pool.len(), n as usize);

transactions.shuffle(&mut rng);
let (txs_to_remove, txs_to_check) = transactions.split_at(transactions.len() / 2);
pool.remove_transactions(txs_to_remove);

assert_eq!(pool.len(), txs_to_check.len());

let mut pool_txs = pool.prepare_transactions(txs_to_check.len() as u32).unwrap();
pool_txs.sort_by_key(|tx| tx.transaction.nonce);
let mut expected_txs = txs_to_check.to_vec();
expected_txs.sort_by_key(|tx| tx.transaction.nonce);

assert_eq!(pool_txs, expected_txs);
}
}
16 changes: 16 additions & 0 deletions core/crypto/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use rand::SeedableRng;
use serde_derive::{Deserialize, Serialize};

use lazy_static::lazy_static;
use std::hash::{Hash, Hasher};

lazy_static! {
pub static ref SECP256K1: secp256k1::Secp256k1 = secp256k1::Secp256k1::new();
Expand Down Expand Up @@ -121,6 +122,21 @@ impl PublicKey {
}
}

impl Hash for PublicKey {
fn hash<H: Hasher>(&self, state: &mut H) {
match self {
PublicKey::ED25519(public_key) => {
state.write_u8(0u8);
state.write(&public_key.0);
}
PublicKey::SECP256K1(public_key) => {
state.write_u8(1u8);
state.write(&public_key.0);
}
}
}
}

impl Display for PublicKey {
fn fmt(&self, f: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(f, "{}", String::from(self))
Expand Down