Skip to content

Commit

Permalink
Merge pull request #1257 from microsoft/enhancement-inetstack-sockopts
Browse files Browse the repository at this point in the history
[libos] Add socket options
  • Loading branch information
iyzhang authored May 8, 2024
2 parents fc96ac0 + aac92af commit b7ee223
Show file tree
Hide file tree
Showing 28 changed files with 793 additions and 98 deletions.
13 changes: 10 additions & 3 deletions examples/tcp-close/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

use crate::{
helper_functions,
DEFAULT_LINGER,
DEFAULT_TIMEOUT,
};
use anyhow::Result;
use demikernel::{
demi_sgarray_t,
runtime::types::{
demi_opcode_t,
demi_qresult_t,
runtime::{
network::socket::option::SocketOption,
types::{
demi_opcode_t,
demi_qresult_t,
},
},
LibOS,
QDesc,
Expand Down Expand Up @@ -292,6 +296,9 @@ impl TcpClient {
/// Issues an open socket() operation and registers the queue descriptor for cleanup.
fn issue_socket(&mut self) -> Result<QDesc> {
let qd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;
// Set default linger to a short period, otherwise, this test will take a long time to complete.
self.libos
.set_socket_option(qd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;
self.qds.insert(qd);
Ok(qd)
}
Expand Down
1 change: 1 addition & 0 deletions examples/tcp-close/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use demikernel::{
//======================================================================================================================

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_LINGER: Duration = Duration::from_secs(10);

//======================================================================================================================
// main
Expand Down
12 changes: 9 additions & 3 deletions examples/tcp-close/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@

use crate::{
helper_functions,
DEFAULT_LINGER,
DEFAULT_TIMEOUT,
};
use anyhow::Result;
use demikernel::{
demi_sgarray_t,
runtime::types::{
demi_opcode_t,
demi_qresult_t,
runtime::{
network::socket::option::SocketOption,
types::{
demi_opcode_t,
demi_qresult_t,
},
},
LibOS,
QDesc,
Expand Down Expand Up @@ -77,6 +81,8 @@ impl TcpServer {
pub fn new(mut libos: LibOS, local: SocketAddr) -> Result<Self> {
// Create TCP socket.
let sockqd: QDesc = libos.socket(AF_INET, SOCK_STREAM, 0)?;
// Set default linger to a short period, otherwise, this test will take a long time to complete.
libos.set_socket_option(sockqd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

// Bind to local address.
libos.bind(sockqd, local)?;
Expand Down
14 changes: 13 additions & 1 deletion examples/tcp-echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
// Imports
//======================================================================================================================

use crate::DEFAULT_TIMEOUT;
use crate::{
DEFAULT_LINGER,
DEFAULT_TIMEOUT,
};
use anyhow::Result;
use demikernel::{
demi_sgarray_t,
Expand All @@ -16,6 +19,7 @@ use demikernel::{
LibOS,
QDesc,
QToken,
SocketOption,
};
use histogram::Histogram;
use std::{
Expand Down Expand Up @@ -104,6 +108,10 @@ impl TcpEchoClient {
// Open all connections.
for _ in 0..nclients {
let sockqd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;
// Set default linger to a short period, otherwise, this test will take a long time to complete.
self.libos
.set_socket_option(sockqd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

self.clients.insert(sockqd, (vec![0; self.bufsize], 0));
let qt: QToken = self.libos.connect(sockqd, self.remote)?;
let qr: demi_qresult_t = self.libos.wait(qt, Some(DEFAULT_TIMEOUT))?;
Expand Down Expand Up @@ -191,6 +199,10 @@ impl TcpEchoClient {
// Open several connections.
for i in 0..nclients {
let qd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;
// Set default linger to a short period, otherwise, this test will take a long time to complete.
self.libos
.set_socket_option(qd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

let qt: QToken = self.libos.connect(qd, self.remote)?;
self.register_operation(qd, qt);

Expand Down
1 change: 1 addition & 0 deletions examples/tcp-echo/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mod server;
//======================================================================================================================

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_LINGER: Duration = Duration::from_secs(10);

//======================================================================================================================
// Program Arguments
Expand Down
7 changes: 6 additions & 1 deletion examples/tcp-echo/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
// Imports
//======================================================================================================================

use crate::DEFAULT_TIMEOUT;
use crate::{
DEFAULT_LINGER,
DEFAULT_TIMEOUT,
};
use anyhow::Result;
use demikernel::{
demi_sgarray_t,
Expand All @@ -16,6 +19,7 @@ use demikernel::{
LibOS,
QDesc,
QToken,
SocketOption,
};
use std::{
collections::{
Expand Down Expand Up @@ -68,6 +72,7 @@ impl TcpEchoServer {
pub fn new(mut libos: LibOS, local: SocketAddr) -> Result<Self> {
// Create a TCP socket.
let sockqd: QDesc = libos.socket(AF_INET, SOCK_STREAM, 0)?;
libos.set_socket_option(sockqd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

// Bind the socket to a local address.
if let Err(e) = libos.bind(sockqd, local) {
Expand Down
10 changes: 9 additions & 1 deletion examples/tcp-wait/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
// Imports
//======================================================================================================================

use crate::DEFAULT_TIMEOUT;
use crate::{
DEFAULT_LINGER,
DEFAULT_TIMEOUT,
};
use ::anyhow::Result;
use ::demikernel::{
demi_sgarray_t,
Expand All @@ -16,6 +19,7 @@ use ::demikernel::{
LibOS,
QDesc,
QToken,
SocketOption,
};
use ::std::{
net::SocketAddr,
Expand Down Expand Up @@ -296,6 +300,10 @@ impl TcpClient {

fn connect_to_server(&mut self, num_clients: usize) -> Result<()> {
self.sockqd = Some(self.libos.socket(AF_INET, SOCK_STREAM, 0)?);
// Set default linger to a short period, otherwise, this test will take a long time to complete.
self.libos
.set_socket_option(self.sockqd.unwrap(), SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

let qt: QToken = self
.libos
.connect(self.sockqd.expect("should be a valid socket"), self.remote)?;
Expand Down
1 change: 1 addition & 0 deletions examples/tcp-wait/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use ::std::time::Duration;
//======================================================================================================================

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_LINGER: Duration = Duration::from_secs(10);

//======================================================================================================================
// main
Expand Down
4 changes: 4 additions & 0 deletions examples/tcp-wait/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Imports
//======================================================================================================================

use crate::DEFAULT_LINGER;
use ::anyhow::Result;
use ::demikernel::{
demi_sgarray_t,
Expand All @@ -15,6 +16,7 @@ use ::demikernel::{
LibOS,
QDesc,
QToken,
SocketOption,
};
use ::std::{
collections::{
Expand Down Expand Up @@ -71,6 +73,8 @@ impl TcpServer {
pub fn new(mut libos: LibOS, local: SocketAddr, nclients: usize) -> Result<Self> {
// Create TCP socket.
let sockqd: QDesc = libos.socket(AF_INET, SOCK_STREAM, 0)?;
// Set default linger to a short period, otherwise, this test will take a long time to complete.
libos.set_socket_option(sockqd, SocketOption::SO_LINGER(Some(DEFAULT_LINGER)))?;

// Bind to local address.
libos.bind(sockqd, local)?;
Expand Down
44 changes: 41 additions & 3 deletions src/rust/catloop/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@ use crate::{
conditional_yield_with_timeout,
fail::Fail,
memory::DemiBuffer,
network::unwrap_socketaddr,
network::{
socket::option::{
SocketOption,
TcpSocketOptions,
},
unwrap_socketaddr,
},
queue::QDesc,
OperationResult,
SharedObject,
Expand Down Expand Up @@ -68,6 +74,8 @@ pub struct MemorySocket {
pending_request_ids: HashSet<RequestId>,
/// Random number generator for request ids.
rng: SmallRng,
/// SO_LINGER option, which dictates how long to wait for the connection to close.
options: TcpSocketOptions,
}

pub struct SharedMemorySocket(SharedObject<MemorySocket>);
Expand All @@ -93,6 +101,7 @@ impl SharedMemorySocket {
rng: SmallRng::seed_from_u64(REQUEST_ID_SEED),
#[cfg(not(debug_assertions))]
rng: SmallRng::from_entropy(),
options: TcpSocketOptions::default(),
}))
}

Expand All @@ -108,9 +117,26 @@ impl SharedMemorySocket {
rng: SmallRng::seed_from_u64(REQUEST_ID_SEED),
#[cfg(not(debug_assertions))]
rng: SmallRng::from_entropy(),
options: TcpSocketOptions::default(),
}))
}

/// Set an SO_* option on the socket.
pub fn set_socket_option(&mut self, option: SocketOption) -> Result<(), Fail> {
match option {
SocketOption::SO_LINGER(linger) => self.options.set_linger(linger),
}
Ok(())
}

/// Gets an SO_* option on the socket. The option should be passed in as [option] and the value is returned in
/// [option].
pub fn get_socket_option(&mut self, option: SocketOption) -> Result<SocketOption, Fail> {
match option {
SocketOption::SO_LINGER(_) => Ok(SocketOption::SO_LINGER(self.options.get_linger())),
}
}

/// Binds the target socket to `local` address.
/// TODO: Should probably move the create of the duplex pipe to listen.
pub fn bind(&mut self, local: SocketAddrV4, catmem: &mut SharedCatmemLibOS) -> Result<(), Fail> {
Expand Down Expand Up @@ -230,11 +256,23 @@ impl SharedMemorySocket {
/// Closes `socket`.
pub async fn close(&mut self, catmem: SharedCatmemLibOS) -> Result<(), Fail> {
if let Some(qd) = self.catmem_qd {
match catmem.close_coroutine(qd).await {
let result = if let Some(linger) = self.options.get_linger() {
match conditional_yield_with_timeout(catmem.close_coroutine(qd), linger).await {
Err(e) if e.errno == libc::ETIMEDOUT => {
// This case is actually ok because we have waited out the linger time out.
return Ok(());
},
Err(e) => return Err(e),
Ok(result) => result,
}
} else {
catmem.close_coroutine(qd).await
};
match result {
(_, OperationResult::Close) => (),
(_, OperationResult::Failed(e)) => return Err(e),
_ => panic!("Should not return anything other than close or fail"),
}
};
};
Ok(())
}
Expand Down
16 changes: 16 additions & 0 deletions src/rust/catloop/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
MemoryRuntime,
},
network::{
socket::option::SocketOption,
transport::NetworkTransport,
unwrap_socketaddr,
},
Expand Down Expand Up @@ -80,6 +81,21 @@ impl NetworkTransport for SharedCatloopTransport {
Ok(SharedMemorySocket::new())
}

/// Set an SO_* option on the socket.
fn set_socket_option(&mut self, sd: &mut Self::SocketDescriptor, option: SocketOption) -> Result<(), Fail> {
sd.set_socket_option(option)
}

/// Gets an SO_* option on the socket. The option should be passed in as [option] and the value is returned in
/// [option].
fn get_socket_option(
&mut self,
sd: &mut Self::SocketDescriptor,
option: SocketOption,
) -> Result<SocketOption, Fail> {
sd.get_socket_option(option)
}

/// Binds a socket to a local endpoint. This function contains the libOS-level functionality needed to bind a
/// SharedCatloopQueue to a local address.
fn bind(&mut self, sd: &mut Self::SocketDescriptor, local: SocketAddr) -> Result<(), Fail> {
Expand Down
45 changes: 44 additions & 1 deletion src/rust/catnap/linux/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ use crate::{
DemiBuffer,
MemoryRuntime,
},
network::transport::NetworkTransport,
network::{
socket::option::SocketOption,
transport::NetworkTransport,
},
poll_yield,
DemiRuntime,
SharedDemiRuntime,
Expand Down Expand Up @@ -349,6 +352,46 @@ impl NetworkTransport for SharedCatnapTransport {
Ok(sd)
}

/// Set an SO_* option on the socket.
fn set_socket_option(&mut self, sd: &mut Self::SocketDescriptor, option: SocketOption) -> Result<(), Fail> {
trace!("Set socket option to {:?}", option);
let socket: &mut Socket = self.socket_from_sd(sd);
match option {
SocketOption::SO_LINGER(linger) => {
if let Err(e) = socket.set_linger(linger) {
let errno: i32 = get_libc_err(e);
let cause: String = format!("SO_LINGER failed: {:?}", errno);
error!("set_socket_option(): {}", cause);
Err(Fail::new(errno, &cause))
} else {
Ok(())
}
},
}
}

/// Gets an SO_* option on the socket. The option should be passed in as [option] and the value returned is either
/// an error or must match [option] with a value.
fn get_socket_option(
&mut self,
sd: &mut Self::SocketDescriptor,
option: SocketOption,
) -> Result<SocketOption, Fail> {
trace!("Set socket option to {:?}", option);
let socket: &mut Socket = self.socket_from_sd(sd);
match option {
SocketOption::SO_LINGER(_) => match socket.linger() {
Ok(linger) => Ok(SocketOption::SO_LINGER(linger)),
Err(e) => {
let errno: i32 = get_libc_err(e);
let cause: String = format!("SO_LINGER failed: {:?}", errno);
error!("set_socket_option(): {}", cause);
Err(Fail::new(errno, &cause))
},
},
}
}

/// Binds a socket to [local] on the underlying network transport.
fn bind(&mut self, sd: &mut Self::SocketDescriptor, local: SocketAddr) -> Result<(), Fail> {
trace!("Bind to {:?}", local);
Expand Down
Loading

0 comments on commit b7ee223

Please sign in to comment.