diff --git a/src/rust/catpowder/mod.rs b/src/rust/catpowder/mod.rs index 3943d70eb..d0247dcd9 100644 --- a/src/rust/catpowder/mod.rs +++ b/src/rust/catpowder/mod.rs @@ -6,3 +6,9 @@ mod linux; #[cfg(target_os = "linux")] pub use linux::LinuxRuntime as CatpowderRuntime; + +#[cfg(target_os = "windows")] +mod win; + +#[cfg(target_os = "windows")] +pub use win::runtime::CatpowderRuntime; diff --git a/src/rust/catpowder/win/buffer.rs b/src/rust/catpowder/win/buffer.rs new file mode 100644 index 000000000..262fe55e5 --- /dev/null +++ b/src/rust/catpowder/win/buffer.rs @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::catpowder::win::umemreg::UmemReg; +use ::std::ops::{ + Deref, + DerefMut, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +pub struct XdpBuffer { + b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, + umemreg: UmemReg, +} + +//====================================================================================================================== +// Implementations +//====================================================================================================================== + +impl XdpBuffer { + pub fn new(b: *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, umemreg: UmemReg) -> Self { + Self { b, umemreg } + } + + pub fn len(&self) -> usize { + unsafe { (*self.b).Length as usize } + } + + pub fn set_len(&mut self, len: usize) { + unsafe { + (*self.b).Length = len as u32; + } + } + + unsafe fn base_address(&self) -> u64 { + (*self.b).Address.__bindgen_anon_1.BaseAddress() + } + + unsafe fn offset(&self) -> u64 { + (*self.b).Address.__bindgen_anon_1.Offset() + } + + unsafe fn addr(&self) -> *mut core::ffi::c_void { + let mut ptr: *mut u8 = self.umemreg.get_address() as *mut u8; + ptr = ptr.add(self.base_address() as usize); + ptr = ptr.add(self.offset() as usize); + ptr as *mut core::ffi::c_void + } +} + +impl Deref for XdpBuffer { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.addr() as *const u8, self.len()) } + } +} + +impl DerefMut for XdpBuffer { + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { std::slice::from_raw_parts_mut(self.addr() as *mut u8, self.len()) } + } +} diff --git a/src/rust/catpowder/win/mod.rs b/src/rust/catpowder/win/mod.rs new file mode 100644 index 000000000..db970e520 --- /dev/null +++ b/src/rust/catpowder/win/mod.rs @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +mod buffer; +mod params; +mod program; +mod rule; +pub mod runtime; +mod rx_ring; +mod socket; +mod tx_ring; +mod umemreg; diff --git a/src/rust/catpowder/win/params.rs b/src/rust/catpowder/win/params.rs new file mode 100644 index 000000000..c73ed2509 --- /dev/null +++ b/src/rust/catpowder/win/params.rs @@ -0,0 +1,37 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::catpowder::win::socket::XdpSocket; +use ::std::mem; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +pub struct XdpRedirectParams { + redirect: xdp_rs::XDP_REDIRECT_PARAMS, +} + +//====================================================================================================================== +// Implementations +//====================================================================================================================== + +impl XdpRedirectParams { + pub fn new(socket: &XdpSocket) -> Self { + let redirect: xdp_rs::XDP_REDIRECT_PARAMS = { + let mut redirect: xdp_rs::_XDP_REDIRECT_PARAMS = unsafe { mem::zeroed() }; + redirect.TargetType = xdp_rs::_XDP_REDIRECT_TARGET_TYPE_XDP_REDIRECT_TARGET_TYPE_XSK; + redirect.Target = socket.socket; + redirect + }; + Self { redirect } + } + + pub fn as_ptr(&self) -> &xdp_rs::XDP_REDIRECT_PARAMS { + &self.redirect + } +} diff --git a/src/rust/catpowder/win/program.rs b/src/rust/catpowder/win/program.rs new file mode 100644 index 000000000..6f1bcedac --- /dev/null +++ b/src/rust/catpowder/win/program.rs @@ -0,0 +1,27 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use ::windows::Win32::Foundation::HANDLE; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +#[derive(Default)] +pub struct XdpProgram { + program: HANDLE, +} + +//====================================================================================================================== +// Implementations +//====================================================================================================================== + +impl XdpProgram { + pub fn as_ptr(&mut self) -> *mut HANDLE { + &mut self.program as *mut HANDLE + } +} diff --git a/src/rust/catpowder/win/rule.rs b/src/rust/catpowder/win/rule.rs new file mode 100644 index 000000000..0388fdd6e --- /dev/null +++ b/src/rust/catpowder/win/rule.rs @@ -0,0 +1,46 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::catpowder::win::{ + params::XdpRedirectParams, + socket::XdpSocket, +}; +use ::std::mem; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +pub struct XdpRule { + rule: xdp_rs::XDP_RULE, +} + +//====================================================================================================================== +// Implementations +//====================================================================================================================== + +impl XdpRule { + pub fn new(socket: &XdpSocket) -> Self { + let redirect: XdpRedirectParams = XdpRedirectParams::new(socket); + let rule: xdp_rs::XDP_RULE = unsafe { + let mut rule: xdp_rs::XDP_RULE = std::mem::zeroed(); + rule.Match = xdp_rs::_XDP_MATCH_TYPE_XDP_MATCH_ALL; + rule.Action = xdp_rs::_XDP_RULE_ACTION_XDP_PROGRAM_ACTION_REDIRECT; + // TODO: Set pattern + // Perform bitwise copy from redirect to rule. + rule.__bindgen_anon_1 = + mem::transmute_copy::(redirect.as_ptr()); + + rule + }; + Self { rule } + } + + pub fn as_ptr(&self) -> *const xdp_rs::XDP_RULE { + &self.rule + } +} diff --git a/src/rust/catpowder/win/runtime.rs b/src/rust/catpowder/win/runtime.rs new file mode 100644 index 000000000..fb06e6392 --- /dev/null +++ b/src/rust/catpowder/win/runtime.rs @@ -0,0 +1,156 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::{ + catpowder::win::{ + buffer::XdpBuffer, + rx_ring::RxRing, + socket::XdpApi, + tx_ring::TxRing, + }, + demikernel::config::Config, + expect_ok, + runtime::{ + fail::Fail, + memory::{ + DemiBuffer, + MemoryRuntime, + }, + network::{ + consts::RECEIVE_BATCH_SIZE, + NetworkRuntime, + PacketBuf, + }, + Runtime, + SharedObject, + }, +}; +use ::arrayvec::ArrayVec; +use ::std::borrow::{ + Borrow, + BorrowMut, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +struct CatpowderRuntimeInner { + tx: TxRing, + rx: RxRing, +} +/// Underlying network transport. +#[derive(Clone)] +pub struct CatpowderRuntime { + api: XdpApi, + inner: SharedObject, +} + +/// A network transport built on top of Windows XDP. +#[derive(Clone)] +pub struct SharedXdpTransport(SharedObject); + +//====================================================================================================================== +// Associated Functions +//====================================================================================================================== +impl CatpowderRuntime {} + +impl NetworkRuntime for CatpowderRuntime { + fn new(igconfig: &Config) -> Result { + trace!("Creating XDP runtime."); + let mut api: XdpApi = XdpApi::new()?; + + // TODO: read the following from the config file. + let index: u32 = 5; + let queueid: u32 = 0; + + let rx: RxRing = RxRing::new(&mut api, index, queueid)?; + let tx: TxRing = TxRing::new(&mut api, index, queueid)?; + + Ok(Self { + api, + inner: SharedObject::new(CatpowderRuntimeInner { rx, tx }), + }) + } + + fn transmit(&mut self, pkt: Box) { + let header_size: usize = pkt.header_size(); + let body_size: usize = pkt.body_size(); + assert!(header_size + body_size < u16::MAX as usize); + trace!("header_size={:?}, body_size={:?}", header_size, body_size); + + const COUNT: u32 = 1; + let mut idx: u32 = 0; + + assert!(self.inner.borrow_mut().tx.producer_reserve(COUNT, &mut idx) == COUNT); + + let mut buf: XdpBuffer = self.inner.borrow_mut().tx.get_element(idx); + buf.set_len(header_size + body_size); + + pkt.write_header(&mut buf[..header_size]); + if let Some(body) = pkt.take_body() { + buf[header_size..].copy_from_slice(&body[..]); + } + + self.inner.borrow_mut().tx.producer_submit(COUNT); + + // Notify socket. + let mut outflags = xdp_rs::XSK_NOTIFY_RESULT_FLAGS::default(); + self.inner + .borrow() + .tx + .notify_socket( + &mut self.api, + xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_POKE_TX | xdp_rs::_XSK_NOTIFY_FLAGS_XSK_NOTIFY_FLAG_WAIT_TX, + u32::MAX, + &mut outflags, + ) + .unwrap(); + + if self.inner.borrow_mut().tx.consumer_reserve(COUNT, &mut idx) == COUNT { + self.inner.borrow_mut().tx.consumer_release(COUNT); + return; + } + + warn!("failed to send packet"); + } + + fn receive(&mut self) -> ArrayVec { + let mut ret: ArrayVec = ArrayVec::new(); + const COUNT: u32 = 1; + let mut idx: u32 = 0; + + if self.inner.borrow_mut().rx.consumer_reserve(COUNT, &mut idx) == COUNT { + let xdpbuf: XdpBuffer = self.inner.borrow().rx.get_element(idx); + let mut out: Vec = Vec::with_capacity(xdpbuf.len()); + + xdpbuf[..].clone_into(&mut out); + + let dbuf: DemiBuffer = expect_ok!(DemiBuffer::from_slice(&out), "'bytes' should fit"); + + ret.push(dbuf); + + self.inner.borrow_mut().rx.consumer_release(COUNT); + + self.inner.borrow_mut().rx.producer_reserve(COUNT, &mut idx); + + self.inner.borrow_mut().rx.producer_submit(COUNT); + } + + ret + } +} + +//====================================================================================================================== +// Trait Implementations +//====================================================================================================================== + +/// Memory runtime trait implementation for XDP Runtime. +impl MemoryRuntime for CatpowderRuntime {} + +/// Runtime trait implementation for XDP Runtime. +impl Runtime for CatpowderRuntime {} diff --git a/src/rust/catpowder/win/rx_ring.rs b/src/rust/catpowder/win/rx_ring.rs new file mode 100644 index 000000000..aaabc2b8e --- /dev/null +++ b/src/rust/catpowder/win/rx_ring.rs @@ -0,0 +1,145 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::{ + catpowder::win::{ + buffer::XdpBuffer, + program::XdpProgram, + rule::XdpRule, + socket::{ + XdpApi, + XdpRing, + XdpSocket, + }, + umemreg::UmemReg, + }, + runtime::{ + fail::Fail, + limits, + }, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +pub struct RxRing { + program: XdpProgram, + mem: UmemReg, + socket: XdpSocket, + rx_ring: XdpRing, + rx_fill_ring: XdpRing, +} + +impl RxRing { + pub fn new(api: &mut XdpApi, ifindex: u32, queueid: u32) -> Result { + trace!("Creating XDP socket."); + let mut socket: XdpSocket = XdpSocket::create(api)?; + + const RING_SIZE: u32 = 1; + let mem: UmemReg = UmemReg::new(RING_SIZE, limits::RECVBUF_SIZE_MAX as u32); + + trace!("Registering UMEM."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_UMEM_REG, + mem.as_ptr() as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + trace!("Setting RX ring size."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_RX_RING_SIZE, + &RING_SIZE as *const u32 as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + trace!("Setting RX Fill ring size."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_RX_FILL_RING_SIZE, + &RING_SIZE as *const u32 as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + trace!("Binding RX queue."); + socket.bind(api, ifindex, queueid, xdp_rs::_XSK_BIND_FLAGS_XSK_BIND_FLAG_RX)?; + + trace!("Activating XDP socket."); + socket.activate(api, xdp_rs::_XSK_ACTIVATE_FLAGS_XSK_ACTIVATE_FLAG_NONE)?; + + trace!("Getting RX ring info."); + let mut ring_info: xdp_rs::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() }; + let mut option_length: u32 = std::mem::size_of::() as u32; + socket.getsockopt( + api, + xdp_rs::XSK_SOCKOPT_RING_INFO, + &mut ring_info as *mut xdp_rs::XSK_RING_INFO_SET as *mut core::ffi::c_void, + &mut option_length as *mut u32, + )?; + + let mut rx_fill_ring: XdpRing = XdpRing::ring_initialize(&ring_info.Fill); + let rx_ring: XdpRing = XdpRing::ring_initialize(&ring_info.Rx); + + trace!("Reserving RX ring buffer."); + let mut ring_index: u32 = 0; + rx_fill_ring.ring_producer_reserve(RING_SIZE, &mut ring_index); + + let b = rx_fill_ring.ring_get_element(ring_index) as *mut u64; + unsafe { *b = 0 }; + + trace!("Submitting RX ring buffer."); + rx_fill_ring.ring_producer_submit(RING_SIZE); + + trace!("Setting RX Fill ring."); + + // Create XDP program. + const XDP_INSPECT_RX: xdp_rs::XDP_HOOK_ID = xdp_rs::XDP_HOOK_ID { + Layer: xdp_rs::_XDP_HOOK_LAYER_XDP_HOOK_L2, + Direction: xdp_rs::_XDP_HOOK_DATAPATH_DIRECTION_XDP_HOOK_RX, + SubLayer: xdp_rs::_XDP_HOOK_SUBLAYER_XDP_HOOK_INSPECT, + }; + + let rule: XdpRule = XdpRule::new(&socket); + + trace!("Creating XDP program."); + let program: XdpProgram = socket.create_program(api, &rule, ifindex, &XDP_INSPECT_RX, queueid, 0)?; + + trace!("XDP program created."); + Ok(Self { + program, + mem, + socket, + rx_ring, + rx_fill_ring, + }) + } + + pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + self.rx_ring.ring_consumer_reserve(count, idx) + } + + pub fn get_element(&self, idx: u32) -> XdpBuffer { + XdpBuffer::new( + self.rx_ring.ring_get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, + self.mem.clone(), + ) + } + + pub fn consumer_release(&mut self, count: u32) { + self.rx_ring.ring_consumer_release(count); + } + + pub fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + self.rx_fill_ring.ring_producer_reserve(count, idx) + } + + pub fn producer_submit(&mut self, count: u32) { + self.rx_fill_ring.ring_producer_submit(count); + } +} diff --git a/src/rust/catpowder/win/socket.rs b/src/rust/catpowder/win/socket.rs new file mode 100644 index 000000000..7f7caf10b --- /dev/null +++ b/src/rust/catpowder/win/socket.rs @@ -0,0 +1,259 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::{ + catpowder::win::{ + program::XdpProgram, + rule::XdpRule, + }, + runtime::fail::Fail, +}; +use ::windows::{ + core::HRESULT, + Win32::Foundation::HANDLE, +}; +use ::xdp_rs; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +#[derive(Clone)] +pub struct XdpApi { + pub endpoint: *const xdp_rs::XDP_API_TABLE, +} + +impl XdpApi { + pub fn new() -> Result { + let mut api: *const xdp_rs::XDP_API_TABLE = std::ptr::null_mut(); + + let result: HRESULT = unsafe { xdp_rs::XdpOpenApi(xdp_rs::XDP_API_VERSION_1, &mut api) }; + + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(Self { endpoint: api }), + false => Err(Fail::from(&error)), + } + } + + pub fn endpoint(&self) -> xdp_rs::XDP_API_TABLE { + unsafe { + let api: *const xdp_rs::XDP_API_TABLE = self.endpoint; + *api + } + } +} + +impl Drop for XdpApi { + fn drop(&mut self) { + let api: xdp_rs::XDP_API_TABLE = unsafe { + let api: *const xdp_rs::XDP_API_TABLE = self.endpoint; + *api + }; + + if let Some(close) = api.XdpCloseApi { + unsafe { close(self.endpoint) }; + } + } +} + +/// A XDP socket. +pub struct XdpSocket { + pub socket: HANDLE, +} + +/// Associated functions for XDP sockets. +impl XdpSocket { + pub fn create(api: &mut XdpApi) -> Result { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + let mut socket: HANDLE = HANDLE::default(); + if let Some(create) = api.XskCreate { + let result: HRESULT = unsafe { create(&mut socket) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(Self { socket }), + false => Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XskCreate is not implemented"); + error!("create(): {:?}", &cause); + Err(Fail::new(libc::ENOSYS, &cause)) + } + } + + pub fn bind(&self, api: &mut XdpApi, ifindex: u32, queueid: u32, flags: i32) -> Result<(), Fail> { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + if let Some(bind) = api.XskBind { + let result: HRESULT = unsafe { bind(self.socket, ifindex, queueid, flags) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(()), + false => { + error!("bind(): {:?}", &error); + Err(Fail::from(&error)) + }, + } + } else { + let cause: String = format!("XskBind is not implemented"); + error!("bind(): {:?}", &cause); + Err(Fail::new(libc::ENOSYS, &cause)) + } + } + + pub fn setsockopt( + &mut self, + api: &mut XdpApi, + opt: u32, + val: *const std::ffi::c_void, + len: u32, + ) -> Result<(), Fail> { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + if let Some(setsocket) = api.XskSetSockopt { + let result: HRESULT = unsafe { setsocket(self.socket, opt, val, len) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(()), + false => return Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XskSetSockopt is not implemented"); + error!("setsockopt(): {:?}", &cause); + return Err(Fail::new(libc::ENOSYS, &cause)); + } + } + + pub fn getsockopt( + &self, + api: &mut XdpApi, + opt: u32, + val: *mut std::ffi::c_void, + len: *mut u32, + ) -> Result<(), Fail> { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + if let Some(getsockopt) = api.XskGetSockopt { + let result: HRESULT = unsafe { getsockopt(self.socket, opt, val, len) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(()), + false => return Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XskGetSockopt is not implemented"); + error!("getsockopt(): {:?}", &cause); + return Err(Fail::new(libc::ENOSYS, &cause)); + } + } + + pub fn activate(&self, api: &mut XdpApi, flags: i32) -> Result<(), Fail> { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + if let Some(activate) = api.XskActivate { + let result: HRESULT = unsafe { activate(self.socket, flags) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(()), + false => Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XskActivate is not implemented"); + error!("activate(): {:?}", &cause); + Err(Fail::new(libc::ENOSYS, &cause)) + } + } + + pub fn create_program( + &self, + api: &mut XdpApi, + rule: &XdpRule, + ifindex: u32, + hookid: *const xdp_rs::XDP_HOOK_ID, + queueid: u32, + flags: xdp_rs::XDP_CREATE_PROGRAM_FLAGS, + ) -> Result { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + let rule = rule.as_ptr(); + let rule_count: u32 = 1; + let mut program: XdpProgram = XdpProgram::default(); + + if let Some(create_program) = api.XdpCreateProgram { + let result: HRESULT = + unsafe { create_program(ifindex, hookid, queueid, flags, rule, rule_count, program.as_ptr()) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(program), + false => Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XdpCreateProgram is not implemented"); + error!("create_program(): {:?}", &cause); + Err(Fail::new(libc::ENOSYS, &cause)) + } + } + + pub fn notify_socket( + &self, + api: &mut XdpApi, + flags: xdp_rs::XSK_NOTIFY_FLAGS, + timeout: u32, + result: *mut xdp_rs::XSK_NOTIFY_RESULT_FLAGS, + ) -> Result<(), Fail> { + let api: xdp_rs::XDP_API_TABLE = api.endpoint(); + + if let Some(notify) = api.XskNotifySocket { + let result: HRESULT = unsafe { notify(self.socket, flags, timeout, result) }; + let error: windows::core::Error = windows::core::Error::from_hresult(result); + match error.code().is_ok() { + true => Ok(()), + false => Err(Fail::from(&error)), + } + } else { + let cause: String = format!("XskNotifySocket is not implemented"); + error!("notify_socket(): {:?}", &cause); + Err(Fail::new(libc::ENOSYS, &cause)) + } + } +} + +pub struct XdpRing { + ring: xdp_rs::XSK_RING, +} + +impl XdpRing { + pub fn ring_initialize(info: &xdp_rs::XSK_RING_INFO) -> Self { + let ring = unsafe { + let mut ring: xdp_rs::XSK_RING = std::mem::zeroed(); + xdp_rs::_XskRingInitialize(&mut ring, info); + ring + }; + Self { ring } + } + + pub fn ring_consumer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 { + unsafe { xdp_rs::_XskRingConsumerReserve(&mut self.ring, count, idx) } + } + + pub fn ring_consumer_release(&mut self, count: u32) { + unsafe { xdp_rs::_XskRingConsumerRelease(&mut self.ring, count) } + } + + pub fn ring_producer_reserve(&mut self, count: u32, idx: *mut u32) -> u32 { + unsafe { xdp_rs::_XskRingProducerReserve(&mut self.ring, count, idx) } + } + + pub fn ring_producer_submit(&mut self, count: u32) { + unsafe { xdp_rs::_XskRingProducerSubmit(&mut self.ring, count) } + } + + pub fn ring_get_element(&self, idx: u32) -> *mut std::ffi::c_void { + unsafe { xdp_rs::_XskRingGetElement(&self.ring, idx) } + } +} diff --git a/src/rust/catpowder/win/tx_ring.rs b/src/rust/catpowder/win/tx_ring.rs new file mode 100644 index 000000000..1567f91e5 --- /dev/null +++ b/src/rust/catpowder/win/tx_ring.rs @@ -0,0 +1,128 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use crate::{ + catpowder::win::{ + buffer::XdpBuffer, + socket::{ + XdpApi, + XdpRing, + XdpSocket, + }, + umemreg::UmemReg, + }, + runtime::{ + fail::Fail, + limits, + }, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +#[allow(dead_code)] +pub struct TxRing { + mem: UmemReg, + socket: XdpSocket, + tx_ring: XdpRing, + pub tx_completion_ring: XdpRing, +} + +impl TxRing { + pub fn new(api: &mut XdpApi, index: u32, queueid: u32) -> Result { + trace!("Creating XDP socket."); + let mut socket: XdpSocket = XdpSocket::create(api)?; + + let mem: UmemReg = UmemReg::new(1, limits::RECVBUF_SIZE_MAX as u32); + + trace!("Registering UMEM."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_UMEM_REG, + mem.as_ptr() as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + const RING_SIZE: u32 = 1; + + trace!("Setting TX ring size."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_TX_RING_SIZE, + &RING_SIZE as *const u32 as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + trace!("Setting TX completion ring size."); + socket.setsockopt( + api, + xdp_rs::XSK_SOCKOPT_TX_COMPLETION_RING_SIZE, + &RING_SIZE as *const u32 as *const core::ffi::c_void, + std::mem::size_of::() as u32, + )?; + + trace!("Binding TX queue."); + socket.bind(api, index, queueid, xdp_rs::_XSK_BIND_FLAGS_XSK_BIND_FLAG_TX)?; + + trace!("Activating XDP socket."); + socket.activate(api, xdp_rs::_XSK_ACTIVATE_FLAGS_XSK_ACTIVATE_FLAG_NONE)?; + + trace!("Getting TX ring info."); + let mut ring_info: xdp_rs::XSK_RING_INFO_SET = unsafe { std::mem::zeroed() }; + let mut option_length: u32 = std::mem::size_of::() as u32; + socket.getsockopt( + api, + xdp_rs::XSK_SOCKOPT_RING_INFO, + &mut ring_info as *mut xdp_rs::XSK_RING_INFO_SET as *mut core::ffi::c_void, + &mut option_length as *mut u32, + )?; + + let tx_ring: XdpRing = XdpRing::ring_initialize(&ring_info.Tx); + let tx_completion_ring: XdpRing = XdpRing::ring_initialize(&ring_info.Completion); + + trace!("XDP program created."); + Ok(Self { + mem, + socket, + tx_ring, + tx_completion_ring, + }) + } + + pub fn notify_socket( + &self, + api: &mut XdpApi, + flags: i32, + count: u32, + outflags: &mut xdp_rs::XSK_NOTIFY_RESULT_FLAGS, + ) -> Result<(), Fail> { + self.socket.notify_socket(api, flags, count, outflags) + } + + pub fn consumer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + self.tx_completion_ring.ring_consumer_reserve(count, idx) + } + + pub fn consumer_release(&mut self, count: u32) { + self.tx_completion_ring.ring_consumer_release(count); + } + + pub fn producer_reserve(&mut self, count: u32, idx: &mut u32) -> u32 { + self.tx_ring.ring_producer_reserve(count, idx) + } + + pub fn producer_submit(&mut self, count: u32) { + self.tx_ring.ring_producer_submit(count); + } + + pub fn get_element(&self, idx: u32) -> XdpBuffer { + XdpBuffer::new( + self.tx_ring.ring_get_element(idx) as *mut xdp_rs::XSK_BUFFER_DESCRIPTOR, + self.mem.clone(), + ) + } +} diff --git a/src/rust/catpowder/win/umemreg.rs b/src/rust/catpowder/win/umemreg.rs new file mode 100644 index 000000000..a3e5945cc --- /dev/null +++ b/src/rust/catpowder/win/umemreg.rs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +//====================================================================================================================== +// Imports +//====================================================================================================================== + +use ::std::{ + cell::RefCell, + rc::Rc, +}; + +//====================================================================================================================== +// Structures +//====================================================================================================================== + +#[derive(Clone)] +pub struct UmemReg { + mem: Rc>, +} + +//====================================================================================================================== +// Implementations +//====================================================================================================================== + +impl UmemReg { + pub fn new(count: u32, chunk_size: u32) -> Self { + let total_size: u64 = count as u64 * chunk_size as u64; + let mut buffer: Vec = Vec::::with_capacity(total_size as usize); + + let mem: Rc> = Rc::new(RefCell::new(xdp_rs::XSK_UMEM_REG { + TotalSize: total_size, + ChunkSize: chunk_size, + Headroom: 0, + Address: buffer.as_mut_ptr() as *mut core::ffi::c_void, + })); + + Self { mem } + } + + pub fn as_ptr(&self) -> *mut xdp_rs::XSK_UMEM_REG { + self.mem.as_ptr() + } + + pub fn get_address(&self) -> *mut core::ffi::c_void { + self.mem.borrow().Address + } +}