Skip to content

Commit

Permalink
Merge pull request nervosnetwork#448 from zhangsoledad/zhangsoledad/r…
Browse files Browse the repository at this point in the history
…elay_filter

feat: relay known filter
  • Loading branch information
doitian committed Apr 12, 2019
2 parents f986b24 + 3cf197c commit 14d8ffc
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 144 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion network/src/peers_registry.rs
Expand Up @@ -321,7 +321,7 @@ impl PeersRegistry {
&self.peers
}

fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
pub fn peer_indexes_guard(&self) -> &RwLock<FnvHashMap<PeerIndex, PeerId>> {
&self.peer_id_by_index
}

Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Expand Up @@ -28,6 +28,7 @@ ckb-traits = { path = "../traits" }
failure = "0.1.5"
bytes = "0.4.12"
hash = {path = "../util/hash"}
lru-cache = { git = "https://github.com/nervosnetwork/lru-cache" }

[dev-dependencies]
ckb-notify = { path = "../notify" }
Expand Down
38 changes: 31 additions & 7 deletions sync/src/relayer/mod.rs
Expand Up @@ -36,12 +36,15 @@ use flatbuffers::FlatBufferBuilder;
use fnv::{FnvHashMap, FnvHashSet};
use log::warn;
use log::{debug, info};
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

pub const TX_PROPOSAL_TOKEN: u64 = 0;
pub const MAX_RELAY_PEERS: usize = 128;
pub const TX_FILTER_SIZE: usize = 1000;

pub struct Relayer<CI> {
chain: ChainController,
Expand Down Expand Up @@ -184,17 +187,27 @@ impl<CI: ChainIndex> Relayer<CI> {

pub fn accept_block(&self, nc: &mut CKBProtocolContext, peer: PeerIndex, block: &Arc<Block>) {
let ret = self.chain.process_block(Arc::clone(&block));

if ret.is_ok() {
let block_hash = block.header().hash();
let fbb = &mut FlatBufferBuilder::new();
let message = RelayMessage::build_compact_block(fbb, block, &HashSet::new());
fbb.finish(message, None);

for peer_id in nc.connected_peers() {
if peer_id != peer {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
let mut known_blocks = self.peers.known_blocks.lock();
let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_blocks.insert(*peer_index, block_hash.clone()) && (*peer_index != peer)
})
.take(MAX_RELAY_PEERS)
.collect();

for peer_id in selected_peers {
let ret = nc.send(peer_id, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay compact_block error {:?}", ret);
}
}
} else {
Expand Down Expand Up @@ -360,9 +373,20 @@ impl<CI: ChainIndex> CKBProtocolHandler for Relayer<CI> {
}
}

#[derive(Default)]
pub struct RelayState {
pub pending_compact_blocks: Mutex<FnvHashMap<H256, CompactBlock>>,
pub inflight_proposals: Mutex<FnvHashSet<ProposalShortId>>,
pub pending_proposals_request: Mutex<FnvHashMap<ProposalShortId, FnvHashSet<PeerIndex>>>,
pub tx_filter: Mutex<LruCache<H256, ()>>,
}

impl Default for RelayState {
fn default() -> Self {
RelayState {
pending_compact_blocks: Mutex::new(FnvHashMap::default()),
inflight_proposals: Mutex::new(FnvHashSet::default()),
pending_proposals_request: Mutex::new(FnvHashMap::default()),
tx_filter: Mutex::new(LruCache::new(TX_FILTER_SIZE)),
}
}
}
42 changes: 28 additions & 14 deletions sync/src/relayer/transaction_process.rs
@@ -1,4 +1,5 @@
use crate::relayer::Relayer;
use crate::relayer::MAX_RELAY_PEERS;
use ckb_core::{transaction::Transaction, Cycle};
use ckb_network::{CKBProtocolContext, PeerIndex};
use ckb_protocol::{RelayMessage, ValidTransaction as FbsValidTransaction};
Expand All @@ -9,6 +10,7 @@ use ckb_verification::TransactionError;
use failure::Error as FailureError;
use flatbuffers::FlatBufferBuilder;
use log::{debug, warn};
use numext_fixed_hash::H256;
use std::convert::TryInto;
use std::time::Duration;

Expand Down Expand Up @@ -38,6 +40,13 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

pub fn execute(self) -> Result<(), FailureError> {
let (tx, relay_cycles): (Transaction, Cycle) = (*self.message).try_into()?;
let tx_hash = tx.hash();

if self.already_known(tx_hash.clone()) {
debug!(target: "relay", "discarding already known transaction {:#x}", tx_hash);
return Ok(());
}

let tx_result = {
let chain_state = self.relayer.shared.chain_state().lock();
let max_block_cycles = self.relayer.shared.consensus().max_block_cycles();
Expand All @@ -51,21 +60,21 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {
let message = RelayMessage::build_transaction(fbb, &tx, cycles);
fbb.finish(message, None);

for peer in self.nc.connected_peers() {
if peer != self.peer
&& self
.relayer
.peers()
.transaction_filters
.read()
.get(&peer)
.map_or(true, |filter| filter.contains(&tx))
{
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
let mut known_txs = self.relayer.peers.known_txs.lock();
let selected_peers: Vec<PeerIndex> = self
.nc
.connected_peers()
.into_iter()
.filter(|peer_index| {
known_txs.insert(*peer_index, tx_hash.clone()) && (self.peer != *peer_index)
})
.take(MAX_RELAY_PEERS)
.collect();

if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
for peer in selected_peers {
let ret = self.nc.send(peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "relay Transaction error {:?}", ret);
}
}
}
Expand All @@ -91,4 +100,9 @@ impl<'a, CI: ChainIndex> TransactionProcess<'a, CI> {

Ok(())
}

fn already_known(&self, hash: H256) -> bool {
let mut tx_filter = self.relayer.state.tx_filter.lock();
tx_filter.insert(hash, ()).is_some()
}
}
37 changes: 6 additions & 31 deletions sync/src/synchronizer/get_blocks_process.rs
Expand Up @@ -40,37 +40,12 @@ where
debug!(target: "sync", "get_blocks {:x}", block_hash);
if let Some(block) = self.synchronizer.get_block(&block_hash) {
debug!(target: "sync", "respond_block {} {:x}", block.header().number(), block.header().hash());
if let Some(filter) = self
.synchronizer
.peers
.transaction_filters
.read()
.get(&self.peer)
{
let transactions_index = block
.commit_transactions()
.iter()
.enumerate()
.filter(|(_index, tx)| filter.contains(tx))
.map(|ti| ti.0)
.collect::<Vec<_>>();

let fbb = &mut FlatBufferBuilder::new();
let message =
SyncMessage::build_filtered_block(fbb, &block, &transactions_index);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
let fbb = &mut FlatBufferBuilder::new();
let message = SyncMessage::build_block(fbb, &block);
fbb.finish(message, None);
let ret = self.nc.send(self.peer, fbb.finished_data().to_vec());
if ret.is_err() {
warn!(target: "relay", "response GetBlocks error {:?}", ret);
}
} else {
// TODO response not found
Expand Down
18 changes: 2 additions & 16 deletions sync/src/synchronizer/mod.rs
@@ -1,15 +1,13 @@
mod block_fetcher;
mod block_pool;
mod block_process;
mod filter_process;
mod get_blocks_process;
mod get_headers_process;
mod headers_process;

use self::block_fetcher::BlockFetcher;
use self::block_pool::OrphanBlockPool;
use self::block_process::BlockProcess;
use self::filter_process::{AddFilterProcess, ClearFilterProcess, SetFilterProcess};
use self::get_blocks_process::GetBlocksProcess;
use self::get_headers_process::GetHeadersProcess;
use self::headers_process::HeadersProcess;
Expand Down Expand Up @@ -158,22 +156,10 @@ impl<CI: ChainIndex> Synchronizer<CI> {
SyncPayload::Block => {
BlockProcess::new(&cast!(message.payload_as_block())?, self, peer, nc).execute()?;
}
SyncPayload::SetFilter => {
SetFilterProcess::new(&cast!(message.payload_as_set_filter())?, self, peer)
.execute()?;
}
SyncPayload::AddFilter => {
AddFilterProcess::new(&cast!(message.payload_as_add_filter())?, self, peer)
.execute()?;
}
SyncPayload::ClearFilter => {
ClearFilterProcess::new(self, peer).execute()?;
}
SyncPayload::FilteredBlock => {
// ignore, should not receive FilteredBlock in full node mode
SyncPayload::NONE => {
cast!(None)?;
}
SyncPayload::NONE => {
_ => {
cast!(None)?;
}
}
Expand Down
1 change: 0 additions & 1 deletion sync/src/tests/mod.rs
Expand Up @@ -9,7 +9,6 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

mod filter;
#[cfg(not(disable_faketime))]
mod relayer;
#[cfg(not(disable_faketime))]
Expand Down
103 changes: 29 additions & 74 deletions sync/src/types.rs
@@ -1,17 +1,17 @@
use bloom_filters::{
BloomFilter, ClassicBloomFilter, DefaultBuildHashKernels, UpdatableBloomFilter,
};
use ckb_core::block::Block;
use ckb_core::header::{BlockNumber, Header};
use ckb_core::transaction::Transaction;
use ckb_network::PeerIndex;
use ckb_util::Mutex;
use ckb_util::RwLock;
use faketime::unix_time_as_millis;
use fnv::{FnvHashMap, FnvHashSet};
use log::debug;
use lru_cache::LruCache;
use numext_fixed_hash::H256;
use numext_fixed_uint::U256;
use std::hash::{BuildHasher, Hasher};
use std::collections::hash_map::Entry;

const FILTER_SIZE: usize = 500;

// State used to enforce CHAIN_SYNC_TIMEOUT
// Only in effect for outbound, non-manual connections, with
Expand Down Expand Up @@ -57,14 +57,37 @@ pub struct PeerState {
pub chain_sync: ChainSyncState,
}

#[derive(Clone, Default)]
pub struct KnownFilter {
inner: FnvHashMap<PeerIndex, LruCache<H256, ()>>,
}

impl KnownFilter {
/// Adds a value to the filter.
/// If the filter did not have this value present, `true` is returned.
/// If the filter did have this value present, `false` is returned.
pub fn insert(&mut self, index: PeerIndex, hash: H256) -> bool {
match self.inner.entry(index) {
Entry::Occupied(mut o) => o.get_mut().insert(hash, ()).is_none(),
Entry::Vacant(v) => {
let mut lru = LruCache::new(FILTER_SIZE);
lru.insert(hash, ());
v.insert(lru);
true
}
}
}
}

#[derive(Default)]
pub struct Peers {
pub state: RwLock<FnvHashMap<PeerIndex, PeerState>>,
pub misbehavior: RwLock<FnvHashMap<PeerIndex, u32>>,
pub blocks_inflight: RwLock<FnvHashMap<PeerIndex, BlocksInflight>>,
pub best_known_headers: RwLock<FnvHashMap<PeerIndex, HeaderView>>,
pub last_common_headers: RwLock<FnvHashMap<PeerIndex, Header>>,
pub transaction_filters: RwLock<FnvHashMap<PeerIndex, TransactionFilter>>,
pub known_txs: Mutex<KnownFilter>,
pub known_blocks: Mutex<KnownFilter>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -230,71 +253,3 @@ impl HeaderView {
self.inner
}
}

pub struct TransactionFilter {
filter: ClassicBloomFilter<DefaultBuildHashKernels<HighLowBytesBuildHasher>>,
}

impl TransactionFilter {
pub fn new(raw_data: &[u8], k: usize, hash_seed: usize) -> Self {
Self {
filter: ClassicBloomFilter::with_raw_data(
raw_data,
k,
DefaultBuildHashKernels::new(hash_seed, HighLowBytesBuildHasher),
),
}
}

pub fn update(&mut self, raw_data: &[u8]) {
self.filter.update(raw_data)
}

pub fn insert(&mut self, hash: &H256) {
self.filter.insert(hash);
}

pub fn contains(&self, transaction: &Transaction) -> bool {
self.filter.contains(&transaction.hash())
|| transaction
.inputs()
.iter()
.any(|input| self.filter.contains(&input.previous_output.hash))
|| transaction
.outputs()
.iter()
.any(|output| self.filter.contains(&output.lock.hash()))
}
}

struct HighLowBytesBuildHasher;

impl BuildHasher for HighLowBytesBuildHasher {
type Hasher = HighLowBytesHasher;

fn build_hasher(&self) -> Self::Hasher {
HighLowBytesHasher(0)
}
}

/// a hasher which only accepts H256 bytes and use high / low bytes as hash value
struct HighLowBytesHasher(u64);

impl Hasher for HighLowBytesHasher {
fn write(&mut self, bytes: &[u8]) {
if bytes.len() == 32 {
self.0 = (u64::from(bytes[0]) << 56)
+ (u64::from(bytes[1]) << 48)
+ (u64::from(bytes[2]) << 40)
+ (u64::from(bytes[3]) << 32)
+ (u64::from(bytes[28]) << 24)
+ (u64::from(bytes[29]) << 16)
+ (u64::from(bytes[30]) << 8)
+ u64::from(bytes[31]);
}
}

fn finish(&self) -> u64 {
self.0
}
}

0 comments on commit 14d8ffc

Please sign in to comment.