Skip to content

Commit

Permalink
HeaderSync optimization (#1372) (#1400)
Browse files Browse the repository at this point in the history
* improve: HeaderSync optimization (#1372)
* remove get_locator() optimization, which should be an independent pr for security review
* refactoring: move 'headers_streaming_body()' from Message to Protocol
* move 2 headers utils functions out of Protocol, and remove 'pub'
* support reading variable size of BlockHeader, from Cuckoo30 to Cuckoo36
* fix: use global::min_sizeshift() instead of hardcoded 30, because Cuckoo10 will be used for AutomatedTesting chain
* fix: should use global::proofsize() instead of hardcoded 42 when calculate serialized_size_of_header
* replace another 42 with global::proofsize()
  • Loading branch information
garyyu authored and ignopeverell committed Aug 30, 2018
1 parent 6d992e6 commit d719493
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 15 deletions.
48 changes: 48 additions & 0 deletions core/src/core/block.rs
Expand Up @@ -19,6 +19,7 @@ use chrono::prelude::{DateTime, NaiveDateTime, Utc};
use std::collections::HashSet;
use std::fmt;
use std::iter::FromIterator;
use std::mem;
use std::sync::{Arc, RwLock};

use consensus::{self, reward, REWARD};
Expand Down Expand Up @@ -144,6 +145,39 @@ pub struct BlockHeader {
pub pow: Proof,
}

/// Serialized size of fixed part of a BlockHeader, i.e. without pow
fn fixed_size_of_serialized_header() -> usize {
let mut size: usize = 0;
size += mem::size_of::<u16>(); // version
size += mem::size_of::<u64>(); // height
size += mem::size_of::<Hash>(); // previous
size += mem::size_of::<u64>(); // timestamp
size += mem::size_of::<Difficulty>(); // total_difficulty
size += mem::size_of::<Hash>(); // output_root
size += mem::size_of::<Hash>(); // range_proof_root
size += mem::size_of::<Hash>(); // kernel_root
size += mem::size_of::<BlindingFactor>(); // total_kernel_offset
size += mem::size_of::<Commitment>(); // total_kernel_sum
size += mem::size_of::<u64>(); // output_mmr_size
size += mem::size_of::<u64>(); // kernel_mmr_size
size += mem::size_of::<u64>(); // nonce
size
}

/// Serialized size of a BlockHeader
pub fn serialized_size_of_header(cuckoo_sizeshift: u8) -> usize {
let mut size = fixed_size_of_serialized_header();

size += mem::size_of::<u8>(); // pow.cuckoo_sizeshift
let nonce_bits = cuckoo_sizeshift as usize - 1;
let bitvec_len = global::proofsize() * nonce_bits;
size += bitvec_len / 8; // pow.nonces
if bitvec_len % 8 != 0 {
size += 1;
}
size
}

impl Default for BlockHeader {
fn default() -> BlockHeader {
let proof_size = global::proofsize();
Expand Down Expand Up @@ -267,6 +301,20 @@ impl BlockHeader {
pub fn total_kernel_offset(&self) -> BlindingFactor {
self.total_kernel_offset
}

/// Serialized size of this header
pub fn serialized_size(&self) -> usize {
let mut size = fixed_size_of_serialized_header();

size += mem::size_of::<u8>(); // pow.cuckoo_sizeshift
let nonce_bits = self.pow.cuckoo_sizeshift as usize - 1;
let bitvec_len = global::proofsize() * nonce_bits;
size += bitvec_len / 8; // pow.nonces
if bitvec_len % 8 != 0 {
size += 1;
}
size
}
}

/// A block as expressed in the MimbleWimble protocol. The reward is
Expand Down
5 changes: 5 additions & 0 deletions p2p/src/conn.rs
Expand Up @@ -64,6 +64,11 @@ impl<'a> Message<'a> {
Message { header, conn }
}

/// Get the TcpStream
pub fn get_conn(&mut self) -> TcpStream {
return self.conn.try_clone().unwrap();
}

/// Read the message body from the underlying connection
pub fn body<T>(&mut self) -> Result<T, Error>
where
Expand Down
140 changes: 126 additions & 14 deletions p2p/src/protocol.rs
Expand Up @@ -14,17 +14,17 @@

use std::env;
use std::fs::File;
use std::io::BufWriter;
use std::net::SocketAddr;
use std::io::{self, BufWriter};
use std::net::{SocketAddr, TcpStream};
use std::sync::Arc;

use conn::{Message, MessageHandler, Response};
use core::core;
use core::core::hash::Hash;
use core::core::CompactBlock;
use core::core::{self, hash::Hash, CompactBlock};
use core::{global, ser};

use msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive,
TxHashSetRequest, Type,
read_exact, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr,
TxHashSetArchive, TxHashSetRequest, Type,
};
use rand::{self, Rng};
use types::{Error, NetAdapter};
Expand Down Expand Up @@ -169,10 +169,9 @@ impl MessageHandler for Protocol {
let headers = adapter.locate_headers(loc.hashes);

// serialize and send all the headers over
Ok(Some(msg.respond(
Type::Headers,
Headers { headers: headers },
)))
Ok(Some(
msg.respond(Type::Headers, Headers { headers: headers }),
))
}

// "header first" block propagation - if we have not yet seen this block
Expand All @@ -188,8 +187,23 @@ impl MessageHandler for Protocol {
}

Type::Headers => {
let headers: Headers = msg.body()?;
adapter.headers_received(headers.headers, self.addr);
let conn = &mut msg.get_conn();

let header_size: u64 = headers_header_size(conn, msg.header.msg_len)?;
let mut total_read: u64 = 2;
let mut reserved: Vec<u8> = vec![];

while total_read < msg.header.msg_len || reserved.len() > 0 {
let headers: Headers = headers_streaming_body(
conn,
msg.header.msg_len,
8,
&mut total_read,
&mut reserved,
header_size,
)?;
adapter.headers_received(headers.headers, self.addr);
}
Ok(None)
}

Expand Down Expand Up @@ -276,7 +290,8 @@ impl MessageHandler for Protocol {
);

let tmp_zip = File::open(tmp)?;
let res = self.adapter
let res = self
.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr);

debug!(
Expand All @@ -297,3 +312,100 @@ impl MessageHandler for Protocol {
}
}
}

/// Read the Headers Vec size from the underlying connection, and calculate maximum header_size of one Header
fn headers_header_size(conn: &mut TcpStream, msg_len: u64) -> Result<u64, Error> {
let mut size = vec![0u8; 2];
// read size of Vec<BlockHeader>
read_exact(conn, &mut size, 20000, true)?;

let total_headers = size[0] as u64 * 256 + size[1] as u64;
if total_headers == 0 || total_headers > 10_000 {
return Err(Error::Connection(io::Error::new(
io::ErrorKind::InvalidData,
"headers_header_size",
)));
}
let average_header_size = (msg_len - 2) / total_headers;

// support size of Cuckoo: from Cuckoo 30 to Cuckoo 36
let minimum_size = core::serialized_size_of_header(global::min_sizeshift());
let maximum_size = core::serialized_size_of_header(global::min_sizeshift() + 6);
if average_header_size < minimum_size as u64 || average_header_size > maximum_size as u64 {
debug!(
LOGGER,
"headers_header_size - size of Vec: {}, average_header_size: {}, min: {}, max: {}",
total_headers,
average_header_size,
minimum_size,
maximum_size,
);
return Err(Error::Connection(io::Error::new(
io::ErrorKind::InvalidData,
"headers_header_size",
)));
}
return Ok(maximum_size as u64);
}

/// Read the Headers streaming body from the underlying connection
fn headers_streaming_body(
conn: &mut TcpStream, // (i) underlying connection
msg_len: u64, // (i) length of whole 'Headers'
headers_num: u64, // (i) how many BlockHeader(s) do you want to read
total_read: &mut u64, // (i/o) how many bytes already read on this 'Headers' message
reserved: &mut Vec<u8>, // (i/o) reserved part of previous read, which's not a whole header
max_header_size: u64, // (i) maximum possible size of single BlockHeader
) -> Result<Headers, Error> {
if headers_num == 0 || msg_len < *total_read || *total_read < 2 {
return Err(Error::Connection(io::Error::new(
io::ErrorKind::InvalidInput,
"headers_streaming_body",
)));
}

// Note:
// As we allow Cuckoo sizes greater than 30 now, the proof of work part of the header
// could be 30*42 bits, 31*42 bits, 32*42 bits, etc.
// So, for compatibility with variable size of block header, we read max possible size, for
// up to Cuckoo 36.
//
let mut read_size = headers_num * max_header_size - reserved.len() as u64;
if *total_read + read_size > msg_len {
read_size = msg_len - *total_read;
}

// 1st part
let mut body = vec![0u8; 2]; // for Vec<> size
let mut final_headers_num = (read_size + reserved.len() as u64) / max_header_size;
let remaining = msg_len - *total_read - read_size;
if final_headers_num == 0 && remaining == 0 {
final_headers_num = 1;
}
body[0] = (final_headers_num >> 8) as u8;
body[1] = (final_headers_num & 0x00ff) as u8;

// 2nd part
body.append(reserved);

// 3rd part
let mut read_body = vec![0u8; read_size as usize];
if read_size > 0 {
read_exact(conn, &mut read_body, 20000, true)?;
*total_read += read_size;
}
body.append(&mut read_body);

// deserialize these assembled 3 parts
let result: Result<Headers, Error> = ser::deserialize(&mut &body[..]).map_err(From::from);
let headers = result?;

// remaining data
let mut deserialized_size = 2; // for Vec<> size
for header in &headers.headers {
deserialized_size += header.serialized_size();
}
*reserved = body[deserialized_size..].to_vec();

Ok(headers)
}
2 changes: 1 addition & 1 deletion servers/src/grin/sync.rs
Expand Up @@ -189,7 +189,7 @@ pub fn run_sync(
sync_state.update(SyncStatus::NoSync);
}

thread::sleep(time::Duration::from_secs(1));
thread::sleep(time::Duration::from_millis(10));

if stop.load(Ordering::Relaxed) {
break;
Expand Down

0 comments on commit d719493

Please sign in to comment.