Skip to content

Commit

Permalink
[catpowder] Enhancement: Improve XDP LibOS
Browse files Browse the repository at this point in the history
  • Loading branch information
ppenna committed Jul 8, 2024
1 parent 6c2094e commit 5f7ff0f
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 53 deletions.
1 change: 0 additions & 1 deletion src/rust/catpowder/win/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
//======================================================================================================================

mod api;
mod buffer;
mod params;
mod program;
mod ring;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use ::std::{

/// A structure that represents a buffer in the UMEM region.
pub struct XdpBuffer {
/// A pointer to the buffer descriptor.
b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
/// UMEM region that contains the buffer.
umemreg: Rc<RefCell<UmemReg>>,
}

Expand All @@ -31,35 +33,41 @@ pub struct XdpBuffer {
//======================================================================================================================

impl XdpBuffer {
pub fn new(b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, umemreg: Rc<RefCell<UmemReg>>) -> Self {
/// Instantiates a buffer.
pub(super) fn new(b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, umemreg: Rc<RefCell<UmemReg>>) -> Self {
Self { b, umemreg }
}

pub fn len(&self) -> usize {
unsafe { (*self.b).Length as usize }
}

pub fn set_len(&mut self, len: usize) {
/// Sets the length of the target buffer.
pub(super) fn set_len(&mut self, len: usize) {
unsafe {
(*self.b).Length = len as u32;
}
}

unsafe fn base_address(&self) -> u64 {
/// Gets the length of the target buffer.
fn len(&self) -> usize {
unsafe { (*self.b).Length as usize }
}

/// Gets the relative base address of the target buffer.
unsafe fn relative_base_address(&self) -> u64 {
(*self.b).Address.__bindgen_anon_1.BaseAddress()
}

unsafe fn offset(&self) -> u64 {
(*self.b).Address.__bindgen_anon_1.Offset()
}

unsafe fn addr(&self) -> *mut core::ffi::c_void {
/// Computes the address of the target buffer.
unsafe fn compute_address(&self) -> *mut core::ffi::c_void {
let mut ptr: *mut u8 = self.umemreg.borrow_mut().address() as *mut u8;
ptr = ptr.add(self.base_address() as usize);
ptr = ptr.add(self.relative_base_address() as usize);
ptr = ptr.add(self.offset() as usize);
ptr as *mut core::ffi::c_void
}

/// Creates a vector with the contents of the target buffer.
fn to_vector(&self) -> Vec<u8> {
let mut out: Vec<u8> = Vec::with_capacity(self.len());
self[..].clone_into(&mut out);
Expand All @@ -81,12 +89,12 @@ impl Deref for XdpBuffer {
type Target = [u8];

fn deref(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.addr() as *const u8, self.len()) }
unsafe { std::slice::from_raw_parts(self.compute_address() as *const u8, self.len()) }
}
}

impl DerefMut for XdpBuffer {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { std::slice::from_raw_parts_mut(self.addr() as *mut u8, self.len()) }
unsafe { std::slice::from_raw_parts_mut(self.compute_address() as *mut u8, self.len()) }
}
}
2 changes: 2 additions & 0 deletions src/rust/catpowder/win/ring/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Modules
//======================================================================================================================

mod buffer;
mod generic;
mod rx_ring;
mod tx_ring;
Expand All @@ -13,5 +14,6 @@ mod tx_ring;
// Exports
//======================================================================================================================

pub use buffer::XdpBuffer;
pub use rx_ring::RxRing;
pub use tx_ring::TxRing;
21 changes: 14 additions & 7 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
use crate::{
catpowder::win::{
api::XdpApi,
buffer::XdpBuffer,
program::XdpProgram,
ring::generic::XdpRing,
ring::{
buffer::XdpBuffer,
generic::XdpRing,
},
rule::XdpRule,
socket::XdpSocket,
umemreg::UmemReg,
Expand Down Expand Up @@ -137,23 +139,28 @@ impl RxRing {
})
}

pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
/// Reserves a consumer slot in the rx ring.
pub fn reserve_rx(&mut self, count: u32, idx: &mut u32) -> u32 {
self.rx_ring.consumer_reserve(count, idx)
}

pub fn consumer_release(&mut self, count: u32) {
/// Releases a consumer slot in the rx ring.
pub fn release_rx(&mut self, count: u32) {
self.rx_ring.consumer_release(count);
}

pub fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
/// Reserves a producer slot in the rx fill ring.
pub fn reserve_rx_fill(&mut self, count: u32, idx: &mut u32) -> u32 {
self.rx_fill_ring.producer_reserve(count, idx)
}

pub fn producer_submit(&mut self, count: u32) {
/// Submits a producer slot in the rx fill ring.
pub fn submit_rx_fill(&mut self, count: u32) {
self.rx_fill_ring.producer_submit(count);
}

pub fn get_element(&self, idx: u32) -> XdpBuffer {
/// Gets the buffer at the target index.
pub fn get_buffer(&self, idx: u32) -> XdpBuffer {
XdpBuffer::new(
self.rx_ring.get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
self.mem.clone(),
Expand Down
27 changes: 18 additions & 9 deletions src/rust/catpowder/win/ring/tx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
use crate::{
catpowder::win::{
api::XdpApi,
buffer::XdpBuffer,
ring::generic::XdpRing,
ring::{
buffer::XdpBuffer,
generic::XdpRing,
},
socket::XdpSocket,
umemreg::UmemReg,
},
Expand Down Expand Up @@ -119,26 +121,33 @@ impl TxRing {
self.socket.notify(api, flags, count, outflags)
}

pub fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
/// Reserves a producer slot in the tx ring.
pub fn reserve_tx(&mut self, count: u32, idx: &mut u32) -> u32 {
self.tx_ring.producer_reserve(count, idx)
}

pub fn producer_submit(&mut self, count: u32) {
/// Submits a producer slot in the tx ring.
pub fn submit_tx(&mut self, count: u32) {
self.tx_ring.producer_submit(count);
}

pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
/// Reserves a consumer slot in the tx completion ring.
pub fn reserve_tx_completion(&mut self, count: u32, idx: &mut u32) -> u32 {
self.tx_completion_ring.consumer_reserve(count, idx)
}

pub fn consumer_release(&mut self, count: u32) {
/// Releases a consumer slot in the tx completion ring.
pub fn release_tx_completion(&mut self, count: u32) {
self.tx_completion_ring.consumer_release(count);
}

pub fn get_element(&self, idx: u32) -> XdpBuffer {
XdpBuffer::new(
/// Gets the buffer at the target index and set its length.
pub fn get_buffer(&self, idx: u32, len: usize) -> XdpBuffer {
let mut buf = XdpBuffer::new(
self.tx_ring.get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
self.mem.clone(),
)
);
buf.set_len(len);
buf
}
}
56 changes: 31 additions & 25 deletions src/rust/catpowder/win/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
use crate::{
catpowder::win::{
api::XdpApi,
buffer::XdpBuffer,
ring::{
RxRing,
TxRing,
XdpBuffer,
},
},
demikernel::config::Config,
Expand Down Expand Up @@ -41,28 +41,19 @@ use ::std::borrow::{
// Structures
//======================================================================================================================

/// A LibOS built on top of Windows XDP.
#[derive(Clone)]
pub struct CatpowderRuntime(SharedObject<CatpowderRuntimeInner>);

/// The inner state of the Catpowder runtime.
struct CatpowderRuntimeInner {
api: XdpApi,
tx: TxRing,
rx: RxRing,
}

/// A LibOS built on top of Windows XDP.
#[derive(Clone)]
pub struct CatpowderRuntime(SharedObject<CatpowderRuntimeInner>);

//======================================================================================================================
// Implementations
//======================================================================================================================

impl CatpowderRuntimeInner {
/// Notifies the socket that there are packets to be transmitted.
fn notify_socket(&mut self, flags: i32, timeout: u32, outflags: &mut i32) -> Result<(), Fail> {
self.tx.notify_socket(&mut self.api, flags, timeout, outflags)
}
}

impl CatpowderRuntime {
///
/// Number of buffers in the rings.
Expand All @@ -74,6 +65,7 @@ impl CatpowderRuntime {
}

impl NetworkRuntime for CatpowderRuntime {
/// Instantiates a new XDP runtime.
fn new(config: &Config) -> Result<Self, Fail> {
let ifindex: u32 = config.local_interface_index()?;
const QUEUEID: u32 = 0; // We do no use RSS, thus queue id is always 0.
Expand All @@ -88,6 +80,7 @@ impl NetworkRuntime for CatpowderRuntime {
Ok(Self(SharedObject::new(CatpowderRuntimeInner { api, rx, tx })))
}

/// Transmits a packet.
fn transmit(&mut self, pkt: Box<dyn PacketBuf>) {
let header_size: usize = pkt.header_size();
let body_size: usize = pkt.body_size();
Expand All @@ -100,20 +93,19 @@ impl NetworkRuntime for CatpowderRuntime {

let mut idx: u32 = 0;

if self.0.borrow_mut().tx.producer_reserve(Self::RING_LENGTH, &mut idx) != Self::RING_LENGTH {
if self.0.borrow_mut().tx.reserve_tx(Self::RING_LENGTH, &mut idx) != Self::RING_LENGTH {
warn!("failed to reserve producer space for packet");
return;
}

let mut buf: XdpBuffer = self.0.borrow_mut().tx.get_element(idx);
buf.set_len(header_size + body_size);
let mut buf: XdpBuffer = self.0.borrow_mut().tx.get_buffer(idx, header_size + body_size);

pkt.write_header(&mut buf[..header_size]);
if let Some(body) = pkt.take_body() {
buf[header_size..].copy_from_slice(&body[..]);
}

self.0.borrow_mut().tx.producer_submit(Self::RING_LENGTH);
self.0.borrow_mut().tx.submit_tx(Self::RING_LENGTH);

// Notify socket.
let mut outflags: i32 = xdp_rs::XSK_NOTIFY_RESULT_FLAGS::default();
Expand All @@ -125,37 +117,51 @@ impl NetworkRuntime for CatpowderRuntime {
return;
}

if self.0.borrow_mut().tx.consumer_reserve(Self::RING_LENGTH, &mut idx) != Self::RING_LENGTH {
if self
.0
.borrow_mut()
.tx
.reserve_tx_completion(Self::RING_LENGTH, &mut idx)
!= Self::RING_LENGTH
{
warn!("failed to send packet");
return;
}

self.0.borrow_mut().tx.consumer_release(Self::RING_LENGTH);
self.0.borrow_mut().tx.release_tx_completion(Self::RING_LENGTH);
}

/// Polls for received packets.
fn receive(&mut self) -> ArrayVec<DemiBuffer, RECEIVE_BATCH_SIZE> {
let mut ret: ArrayVec<DemiBuffer, RECEIVE_BATCH_SIZE> = ArrayVec::new();
let mut idx: u32 = 0;

if self.0.borrow_mut().rx.consumer_reserve(Self::RING_LENGTH, &mut idx) == Self::RING_LENGTH {
let xdp_buffer: XdpBuffer = self.0.borrow().rx.get_element(idx);
if self.0.borrow_mut().rx.reserve_rx(Self::RING_LENGTH, &mut idx) == Self::RING_LENGTH {
let xdp_buffer: XdpBuffer = self.0.borrow().rx.get_buffer(idx);
let out: Vec<u8> = xdp_buffer.into();

let dbuf: DemiBuffer = expect_ok!(DemiBuffer::from_slice(&out), "'bytes' should fit");

ret.push(dbuf);

self.0.borrow_mut().rx.consumer_release(Self::RING_LENGTH);
self.0.borrow_mut().rx.release_rx(Self::RING_LENGTH);

self.0.borrow_mut().rx.producer_reserve(Self::RING_LENGTH, &mut idx);
self.0.borrow_mut().rx.reserve_rx_fill(Self::RING_LENGTH, &mut idx);

self.0.borrow_mut().rx.producer_submit(Self::RING_LENGTH);
self.0.borrow_mut().rx.submit_rx_fill(Self::RING_LENGTH);
}

ret
}
}

impl CatpowderRuntimeInner {
/// Notifies the socket that there are packets to be transmitted.
fn notify_socket(&mut self, flags: i32, timeout: u32, outflags: &mut i32) -> Result<(), Fail> {
self.tx.notify_socket(&mut self.api, flags, timeout, outflags)
}
}

//======================================================================================================================
// Trait Implementations
//======================================================================================================================
Expand Down

0 comments on commit 5f7ff0f

Please sign in to comment.