Skip to content

Commit

Permalink
[catpowder] Enhancement: Improve XDP LibOS
Browse files Browse the repository at this point in the history
  • Loading branch information
ppenna committed Jul 8, 2024
1 parent c86c304 commit 6c2094e
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 82 deletions.
14 changes: 14 additions & 0 deletions src/rust/catpowder/win/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ use ::std::{
DerefMut,
},
rc::Rc,
vec::Vec,
};

//======================================================================================================================
// Structures
//======================================================================================================================

/// A structure that represents a buffer in the UMEM region.
pub struct XdpBuffer {
b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
umemreg: Rc<RefCell<UmemReg>>,
Expand Down Expand Up @@ -57,12 +59,24 @@ impl XdpBuffer {
ptr = ptr.add(self.offset() as usize);
ptr as *mut core::ffi::c_void
}

fn to_vector(&self) -> Vec<u8> {
let mut out: Vec<u8> = Vec::with_capacity(self.len());
self[..].clone_into(&mut out);
out
}
}

//======================================================================================================================
// Trait Implementations
//======================================================================================================================

impl From<XdpBuffer> for Vec<u8> {
fn from(buffer: XdpBuffer) -> Vec<u8> {
buffer.to_vector()
}
}

impl Deref for XdpBuffer {
type Target = [u8];

Expand Down
2 changes: 1 addition & 1 deletion src/rust/catpowder/win/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl XdpRedirectParams {
let redirect: xdp_rs::XDP_REDIRECT_PARAMS = {
let mut redirect: xdp_rs::_XDP_REDIRECT_PARAMS = unsafe { mem::zeroed() };
redirect.TargetType = xdp_rs::_XDP_REDIRECT_TARGET_TYPE_XDP_REDIRECT_TARGET_TYPE_XSK;
redirect.Target = socket.get_socket();
redirect.Target = socket.into_raw();
redirect
};
Self(redirect)
Expand Down
10 changes: 9 additions & 1 deletion src/rust/catpowder/win/ring/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,45 @@
// Structures
//======================================================================================================================

/// A wrapper structure for a XDP ring.
#[repr(C)]
pub(super) struct XdpRing(xdp_rs::XSK_RING);

//======================================================================================================================
// Implementations
//======================================================================================================================

impl XdpRing {
pub(super) fn initialize(info: &xdp_rs::XSK_RING_INFO) -> Self {
/// Initializes a XDP ring.
pub(super) fn new(info: &xdp_rs::XSK_RING_INFO) -> Self {
Self(unsafe {
let mut ring: xdp_rs::XSK_RING = std::mem::zeroed();
xdp_rs::_XskRingInitialize(&mut ring, info);
ring
})
}

/// Reserves a consumer slot in the target ring.
pub(super) fn consumer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 {
unsafe { xdp_rs::_XskRingConsumerReserve(&mut self.0, count, idx) }
}

/// Releases a consumer slot in the target ring.
pub(super) fn consumer_release(&mut self, count: u32) {
unsafe { xdp_rs::_XskRingConsumerRelease(&mut self.0, count) }
}

/// Reserves a producer slot in the target ring.
pub(super) fn producer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 {
unsafe { xdp_rs::_XskRingProducerReserve(&mut self.0, count, idx) }
}

/// Submits a producer slot in the target ring.
pub(super) fn producer_submit(&mut self, count: u32) {
unsafe { xdp_rs::_XskRingProducerSubmit(&mut self.0, count) }
}

/// Gets the element at the target index.
pub(super) fn get_element(&self, idx: u32) -> *mut std::ffi::c_void {
unsafe { xdp_rs::_XskRingGetElement(&self.0, idx) }
}
Expand Down
82 changes: 48 additions & 34 deletions src/rust/catpowder/win/ring/rx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,57 +29,72 @@ use ::std::{
// Structures
//======================================================================================================================

/// A ring for receiving packets.
pub struct RxRing {
/// A user memory region where receive buffers are stored.
mem: Rc<RefCell<UmemReg>>,
/// A ring for receiving packets.
rx_ring: XdpRing,
/// A ring for returning receive buffers to the kernel.
rx_fill_ring: XdpRing,
_program: XdpProgram,
_socket: XdpSocket,
/// Underlying XDP socket.
_socket: XdpSocket, // NOTE: we keep this here to prevent the socket from being dropped.
/// Underlying XDP program.
_program: XdpProgram, // NOTE: we keep this here to prevent the program from being dropped.
}

//======================================================================================================================
// Implementations
//======================================================================================================================

impl RxRing {
pub fn new(api: &mut XdpApi, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
trace!("Creating XDP socket.");
/// Creates a new ring for receiving packets.
pub fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
// Create an XDP socket.
trace!("creating xdp socket");
let mut socket: XdpSocket = XdpSocket::create(api)?;

const RING_SIZE: u32 = 1;
let mem: Rc<RefCell<UmemReg>> = Rc::new(RefCell::new(UmemReg::new(RING_SIZE, limits::RECVBUF_SIZE_MAX as u32)));
// Create a UMEM region.
trace!("creating umem region");
let mem: Rc<RefCell<UmemReg>> = Rc::new(RefCell::new(UmemReg::new(length, limits::RECVBUF_SIZE_MAX as u32)));

trace!("Registering UMEM.");
// Register the UMEM region.
trace!("registering umem region");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_UMEM_REG,
mem.borrow().as_ref() as *const xdp_rs::XSK_UMEM_REG as *const core::ffi::c_void,
std::mem::size_of::<xdp_rs::XSK_UMEM_REG>() as u32,
)?;

trace!("Setting RX ring size.");
// Set rx ring size.
trace!("setting rx ring size");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_RX_RING_SIZE,
&RING_SIZE as *const u32 as *const core::ffi::c_void,
&length as *const u32 as *const core::ffi::c_void,
std::mem::size_of::<u32>() as u32,
)?;

trace!("Setting RX Fill ring size.");
// Set rx fill ring size.
trace!("setting rx fill ring size");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_RX_FILL_RING_SIZE,
&RING_SIZE as *const u32 as *const core::ffi::c_void,
&length as *const u32 as *const core::ffi::c_void,
std::mem::size_of::<u32>() as u32,
)?;

trace!("Binding RX queue.");
// Bind the rx queue.
trace!("binding rx queue");
socket.bind(api, ifindex, queueid, xdp_rs::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?;

trace!("Activating XDP socket.");
// Activate socket to enable packet reception.
trace!("activating xdp socket");
socket.activate(api, xdp_rs::_XSK_ACTIVATE_FLAGS_XSK_ACTIVATE_FLAG_NONE)?;

trace!("Getting RX ring info.");
// Retrieve rx ring info.
trace!("retrieving rx ring info");
let mut ring_info: xdp_rs::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() };
let mut option_length: u32 = std::mem::size_of::<xdp_rs::XSK_RING_INFO_SET>() as u32;
socket.getsockopt(
Expand All @@ -89,23 +104,20 @@ impl RxRing {
&mut option_length as *mut u32,
)?;

let mut rx_fill_ring: XdpRing = XdpRing::initialize(&ring_info.Fill);
let rx_ring: XdpRing = XdpRing::initialize(&ring_info.Rx);
// Initialize rx and rx fill rings.
let mut rx_fill_ring: XdpRing = XdpRing::new(&ring_info.Fill);
let rx_ring: XdpRing = XdpRing::new(&ring_info.Rx);

trace!("Reserving RX ring buffer.");
// Submit rx buffer to the kernel.
trace!("submitting rx ring buffer");
let mut ring_index: u32 = 0;
rx_fill_ring.producer_reserve(RING_SIZE, &mut ring_index);

rx_fill_ring.producer_reserve(length, &mut ring_index);
let b: *mut u64 = rx_fill_ring.get_element(ring_index) as *mut u64;
unsafe { *b = 0 };

trace!("Submitting RX ring buffer.");
rx_fill_ring.producer_submit(RING_SIZE);

trace!("Setting RX Fill ring.");
rx_fill_ring.producer_submit(length);

// Create XDP program.
trace!("Creating XDP program...");
trace!("creating xdp program");
const XDP_INSPECT_RX: xdp_rs::XDP_HOOK_ID = xdp_rs::XDP_HOOK_ID {
Layer: xdp_rs::_XDP_HOOK_LAYER_XDP_HOOK_L2,
Direction: xdp_rs::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX,
Expand All @@ -114,26 +126,21 @@ impl RxRing {
let rules: Vec<XdpRule> = vec![XdpRule::new(&socket)];
let program: XdpProgram = XdpProgram::new(api, &rules, ifindex, &XDP_INSPECT_RX, queueid, 0)?;

trace!("xdp program created");

Ok(Self {
_program: program,
mem,
_socket: socket,
rx_ring,
rx_fill_ring,
_socket: socket,
_program: program,
})
}

pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
self.rx_ring.consumer_reserve(count, idx)
}

pub fn get_element(&self, idx: u32) -> XdpBuffer {
XdpBuffer::new(
self.rx_ring.get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
self.mem.clone(),
)
}

pub fn consumer_release(&mut self, count: u32) {
self.rx_ring.consumer_release(count);
}
Expand All @@ -145,4 +152,11 @@ impl RxRing {
pub fn producer_submit(&mut self, count: u32) {
self.rx_fill_ring.producer_submit(count);
}

pub fn get_element(&self, idx: u32) -> XdpBuffer {
XdpBuffer::new(
self.rx_ring.get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
self.mem.clone(),
)
}
}
65 changes: 40 additions & 25 deletions src/rust/catpowder/win/ring/tx_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,52 +27,66 @@ use ::std::{
// Structures
//======================================================================================================================

/// A ring for transmitting packets.
pub struct TxRing {
/// A user memory region where transmit buffers are stored.
mem: Rc<RefCell<UmemReg>>,
socket: XdpSocket,
/// A ring for transmitting packets.
tx_ring: XdpRing,
/// A ring for returning transmit buffers to the kernel.
tx_completion_ring: XdpRing,
/// Underlying XDP socket.
socket: XdpSocket,
}

impl TxRing {
pub fn new(api: &mut XdpApi, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
trace!("Creating XDP socket.");
/// Creates a new ring for transmitting packets.
pub fn new(api: &mut XdpApi, length: u32, ifindex: u32, queueid: u32) -> Result<Self, Fail> {
// Create an XDP socket.
trace!("creating xdp socket");
let mut socket: XdpSocket = XdpSocket::create(api)?;

// Create a UMEM region.
trace!("creating umem region");
let mem: Rc<RefCell<UmemReg>> = Rc::new(RefCell::new(UmemReg::new(1, limits::RECVBUF_SIZE_MAX as u32)));

trace!("Registering UMEM.");
// Register the UMEM region.
trace!("registering umem region");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_UMEM_REG,
mem.borrow().as_ref() as *const xdp_rs::XSK_UMEM_REG as *const core::ffi::c_void,
std::mem::size_of::<xdp_rs::XSK_UMEM_REG>() as u32,
)?;
const RING_SIZE: u32 = 1;

trace!("Setting TX ring size.");
// Set tx ring size.
trace!("setting tx ring size");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_TX_RING_SIZE,
&RING_SIZE as *const u32 as *const core::ffi::c_void,
&length as *const u32 as *const core::ffi::c_void,
std::mem::size_of::<u32>() as u32,
)?;

trace!("Setting TX completion ring size.");
// Set tx completion ring size.
trace!("setting tx completion ring size");
socket.setsockopt(
api,
xdp_rs::XSK_SOCKOPT_TX_COMPLETION_RING_SIZE,
&RING_SIZE as *const u32 as *const core::ffi::c_void,
&length as *const u32 as *const core::ffi::c_void,
std::mem::size_of::<u32>() as u32,
)?;

trace!("Binding TX queue.");
// Bind tx queue.
trace!("binding tx queue");
socket.bind(api, ifindex, queueid, xdp_rs::_XSK_BIND_FLAGS_XSK_BIND_FLAG_TX)?;

trace!("Activating XDP socket.");
// Activate socket to enable packet transmission.
trace!("activating xdp socket");
socket.activate(api, xdp_rs::_XSK_ACTIVATE_FLAGS_XSK_ACTIVATE_FLAG_NONE)?;

trace!("Getting TX ring info.");
// Retrieve tx ring info.
trace!("retrieving tx ring info");
let mut ring_info: xdp_rs::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() };
let mut option_length: u32 = std::mem::size_of::<xdp_rs::XSK_RING_INFO_SET>() as u32;
socket.getsockopt(
Expand All @@ -82,34 +96,27 @@ impl TxRing {
&mut option_length as *mut u32,
)?;

let tx_ring: XdpRing = XdpRing::initialize(&ring_info.Tx);
let tx_completion_ring: XdpRing = XdpRing::initialize(&ring_info.Completion);
// Initialize tx and tx completion rings.
let tx_ring: XdpRing = XdpRing::new(&ring_info.Tx);
let tx_completion_ring: XdpRing = XdpRing::new(&ring_info.Completion);

trace!("XDP program created.");
Ok(Self {
mem,
socket,
tx_ring,
tx_completion_ring,
socket,
})
}

/// Notifies the socket that there are packets to be transmitted.
pub fn notify_socket(
&self,
api: &mut XdpApi,
flags: i32,
count: u32,
outflags: &mut xdp_rs::XSK_NOTIFY_RESULT_FLAGS,
) -> Result<(), Fail> {
self.socket.notify_socket(api, flags, count, outflags)
}

pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
self.tx_completion_ring.consumer_reserve(count, idx)
}

pub fn consumer_release(&mut self, count: u32) {
self.tx_completion_ring.consumer_release(count);
self.socket.notify(api, flags, count, outflags)
}

pub fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
Expand All @@ -120,6 +127,14 @@ impl TxRing {
self.tx_ring.producer_submit(count);
}

pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 {
self.tx_completion_ring.consumer_reserve(count, idx)
}

pub fn consumer_release(&mut self, count: u32) {
self.tx_completion_ring.consumer_release(count);
}

pub fn get_element(&self, idx: u32) -> XdpBuffer {
XdpBuffer::new(
self.tx_ring.get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR,
Expand Down
Loading

0 comments on commit 6c2094e

Please sign in to comment.