Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Networking fixes #480

Merged
merged 4 commits into from
Feb 21, 2016
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 27 additions & 8 deletions util/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,25 @@ impl Connection {
self.socket.peer_addr()
}

pub fn try_clone(&self) -> io::Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing docs?

Ok(Connection {
token: self.token,
socket: try!(self.socket.try_clone()),
rec_buf: Vec::new(),
rec_size: 0,
send_queue: self.send_queue.clone(),
interest: EventSet::hup() | EventSet::readable(),
stats: self.stats.clone(),
})
}

/// Register this connection with the IO event loop.
pub fn register_socket<Host: Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> io::Result<()> {
trace!(target: "net", "connection register; token={:?}", reg);
event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()).or_else(|e| {
if let Err(e) = event_loop.register(&self.socket, reg, self.interest, PollOpt::edge() | PollOpt::oneshot()) {
debug!("Failed to register {:?}, {:?}", reg, e);
Ok(())
})
}
Ok(())
}

/// Update connection registration. Should be called at the end of the IO handler.
Expand Down Expand Up @@ -265,7 +277,7 @@ impl EncryptedConnection {
}

/// Create an encrypted connection out of the handshake. Consumes a handshake object.
pub fn new(mut handshake: Handshake) -> Result<EncryptedConnection, UtilError> {
pub fn new(handshake: &mut Handshake) -> Result<EncryptedConnection, UtilError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it consumes the object, why not pass it by move rather than &mut?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not anymore, I'll update the comment

let shared = try!(crypto::ecdh::agree(handshake.ecdhe.secret(), &handshake.remote_public));
let mut nonce_material = H512::new();
if handshake.originated {
Expand Down Expand Up @@ -300,9 +312,8 @@ impl EncryptedConnection {
ingress_mac.update(&mac_material);
ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher });

handshake.connection.expect(ENCRYPTED_HEADER_LEN);
Ok(EncryptedConnection {
connection: handshake.connection,
let mut enc = EncryptedConnection {
connection: try!(handshake.connection.try_clone()),
encoder: encoder,
decoder: decoder,
mac_encoder: mac_encoder,
Expand All @@ -311,7 +322,9 @@ impl EncryptedConnection {
read_state: EncryptedConnectionState::Header,
protocol_id: 0,
payload_len: 0
})
};
enc.connection.expect(ENCRYPTED_HEADER_LEN);
Ok(enc)
}

/// Send a packet
Expand Down Expand Up @@ -440,6 +453,12 @@ impl EncryptedConnection {
Ok(())
}

/// Register socket with the event lpop. This should be called at the end of the event loop.
pub fn register_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.register_socket(reg, event_loop));
Ok(())
}

/// Update connection registration. This should be called at the end of the event loop.
pub fn update_socket<Host:Handler>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.update_socket(reg, event_loop));
Expand Down
73 changes: 47 additions & 26 deletions util/src/network/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub struct Handshake {
/// A copy of received encryped auth packet
pub auth_cipher: Bytes,
/// A copy of received encryped ack packet
pub ack_cipher: Bytes
pub ack_cipher: Bytes,
/// This Handshake is marked for deleteion flag
pub expired: bool,
}

const AUTH_PACKET_SIZE: usize = 307;
Expand All @@ -84,6 +86,7 @@ impl Handshake {
remote_nonce: H256::new(),
auth_cipher: Bytes::new(),
ack_cipher: Bytes::new(),
expired: false,
})
}

Expand All @@ -97,6 +100,16 @@ impl Handshake {
self.connection.token()
}

/// Mark this handshake as inactive to be deleted lated.
pub fn set_expired(&mut self) {
self.expired = true;
}

/// Check if this handshake is expired.
pub fn expired(&self) -> bool {
self.expired
}

/// Start a handhsake
pub fn start<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo, originated: bool) -> Result<(), UtilError> where Message: Send + Clone{
self.originated = originated;
Expand All @@ -118,47 +131,55 @@ impl Handshake {

/// Readable IO handler. Drives the state change.
pub fn readable<Message>(&mut self, io: &IoContext<Message>, host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
io.clear_timer(self.connection.token).unwrap();
match self.state {
HandshakeState::ReadingAuth => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth(host, &data));
try!(self.write_ack());
};
},
HandshakeState::ReadingAck => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_ack(host, &data));
self.state = HandshakeState::StartSession;
};
},
HandshakeState::StartSession => {},
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(io.update_registration(self.connection.token));
if !self.expired() {
io.clear_timer(self.connection.token).unwrap();
match self.state {
HandshakeState::ReadingAuth => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_auth(host, &data));
try!(self.write_ack());
};
},
HandshakeState::ReadingAck => {
if let Some(data) = try!(self.connection.readable()) {
try!(self.read_ack(host, &data));
self.state = HandshakeState::StartSession;
};
},
HandshakeState::StartSession => {},
_ => { panic!("Unexpected state"); }
}
if self.state != HandshakeState::StartSession {
try!(io.update_registration(self.connection.token));
}
}
Ok(())
}

/// Writabe IO handler.
pub fn writable<Message>(&mut self, io: &IoContext<Message>, _host: &HostInfo) -> Result<(), UtilError> where Message: Send + Clone {
io.clear_timer(self.connection.token).unwrap();
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
io.update_registration(self.connection.token).unwrap();
if !self.expired() {
io.clear_timer(self.connection.token).unwrap();
try!(self.connection.writable());
if self.state != HandshakeState::StartSession {
io.update_registration(self.connection.token).unwrap();
}
}
Ok(())
}

/// Register the socket with the event loop
pub fn register_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.register_socket(reg, event_loop));
if !self.expired() {
try!(self.connection.register_socket(reg, event_loop));
}
Ok(())
}

pub fn update_socket<Host:Handler<Timeout=Token>>(&self, reg: Token, event_loop: &mut EventLoop<Host>) -> Result<(), UtilError> {
try!(self.connection.update_socket(reg, event_loop));
if !self.expired() {
try!(self.connection.update_socket(reg, event_loop));
}
Ok(())
}

Expand Down