Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Avoid broadcasting transactions to peers that send them #3796

Merged
merged 9 commits into from
Dec 12, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions ethcore/light/src/net/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

//! I/O and event context generalizations.

use network::{NetworkContext, PeerId};
use network::{NetworkContext, PeerId, NodeId};

use super::{Announcement, LightProtocol, ReqId};
use super::error::Error;
use request::Request;

/// An I/O context which allows sending and receiving packets as well as
/// An I/O context which allows sending and receiving packets as well as
/// disconnecting peers. This is used as a generalization of the portions
/// of a p2p network which the light protocol structure makes use of.
pub trait IoContext {
Expand All @@ -41,6 +41,9 @@ pub trait IoContext {

/// Get a peer's protocol version.
fn protocol_version(&self, peer: PeerId) -> Option<u8>;

/// Persistent peer id
fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId>;
}

impl<'a> IoContext for NetworkContext<'a> {
Expand All @@ -67,6 +70,10 @@ impl<'a> IoContext for NetworkContext<'a> {
fn protocol_version(&self, peer: PeerId) -> Option<u8> {
self.protocol_version(self.subprotocol_name(), peer)
}

fn persistent_peer_id(&self, peer: PeerId) -> Option<NodeId> {
self.session_info(peer).and_then(|info| info.id)
}
}

/// Context for a protocol event.
Expand All @@ -75,6 +82,9 @@ pub trait EventContext {
/// disconnected/connected peer.
fn peer(&self) -> PeerId;

/// Returns the relevant's peer persistent Id (aka NodeId).
fn persistent_peer_id(&self) -> Option<NodeId>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer for this to take arbitrary &PeerId ideally.


/// Make a request from a peer.
fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error>;

Expand All @@ -89,19 +99,26 @@ pub trait EventContext {
fn disable_peer(&self, peer: PeerId);
}

/// Concrete implementation of `EventContext` over the light protocol struct and
/// Concrete implementation of `EventContext` over the light protocol struct and
/// an io context.
pub struct Ctx<'a> {
/// Io context to enable immediate response to events.
pub io: &'a IoContext,
/// Protocol implementation.
pub proto: &'a LightProtocol,
/// Relevant peer for event.
pub peer: PeerId,
pub peer: PeerId,
}

impl<'a> EventContext for Ctx<'a> {
fn peer(&self) -> PeerId { self.peer }

fn peer(&self) -> PeerId {
self.peer
}

fn persistent_peer_id(&self) -> Option<NodeId> {
self.io.persistent_peer_id(self.peer)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation looks strange around this block. Does the file use spaces?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, this is my mistake from earlier. Fixed in #3801

fn request_from(&self, peer: PeerId, request: Request) -> Result<ReqId, Error> {
self.proto.request_from(self.io, &peer, request)
}
Expand All @@ -117,4 +134,4 @@ impl<'a> EventContext for Ctx<'a> {
fn disable_peer(&self, peer: PeerId) {
self.io.disable_peer(peer);
}
}
}
86 changes: 45 additions & 41 deletions ethcore/light/src/net/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

//! Tests for the `LightProtocol` implementation.
//! These don't test of the higher level logic on top of
//! These don't test of the higher level logic on top of

use ethcore::blockchain_info::BlockChainInfo;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient};
use ethcore::ids::BlockId;
use ethcore::transaction::SignedTransaction;
use network::PeerId;
use network::{PeerId, NodeId};

use net::buffer_flow::FlowParams;
use net::context::IoContext;
Expand Down Expand Up @@ -68,6 +68,10 @@ impl IoContext for Expect {
fn protocol_version(&self, _peer: PeerId) -> Option<u8> {
Some(super::MAX_PROTOCOL_VERSION)
}

fn persistent_peer_id(&self, _peer: PeerId) -> Option<NodeId> {
None
}
}

// can't implement directly for Arc due to cross-crate orphan rules.
Expand Down Expand Up @@ -106,7 +110,7 @@ impl Provider for TestProvider {
.map(|x: u64| x.saturating_mul(req.skip + 1))
.take_while(|x| if req.reverse { x < &start_num } else { best_num - start_num >= *x })
.map(|x| if req.reverse { start_num - x } else { start_num + x })
.map(|x| self.0.client.block_header(BlockId::Number(x)))
.map(|x| self.0.client.block_header(BlockId::Number(x)))
.take_while(|x| x.is_some())
.flat_map(|x| x)
.collect()
Expand Down Expand Up @@ -139,12 +143,12 @@ impl Provider for TestProvider {
}
}
})
.collect()
.collect()
}

fn contract_code(&self, req: request::ContractCodes) -> Vec<Bytes> {
req.code_requests.into_iter()
.map(|req| {
.map(|req| {
req.account_key.iter().chain(req.account_key.iter()).cloned().collect()
})
.collect()
Expand Down Expand Up @@ -202,9 +206,9 @@ fn status(chain_info: BlockChainInfo) -> Status {
#[test]
fn handshake_expected() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let status = status(provider.client.chain_info());

Expand All @@ -217,9 +221,9 @@ fn handshake_expected() {
#[should_panic]
fn genesis_mismatch() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let mut status = status(provider.client.chain_info());
status.genesis_hash = H256::default();
Expand All @@ -232,15 +236,15 @@ fn genesis_mismatch() {
#[test]
fn buffer_overflow() {
let flow_params = make_flow_params();
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
let packet_body = write_handshake(&status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
}

{
Expand All @@ -266,9 +270,9 @@ fn buffer_overflow() {
#[test]
fn get_block_headers() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
Expand All @@ -278,8 +282,8 @@ fn get_block_headers() {
let cur_status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}

Expand All @@ -300,7 +304,7 @@ fn get_block_headers() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Headers, 10);

let mut response_stream = RlpStream::new_list(12);

response_stream.append(&req_id).append(&new_buf);
for header in headers {
response_stream.append_raw(&header, 1);
Expand All @@ -316,9 +320,9 @@ fn get_block_headers() {
#[test]
fn get_block_bodies() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
Expand All @@ -328,8 +332,8 @@ fn get_block_bodies() {
let cur_status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}

Expand All @@ -347,7 +351,7 @@ fn get_block_bodies() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Bodies, 10);

let mut response_stream = RlpStream::new_list(12);

response_stream.append(&req_id).append(&new_buf);
for body in bodies {
response_stream.append_raw(&body, 1);
Expand All @@ -363,9 +367,9 @@ fn get_block_bodies() {
#[test]
fn get_block_receipts() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let cur_status = status(provider.client.chain_info());
let my_status = write_handshake(&cur_status, &capabilities, Some(&flow_params));
Expand All @@ -375,8 +379,8 @@ fn get_block_receipts() {
let cur_status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &my_status);
}

Expand All @@ -400,7 +404,7 @@ fn get_block_receipts() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Receipts, receipts.len());

let mut response_stream = RlpStream::new_list(2 + receipts.len());

response_stream.append(&req_id).append(&new_buf);
for block_receipts in receipts {
response_stream.append_raw(&block_receipts, 1);
Expand All @@ -416,23 +420,23 @@ fn get_block_receipts() {
#[test]
fn get_state_proofs() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let cur_status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
}

let req_id = 112;
let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into();

let request = Request::StateProofs (request::StateProofs {
let request = Request::StateProofs (request::StateProofs {
requests: vec![
request::StateProof { block: H256::default(), key1: key1, key2: None, from_level: 0 },
request::StateProof { block: H256::default(), key1: key1, key2: Some(key2), from_level: 0},
Expand All @@ -449,7 +453,7 @@ fn get_state_proofs() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::StateProofs, 2);

let mut response_stream = RlpStream::new_list(4);

response_stream.append(&req_id).append(&new_buf);
for proof in proofs {
response_stream.append_raw(&proof, 1);
Expand All @@ -465,23 +469,23 @@ fn get_state_proofs() {
#[test]
fn get_contract_code() {
let flow_params = FlowParams::new(5_000_000.into(), Default::default(), 0.into());
let capabilities = capabilities();
let capabilities = capabilities();

let (provider, proto) = setup(flow_params.clone(), capabilities.clone());
let (provider, proto) = setup(flow_params.clone(), capabilities.clone());

let cur_status = status(provider.client.chain_info());

{
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
let packet_body = write_handshake(&cur_status, &capabilities, Some(&flow_params));
proto.on_connect(&1, &Expect::Send(1, packet::STATUS, packet_body.clone()));
proto.handle_packet(&Expect::Nothing, &1, packet::STATUS, &packet_body);
}

let req_id = 112;
let key1 = U256::from(11223344).into();
let key2 = U256::from(99988887).into();

let request = Request::Codes (request::ContractCodes {
let request = Request::Codes (request::ContractCodes {
code_requests: vec![
request::ContractCode { block_hash: H256::default(), account_key: key1 },
request::ContractCode { block_hash: H256::default(), account_key: key2 },
Expand All @@ -498,7 +502,7 @@ fn get_contract_code() {
let new_buf = *flow_params.limit() - flow_params.compute_cost(request::Kind::Codes, 2);

let mut response_stream = RlpStream::new_list(4);

response_stream.append(&req_id).append(&new_buf);
for code in codes {
response_stream.append(&code);
Expand All @@ -509,4 +513,4 @@ fn get_contract_code() {

let expected = Expect::Respond(packet::CONTRACT_CODES, response);
proto.handle_packet(&expected, &1, packet::GET_CONTRACT_CODES, &request_body);
}
}
11 changes: 10 additions & 1 deletion ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Parity. If not, see <http://www.gnu.org/licenses/>.

use ipc::IpcConfig;
use util::H256;
use util::{H256, H512};

/// Represents what has to be handled by actor listening to chain events
#[ipc]
Expand All @@ -40,6 +40,15 @@ pub trait ChainNotify : Send + Sync {
fn stop(&self) {
// does nothing by default
}

/// fires when new transactions are imported
fn transactions_imported(&self,
_hashes: Vec<H256>,
_peer_id: Option<H512>,
_block_num: u64,
) {
// does nothing by default
}
}

impl IpcConfig for ChainNotify { }
Loading