From d719493483c546d25c2a8ec2b169f9121962eaf1 Mon Sep 17 00:00:00 2001 From: Gary Yu Date: Fri, 31 Aug 2018 07:50:55 +0800 Subject: [PATCH] HeaderSync optimization (#1372) (#1400) * improve: HeaderSync optimization (#1372) * remove get_locator() optimization, which should be an independent pr for security review * refactoring: move 'headers_streaming_body()' from Message to Protocol * move 2 headers utils functions out of Protocol, and remove 'pub' * support reading variable size of BlockHeader, from Cuckoo30 to Cuckoo36 * fix: use global::min_sizeshift() instead of hardcoded 30, because Cuckoo10 will be used for AutomatedTesting chain * fix: should use global::proofsize() instead of hardcoded 42 when calculate serialized_size_of_header * replace another 42 with global::proofsize() --- core/src/core/block.rs | 48 ++++++++++++++ p2p/src/conn.rs | 5 ++ p2p/src/protocol.rs | 140 +++++++++++++++++++++++++++++++++++---- servers/src/grin/sync.rs | 2 +- 4 files changed, 180 insertions(+), 15 deletions(-) diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 2dbf91097a..83e9310a21 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -19,6 +19,7 @@ use chrono::prelude::{DateTime, NaiveDateTime, Utc}; use std::collections::HashSet; use std::fmt; use std::iter::FromIterator; +use std::mem; use std::sync::{Arc, RwLock}; use consensus::{self, reward, REWARD}; @@ -144,6 +145,39 @@ pub struct BlockHeader { pub pow: Proof, } +/// Serialized size of fixed part of a BlockHeader, i.e. without pow +fn fixed_size_of_serialized_header() -> usize { + let mut size: usize = 0; + size += mem::size_of::(); // version + size += mem::size_of::(); // height + size += mem::size_of::(); // previous + size += mem::size_of::(); // timestamp + size += mem::size_of::(); // total_difficulty + size += mem::size_of::(); // output_root + size += mem::size_of::(); // range_proof_root + size += mem::size_of::(); // kernel_root + size += mem::size_of::(); // total_kernel_offset + size += mem::size_of::(); // total_kernel_sum + size += mem::size_of::(); // output_mmr_size + size += mem::size_of::(); // kernel_mmr_size + size += mem::size_of::(); // nonce + size +} + +/// Serialized size of a BlockHeader +pub fn serialized_size_of_header(cuckoo_sizeshift: u8) -> usize { + let mut size = fixed_size_of_serialized_header(); + + size += mem::size_of::(); // pow.cuckoo_sizeshift + let nonce_bits = cuckoo_sizeshift as usize - 1; + let bitvec_len = global::proofsize() * nonce_bits; + size += bitvec_len / 8; // pow.nonces + if bitvec_len % 8 != 0 { + size += 1; + } + size +} + impl Default for BlockHeader { fn default() -> BlockHeader { let proof_size = global::proofsize(); @@ -267,6 +301,20 @@ impl BlockHeader { pub fn total_kernel_offset(&self) -> BlindingFactor { self.total_kernel_offset } + + /// Serialized size of this header + pub fn serialized_size(&self) -> usize { + let mut size = fixed_size_of_serialized_header(); + + size += mem::size_of::(); // pow.cuckoo_sizeshift + let nonce_bits = self.pow.cuckoo_sizeshift as usize - 1; + let bitvec_len = global::proofsize() * nonce_bits; + size += bitvec_len / 8; // pow.nonces + if bitvec_len % 8 != 0 { + size += 1; + } + size + } } /// A block as expressed in the MimbleWimble protocol. The reward is diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 6bfc1cd5bc..3e0042d570 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -64,6 +64,11 @@ impl<'a> Message<'a> { Message { header, conn } } + /// Get the TcpStream + pub fn get_conn(&mut self) -> TcpStream { + return self.conn.try_clone().unwrap(); + } + /// Read the message body from the underlying connection pub fn body(&mut self) -> Result where diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index aa27334cfc..029f40d04d 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -14,17 +14,17 @@ use std::env; use std::fs::File; -use std::io::BufWriter; -use std::net::SocketAddr; +use std::io::{self, BufWriter}; +use std::net::{SocketAddr, TcpStream}; use std::sync::Arc; use conn::{Message, MessageHandler, Response}; -use core::core; -use core::core::hash::Hash; -use core::core::CompactBlock; +use core::core::{self, hash::Hash, CompactBlock}; +use core::{global, ser}; + use msg::{ - BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, TxHashSetArchive, - TxHashSetRequest, Type, + read_exact, BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, SockAddr, + TxHashSetArchive, TxHashSetRequest, Type, }; use rand::{self, Rng}; use types::{Error, NetAdapter}; @@ -169,10 +169,9 @@ impl MessageHandler for Protocol { let headers = adapter.locate_headers(loc.hashes); // serialize and send all the headers over - Ok(Some(msg.respond( - Type::Headers, - Headers { headers: headers }, - ))) + Ok(Some( + msg.respond(Type::Headers, Headers { headers: headers }), + )) } // "header first" block propagation - if we have not yet seen this block @@ -188,8 +187,23 @@ impl MessageHandler for Protocol { } Type::Headers => { - let headers: Headers = msg.body()?; - adapter.headers_received(headers.headers, self.addr); + let conn = &mut msg.get_conn(); + + let header_size: u64 = headers_header_size(conn, msg.header.msg_len)?; + let mut total_read: u64 = 2; + let mut reserved: Vec = vec![]; + + while total_read < msg.header.msg_len || reserved.len() > 0 { + let headers: Headers = headers_streaming_body( + conn, + msg.header.msg_len, + 8, + &mut total_read, + &mut reserved, + header_size, + )?; + adapter.headers_received(headers.headers, self.addr); + } Ok(None) } @@ -276,7 +290,8 @@ impl MessageHandler for Protocol { ); let tmp_zip = File::open(tmp)?; - let res = self.adapter + let res = self + .adapter .txhashset_write(sm_arch.hash, tmp_zip, self.addr); debug!( @@ -297,3 +312,100 @@ impl MessageHandler for Protocol { } } } + +/// Read the Headers Vec size from the underlying connection, and calculate maximum header_size of one Header +fn headers_header_size(conn: &mut TcpStream, msg_len: u64) -> Result { + let mut size = vec![0u8; 2]; + // read size of Vec + read_exact(conn, &mut size, 20000, true)?; + + let total_headers = size[0] as u64 * 256 + size[1] as u64; + if total_headers == 0 || total_headers > 10_000 { + return Err(Error::Connection(io::Error::new( + io::ErrorKind::InvalidData, + "headers_header_size", + ))); + } + let average_header_size = (msg_len - 2) / total_headers; + + // support size of Cuckoo: from Cuckoo 30 to Cuckoo 36 + let minimum_size = core::serialized_size_of_header(global::min_sizeshift()); + let maximum_size = core::serialized_size_of_header(global::min_sizeshift() + 6); + if average_header_size < minimum_size as u64 || average_header_size > maximum_size as u64 { + debug!( + LOGGER, + "headers_header_size - size of Vec: {}, average_header_size: {}, min: {}, max: {}", + total_headers, + average_header_size, + minimum_size, + maximum_size, + ); + return Err(Error::Connection(io::Error::new( + io::ErrorKind::InvalidData, + "headers_header_size", + ))); + } + return Ok(maximum_size as u64); +} + +/// Read the Headers streaming body from the underlying connection +fn headers_streaming_body( + conn: &mut TcpStream, // (i) underlying connection + msg_len: u64, // (i) length of whole 'Headers' + headers_num: u64, // (i) how many BlockHeader(s) do you want to read + total_read: &mut u64, // (i/o) how many bytes already read on this 'Headers' message + reserved: &mut Vec, // (i/o) reserved part of previous read, which's not a whole header + max_header_size: u64, // (i) maximum possible size of single BlockHeader +) -> Result { + if headers_num == 0 || msg_len < *total_read || *total_read < 2 { + return Err(Error::Connection(io::Error::new( + io::ErrorKind::InvalidInput, + "headers_streaming_body", + ))); + } + + // Note: + // As we allow Cuckoo sizes greater than 30 now, the proof of work part of the header + // could be 30*42 bits, 31*42 bits, 32*42 bits, etc. + // So, for compatibility with variable size of block header, we read max possible size, for + // up to Cuckoo 36. + // + let mut read_size = headers_num * max_header_size - reserved.len() as u64; + if *total_read + read_size > msg_len { + read_size = msg_len - *total_read; + } + + // 1st part + let mut body = vec![0u8; 2]; // for Vec<> size + let mut final_headers_num = (read_size + reserved.len() as u64) / max_header_size; + let remaining = msg_len - *total_read - read_size; + if final_headers_num == 0 && remaining == 0 { + final_headers_num = 1; + } + body[0] = (final_headers_num >> 8) as u8; + body[1] = (final_headers_num & 0x00ff) as u8; + + // 2nd part + body.append(reserved); + + // 3rd part + let mut read_body = vec![0u8; read_size as usize]; + if read_size > 0 { + read_exact(conn, &mut read_body, 20000, true)?; + *total_read += read_size; + } + body.append(&mut read_body); + + // deserialize these assembled 3 parts + let result: Result = ser::deserialize(&mut &body[..]).map_err(From::from); + let headers = result?; + + // remaining data + let mut deserialized_size = 2; // for Vec<> size + for header in &headers.headers { + deserialized_size += header.serialized_size(); + } + *reserved = body[deserialized_size..].to_vec(); + + Ok(headers) +} diff --git a/servers/src/grin/sync.rs b/servers/src/grin/sync.rs index c85c51ee76..971a2e6da8 100644 --- a/servers/src/grin/sync.rs +++ b/servers/src/grin/sync.rs @@ -189,7 +189,7 @@ pub fn run_sync( sync_state.update(SyncStatus::NoSync); } - thread::sleep(time::Duration::from_secs(1)); + thread::sleep(time::Duration::from_millis(10)); if stop.load(Ordering::Relaxed) { break;