Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Stop adding transactions to queue while not fully synced #751

Merged
merged 8 commits into from
Mar 17, 2016
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl<V> Client<V> where V: Verifier {
invalid: invalid_blocks,
enacted: enacted,
retracted: retracted,
is_last: self.queue_info().is_empty()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to avoid sending the event instead?

})).unwrap();
}
}
Expand Down
2 changes: 2 additions & 0 deletions ethcore/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum SyncMessage {
retracted: Vec<H256>,
/// Hashes of blocks that are now included in cannonical chain
enacted: Vec<H256>,
/// Set when blockqueue is empty
is_last: bool,
},
/// Best Block Hash in chain has been changed
NewChainHead,
Expand Down
59 changes: 53 additions & 6 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ pub struct ChainSync {
network_id: U256,
/// Miner
miner: Arc<Miner>,
/// Fully-synced flag
is_fully_synced: bool,
}

type RlpResponseResult = Result<Option<(PacketId, RlpStream)>, PacketDecodeError>;
Expand All @@ -241,6 +243,7 @@ impl ChainSync {
max_download_ahead_blocks: max(MAX_HEADERS_TO_REQUEST, config.max_download_ahead_blocks),
network_id: config.network_id,
miner: miner,
is_fully_synced: true,
}
}

Expand Down Expand Up @@ -625,6 +628,13 @@ impl ChainSync {
self.state = SyncState::Waiting;
}

fn can_sync(&self) -> bool {
match self.state {
SyncState::Idle | SyncState::NotSynced => true,
_ => false
}
}

/// Find something to do for a peer. Called for a new peer or when a peer is done with it's task.
fn sync_peer(&mut self, io: &mut SyncIo, peer_id: PeerId, force: bool) {
let (peer_latest, peer_difficulty) = {
Expand All @@ -644,7 +654,7 @@ impl ChainSync {
if force || peer_difficulty > syncing_difficulty {
// start sync
self.syncing_difficulty = peer_difficulty;
if self.state == SyncState::Idle || self.state == SyncState::NotSynced {
if self.can_sync() {
self.state = SyncState::Blocks;
}
trace!(target: "sync", "Starting sync with better chain");
Expand Down Expand Up @@ -937,6 +947,11 @@ impl ChainSync {
}
/// Called when peer sends us new transactions
fn on_peer_transactions(&mut self, io: &mut SyncIo, peer_id: PeerId, r: &UntrustedRlp) -> Result<(), PacketDecodeError> {
// accepting transactions once only fully synced
if !self.is_fully_synced {
return Ok(());
}

let item_count = r.item_count();
trace!(target: "sync", "{} -> Transactions ({} entries)", peer_id, item_count);

Expand Down Expand Up @@ -1281,9 +1296,13 @@ impl ChainSync {
}

/// called when block is imported to chain, updates transactions queue and propagates the blocks
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256]) {
// Notify miner
self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
pub fn chain_new_blocks(&mut self, io: &mut SyncIo, imported: &[H256], invalid: &[H256], enacted: &[H256], retracted: &[H256], is_last: bool) {
// Set the state in which it can accept transactions from the net
self.is_fully_synced = is_last;
if self.is_fully_synced {
// Notify miner
self.miner.chain_new_blocks(io.chain(), imported, invalid, enacted, retracted);
}
// Propagate latests blocks
self.propagate_latest_blocks(io);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think propagation should also happen only if we are fully synced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

// TODO [todr] propagate transactions?
Expand Down Expand Up @@ -1643,17 +1662,45 @@ mod tests {
let mut io = TestIo::new(&mut client, &mut queue, None);

// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, true);
assert_eq!(sync.miner.status().transactions_in_future_queue, 0);
assert_eq!(sync.miner.status().transactions_in_pending_queue, 1);
sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks);
sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks, true);


// then
let status = sync.miner.status();
assert_eq!(status.transactions_in_pending_queue, 1);
assert_eq!(status.transactions_in_future_queue, 0);
}

#[test]
fn should_not_add_transactions_to_queue_if_not_synced() {
// given
let mut client = TestBlockChainClient::new();
client.add_blocks(98, EachBlockWith::Uncle);
client.add_blocks(1, EachBlockWith::UncleAndTransaction);
client.add_blocks(1, EachBlockWith::Transaction);
let mut sync = dummy_sync_with_peer(client.block_hash_delta_minus(5));

let good_blocks = vec![client.block_hash_delta_minus(2)];
let retracted_blocks = vec![client.block_hash_delta_minus(1)];

let mut queue = VecDeque::new();
let mut io = TestIo::new(&mut client, &mut queue, None);

// when
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, false);
assert_eq!(sync.miner.status().transactions_in_future_queue, 0);
assert_eq!(sync.miner.status().transactions_in_pending_queue, 0);
sync.chain_new_blocks(&mut io, &good_blocks, &[], &[], &retracted_blocks, false);

// then
let status = sync.miner.status();
assert_eq!(status.transactions_in_pending_queue, 0);
assert_eq!(status.transactions_in_future_queue, 0);
}

#[test]
fn returns_requested_block_headers() {
let mut client = TestBlockChainClient::new();
Expand Down
6 changes: 3 additions & 3 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,14 @@ impl NetworkProtocolHandler<SyncMessage> for EthSync {

fn message(&self, io: &NetworkContext<SyncMessage>, message: &SyncMessage) {
match *message {
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted } => {
SyncMessage::NewChainBlocks { ref imported, ref invalid, ref enacted, ref retracted, is_last } => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted);
self.sync.write().unwrap().chain_new_blocks(&mut sync_io, imported, invalid, enacted, retracted, is_last);
},
SyncMessage::NewChainHead => {
let mut sync_io = NetSyncIo::new(io, self.chain.deref());
self.sync.write().unwrap().chain_new_head(&mut sync_io);
}
},
_ => {/* Ignore other messages */},
}
}
Expand Down
2 changes: 1 addition & 1 deletion sync/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,6 @@ impl TestNet {

pub fn trigger_chain_new_blocks(&mut self, peer_id: usize) {
let mut peer = self.peer_mut(peer_id);
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[]);
peer.sync.chain_new_blocks(&mut TestIo::new(&mut peer.chain, &mut peer.queue, None), &[], &[], &[], &[], true);
}
}