Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
Network tracing improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
arkpar committed Mar 5, 2016
1 parent b14b4cf commit 6bd1ac5
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion ethcore/res/ethereum/tests
Submodule tests updated 31 files
+12,522 −1 BasicTests/difficulty.json
+240 −240 BasicTests/difficultyCustomHomestead.json
+5,520 −5,520 BasicTests/difficultyFrontier.json
+5,520 −5,520 BasicTests/difficultyHomestead.json
+5,520 −5,520 BasicTests/difficultyMorden.json
+5,520 −5,520 BasicTests/difficultyOlimpic.json
+186 −246 StateTests/Homestead/stCallCodes.json
+38 −38 StateTests/Homestead/stCallCreateCallCodeTest.json
+151 −201 StateTests/Homestead/stCallDelegateCodes.json
+153 −203 StateTests/Homestead/stCallDelegateCodesCallCode.json
+34 −34 StateTests/Homestead/stDelegatecallTest.json
+5 −5 StateTests/Homestead/stHomeSteadSpecific.json
+17 −17 StateTests/Homestead/stInitCodeTest.json
+46 −46 StateTests/Homestead/stLogTests.json
+10 −9 StateTests/Homestead/stMemoryStressTest.json
+65 −65 StateTests/Homestead/stMemoryTest.json
+89 −89 StateTests/Homestead/stPreCompiledContracts.json
+15 −15 StateTests/Homestead/stQuadraticComplexityTest.json
+2 −2 StateTests/Homestead/stRecursiveCreate.json
+17 −17 StateTests/Homestead/stRefundTest.json
+9 −9 StateTests/Homestead/stSpecialTest.json
+64 −71 StateTests/Homestead/stSystemOperationsTest.json
+40 −40 StateTests/Homestead/stTransactionTest.json
+85 −85 StateTests/Homestead/stWalletTest.json
+112 −172 StateTests/stCallCodes.json
+4 −3 StateTests/stMemoryStressTest.json
+9 −9 StateTests/stTransitionTest.json
+1 −1 TransactionTests/Homestead/tt10mbDataField.json
+57 −57 TransactionTests/Homestead/ttTransactionTest.json
+69 −69 TransactionTests/Homestead/ttWrongRLPTransaction.json
+3 −3 TransactionTests/tt10mbDataField.json
2 changes: 1 addition & 1 deletion sync/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ impl ChainSync {
pub fn on_peer_connected(&mut self, io: &mut SyncIo, peer: PeerId) {
trace!(target: "sync", "== Connected {}", peer);
if let Err(e) = self.send_status(io) {
warn!(target:"sync", "Error sending status request: {:?}", e);
trace!(target:"sync", "Error sending status request: {:?}", e);
io.disable_peer(peer);
}
}
Expand Down
10 changes: 5 additions & 5 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,25 +190,25 @@ impl Connection {

/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", reg);
trace!(target: "network", "connection register; token={:?}", reg);
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) {
debug!("Failed to register {:?}, {:?}", reg, e);
trace!(target: "network", "Failed to register {:?}, {:?}", reg, e);
}
Ok(())
}

/// Update connection registration. Should be called at the end of the IO handler.
pub fn update_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection reregister; token={:?}", reg);
trace!(target: "network", "connection reregister; token={:?}", reg);
event_loop.reregister( &self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
debug!("Failed to reregister {:?}, {:?}", reg, e);
trace!(target: "network", "Failed to reregister {:?}, {:?}", reg, e);
Ok(())
})
}

/// Delete connection registration. Should be called at the end of the IO handler.
pub fn deregister_socket<Host: Handler>(&self, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection deregister; token={:?}", self.token);
trace!(target: "network", "connection deregister; token={:?}", self.token);
event_loop.deregister(&self.socket).ok(); // ignore errors here
Ok(())
}
Expand Down
14 changes: 7 additions & 7 deletions util/src/network/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl Handshake {

/// Parse, validate and confirm auth message
fn read_auth(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received handshake auth from {:?}", self.connection.socket.peer_addr());
if data.len() != V4_AUTH_PACKET_SIZE {
debug!(target:"net", "Wrong auth packet size");
return Err(From::from(NetworkError::BadProtocol));
Expand Down Expand Up @@ -253,7 +253,7 @@ impl Handshake {
}

fn read_auth_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
self.auth_cipher.extend_from_slice(data);
let auth = try!(ecies::decrypt(secret, &self.auth_cipher[0..2], &self.auth_cipher[2..]));
let rlp = UntrustedRlp::new(&auth);
Expand All @@ -268,7 +268,7 @@ impl Handshake {

/// Parse and validate ack message
fn read_ack(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received handshake auth to {:?}", self.connection.socket.peer_addr());
if data.len() != V4_ACK_PACKET_SIZE {
debug!(target:"net", "Wrong ack packet size");
return Err(From::from(NetworkError::BadProtocol));
Expand Down Expand Up @@ -296,7 +296,7 @@ impl Handshake {
}

fn read_ack_eip8(&mut self, secret: &Secret, data: &[u8]) -> Result<(), UtilError> {
trace!(target:"net", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Received EIP8 handshake auth from {:?}", self.connection.socket.peer_addr());
self.ack_cipher.extend_from_slice(data);
let ack = try!(ecies::decrypt(secret, &self.ack_cipher[0..2], &self.ack_cipher[2..]));
let rlp = UntrustedRlp::new(&ack);
Expand All @@ -309,7 +309,7 @@ impl Handshake {

/// Sends auth message
fn write_auth(&mut self, secret: &Secret, public: &Public) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending handshake auth to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; /*Signature::SIZE*/ 65 + /*H256::SIZE*/ 32 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32 + 1]; //TODO: use associated constants
let len = data.len();
{
Expand All @@ -336,7 +336,7 @@ impl Handshake {

/// Sends ack message
fn write_ack(&mut self) -> Result<(), UtilError> {
trace!(target:"net", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending handshake ack to {:?}", self.connection.socket.peer_addr());
let mut data = [0u8; 1 + /*Public::SIZE*/ 64 + /*H256::SIZE*/ 32]; //TODO: use associated constants
let len = data.len();
{
Expand All @@ -355,7 +355,7 @@ impl Handshake {

/// Sends EIP8 ack message
fn write_ack_eip8(&mut self) -> Result<(), UtilError> {
trace!(target:"net", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr());
trace!(target:"network", "Sending EIP8 handshake ack to {:?}", self.connection.socket.peer_addr());
let mut rlp = RlpStream::new_list(3);
rlp.append(self.ecdhe.public());
rlp.append(&self.nonce);
Expand Down
43 changes: 24 additions & 19 deletions util/src/network/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,52 +170,55 @@ pub struct NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'sta
io: &'s IoContext<NetworkIoMessage<Message>>,
protocol: ProtocolId,
sessions: Arc<RwLock<Slab<SharedSession>>>,
session: Option<StreamToken>,
session: Option<SharedSession>,
session_id: Option<StreamToken>,
}

impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone + 'static, {
/// Create a new network IO access point. Takes references to all the data that can be updated within the IO handler.
fn new(io: &'s IoContext<NetworkIoMessage<Message>>,
protocol: ProtocolId,
session: Option<StreamToken>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> {
session: Option<SharedSession>, sessions: Arc<RwLock<Slab<SharedSession>>>) -> NetworkContext<'s, Message> {
let id = session.as_ref().map(|s| s.lock().unwrap().token());
NetworkContext {
io: io,
protocol: protocol,
session_id: id,
session: session,
sessions: sessions,
}
}

fn resolve_session(&self, peer: PeerId) -> Option<SharedSession> {
match self.session_id {
Some(id) if id == peer => self.session.clone(),
_ => self.sessions.read().unwrap().get(peer).cloned(),
}
}

/// Send a packet over the network to another peer.
pub fn send(&self, peer: PeerId, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
let session = { self.sessions.read().unwrap().get(peer).cloned() };
let session = self.resolve_session(peer);
if let Some(session) = session {
session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data).unwrap_or_else(|e| {
warn!(target: "network", "Send error: {:?}", e);
}); //TODO: don't copy vector data
try!(session.lock().unwrap().deref_mut().send_packet(self.protocol, packet_id as u8, &data));
try!(self.io.update_registration(peer));
} else {
trace!(target: "network", "Send: Peer no longer exist")
}
Ok(())
}

/// Respond to a current network message. Panics if no there is no packet in the context.
/// Respond to a current network message. Panics if no there is no packet in the context. If the session is expired returns nothing.
pub fn respond(&self, packet_id: PacketId, data: Vec<u8>) -> Result<(), UtilError> {
match self.session {
Some(session) => self.send(session, packet_id, data),
None => {
panic!("Respond: Session does not exist")
}
}
assert!(self.session.is_some(), "Respond called without network context");
self.send(self.session_id.unwrap(), packet_id, data)
}

/// Send an IO message
pub fn message(&self, msg: Message) {
self.io.message(NetworkIoMessage::User(msg));
}


/// Disable current protocol capability for given peer. If no capabilities left peer gets disconnected.
pub fn disable_peer(&self, peer: PeerId) {
//TODO: remove capability, disconnect if no capabilities left
Expand All @@ -239,7 +242,7 @@ impl<'s, Message> NetworkContext<'s, Message> where Message: Send + Sync + Clone

/// Returns peer identification string
pub fn peer_info(&self, peer: PeerId) -> String {
let session = { self.sessions.read().unwrap().get(peer).cloned() };
let session = self.resolve_session(peer);
if let Some(session) = session {
return session.lock().unwrap().info.client_version.clone()
}
Expand Down Expand Up @@ -624,7 +627,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut packet_data: Option<(ProtocolId, PacketId, Vec<u8>)> = None;
let mut kill = false;
let session = { self.sessions.read().unwrap().get(token).cloned() };
if let Some(session) = session {
if let Some(session) = session.clone() {
let mut s = session.lock().unwrap();
match s.readable(io, &self.info.read().unwrap()) {
Err(e) => {
Expand Down Expand Up @@ -656,11 +659,11 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
for p in ready_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.connected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token);
h.connected(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token);
}
if let Some((p, packet_id, data)) = packet_data {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.read(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token, packet_id, &data[1..]);
h.read(&NetworkContext::new(io, p, session.clone(), self.sessions.clone()), &token, packet_id, &data[1..]);
}
io.update_registration(token).unwrap_or_else(|e| debug!(target: "network", "Token registration error: {:?}", e));
}
Expand Down Expand Up @@ -718,6 +721,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
let mut to_disconnect: Vec<ProtocolId> = Vec::new();
let mut failure_id = None;
let mut deregister = false;
let mut expired_session = None;
match token {
FIRST_HANDSHAKE ... LAST_HANDSHAKE => {
let handshakes = self.handshakes.write().unwrap();
Expand All @@ -733,6 +737,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
FIRST_SESSION ... LAST_SESSION => {
let sessions = self.sessions.write().unwrap();
if let Some(session) = sessions.get(token).cloned() {
expired_session = Some(session.clone());
let mut s = session.lock().unwrap();
if !s.expired() {
if s.is_ready() {
Expand All @@ -757,7 +762,7 @@ impl<Message> Host<Message> where Message: Send + Sync + Clone {
}
for p in to_disconnect {
let h = self.handlers.read().unwrap().get(p).unwrap().clone();
h.disconnected(&NetworkContext::new(io, p, Some(token), self.sessions.clone()), &token);
h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone()), &token);
}
if deregister {
io.deregister_stream(token).expect("Error deregistering stream");
Expand Down
9 changes: 6 additions & 3 deletions util/src/network/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ impl Session {

/// Send a protocol packet to peer.
pub fn send_packet(&mut self, protocol: &str, packet_id: u8, data: &[u8]) -> Result<(), UtilError> {
if self.expired() {
return Err(From::from(NetworkError::Expired));
}
let mut i = 0usize;
while protocol != self.info.capabilities[i].protocol {
i += 1;
Expand Down Expand Up @@ -351,15 +354,15 @@ impl Session {
offset += caps[i].packet_count;
i += 1;
}
trace!(target: "net", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
trace!(target: "network", "Hello: {} v{} {} {:?}", client_version, protocol, id, caps);
self.info.client_version = client_version;
self.info.capabilities = caps;
if self.info.capabilities.is_empty() {
trace!("No common capabilities with peer.");
trace!(target: "network", "No common capabilities with peer.");
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));
}
if protocol != host.protocol_version {
trace!("Peer protocol version mismatch: {}", protocol);
trace!(target: "network", "Peer protocol version mismatch: {}", protocol);
return Err(From::from(self.disconnect(DisconnectReason::UselessPeer)));
}
self.had_hello = true;
Expand Down

0 comments on commit 6bd1ac5

Please sign in to comment.