Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Revert "Transaction Queue integration" #602

Merged
merged 1 commit into from
Mar 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 0 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 6 additions & 15 deletions ethcore/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ pub trait BlockChainClient : Sync + Send {
/// Get block total difficulty.
fn block_total_difficulty(&self, id: BlockId) -> Option<U256>;

/// Get address nonce.
fn nonce(&self, address: &Address) -> U256;

/// Get block hash.
fn block_hash(&self, id: BlockId) -> Option<H256>;

Expand Down Expand Up @@ -368,14 +365,18 @@ impl<V> Client<V> where V: Verifier {
bad_blocks.insert(header.hash());
continue;
}

let closed_block = self.check_and_close_block(&block);
if let Err(_) = closed_block {
bad_blocks.insert(header.hash());
break;
}

// Insert block
let closed_block = closed_block.unwrap();
self.chain.write().unwrap().insert_block(&block.bytes, closed_block.block().receipts().clone());
good_blocks.push(header.hash());

// Are we committing an era?
let ancient = if header.number() >= HISTORY {
let n = header.number() - HISTORY;
let chain = self.chain.read().unwrap();
Expand All @@ -385,16 +386,10 @@ impl<V> Client<V> where V: Verifier {
};

// Commit results
let closed_block = closed_block.unwrap();
let receipts = closed_block.block().receipts().clone();
closed_block.drain()
.commit(header.number(), &header.hash(), ancient)
.expect("State DB commit failed.");

// And update the chain
self.chain.write().unwrap()
.insert_block(&block.bytes, receipts);

self.report.write().unwrap().accrue_block(&block);
trace!(target: "client", "Imported #{} ({})", header.number(), header.hash());
}
Expand All @@ -413,7 +408,7 @@ impl<V> Client<V> where V: Verifier {
if !good_blocks.is_empty() && block_queue.queue_info().is_empty() {
io.send(NetworkIoMessage::User(SyncMessage::NewChainBlocks {
good: good_blocks,
retracted: bad_blocks,
bad: bad_blocks,
})).unwrap();
}
}
Expand Down Expand Up @@ -586,10 +581,6 @@ impl<V> BlockChainClient for Client<V> where V: Verifier {
Self::block_hash(&chain, id).and_then(|hash| chain.block_details(&hash)).map(|d| d.total_difficulty)
}

fn nonce(&self, address: &Address) -> U256 {
self.state().nonce(address)
}

fn block_hash(&self, id: BlockId) -> Option<H256> {
let chain = self.chain.read().unwrap();
Self::block_hash(&chain, id)
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum SyncMessage {
/// Hashes of blocks imported to blockchain
good: Vec<H256>,
/// Hashes of blocks not imported to blockchain
retracted: Vec<H256>,
bad: Vec<H256>,
},
/// A block is ready
BlockVerified,
Expand Down
1 change: 0 additions & 1 deletion sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ time = "0.1.34"
rand = "0.3.13"
heapsize = "0.3"
rustc-serialize = "0.3"
rayon = "0.3.1"

[features]
default = []
Expand Down
107 changes: 17 additions & 90 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@
///

use util::*;
use rayon::prelude::*;
use std::mem::{replace};
use ethcore::views::{HeaderView, BlockView};
use ethcore::views::{HeaderView};
use ethcore::header::{BlockNumber, Header as BlockHeader};
use ethcore::client::{BlockChainClient, BlockStatus, BlockId, BlockChainInfo};
use range_collection::{RangeCollection, ToUsize, FromUsize};
use ethcore::error::*;
use ethcore::block::Block;
use ethcore::transaction::SignedTransaction;
use io::SyncIo;
use transaction_queue::TransactionQueue;
use time;
use super::SyncConfig;

Expand Down Expand Up @@ -212,8 +209,6 @@ pub struct ChainSync {
max_download_ahead_blocks: usize,
/// Network ID
network_id: U256,
/// Transactions Queue
transaction_queue: Mutex<TransactionQueue>,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -239,7 +234,6 @@ impl ChainSync {
last_send_block_number: 0,
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
transaction_queue: Mutex::new(TransactionQueue::new()),
}
}

Expand Down Expand Up @@ -298,7 +292,6 @@ impl ChainSync {
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
self.transaction_queue.lock().unwrap().clear();
self.starting_block = io.chain().chain_info().best_block_number;
self.state = SyncState::NotSynced;
}
Expand Down Expand Up @@ -491,7 +484,7 @@ impl ChainSync {
trace!(target: "sync", "New block already queued {:?}", h);
},
Ok(_) => {
if self.current_base_block() < header.number {
if self.current_base_block() < header.number {
self.last_imported_block = Some(header.number);
self.remove_downloaded_blocks(header.number);
}
Expand Down Expand Up @@ -928,16 +921,8 @@ impl ChainSync {
}
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
let chain = io.chain();
let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);
let fetch_latest_nonce = |a : &Address| chain.nonce(a);
for i in 0..item_count {
let tx: SignedTransaction = try!(r.val_at(i));
self.transaction_queue.lock().unwrap().add(tx, &fetch_latest_nonce);
}
Ok(())
fn on_peer_transactions(&mut self, _io: &mut SyncIo, _peer_id: PeerId, _r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
Ok(())
}

/// Send Status message
Expand Down Expand Up @@ -1263,37 +1248,6 @@ impl ChainSync {
}
self.last_send_block_number = chain.best_block_number;
}

/// called when block is imported to chain, updates transactions queue
pub fn chain_new_blocks(&mut self, io: &SyncIo, good: &[H256], retracted: &[H256]) {
fn fetch_transactions(chain: &BlockChainClient, hash: &H256) -> Vec<SignedTransaction> {
let block = chain
.block(BlockId::Hash(hash.clone()))
// Client should send message after commit to db and inserting to chain.
.expect("Expected in-chain blocks.");
let block = BlockView::new(&block);
block.transactions()
}


let chain = io.chain();
let good = good.par_iter().map(|h| fetch_transactions(chain, h));
let retracted = retracted.par_iter().map(|h| fetch_transactions(chain, h));

good.for_each(|txs| {
let mut transaction_queue = self.transaction_queue.lock().unwrap();
let hashes = txs.iter().map(|tx| tx.hash()).collect::<Vec<H256>>();
transaction_queue.remove_all(&hashes, |a| chain.nonce(a));
});
retracted.for_each(|txs| {
// populate sender
for tx in &txs {
let _sender = tx.sender();
}
let mut transaction_queue = self.transaction_queue.lock().unwrap();
transaction_queue.add_all(txs, |a| chain.nonce(a));
});
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1434,7 +1388,7 @@ mod tests {
#[test]
fn finds_lagging_peers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(10));
let chain_info = client.chain_info();
Expand All @@ -1448,7 +1402,7 @@ mod tests {
#[test]
fn calculates_tree_for_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(15, EachBlockWith::Uncle);
client.add_blocks(15, false);

let start = client.block_hash_delta_minus(4);
let end = client.block_hash_delta_minus(2);
Expand All @@ -1465,7 +1419,7 @@ mod tests {
#[test]
fn sends_new_hashes_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1484,7 +1438,7 @@ mod tests {
#[test]
fn sends_latest_block_to_lagging_peer() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1502,7 +1456,7 @@ mod tests {
#[test]
fn handles_peer_new_block_mallformed() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);

let block_data = get_dummy_block(11, client.chain_info().best_block_hash);

Expand All @@ -1520,7 +1474,7 @@ mod tests {
#[test]
fn handles_peer_new_block() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);

let block_data = get_dummy_blocks(11, client.chain_info().best_block_hash);

Expand All @@ -1538,7 +1492,7 @@ mod tests {
#[test]
fn handles_peer_new_block_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1554,7 +1508,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1570,7 +1524,7 @@ mod tests {
#[test]
fn handles_peer_new_hashes_empty() {
let mut client = TestBlockChainClient::new();
client.add_blocks(10, EachBlockWith::Uncle);
client.add_blocks(10, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let mut io = TestIo::new(&mut client, &mut queue, None);
Expand All @@ -1588,7 +1542,7 @@ mod tests {
#[test]
fn hashes_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1606,7 +1560,7 @@ mod tests {
#[test]
fn block_rlp_mutually_acceptable() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));
let chain_info = client.chain_info();
Expand All @@ -1619,37 +1573,10 @@ mod tests {
assert!(result.is_ok());
}

#[test]
fn should_add_transactions_to_queue() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));

let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];

let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

// when
sync.chain_new_blocks(&io, &[], &good_blocks);
assert_eq!(sync.transaction_queue.lock().unwrap().status().future, 0);
assert_eq!(sync.transaction_queue.lock().unwrap().status().pending, 1);
sync.chain_new_blocks(&io, &good_blocks, &retracted_blocks);

// then
let status = sync.transaction_queue.lock().unwrap().status();
assert_eq!(status.pending, 1);
assert_eq!(status.future, 0);
}

#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

Expand All @@ -1673,7 +1600,7 @@ mod tests {
#[test]
fn returns_requested_block_headers_reverse() {
let mut client = TestBlockChainClient::new();
client.add_blocks(100, EachBlockWith::Uncle);
client.add_blocks(100, false);
let mut queue = VecDeque::new();
let io = TestIo::new(&mut client, &mut queue, None);

Expand Down