Skip to content

Commit

Permalink
Implement new type for poll_at
Browse files Browse the repository at this point in the history
Change return-type for `poll_at` for sockets to be a `PollAt` instead of
the former `Option<Instant>`.

Closes: #216
Approved by: whitequark
  • Loading branch information
barskern authored and Homu committed May 16, 2018
1 parent 9a7318d commit 805e11b
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 54 deletions.
10 changes: 7 additions & 3 deletions src/iface/ethernet.rs
Expand Up @@ -31,7 +31,7 @@ use wire::{UdpPacket, UdpRepr};
#[cfg(feature = "socket-tcp")]
use wire::{TcpPacket, TcpRepr, TcpControl};

use socket::{Socket, SocketSet, AnySocket};
use socket::{Socket, SocketSet, AnySocket, PollAt};
#[cfg(feature = "socket-raw")]
use socket::RawSocket;
#[cfg(all(feature = "socket-icmp", any(feature = "proto-ipv4", feature = "proto-ipv6")))]
Expand Down Expand Up @@ -386,8 +386,12 @@ impl<'b, 'c, DeviceT> Interface<'b, 'c, DeviceT>
pub fn poll_at(&self, sockets: &SocketSet, timestamp: Instant) -> Option<Instant> {
sockets.iter().filter_map(|socket| {
let socket_poll_at = socket.poll_at();
socket.meta().poll_at(socket_poll_at, |ip_addr|
self.inner.has_neighbor(&ip_addr, timestamp))
match socket.meta().poll_at(socket_poll_at, |ip_addr|
self.inner.has_neighbor(&ip_addr, timestamp)) {
PollAt::Ingress => None,
PollAt::Time(instant) => Some(instant),
PollAt::Now => Some(Instant::from_millis(0)),
}
}).min()
}

Expand Down
9 changes: 4 additions & 5 deletions src/socket/icmp.rs
Expand Up @@ -2,9 +2,8 @@ use core::cmp;

use {Error, Result};
use phy::{ChecksumCapabilities, DeviceCapabilities};
use socket::{Socket, SocketMeta, SocketHandle};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{PacketBuffer, PacketMetadata};
use time::Instant;
use wire::{IpAddress, IpEndpoint, IpProtocol, IpRepr};

#[cfg(feature = "proto-ipv4")]
Expand Down Expand Up @@ -352,11 +351,11 @@ impl<'a, 'b> IcmpSocket<'a, 'b> {
})
}

pub(crate) fn poll_at(&self) -> Option<Instant> {
pub(crate) fn poll_at(&self) -> PollAt {
if self.tx_buffer.is_empty() {
None
PollAt::Ingress
} else {
Some(Instant::from_millis(0))
PollAt::Now
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/socket/meta.rs
@@ -1,5 +1,5 @@
use wire::IpAddress;
use super::SocketHandle;
use super::{SocketHandle, PollAt};
use time::{Duration, Instant};

/// Neighbor dependency.
Expand Down Expand Up @@ -44,7 +44,7 @@ impl Meta {
/// See also `iface::NeighborCache::SILENT_TIME`.
pub(crate) const DISCOVERY_SILENT_TIME: Duration = Duration { millis: 3_000 };

pub(crate) fn poll_at<F>(&self, socket_poll_at: Option<Instant>, has_neighbor: F) -> Option<Instant>
pub(crate) fn poll_at<F>(&self, socket_poll_at: PollAt, has_neighbor: F) -> PollAt
where F: Fn(IpAddress) -> bool
{
match self.neighbor_state {
Expand All @@ -54,7 +54,7 @@ impl Meta {
if has_neighbor(neighbor) =>
socket_poll_at,
NeighborState::Waiting { silent_until, .. } =>
Some(silent_until)
PollAt::Time(silent_until)
}
}

Expand Down
22 changes: 21 additions & 1 deletion src/socket/mod.rs
Expand Up @@ -55,6 +55,26 @@ pub use self::set::{Iter as SocketSetIter, IterMut as SocketSetIterMut};
pub use self::ref_::Ref as SocketRef;
pub(crate) use self::ref_::Session as SocketSession;

/// Gives an indication on when the socket should be polled
#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Clone, Copy)]
pub(crate) enum PollAt {
/// Should be polled immidiately
Now,
/// Should be polled at given [Instant][struct.Instant]
Time(Instant),
/// Should be polled on incoming packet
Ingress,
}

impl PollAt {
fn is_ingress(&self) -> bool {
match self {
&PollAt::Ingress => true,
_ => false,
}
}
}

/// A network socket.
///
/// This enumeration abstracts the various types of sockets based on the IP protocol.
Expand Down Expand Up @@ -116,7 +136,7 @@ impl<'a, 'b> Socket<'a, 'b> {
dispatch_socket!(mut self, |socket| &mut socket.meta)
}

pub(crate) fn poll_at(&self) -> Option<Instant> {
pub(crate) fn poll_at(&self) -> PollAt {
dispatch_socket!(self, |socket| socket.poll_at())
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/socket/raw.rs
Expand Up @@ -2,9 +2,8 @@ use core::cmp::min;

use {Error, Result};
use phy::ChecksumCapabilities;
use socket::{Socket, SocketMeta, SocketHandle};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{PacketBuffer, PacketMetadata};
use time::Instant;
use wire::{IpVersion, IpRepr, IpProtocol};
#[cfg(feature = "proto-ipv4")]
use wire::{Ipv4Repr, Ipv4Packet};
Expand Down Expand Up @@ -208,11 +207,11 @@ impl<'a, 'b> RawSocket<'a, 'b> {
})
}

pub(crate) fn poll_at(&self) -> Option<Instant> {
pub(crate) fn poll_at(&self) -> PollAt {
if self.tx_buffer.is_empty() {
None
PollAt::Ingress
} else {
Some(Instant::from_millis(0))
PollAt::Now
}
}
}
Expand Down
65 changes: 33 additions & 32 deletions src/socket/tcp.rs
Expand Up @@ -7,7 +7,7 @@ use core::{cmp, fmt};
use {Error, Result};
use phy::DeviceCapabilities;
use time::{Duration, Instant};
use socket::{Socket, SocketMeta, SocketHandle};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{Assembler, RingBuffer};
use wire::{IpProtocol, IpRepr, IpAddress, IpEndpoint, TcpSeqNumber, TcpRepr, TcpControl};

Expand Down Expand Up @@ -104,11 +104,12 @@ impl Timer {
}
}

fn poll_at(&self) -> Option<Instant> {
fn poll_at(&self) -> PollAt {
match *self {
Timer::Idle { keep_alive_at } => keep_alive_at,
Timer::Retransmit { expires_at, .. } => Some(expires_at),
Timer::Close { expires_at } => Some(expires_at),
Timer::Idle { keep_alive_at: Some(keep_alive_at) } => PollAt::Time(keep_alive_at),
Timer::Idle { keep_alive_at: None } => PollAt::Ingress,
Timer::Retransmit { expires_at, .. } => PollAt::Time(expires_at),
Timer::Close { expires_at } => PollAt::Time(expires_at),
}
}

Expand Down Expand Up @@ -1462,34 +1463,34 @@ impl<'a> TcpSocket<'a> {
Ok(())
}

pub(crate) fn poll_at(&self) -> Option<Instant> {
pub(crate) fn poll_at(&self) -> PollAt {
// The logic here mirrors the beginning of dispatch() closely.
if !self.remote_endpoint.is_specified() {
// No one to talk to, nothing to transmit.
None
PollAt::Ingress
} else if self.remote_last_ts.is_none() {
// Socket stopped being quiet recently, we need to acquire a timestamp.
Some(Instant::from_millis(0))
PollAt::Now
} else if self.state == State::Closed {
// Socket was aborted, we have an RST packet to transmit.
Some(Instant::from_millis(0))
PollAt::Now
} else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() {
// We have a data or flag packet to transmit.
Some(Instant::from_millis(0))
PollAt::Now
} else {
let timeout_poll_at = match (self.remote_last_ts, self.timeout) {
// If we're transmitting or retransmitting data, we need to poll at the moment
// when the timeout would expire.
(Some(remote_last_ts), Some(timeout)) => Some(remote_last_ts + timeout),
(Some(remote_last_ts), Some(timeout)) => PollAt::Time(remote_last_ts + timeout),
// Otherwise we have no timeout.
(_, _) => None
(_, _) => PollAt::Ingress,
};

// We wait for the earliest of our timers to fire.
[self.timer.poll_at(), timeout_poll_at]
*[self.timer.poll_at(), timeout_poll_at]
.iter()
.filter_map(|x| *x)
.min()
.filter(|x| !x.is_ingress())
.min().unwrap_or(&PollAt::Ingress)
}
}
}
Expand Down Expand Up @@ -3409,7 +3410,7 @@ mod test {
fn test_listen_timeout() {
let mut s = socket_listen();
s.set_timeout(Some(Duration::from_millis(100)));
assert_eq!(s.poll_at(), None);
assert_eq!(s.poll_at(), PollAt::Ingress);
}

#[test]
Expand All @@ -3426,7 +3427,7 @@ mod test {
..RECV_TEMPL
}));
assert_eq!(s.state, State::SynSent);
assert_eq!(s.poll_at(), Some(Instant::from_millis(250)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(250)));
recv!(s, time 250, Ok(TcpRepr {
control: TcpControl::Rst,
seq_number: LOCAL_SEQ + 1,
Expand All @@ -3441,23 +3442,23 @@ mod test {
let mut s = socket_established();
s.set_timeout(Some(Duration::from_millis(200)));
recv!(s, time 250, Err(Error::Exhausted));
assert_eq!(s.poll_at(), Some(Instant::from_millis(450)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(450)));
s.send_slice(b"abcdef").unwrap();
assert_eq!(s.poll_at(), Some(Instant::from_millis(0)));
assert_eq!(s.poll_at(), PollAt::Now);
recv!(s, time 255, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"abcdef"[..],
..RECV_TEMPL
}));
assert_eq!(s.poll_at(), Some(Instant::from_millis(355)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(355)));
recv!(s, time 355, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
payload: &b"abcdef"[..],
..RECV_TEMPL
}));
assert_eq!(s.poll_at(), Some(Instant::from_millis(455)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(455)));
recv!(s, time 500, Ok(TcpRepr {
control: TcpControl::Rst,
seq_number: LOCAL_SEQ + 1 + 6,
Expand All @@ -3479,21 +3480,21 @@ mod test {
..RECV_TEMPL
}));
recv!(s, time 100, Err(Error::Exhausted));
assert_eq!(s.poll_at(), Some(Instant::from_millis(150)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(150)));
send!(s, time 105, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
..SEND_TEMPL
});
assert_eq!(s.poll_at(), Some(Instant::from_millis(155)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(155)));
recv!(s, time 155, Ok(TcpRepr {
seq_number: LOCAL_SEQ,
ack_number: Some(REMOTE_SEQ + 1),
payload: &[0],
..RECV_TEMPL
}));
recv!(s, time 155, Err(Error::Exhausted));
assert_eq!(s.poll_at(), Some(Instant::from_millis(205)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(205)));
recv!(s, time 200, Err(Error::Exhausted));
recv!(s, time 205, Ok(TcpRepr {
control: TcpControl::Rst,
Expand All @@ -3515,7 +3516,7 @@ mod test {
ack_number: Some(REMOTE_SEQ + 1),
..RECV_TEMPL
}));
assert_eq!(s.poll_at(), Some(Instant::from_millis(200)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(200)));
recv!(s, time 400, Ok(TcpRepr {
control: TcpControl::Rst,
seq_number: LOCAL_SEQ + 1 + 1,
Expand All @@ -3535,7 +3536,7 @@ mod test {
ack_number: Some(REMOTE_SEQ + 1 + 1),
..RECV_TEMPL
}));
assert_eq!(s.poll_at(), Some(Instant::from_millis(200)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(200)));
recv!(s, time 400, Ok(TcpRepr {
control: TcpControl::Rst,
seq_number: LOCAL_SEQ + 1 + 1,
Expand All @@ -3551,14 +3552,14 @@ mod test {
s.set_timeout(Some(Duration::from_millis(200)));
s.remote_last_ts = Some(Instant::from_millis(100));
s.abort();
assert_eq!(s.poll_at(), Some(Instant::from_millis(0)));
assert_eq!(s.poll_at(), PollAt::Now);
recv!(s, time 100, Ok(TcpRepr {
control: TcpControl::Rst,
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1),
..RECV_TEMPL
}));
assert_eq!(s.poll_at(), None);
assert_eq!(s.poll_at(), PollAt::Ingress);
}

// =========================================================================================//
Expand All @@ -3585,15 +3586,15 @@ mod test {
s.set_keep_alive(Some(Duration::from_millis(100)));

// drain the forced keep-alive packet
assert_eq!(s.poll_at(), Some(Instant::from_millis(0)));
assert_eq!(s.poll_at(), PollAt::Now);
recv!(s, time 0, Ok(TcpRepr {
seq_number: LOCAL_SEQ,
ack_number: Some(REMOTE_SEQ + 1),
payload: &[0],
..RECV_TEMPL
}));

assert_eq!(s.poll_at(), Some(Instant::from_millis(100)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(100)));
recv!(s, time 95, Err(Error::Exhausted));
recv!(s, time 100, Ok(TcpRepr {
seq_number: LOCAL_SEQ,
Expand All @@ -3602,7 +3603,7 @@ mod test {
..RECV_TEMPL
}));

assert_eq!(s.poll_at(), Some(Instant::from_millis(200)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(200)));
recv!(s, time 195, Err(Error::Exhausted));
recv!(s, time 200, Ok(TcpRepr {
seq_number: LOCAL_SEQ,
Expand All @@ -3616,7 +3617,7 @@ mod test {
ack_number: Some(LOCAL_SEQ + 1),
..SEND_TEMPL
});
assert_eq!(s.poll_at(), Some(Instant::from_millis(350)));
assert_eq!(s.poll_at(), PollAt::Time(Instant::from_millis(350)));
recv!(s, time 345, Err(Error::Exhausted));
recv!(s, time 350, Ok(TcpRepr {
seq_number: LOCAL_SEQ,
Expand Down
9 changes: 4 additions & 5 deletions src/socket/udp.rs
@@ -1,9 +1,8 @@
use core::cmp::min;

use {Error, Result};
use socket::{Socket, SocketMeta, SocketHandle};
use socket::{Socket, SocketMeta, SocketHandle, PollAt};
use storage::{PacketBuffer, PacketMetadata};
use time::Instant;
use wire::{IpProtocol, IpRepr, IpEndpoint, UdpRepr};

/// A UDP packet metadata.
Expand Down Expand Up @@ -212,11 +211,11 @@ impl<'a, 'b> UdpSocket<'a, 'b> {
})
}

pub(crate) fn poll_at(&self) -> Option<Instant> {
pub(crate) fn poll_at(&self) -> PollAt {
if self.tx_buffer.is_empty() {
None
PollAt::Ingress
} else {
Some(Instant::from_millis(0))
PollAt::Now
}
}
}
Expand Down

0 comments on commit 805e11b

Please sign in to comment.