Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Transaction Queue integration #595

Merged
merged 9 commits into from
Mar 5, 2016
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 14 additions & 5 deletions ethcore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;

/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;

/// Get block hash.
fn block_hash(&self, id: BlockId) -> Option<H256>;

Expand Down Expand Up @@ -362,18 +365,14 @@ impl<V> Client<V> where V: Verifier {
bad_blocks.insert(header.hash());
continue;
}

let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
bad_blocks.insert(header.hash());
break;
}

// Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash());

// Are we committing an era?
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap();
Expand All @@ -383,10 +382,16 @@ impl<V> Client<V> where V: Verifier {
};

// Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");

// And update the chain
self.chain.write().unwrap()
.insert_block(&block.bytes, receipts);

self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
Expand Down Expand Up @@ -576,6 +581,10 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
}

fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}

fn block_hash(&self, id: BlockId) -> Option<H256> {
let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id)
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"

[features]
default = []
Expand Down
104 changes: 88 additions & 16 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
///

use util::*;
use rayon::prelude::*;
use std::mem::{replace};
use ethcore::views::{HeaderView};
use ethcore::views::{HeaderView, BlockView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*;
use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo;
use transaction_queue::TransactionQueue;
use time;
use super::SyncConfig;

Expand Down Expand Up @@ -209,6 +212,8 @@ pub struct ChainSync {
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -234,6 +239,7 @@ impl ChainSync {
last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
}
}

Expand Down Expand Up @@ -292,6 +298,7 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced;
}
Expand Down Expand Up @@ -921,8 +928,16 @@ impl ChainSync {
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
}
Ok(())
}

/// Send Status message
Expand Down Expand Up @@ -1248,6 +1263,36 @@ impl ChainSync {
}
self.last_send_block_number = chain.best_block_number;
}

/// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], bad: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(hash.clone()))
.expect("Expected in-chain blocks.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should include a short comment as to why, by design, this can never fail.

let block = BlockView::new(&block);
block.transactions()
}


let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let bad = bad.par_iter().map(|h| fetch_transactions(chain, h));

good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
bad.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1388,7 +1433,7 @@ mod tests {
#[test]
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let chain_info = client.chain_info();
Expand All @@ -1402,7 +1447,7 @@ mod tests {
#[test]
fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(15, false);
client.add_blocks(15, BlocksWith::Uncle);

let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2);
Expand All @@ -1419,7 +1464,7 @@ mod tests {
#[test]
fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1438,7 +1483,7 @@ mod tests {
#[test]
fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1456,7 +1501,7 @@ mod tests {
#[test]
fn handles_peer_new_block_mallformed() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, BlocksWith::Uncle);

let block_data = get_dummy_block(11, client.chain_info().best_block_hash);

Expand All @@ -1474,7 +1519,7 @@ mod tests {
#[test]
fn handles_peer_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, BlocksWith::Uncle);

let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);

Expand All @@ -1492,7 +1537,7 @@ mod tests {
#[test]
fn handles_peer_new_block_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1508,7 +1553,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1524,7 +1569,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, false);
client.add_blocks(10, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1542,7 +1587,7 @@ mod tests {
#[test]
fn hashes_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1560,7 +1605,7 @@ mod tests {
#[test]
fn block_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1573,10 +1618,37 @@ mod tests {
assert!(result.is_ok());
}

#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, BlocksWith::Uncle);
client.add_blocks(1, BlocksWith::UncleAndTransaction);
client.add_blocks(1, BlocksWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));

let good_blocks = vec![client.block_hash_delta_minus(2)];
let bad_blocks = vec![client.block_hash_delta_minus(1)];

let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

// when
sync.chain_new_blocks(&io, &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &bad_blocks);

// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}

#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

Expand All @@ -1600,7 +1672,7 @@ mod tests {
#[test]
fn returns_requested_block_headers_reverse() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, false);
client.add_blocks(100, BlocksWith::Uncle);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

Expand Down
14 changes: 10 additions & 4 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ extern crate ethcore;
extern crate env_logger;
extern crate time;
extern crate rand;
extern crate rayon;
#[macro_use]
extern crate heapsize;

Expand All @@ -70,8 +71,7 @@ use io::NetSyncIo;
mod chain;
mod io;
mod range_collection;
// TODO [todr] Made public to suppress dead code warnings
pub mod transaction_queue;
mod transaction_queue;

#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -153,8 +153,14 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {
}

fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
if let SyncMessage::BlockVerified = *message {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
match *message {
SyncMessage::BlockVerified => {
self.sync.write().unwrap().chain_blocks_verified(&mut NetSyncIo::new(io, self.chain.deref()));
},
SyncMessage::NewChainBlocks { ref good, ref bad } => {
let sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&sync_io, good, bad);
}
}
}
}