Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Light Client transaction queue, initial LightDispatcher #4501

Merged
merged 18 commits into from
Feb 15, 2017
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 65 additions & 18 deletions ethcore/light/src/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,30 @@ impl CurrentNonce {
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct TransactionInfo {
hash: H256,
nonce: U256,
condition: Option<Condition>,
}

impl<'a> From<&'a PendingTransaction> for TransactionInfo {
fn from(tx: &'a PendingTransaction) -> Self {
TransactionInfo {
hash: tx.hash(),
nonce: tx.nonce.clone(),
condition: tx.condition.clone(),
}
}
}

// transactions associated with a specific account.
#[derive(Debug, Clone, PartialEq, Eq)]
struct AccountTransactions {
// believed current nonce (gotten from initial given TX or `cull` calls).
cur_nonce: CurrentNonce,
current: Vec<PendingTransaction>, // ordered "current" transactions (cur_nonce onwards)
future: BTreeMap<U256, PendingTransaction>, // "future" transactions.
current: Vec<TransactionInfo>, // ordered "current" transactions (cur_nonce onwards)
future: BTreeMap<U256, TransactionInfo>, // "future" transactions.
}

impl AccountTransactions {
Expand Down Expand Up @@ -110,12 +127,15 @@ impl TransactionQueue {
let sender = tx.sender();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems right to me. Weirdness in online viewer is just line wrapping.

let hash = tx.hash();
let nonce = tx.nonce;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to check if transaction is already in queue (by hash?)

let tx_info = TransactionInfo::from(&tx);

if self.by_hash.contains_key(&hash) { return Err(TransactionError::AlreadyImported) }

let res = match self.by_account.entry(sender) {
Entry::Vacant(entry) => {
entry.insert(AccountTransactions {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we will store a lot of old transactions, cause if all transactions from particular sender are imported we will remove the entry here.
But then if some node propagates it again we will insert it to the queue with Assumed nonce, and will never cull it unless the sender actually sends another transaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Light nodes don't have the capability to validate transactions, so this queue isn't intended to store anything propagated from the network, only local stuff from RPC. The intended usage is to fetch actual nonces at the best block for all queued_senders and call cull using those.

cur_nonce: CurrentNonce::Assumed(nonce),
current: vec![tx.clone()],
current: vec![tx_info],
future: BTreeMap::new(),
});

Expand All @@ -138,7 +158,8 @@ impl TransactionQueue {
trace!(target: "txqueue", "Replacing existing transaction from {} with nonce {}",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably replacement should happen only if gas_price is higher than the previous one, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the intended use-case (storing transactions only of local origin), it makes sense to replace whenever the same nonce is encountered.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but if you ever propagate transaction with lower gas price it will never be replaced in full nodes transaction queue (at least the ones using default implementations) giving the user false feeling that the transaction was replaced, although this could be left to be displayed in UI.

Copy link
Contributor Author

@rphmeier rphmeier Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user wants to try with a lower gas price then they're welcome to; the least we can do is allow our implementation to propagate it to the network. What the peers do with the transaction is beyond our control. The fact that it would probably not work should be left to documentation instead of behavior.

sender, nonce);

acct_txs.current[idx] = tx.clone();
let old = ::std::mem::replace(&mut acct_txs.current[idx], tx_info);
self.by_hash.remove(&old.hash);

TransactionImportResult::Current
}
Expand All @@ -149,10 +170,10 @@ impl TransactionQueue {
// current is sorted with one tx per nonce,
// so if a tx with given nonce wasn't found that means it is either
// earlier in nonce than all other "current" transactions or later.
debug_assert!(idx == 0 || idx == cur_len);
assert!(idx == 0 || idx == cur_len);

if idx == 0 && acct_txs.current.first().map_or(false, |f| f.nonce != incr_nonce) {
let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx.clone()]);
let old_cur = ::std::mem::replace(&mut acct_txs.current, vec![tx_info]);

trace!(target: "txqueue", "Moving {} transactions with nonce > {} to future",
old_cur.len(), incr_nonce);
Expand All @@ -166,14 +187,14 @@ impl TransactionQueue {
} else if idx == cur_len && acct_txs.current.last().map_or(false, |f| f.nonce + 1.into() != nonce) {
trace!(target: "txqueue", "Queued future transaction for {}, nonce={}", sender, nonce);
let future_nonce = nonce;
acct_txs.future.insert(future_nonce, tx.clone());
acct_txs.future.insert(future_nonce, tx_info);

TransactionImportResult::Future
} else {
trace!(target: "txqueue", "Queued current transaction for {}, nonce={}", sender, nonce);

// insert, then check if we've filled any gaps.
acct_txs.current.insert(idx, tx.clone());
acct_txs.current.insert(idx, tx_info);
acct_txs.adjust_future();

TransactionImportResult::Current
Expand Down Expand Up @@ -202,14 +223,20 @@ impl TransactionQueue {
/// Get all transactions ready to be propagated.
/// `best_block_number` and `best_block_timestamp` are used to filter out conditionally
/// propagated transactions.
///
/// Returned transactions are batched by sender, in order of ascending nonce.
pub fn ready_transactions(&self, best_block_number: u64, best_block_timestamp: u64) -> Vec<PendingTransaction> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should indicate that the transactions are not sorted by any priority (actually it will batch transactions from single sender togethe).
Since we propagate transactions in chunks some senders will always be a bit more priviledged by some node.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't implemented the transaction propagation code for the light node just yet. I'll add docs that it's in "arbitrary" order.

self.by_account.values().flat_map(|acct_txs| {
acct_txs.current.iter().take_while(|tx| match tx.condition {
None => true,
Some(Condition::Number(blk_num)) => blk_num <= best_block_number,
Some(Condition::Timestamp(time)) => time <= best_block_timestamp,
}).cloned()
}).collect()
self.by_account.values()
.flat_map(|acct_txs| {
acct_txs.current.iter().take_while(|tx| match tx.condition {
None => true,
Some(Condition::Number(blk_num)) => blk_num <= best_block_number,
Some(Condition::Timestamp(time)) => time <= best_block_timestamp,
}).map(|info| info.hash)
})
.filter_map(|hash| self.by_hash.get(&hash))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to put .expect here instead to track potential issues (had issues like this in standard transaction pool)

Copy link
Contributor Author

@rphmeier rphmeier Feb 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to use filter_map specifically to avoid panics in this case. I think it's better for something not to be propagated than for the whole node to go down. Our style is to have panickers be provable: I don't think the invariant that by_hash be fully consistent with by_account is strong enough to assert (although it should really be the case). Adding a warning here "Inconsistency detected between by_account and by_hash" makes sense to me, though.

.cloned()
.collect()
}

/// Addresses for which we store transactions.
Expand All @@ -231,17 +258,17 @@ impl TransactionQueue {
for old in old_future {
let hash = acct_txs.future.remove(&old)
.expect("key extracted from keys iterator; known to exist; qed")
.hash();
.hash;
removed_hashes.push(hash);
}

// then cull from "current".
let valid_pos = acct_txs.current.iter().position(|tx| tx.nonce >= cur_nonce);
match valid_pos {
None =>
removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash())),
removed_hashes.extend(acct_txs.current.drain(..).map(|tx| tx.hash)),
Some(valid) =>
removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash())),
removed_hashes.extend(acct_txs.current.drain(..valid).map(|tx| tx.hash)),
}

// now try and move stuff out of future into current.
Expand Down Expand Up @@ -418,4 +445,24 @@ mod tests {

assert!(txq.import(tx_b.fake_sign(sender).into()).is_err())
}

#[test]
fn replace_is_removed() {
let sender = Address::default();
let mut txq = TransactionQueue::default();

let tx_b: PendingTransaction = Transaction::default().fake_sign(sender).into();
let tx_a: PendingTransaction = {
let mut tx_a = Transaction::default();
tx_a.gas_price = tx_b.gas_price + 1.into();
tx_a.fake_sign(sender).into()
};

let hash = tx_a.hash();

txq.import(tx_a).unwrap();
txq.import(tx_b).unwrap();

assert!(txq.transaction(&hash).is_none());
}
}