Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

More sync fixes #685

Merged
merged 5 commits into from
Mar 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 25 additions & 27 deletions sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ impl ChainSync {
/// Restart sync
pub fn restart(&mut self, io: &mut SyncIo) {
self.reset();
self.last_imported_block = None;
self.last_imported_hash = None;
self.starting_block = 0;
self.highest_block = None;
self.have_common_block = false;
Expand Down Expand Up @@ -366,7 +364,7 @@ impl ChainSync {
for i in 0..item_count {
let info: BlockHeader = try!(r.val_at(i));
let number = BlockNumber::from(info.number);
if number <= self.current_base_block() || self.headers.have_item(&number) {
if (number <= self.current_base_block() && self.have_common_block) || self.headers.have_item(&number) {
trace!(target: "sync", "Skipping existing block header");
continue;
}
Expand All @@ -376,11 +374,17 @@ impl ChainSync {
}
let hash = info.hash();
match io.chain().block_status(BlockId::Hash(hash.clone())) {
BlockStatus::InChain => {
self.have_common_block = true;
self.last_imported_block = Some(number);
self.last_imported_hash = Some(hash.clone());
trace!(target: "sync", "Found common header {} ({})", number, hash);
BlockStatus::InChain | BlockStatus::Queued => {
if !self.have_common_block || self.current_base_block() < number {
self.last_imported_block = Some(number);
self.last_imported_hash = Some(hash.clone());
}
if !self.have_common_block {
self.have_common_block = true;
trace!(target: "sync", "Found common header {} ({})", number, hash);
} else {
trace!(target: "sync", "Header already in chain {} ({})", number, hash);
}
},
_ => {
if self.have_common_block {
Expand Down Expand Up @@ -588,7 +592,7 @@ impl ChainSync {
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}", peer);
if let Err(e) = self.send_status(io) {
warn!(target:"sync", "Error sending status request: {:?}", e);
debug!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer);
}
}
Expand Down Expand Up @@ -656,10 +660,7 @@ impl ChainSync {
let mut needed_numbers: Vec<BlockNumber> = Vec::new();

if self.have_common_block && !self.headers.is_empty() && self.headers.range_iter().next().unwrap().0 == self.current_base_block() + 1 {
for (start, ref items) in self.headers.range_iter() {
if needed_bodies.len() >= MAX_BODIES_TO_REQUEST {
break;
}
if let Some((start, ref items)) = self.headers.range_iter().next() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Download bodies only for the validated part of the header-chain

let mut index: BlockNumber = 0;
while index != items.len() as BlockNumber && needed_bodies.len() < MAX_BODIES_TO_REQUEST {
let block = start + index;
Expand Down Expand Up @@ -703,7 +704,10 @@ impl ChainSync {
if !self.have_common_block {
// download backwards until common block is found 1 header at a time
let chain_info = io.chain().chain_info();
start = chain_info.best_block_number;
start = match self.last_imported_block {
Some(n) => n,
None => chain_info.best_block_number,
};
if !self.headers.is_empty() {
start = min(start, self.headers.range_iter().next().unwrap().0 - 1);
}
Expand Down Expand Up @@ -844,18 +848,12 @@ impl ChainSync {
/// Remove downloaded bocks/headers starting from specified number.
/// Used to recover from an error and re-download parts of the chain detected as bad.
fn remove_downloaded_blocks(&mut self, start: BlockNumber) {
for n in self.headers.get_tail(&start) {
if let Some(ref header_data) = self.headers.find_item(&n) {
let header_to_delete = HeaderView::new(&header_data.data);
let header_id = HeaderId {
transactions_root: header_to_delete.transactions_root(),
uncles: header_to_delete.uncles_hash()
};
self.header_ids.remove(&header_id);
}
self.downloading_bodies.remove(&n);
self.downloading_headers.remove(&n);
}
let ids = self.header_ids.drain().filter(|&(_, v)| v < start).collect();
self.header_ids = ids;
let hdrs = self.downloading_headers.drain().filter(|v| *v < start).collect();
self.downloading_headers = hdrs;
let bodies = self.downloading_bodies.drain().filter(|v| *v < start).collect();
self.downloading_bodies = bodies;
self.headers.remove_from(&start);
self.bodies.remove_from(&start);
}
Expand Down Expand Up @@ -1095,7 +1093,7 @@ impl ChainSync {
let rlp = UntrustedRlp::new(data);

if packet_id != STATUS_PACKET && !self.peers.contains_key(&peer) {
warn!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
debug!(target:"sync", "Unexpected packet from unregistered peer: {}:{}", peer, io.peer_info(peer));
return;
}
let result = match packet_id {
Expand Down
11 changes: 8 additions & 3 deletions sync/src/range_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,17 @@ fn test_range() {
let mut r = ranges.clone();
r.remove_from(&20);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q', 'r'][..])]), Ordering::Equal);
r.remove_from(&17);
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p'][..])]), Ordering::Equal);
r.remove_from(&15);
r.remove_from(&18);
assert!(!r.have_item(&18));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..]), (16, &['p', 'q'][..])]), Ordering::Equal);
r.remove_from(&16);
assert!(!r.have_item(&16));
assert_eq!(r.range_iter().cmp(vec![(2, &['b', 'c', 'd'][..])]), Ordering::Equal);
r.remove_from(&3);
assert_eq!(r.range_iter().cmp(vec![(2, &['b'][..])]), Ordering::Equal);
r.remove_from(&1);
assert_eq!(r.range_iter().next(), None);
let mut r = ranges.clone();
r.remove_from(&2);
assert_eq!(r.range_iter().next(), None);
}
Expand Down