Skip to content

Commit

Permalink
[inetstack] Enhancement: pipe socket option through
Browse files Browse the repository at this point in the history
  • Loading branch information
iyzhang committed May 23, 2024
1 parent db8aa49 commit 99ad83c
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/rust/demikernel/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl Config {
let linger: u64 = if let Some(linger) = Self::get_typed_env_option(tcp_socket_options::LINGER)? {
linger
} else {
let section: &Yaml = Self::get_subsection(self.get_tcp_socket_options()?, tcp_socket_options::KEEP_ALIVE)?;
let section: &Yaml = Self::get_subsection(self.get_tcp_socket_options()?, tcp_socket_options::LINGER)?;
if Self::get_bool_option(section, "enabled")? {
Self::get_int_option(section, "time_seconds")?
} else {
Expand Down
5 changes: 5 additions & 0 deletions src/rust/inetstack/protocols/tcp/active_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::{
memory::DemiBuffer,
network::{
config::TcpConfig,
socket::option::TcpSocketOptions,
types::MacAddress,
NetworkRuntime,
},
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct ActiveOpenSocket<N: NetworkRuntime> {
ack_queue: SharedAsyncQueue<usize>,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
}
Expand All @@ -93,6 +95,7 @@ impl<N: NetworkRuntime> SharedActiveOpenSocket<N> {
recv_queue: SharedAsyncQueue<(Ipv4Header, TcpHeader, DemiBuffer)>,
ack_queue: SharedAsyncQueue<usize>,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
local_link_addr: MacAddress,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
Expand All @@ -109,6 +112,7 @@ impl<N: NetworkRuntime> SharedActiveOpenSocket<N> {
ack_queue,
local_link_addr,
tcp_config,
socket_options: default_socket_options,
arp,
dead_socket_tx,
})))
Expand Down Expand Up @@ -226,6 +230,7 @@ impl<N: NetworkRuntime> SharedActiveOpenSocket<N> {
self.ack_queue.clone(),
self.local_link_addr,
self.tcp_config.clone(),
self.socket_options,
self.arp.clone(),
remote_seq_num,
self.tcp_config.get_ack_delay_timeout(),
Expand Down
13 changes: 9 additions & 4 deletions src/rust/inetstack/protocols/tcp/established/ctrlblk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::{
memory::DemiBuffer,
network::{
config::TcpConfig,
socket::option::TcpSocketOptions,
types::MacAddress,
NetworkRuntime,
},
Expand Down Expand Up @@ -182,6 +183,7 @@ pub struct ControlBlock<N: NetworkRuntime> {
runtime: SharedDemiRuntime,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
socket_options: TcpSocketOptions,

// TODO: We shouldn't be keeping anything datalink-layer specific at this level. The IP layer should be holding
// this along with other remote IP information (such as routing, path MTU, etc).
Expand Down Expand Up @@ -250,6 +252,7 @@ impl<N: NetworkRuntime> SharedControlBlock<N> {
transport: N,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
receiver_seq_no: SeqNumber,
ack_delay_timeout: Duration,
Expand All @@ -272,6 +275,7 @@ impl<N: NetworkRuntime> SharedControlBlock<N> {
transport,
local_link_addr,
tcp_config,
socket_options: default_socket_options,
arp,
sender,
state: State::Established,
Expand Down Expand Up @@ -1127,10 +1131,10 @@ impl<N: NetworkRuntime> SharedControlBlock<N> {
}

// This coroutine runs the close protocol.
pub async fn close(&mut self, linger: Option<Duration>) -> Result<(), Fail> {
pub async fn close(&mut self) -> Result<(), Fail> {
// Assert we are in a valid state and move to new state.
match self.state {
State::Established => self.local_close(linger).await,
State::Established => self.local_close().await,
State::CloseWait => self.remote_already_closed().await,
_ => {
let cause: String = format!("socket is already closing");
Expand All @@ -1140,7 +1144,7 @@ impl<N: NetworkRuntime> SharedControlBlock<N> {
}
}

async fn local_close(&mut self, linger: Option<Duration>) -> Result<(), Fail> {
async fn local_close(&mut self) -> Result<(), Fail> {
// 0. Set state.
self.state = State::FinWait1;
// 1. Send FIN.
Expand Down Expand Up @@ -1182,7 +1186,8 @@ impl<N: NetworkRuntime> SharedControlBlock<N> {
}

// 3. TIMED_WAIT
let timeout: Duration = linger.unwrap_or(MSL * 2);
trace!("socket options: {:?}", self.socket_options.get_linger());
let timeout: Duration = self.socket_options.get_linger().unwrap_or(MSL * 2);
yield_with_timeout(timeout).await;
self.state = State::Closed;
Ok(())
Expand Down
7 changes: 5 additions & 2 deletions src/rust/inetstack/protocols/tcp/established/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
memory::DemiBuffer,
network::{
config::TcpConfig,
socket::option::TcpSocketOptions,
NetworkRuntime,
},
QDesc,
Expand Down Expand Up @@ -66,6 +67,7 @@ impl<N: NetworkRuntime> EstablishedSocket<N> {
ack_queue: SharedAsyncQueue<usize>,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
receiver_seq_no: SeqNumber,
ack_delay_timeout: Duration,
Expand All @@ -87,6 +89,7 @@ impl<N: NetworkRuntime> EstablishedSocket<N> {
transport,
local_link_addr,
tcp_config,
default_socket_options,
arp,
receiver_seq_no,
ack_delay_timeout,
Expand Down Expand Up @@ -129,8 +132,8 @@ impl<N: NetworkRuntime> EstablishedSocket<N> {
self.cb.pop(size).await
}

pub async fn close(&mut self, linger: Option<Duration>) -> Result<(), Fail> {
self.cb.close(linger).await
pub async fn close(&mut self) -> Result<(), Fail> {
self.cb.close().await
}

pub fn remote_mss(&self) -> usize {
Expand Down
6 changes: 6 additions & 0 deletions src/rust/inetstack/protocols/tcp/passive_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::{
network::{
config::TcpConfig,
consts::MAX_WINDOW_SCALE,
socket::option::TcpSocketOptions,
types::MacAddress,
NetworkRuntime,
},
Expand Down Expand Up @@ -85,6 +86,8 @@ pub struct PassiveSocket<N: NetworkRuntime> {
runtime: SharedDemiRuntime,
transport: N,
tcp_config: TcpConfig,
// We do not use these right now, but will in the future.
socket_options: TcpSocketOptions,
local_link_addr: MacAddress,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
Expand All @@ -107,6 +110,7 @@ impl<N: NetworkRuntime> SharedPassiveSocket<N> {
recv_queue: SharedAsyncQueue<(Ipv4Header, TcpHeader, DemiBuffer)>,
transport: N,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
local_link_addr: MacAddress,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
Expand All @@ -123,6 +127,7 @@ impl<N: NetworkRuntime> SharedPassiveSocket<N> {
runtime: runtime.clone(),
transport,
tcp_config,
socket_options: default_socket_options,
arp,
dead_socket_tx,
background_task_qt: None,
Expand Down Expand Up @@ -458,6 +463,7 @@ impl<N: NetworkRuntime> SharedPassiveSocket<N> {
ack_queue,
self.local_link_addr,
self.tcp_config.clone(),
self.socket_options,
self.arp.clone(),
remote_isn + SeqNumber::from(1),
self.tcp_config.get_ack_delay_timeout(),
Expand Down
8 changes: 7 additions & 1 deletion src/rust/inetstack/protocols/tcp/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use crate::{
network::{
config::TcpConfig,
socket::{
option::SocketOption,
option::{
SocketOption,
TcpSocketOptions,
},
SocketId,
},
types::MacAddress,
Expand Down Expand Up @@ -65,6 +68,7 @@ pub struct TcpPeer<N: NetworkRuntime> {
local_link_addr: MacAddress,
local_ipv4_addr: Ipv4Addr,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
rng: SmallRng,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
Expand Down Expand Up @@ -96,6 +100,7 @@ impl<N: NetworkRuntime> SharedTcpPeer<N> {
local_link_addr: config.local_link_addr()?,
local_ipv4_addr: config.local_ipv4_addr()?,
tcp_config: TcpConfig::new(config)?,
default_socket_options: TcpSocketOptions::new(config)?,
arp,
rng,
dead_socket_tx: tx,
Expand All @@ -110,6 +115,7 @@ impl<N: NetworkRuntime> SharedTcpPeer<N> {
self.transport.clone(),
self.local_link_addr,
self.tcp_config.clone(),
self.default_socket_options.clone(),
self.arp.clone(),
self.dead_socket_tx.clone(),
))
Expand Down
27 changes: 15 additions & 12 deletions src/rust/inetstack/protocols/tcp/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub struct TcpSocket<N: NetworkRuntime> {
network: N,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
tcp_options: TcpSocketOptions,
socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
}
Expand All @@ -95,6 +95,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
network: N,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
) -> Self {
Expand All @@ -105,7 +106,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
network,
local_link_addr,
tcp_config,
tcp_options: TcpSocketOptions::default(),
socket_options: default_socket_options,
arp,
dead_socket_tx,
}))
Expand All @@ -117,6 +118,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
network: N,
local_link_addr: MacAddress,
tcp_config: TcpConfig,
default_socket_options: TcpSocketOptions,
arp: SharedArpPeer<N>,
dead_socket_tx: mpsc::UnboundedSender<QDesc>,
) -> Self {
Expand All @@ -128,7 +130,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
network,
local_link_addr,
tcp_config,
tcp_options: TcpSocketOptions::default(),
socket_options: default_socket_options,
arp,
dead_socket_tx,
}))
Expand All @@ -137,9 +139,9 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
/// Set an SO_* option on the socket.
pub fn set_socket_option(&mut self, option: SocketOption) -> Result<(), Fail> {
match option {
SocketOption::Linger(linger) => self.tcp_options.set_linger(linger),
SocketOption::KeepAlive(keep_alive) => self.tcp_options.set_keepalive(keep_alive),
SocketOption::NoDelay(no_delay) => self.tcp_options.set_nodelay(no_delay),
SocketOption::Linger(linger) => self.socket_options.set_linger(linger),
SocketOption::KeepAlive(keep_alive) => self.socket_options.set_keepalive(keep_alive),
SocketOption::NoDelay(no_delay) => self.socket_options.set_nodelay(no_delay),
}
Ok(())
}
Expand All @@ -148,9 +150,9 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
/// [option].
pub fn get_socket_option(&mut self, option: SocketOption) -> Result<SocketOption, Fail> {
match option {
SocketOption::Linger(_) => Ok(SocketOption::Linger(self.tcp_options.get_linger())),
SocketOption::KeepAlive(_) => Ok(SocketOption::KeepAlive(self.tcp_options.get_keepalive())),
SocketOption::NoDelay(_) => Ok(SocketOption::NoDelay(self.tcp_options.get_nodelay())),
SocketOption::Linger(_) => Ok(SocketOption::Linger(self.socket_options.get_linger())),
SocketOption::KeepAlive(_) => Ok(SocketOption::KeepAlive(self.socket_options.get_keepalive())),
SocketOption::NoDelay(_) => Ok(SocketOption::NoDelay(self.socket_options.get_nodelay())),
}
}

Expand Down Expand Up @@ -189,6 +191,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
recv_queue.clone(),
self.network.clone(),
self.tcp_config.clone(),
self.socket_options.clone(),
self.local_link_addr,
self.arp.clone(),
self.dead_socket_tx.clone(),
Expand All @@ -212,6 +215,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
self.network.clone(),
self.local_link_addr,
self.tcp_config.clone(),
self.socket_options.clone(),
self.arp.clone(),
self.dead_socket_tx.clone(),
);
Expand All @@ -237,6 +241,7 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
recv_queue.clone(),
ack_queue,
self.tcp_config.clone(),
self.socket_options.clone(),
self.local_link_addr,
self.arp.clone(),
self.dead_socket_tx.clone(),
Expand Down Expand Up @@ -264,12 +269,10 @@ impl<N: NetworkRuntime> SharedTcpSocket<N> {
}

pub async fn close(&mut self) -> Result<Option<SocketId>, Fail> {
let linger: Option<Duration> = self.tcp_options.get_linger();

match self.state {
// Closing an active socket.
SocketState::Established(ref mut socket) => {
socket.close(linger).await?;
socket.close().await?;
Ok(Some(SocketId::Active(socket.endpoints().0, socket.endpoints().1)))
},
// Closing a listening socket.
Expand Down

0 comments on commit 99ad83c

Please sign in to comment.