Skip to content

Commit

Permalink
Merge pull request #424 from nervosnetwork/testnet-hotfix-2019-04-09
Browse files Browse the repository at this point in the history
fix: apply hotfix for bugs found in test
  • Loading branch information
doitian committed Apr 11, 2019
2 parents e1ac356 + 0aefcc9 commit b23e7d3
Show file tree
Hide file tree
Showing 17 changed files with 187 additions and 85 deletions.
80 changes: 58 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions network/Cargo.toml
Expand Up @@ -23,11 +23,11 @@ tokio = "0.1.18"
futures = "0.1"
snap = "0.2"
crossbeam-channel = "0.3"
p2p = { git = "https://github.com/nervosnetwork/p2p", rev="ab661f065dc8667a04f12122250f5fb759872dec", package="tentacle" }
secio = { git = "https://github.com/nervosnetwork/p2p", rev="ab661f065dc8667a04f12122250f5fb759872dec", package="tentacle-secio" }
p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="ab661f065dc8667a04f12122250f5fb759872dec", package="tentacle-ping" }
p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="ab661f065dc8667a04f12122250f5fb759872dec", package="tentacle-discovery" }
p2p-identify = { git = "https://github.com/nervosnetwork/p2p", rev="ab661f065dc8667a04f12122250f5fb759872dec", package="tentacle-identify" }
p2p = { git = "https://github.com/nervosnetwork/p2p", rev="53cb765b94041543a9c8582aa4d0d34fb2ac6d95", package="tentacle" }
secio = { git = "https://github.com/nervosnetwork/p2p", rev="53cb765b94041543a9c8582aa4d0d34fb2ac6d95", package="tentacle-secio" }
p2p-ping = { git = "https://github.com/nervosnetwork/p2p", rev="53cb765b94041543a9c8582aa4d0d34fb2ac6d95", package="tentacle-ping" }
p2p-discovery = { git = "https://github.com/nervosnetwork/p2p", rev="53cb765b94041543a9c8582aa4d0d34fb2ac6d95", package="tentacle-discovery" }
p2p-identify = { git = "https://github.com/nervosnetwork/p2p", rev="53cb765b94041543a9c8582aa4d0d34fb2ac6d95", package="tentacle-identify" }
faketime = "0.2.0"
rusqlite = {version = "0.16.0", features = ["bundled"]}
lazy_static = "1.3.0"
Expand Down
62 changes: 38 additions & 24 deletions network/src/network.rs
Expand Up @@ -338,33 +338,47 @@ pub struct EventHandler {
}

impl ServiceHandle for EventHandler {
fn handle_error(&mut self, _context: &mut ServiceContext, error: ServiceError) {
fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
warn!(target: "network", "p2p service error: {:?}", error);
if let ServiceError::DialerError {
ref address,
ref error,
} = error
{
debug!(target: "network", "add self address: {:?}", address);
if error == &P2pError::ConnectSelf {
let addr = address
.iter()
.filter(|proto| match proto {
multiaddr::Protocol::P2p(_) => false,
_ => true,
})
.collect();
self.network_state
.listened_addresses
.write()
.insert(addr, std::u8::MAX);
match error {
ServiceError::DialerError {
ref address,
ref error,
} => {
debug!(target: "network", "add self address: {:?}", address);
if error == &P2pError::ConnectSelf {
let addr = address
.iter()
.filter(|proto| match proto {
multiaddr::Protocol::P2p(_) => false,
_ => true,
})
.collect();
self.network_state
.listened_addresses
.write()
.insert(addr, std::u8::MAX);
}
if let Some(peer_id) = extract_peer_id(address) {
self.network_state
.failed_dials
.write()
.insert(peer_id, Instant::now());
}
}
if let Some(peer_id) = extract_peer_id(address) {
self.network_state
.failed_dials
.write()
.insert(peer_id, Instant::now());
ServiceError::ProtocolError { id, .. } => {
if let Err(err) = context.control().disconnect(id) {
warn!(target: "network", "send disconnect task(session_id={}) failed, error={:?}", id, err);
}
}
ServiceError::MuxerError {
session_context, ..
} => {
if let Err(err) = context.control().disconnect(session_context.id) {
warn!(target: "network", "send disconnect task(session_id={}) failed, error={:?}", session_context.id, err);
}
}
_ => {}
}
}

Expand Down
1 change: 1 addition & 0 deletions network/src/protocols/discovery.rs
Expand Up @@ -126,6 +126,7 @@ impl ServiceProtocol for DiscoveryProtocol {
} else {
warn!(target: "network", "other channel error: {:?}", err);
}
self.discovery_senders.remove(&session.id);
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions network/src/protocols/mod.rs
Expand Up @@ -11,7 +11,7 @@ use crate::{
NetworkState, PeerIndex, ProtocolContext, ProtocolContextMutRef, ServiceControl, SessionInfo,
};
use bytes::Bytes;
use log::{debug, error, info, warn};
use log::{debug, error, info, trace, warn};
use p2p::{
builder::MetaBuilder,
service::{ProtocolHandle, ProtocolMeta},
Expand All @@ -20,6 +20,10 @@ use p2p::{
};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::codec::length_delimited;

// Max message frame length: 20MB
const MAX_FRAME_LENGTH: usize = 20 * 1024 * 1024;

pub type ProtocolVersion = u32;

Expand Down Expand Up @@ -76,6 +80,13 @@ impl CKBProtocol {
MetaBuilder::default()
.id(self.id)
.name(move |_| protocol_name.clone())
.codec(|| {
Box::new(
length_delimited::Builder::new()
.max_frame_length(MAX_FRAME_LENGTH)
.new_codec(),
)
})
.support_versions(supported_versions)
.service_handle(move || {
let handler = CKBHandler::new(self.id, self.network_state, self.handler);
Expand Down Expand Up @@ -231,7 +242,7 @@ impl ServiceProtocol for CKBHandler {
.map(|peer_index| (peer_id, peer_index))
})
{
debug!(
trace!(
target: "network",
"ckb protocol received, addr: {}, protocol: {}, peer_id: {:?}",
session.address,
Expand Down Expand Up @@ -337,6 +348,7 @@ impl CKBProtocolContext for DefaultCKBProtocolContext {
.network_state
.get_peer_id(peer_index)
.ok_or_else(|| PeerError::IndexNotFound(peer_index))?;

let session_id = self
.network_state
.peers_registry
Expand Down
5 changes: 4 additions & 1 deletion rpc/src/module/pool.rs
Expand Up @@ -47,7 +47,10 @@ impl<CI: ChainIndex + 'static> PoolRpc for PoolRpcImpl<CI> {
Ok(cycles) => Some(cycles),
};
let entry = PoolEntry::new(tx.clone(), 0, cycles);
chain_state.mut_tx_pool().enqueue_tx(entry);
if !chain_state.mut_tx_pool().enqueue_tx(entry) {
// Duplicate tx
return Ok(tx_hash);
}
cycles
};
match cycles {
Expand Down
15 changes: 12 additions & 3 deletions shared/src/chain_state.rs
Expand Up @@ -12,7 +12,7 @@ use ckb_core::transaction::{OutPoint, ProposalShortId, Transaction};
use ckb_core::Cycle;
use ckb_verification::{TransactionError, TransactionVerifier};
use fnv::FnvHashSet;
use log::error;
use log::{error, trace};
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
Expand Down Expand Up @@ -104,7 +104,12 @@ impl<CI: ChainIndex> ChainState<CI> {
let short_id = tx.proposal_short_id();
let rtx = self.resolve_tx_from_pool(&tx, &tx_pool);
let verify_result = self.verify_rtx(&rtx, max_cycles);
let tx_hash = tx.hash();
if self.contains_proposal_id(&short_id) {
if !tx_pool.filter.insert(tx_hash.clone()) {
trace!(target: "tx_pool", "discarding already known transaction {:#x}", tx_hash);
return Err(PoolError::Duplicate);
}
let entry = PoolEntry::new(tx, 0, verify_result.map(Some).unwrap_or(None));
self.staging_tx(&mut tx_pool, entry, max_cycles)?;
Ok(verify_result.map_err(PoolError::InvalidTx)?)
Expand All @@ -113,12 +118,16 @@ impl<CI: ChainIndex> ChainState<CI> {
Ok(cycles) => {
// enqueue tx with cycles
let entry = PoolEntry::new(tx, 0, Some(cycles));
tx_pool.enqueue_tx(entry);
if !tx_pool.enqueue_tx(entry) {
return Err(PoolError::Duplicate);
}
Ok(cycles)
}
Err(TransactionError::UnknownInput) => {
let entry = PoolEntry::new(tx, 0, None);
tx_pool.enqueue_tx(entry);
if !tx_pool.enqueue_tx(entry) {
return Err(PoolError::Duplicate);
}
Err(PoolError::InvalidTx(TransactionError::UnknownInput))
}
Err(err) => Err(PoolError::InvalidTx(err)),
Expand Down
2 changes: 2 additions & 0 deletions shared/src/tx_pool/types.rs
Expand Up @@ -79,6 +79,8 @@ pub enum PoolError {
TimeOut,
/// BlockNumber is not right
InvalidBlockNumber,
/// Duplicate tx
Duplicate,
}

impl fmt::Display for PoolError {
Expand Down
38 changes: 38 additions & 0 deletions src/helper.rs
@@ -0,0 +1,38 @@
use ckb_util::{parking_lot::deadlock, Condvar, Mutex};
use log::warn;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

pub fn wait_for_exit() {
let exit = Arc::new((Mutex::new(()), Condvar::new()));

// Handle possible exits
let e = Arc::<(Mutex<()>, Condvar)>::clone(&exit);
let _ = ctrlc::set_handler(move || {
e.1.notify_all();
});

// Wait for signal
let mut l = exit.0.lock();
exit.1.wait(&mut l);
}

pub fn deadlock_detection() {
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(10));
let deadlocks = deadlock::check_deadlock();
if deadlocks.is_empty() {
continue;
}

warn!("{} deadlocks detected", deadlocks.len());
for (i, threads) in deadlocks.iter().enumerate() {
warn!("Deadlock #{}", i);
for t in threads {
warn!("Thread Id {:#?}", t.thread_id());
warn!("{:#?}", t.backtrace());
}
}
});
}
2 changes: 1 addition & 1 deletion src/main.rs
@@ -1,6 +1,6 @@
mod helper;
mod setup;
mod subcommand;
mod system;

use setup::{cli, ExitCode, Setup};

Expand Down
4 changes: 3 additions & 1 deletion src/subcommand/run.rs
@@ -1,5 +1,5 @@
use crate::helper::{deadlock_detection, wait_for_exit};
use crate::setup::{ExitCode, RunArgs};
use crate::system::wait_for_exit;
use ckb_chain::chain::{ChainBuilder, ChainController};
use ckb_db::diskdb::RocksDB;
use ckb_miner::BlockAssembler;
Expand All @@ -15,6 +15,8 @@ use log::info;
use std::sync::Arc;

pub fn run(args: RunArgs) -> Result<(), ExitCode> {
deadlock_detection();

let shared = SharedBuilder::<CacheDB<RocksDB>>::default()
.consensus(args.consensus)
.db(&args.config.db)
Expand Down
16 changes: 0 additions & 16 deletions src/system.rs

This file was deleted.

3 changes: 2 additions & 1 deletion sync/src/relayer/transaction_process.rs
Expand Up @@ -73,7 +73,8 @@ where
}
}
Err(PoolError::InvalidTx(TransactionError::UnknownInput))
| Err(PoolError::InvalidTx(TransactionError::Conflict)) => {
| Err(PoolError::InvalidTx(TransactionError::Conflict))
| Err(PoolError::Duplicate) => {
// this error may occured when peer's tip is different with us,
// we can't proof peer is bad so just ignore this
debug!(target: "relay", "peer {} relay a conflict or missing input tx: {:?}", self.peer, tx);
Expand Down
8 changes: 4 additions & 4 deletions sync/src/synchronizer/block_fetcher.rs
Expand Up @@ -10,7 +10,7 @@ use ckb_shared::index::ChainIndex;
use ckb_traits::ChainProvider;
use ckb_util::try_option;
use faketime::unix_time_as_millis;
use log::debug;
use log::{debug, trace};
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
use std::cmp;
Expand Down Expand Up @@ -48,7 +48,7 @@ where
.or_insert_with(Default::default);

if inflight.timestamp < unix_time_as_millis().saturating_sub(BLOCK_DOWNLOAD_TIMEOUT) {
debug!(target: "sync", "[block downloader] inflight block download timeout");
trace!(target: "sync", "[block downloader] inflight block download timeout");
inflight.clear();
}

Expand Down Expand Up @@ -123,7 +123,7 @@ where
}

pub fn fetch(self) -> Option<Vec<H256>> {
debug!(target: "sync", "[block downloader] BlockFetcher process");
trace!(target: "sync", "[block downloader] BlockFetcher process");

if self.initial_and_check_inflight() {
debug!(target: "sync", "[block downloader] inflight count reach limit");
Expand All @@ -133,7 +133,7 @@ where
let best_known_header = match self.peer_best_known_header() {
Some(best_known_header) => best_known_header,
_ => {
debug!(target: "sync", "[block downloader] peer_best_known_header not found peer={}", self.peer);
trace!(target: "sync", "[block downloader] peer_best_known_header not found peer={}", self.peer);
return None;
}
};
Expand Down
4 changes: 1 addition & 3 deletions sync/src/synchronizer/mod.rs
Expand Up @@ -628,9 +628,7 @@ impl<CI: ChainIndex> Synchronizer<CI> {
}
}
for peer in eviction {
warn!(target: "sync", "timeout eviction peer={}", peer);
// Do not connect this peer in 3 minutes
nc.ban_peer(peer, Duration::from_secs(180));
info!(target: "sync", "timeout eviction peer={}", peer);
nc.disconnect(peer);
}
}
Expand Down
2 changes: 1 addition & 1 deletion util/Cargo.toml
Expand Up @@ -6,4 +6,4 @@ authors = ["Nervos Core Dev <dev@nervos.org>"]
edition = "2018"

[dependencies]
parking_lot = "0.7"
parking_lot = {version = "0.7", features = ["deadlock_detection"]}
4 changes: 3 additions & 1 deletion util/src/lib.rs
@@ -1,7 +1,9 @@
mod unstable;

pub use crate::unstable::{TryFrom, TryInto};
pub use parking_lot::{Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
pub use parking_lot::{
self, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard,
};

/// Helper macro for reducing boilerplate code for matching `Option` together
/// with early return.
Expand Down

0 comments on commit b23e7d3

Please sign in to comment.