Skip to content

Commit

Permalink
[catpowder] Enhancement: Improve XDP LibOS
Browse files Browse the repository at this point in the history
  • Loading branch information
ppenna committed Jul 2, 2024
1 parent 5074b9e commit 5021349
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 79 deletions.
28 changes: 16 additions & 12 deletions src/rust/catpowder/win/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,42 @@
//======================================================================================================================

use crate::runtime::fail::Fail;
use ::windows::core::HRESULT;
use ::std::ptr;
use ::windows::core::{
Error,
HRESULT,
};
use ::xdp_rs;

//======================================================================================================================
// Structures
//======================================================================================================================

#[derive(Clone)]
pub struct XdpApi {
pub endpoint: *const xdp_rs::XDP_API_TABLE,
}
/// A wrapper structure for an XDP API.
#[repr(C)]
pub struct XdpApi(*const xdp_rs::XDP_API_TABLE);

//======================================================================================================================
// Implementations
//======================================================================================================================

impl XdpApi {
/// Opens a new XDP API endpoint.
pub fn new() -> Result<Self, Fail> {
let mut api: *const xdp_rs::XDP_API_TABLE = std::ptr::null_mut();
let mut api: *const xdp_rs::XDP_API_TABLE = ptr::null_mut();

let result: HRESULT = unsafe { xdp_rs::XdpOpenApi(xdp_rs::XDP_API_VERSION_1, &mut api) };

let error: windows::core::Error = windows::core::Error::from_hresult(result);
let error: windows::core::Error = Error::from_hresult(result);
match error.code().is_ok() {
true => Ok(Self { endpoint: api }),
true => Ok(Self(api)),
false => Err(Fail::from(&error)),
}
}

pub fn endpoint(&self) -> xdp_rs::XDP_API_TABLE {
pub fn get(&self) -> xdp_rs::XDP_API_TABLE {
unsafe {
let api: *const xdp_rs::XDP_API_TABLE = self.endpoint;
let api: *const xdp_rs::XDP_API_TABLE = self.0;
*api
}
}
Expand All @@ -46,12 +50,12 @@ impl XdpApi {
impl Drop for XdpApi {
fn drop(&mut self) {
let api: xdp_rs::XDP_API_TABLE = unsafe {
let api: *const xdp_rs::XDP_API_TABLE = self.endpoint;
let api: *const xdp_rs::XDP_API_TABLE = self.0;
*api
};

if let Some(close) = api.XdpCloseApi {
unsafe { close(self.endpoint) };
unsafe { close(self.0) };
}
}
}
11 changes: 10 additions & 1 deletion src/rust/catpowder/win/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

//======================================================================================================================
// Modules
//======================================================================================================================

mod api;
mod buffer;
mod params;
mod program;
mod rule;
pub mod runtime;
mod rx_ring;
mod socket;
mod tx_ring;
mod umemreg;

//======================================================================================================================
// Exports
//======================================================================================================================

pub mod runtime;
3 changes: 2 additions & 1 deletion src/rust/catpowder/win/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use ::std::mem;
// Structures
//======================================================================================================================

#[repr(C)]
pub struct XdpRedirectParams {
redirect: xdp_rs::XDP_REDIRECT_PARAMS,
}
Expand All @@ -25,7 +26,7 @@ impl XdpRedirectParams {
let redirect: xdp_rs::XDP_REDIRECT_PARAMS = {
let mut redirect: xdp_rs::_XDP_REDIRECT_PARAMS = unsafe { mem::zeroed() };
redirect.TargetType = xdp_rs::_XDP_REDIRECT_TARGET_TYPE_XDP_REDIRECT_TARGET_TYPE_XSK;
redirect.Target = socket.socket;
redirect.Target = socket.get_socket();
redirect
};
Self { redirect }
Expand Down
25 changes: 14 additions & 11 deletions src/rust/catpowder/win/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use ::xdp_rs;
//======================================================================================================================

#[repr(C)]
#[derive(Default)]
pub struct XdpProgram(HANDLE);

//======================================================================================================================
Expand All @@ -45,15 +44,24 @@ impl XdpProgram {
) -> Result<XdpProgram, Fail> {
let rule: *const xdp_rs::XDP_RULE = rules.as_ptr() as *const xdp_rs::XDP_RULE;
let rule_count: u32 = rules.len() as u32;
let mut program: XdpProgram = XdpProgram::default();
let mut handle: HANDLE = HANDLE::default();

// Attempt to create the XDP program.
if let Some(create_program) = api.endpoint().XdpCreateProgram {
let result: HRESULT =
unsafe { create_program(ifindex, hookid, queueid, flags, rule, rule_count, program.as_ptr()) };
if let Some(create_program) = api.get().XdpCreateProgram {
let result: HRESULT = unsafe {
create_program(
ifindex,
hookid,
queueid,
flags,
rule,
rule_count,
&mut handle as *mut HANDLE,
)
};
let error: windows::core::Error = windows::core::Error::from_hresult(result);
match error.code().is_ok() {
true => Ok(program),
true => Ok(Self(handle)),
false => Err(Fail::from(&error)),
}
} else {
Expand All @@ -62,11 +70,6 @@ impl XdpProgram {
Err(Fail::new(libc::ENOSYS, &cause))
}
}

/// Casts the target XDP program to a raw pointer.
pub fn as_ptr(&mut self) -> *mut HANDLE {
&mut self.0 as *mut HANDLE
}
}

//======================================================================================================================
Expand Down
5 changes: 0 additions & 5 deletions src/rust/catpowder/win/rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ impl XdpRule {
let mut rule: xdp_rs::XDP_RULE = std::mem::zeroed();
rule.Match = xdp_rs::_XDP_MATCH_TYPE_XDP_MATCH_ALL;
rule.Action = xdp_rs::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT;
// TODO: Set pattern
// Perform bitwise copy from redirect to rule.
rule.__bindgen_anon_1 =
mem::transmute_copy::<xdp_rs::XDP_REDIRECT_PARAMS, xdp_rs::_XDP_RULE__bindgen_ty_1>(redirect.as_ptr());
Expand All @@ -38,8 +37,4 @@ impl XdpRule {
};
Self(rule)
}

pub fn as_ptr(&self) -> *const xdp_rs::XDP_RULE {
&self.0
}
}
62 changes: 36 additions & 26 deletions src/rust/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,50 +40,63 @@ use ::std::borrow::{
//======================================================================================================================

struct CatpowderRuntimeInner {
api: XdpApi,
tx: TxRing,
rx: RxRing,
}

/// A LibOS built on top of Windows XDP.
#[derive(Clone)]
pub struct CatpowderRuntime {
api: XdpApi,
inner: SharedObject<CatpowderRuntimeInner>,
}

//======================================================================================================================
// Associated Functions
//======================================================================================================================

impl CatpowderRuntimeInner {
fn notify_socket(&mut self, flags: i32, timeout: u32, outflags: &mut i32) -> Result<(), Fail> {
self.tx.notify_socket(&mut self.api, flags, timeout, outflags)
}
}

impl CatpowderRuntime {}

impl NetworkRuntime for CatpowderRuntime {
fn new(config: &Config) -> Result<Self, Fail> {
// TODO: read the following from the config file.
let index: u32 = config.local_interface_index()?;
let queueid: u32 = 0;
let ifindex: u32 = config.local_interface_index()?;
const QUEUEID: u32 = 0; // We do no use RSS, thus `queueid` is always 0.

trace!("Creating XDP runtime.");
let mut api: XdpApi = XdpApi::new()?;

let rx: RxRing = RxRing::new(&mut api, index, queueid)?;
let tx: TxRing = TxRing::new(&mut api, index, queueid)?;
// Open TX and RX rings
let rx: RxRing = RxRing::new(&mut api, ifindex, QUEUEID)?;
let tx: TxRing = TxRing::new(&mut api, ifindex, QUEUEID)?;

Ok(Self {
api,
inner: SharedObject::new(CatpowderRuntimeInner { rx, tx }),
inner: SharedObject::new(CatpowderRuntimeInner { api, rx, tx }),
})
}

fn transmit(&mut self, pkt: Box<dyn PacketBuf>) {
let header_size: usize = pkt.header_size();
let body_size: usize = pkt.body_size();
assert!(header_size + body_size < u16::MAX as usize);
trace!("header_size={:?}, body_size={:?}", header_size, body_size);
trace!("transmit(): header_size={:?}, body_size={:?}", header_size, body_size);

if header_size + body_size >= u16::MAX as usize {
warn!("packet is too large: {:?}", header_size + body_size);
return;
}

const COUNT: u32 = 1;
let mut idx: u32 = 0;

assert!(self.inner.borrow_mut().tx.producer_reserve(COUNT, &mut idx) == COUNT);
if self.inner.borrow_mut().tx.producer_reserve(COUNT, &mut idx) != COUNT {
warn!("failed to reserve producer space for packet");
return;
}

let mut buf: XdpBuffer = self.inner.borrow_mut().tx.get_element(idx);
buf.set_len(header_size + body_size);
Expand All @@ -96,24 +109,21 @@ impl NetworkRuntime for CatpowderRuntime {
self.inner.borrow_mut().tx.producer_submit(COUNT);

// Notify socket.
let mut outflags = xdp_rs::XSK_NOTIFY_RESULT_FLAGS::default();
self.inner
.borrow()
.tx
.notify_socket(
&mut self.api,
xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_POKE_TX | xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_WAIT_TX,
u32::MAX,
&mut outflags,
)
.unwrap();

if self.inner.borrow_mut().tx.consumer_reserve(COUNT, &mut idx) == COUNT {
self.inner.borrow_mut().tx.consumer_release(COUNT);
let mut outflags: i32 = xdp_rs::XSK_NOTIFY_RESULT_FLAGS::default();
let flags: i32 =
xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_POKE_TX | xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_WAIT_TX;

if let Err(e) = self.inner.borrow_mut().notify_socket(flags, u32::MAX, &mut outflags) {
warn!("failed to notify socket: {:?}", e);
return;
}

if self.inner.borrow_mut().tx.consumer_reserve(COUNT, &mut idx) != COUNT {
warn!("failed to send packet");
return;
}

warn!("failed to send packet");
self.inner.borrow_mut().tx.consumer_release(COUNT);
}

fn receive(&mut self) -> ArrayVec<DemiBuffer, RECEIVE_BATCH_SIZE> {
Expand Down
19 changes: 10 additions & 9 deletions src/rust/catpowder/win/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@ use crate::{
//======================================================================================================================

pub struct RxRing {
program: XdpProgram,
mem: UmemReg,
socket: XdpSocket,
rx_ring: XdpRing,
rx_fill_ring: XdpRing,
_program: XdpProgram,
_socket: XdpSocket,
}

//======================================================================================================================
// Implementations
//======================================================================================================================

impl RxRing {
pub fn new(api: &mut XdpApi, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
trace!("Creating XDP socket.");
Expand Down Expand Up @@ -90,7 +94,7 @@ impl RxRing {
let mut ring_index: u32 = 0;
rx_fill_ring.ring_producer_reserve(RING_SIZE, &mut ring_index);

let b = rx_fill_ring.ring_get_element(ring_index) as *mut u64;
let b: *mut u64 = rx_fill_ring.ring_get_element(ring_index) as *mut u64;
unsafe { *b = 0 };

trace!("Submitting RX ring buffer.");
Expand All @@ -99,22 +103,19 @@ impl RxRing {
trace!("Setting RX Fill ring.");

// Create XDP program.
trace!("Creating XDP program...");
const XDP_INSPECT_RX: xdp_rs::XDP_HOOK_ID = xdp_rs::XDP_HOOK_ID {
Layer: xdp_rs::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: xdp_rs::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
SubLayer: xdp_rs::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT,
};

let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];

trace!("Creating XDP program.");
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;

trace!("XDP program created.");
Ok(Self {
program,
_program: program,
mem,
socket,
_socket: socket,
rx_ring,
rx_fill_ring,
})
Expand Down
Loading

0 comments on commit 5021349

Please sign in to comment.