diff --git a/Cargo.toml b/Cargo.toml index 80928ed..bbce919 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ parking_lot = "0.10.2" once_cell = "1.3.1" libc = "0.2.71" uring-sys = "0.6.1" -iou = "0.0.0-ringbahn.1" [dev-dependencies] tempfile = "3.1.0" diff --git a/examples/read-event.rs b/examples/read-event.rs index 2bacc31..2b5434c 100644 --- a/examples/read-event.rs +++ b/examples/read-event.rs @@ -10,7 +10,7 @@ fn main() -> io::Result<()> { let submission = Submission::new(event, driver); let content = futures::executor::block_on(async move { let (event, result) = submission.await; - let bytes_read = result?; + let bytes_read = result? as usize; let s = String::from_utf8_lossy(&event.buf[0..bytes_read]).to_string(); io::Result::Ok(s) })?; diff --git a/src/completion.rs b/src/completion.rs index f8d29fb..244a009 100644 --- a/src/completion.rs +++ b/src/completion.rs @@ -6,6 +6,7 @@ use std::task::Waker; use parking_lot::Mutex; use crate::Cancellation; +use crate::kernel::CQE; use State::*; @@ -26,7 +27,7 @@ pub struct Completion { enum State { Submitted(Waker), - Completed(io::Result), + Completed(io::Result), Cancelled(Cancellation), Empty, } @@ -49,7 +50,7 @@ impl Completion { /// Check if the completion has completed. If it has, the result of the completion will be /// returned and the completion will be deallocated. If it has not been completed, the waker /// field will be updated to the new waker if the old waker would not wake the same task. - pub fn check(self, waker: &Waker) -> Result, Completion> { + pub fn check(self, waker: &Waker) -> Result, Completion> { let mut state = self.state.lock(); match mem::replace(&mut *state, State::Empty) { Submitted(old_waker) => { @@ -85,25 +86,12 @@ impl Completion { } } -/// Complete an `[iou::CompletionQueueEvent]` which was constructed from a [`Completion`]. -/// -/// This function should be used in combination with a driver that implements [`Drive`] to process -/// events on an io-uring instance. This function takes a CQE and processes it. -/// -/// ## Safety -/// -/// This function is only valid if the user_data in the CQE is null, the liburing timeout -/// signifier, or a pointer to a Completion constructed using ringbahn. If you have scheduled any -/// events on the io-uring instance using a library other than ringbahn, this method is not safe to -/// call unless you have filtered those events out in some manner. -pub unsafe fn complete(cqe: iou::CompletionQueueEvent) { - if cqe.is_timeout() { return; } - +pub(crate) fn complete(cqe: CQE) { let result = cqe.result(); let completion = cqe.user_data() as *mut Mutex; if completion != ptr::null_mut() { - let state: &Mutex = &*completion; + let state: &Mutex = unsafe { &*completion }; let mut state = state.lock(); match mem::replace(&mut *state, State::Empty) { Submitted(waker) => { @@ -111,9 +99,11 @@ pub unsafe fn complete(cqe: iou::CompletionQueueEvent) { waker.wake(); } Cancelled(callback) => { - drop(callback); - drop(state); - drop(Box::from_raw(completion)); + unsafe { + drop(callback); + drop(state); + drop(Box::from_raw(completion)); + } } _ => unreachable!() } diff --git a/src/drive/demo.rs b/src/drive/demo.rs index 57d4623..69980e3 100644 --- a/src/drive/demo.rs +++ b/src/drive/demo.rs @@ -17,11 +17,13 @@ use access_queue::*; use super::{Drive, Completion}; -static SQ: Lazy>>> = Lazy::new(init_sq); +use crate::kernel::*; + +static SQ: Lazy>> = Lazy::new(init_sq); /// The driver handle pub struct DemoDriver<'a> { - sq: Access<'a, Mutex>>, + sq: Access<'a, Mutex>, } impl Default for DemoDriver<'_> { @@ -40,7 +42,7 @@ impl Drive for DemoDriver<'_> { fn poll_prepare<'cx>( mut self: Pin<&mut Self>, ctx: &mut Context<'cx>, - prepare: impl FnOnce(iou::SubmissionQueueEvent<'_>, &mut Context<'cx>) -> Completion<'cx>, + prepare: impl FnOnce(&mut SQE, &mut Context<'cx>) -> Completion<'cx>, ) -> Poll> { // Wait for access to prepare. When ready, create a new Access future to wait next time we // want to prepare with this driver, and lock the SQ. @@ -51,8 +53,8 @@ impl Drive for DemoDriver<'_> { self.sq = access; let mut sq = sq.lock(); loop { - match sq.next_sqe() { - Some(sqe) => return Poll::Ready(prepare(sqe, ctx)), + match sq.prepare(1) { + Some(sqs) => return Poll::Ready(prepare(sqs.singular(), ctx)), None => { let _ = sq.submit(); } } } @@ -62,7 +64,7 @@ impl Drive for DemoDriver<'_> { self: Pin<&mut Self>, _: &mut Context<'_>, eager: bool, - ) -> Poll> { + ) -> Poll> { let result = if eager { self.sq.skip_queue().lock().submit() } else { @@ -77,22 +79,19 @@ pub fn driver() -> DemoDriver<'static> { DemoDriver { sq: SQ.access() } } -fn init_sq() -> AccessQueue>> { - unsafe { - static mut RING: Option = None; - RING = Some(iou::IoUring::new(SQ_ENTRIES).expect("TODO handle io_uring_init failure")); - let (sq, cq, _) = RING.as_mut().unwrap().queues(); - thread::spawn(move || complete(cq)); - AccessQueue::new(Mutex::new(sq), CQ_ENTRIES) - } +fn init_sq() -> AccessQueue> { + let ring = IoUring::new(SQ_ENTRIES).expect("TODO handle io_uring_init failure"); + let (sq, cq) = ring.queues(); + thread::spawn(move || complete(cq)); + AccessQueue::new(Mutex::new(sq), CQ_ENTRIES) } -unsafe fn complete(mut cq: iou::CompletionQueue<'static>) { +fn complete(mut cq: CompletionQueue) { while let Ok(cqe) = cq.wait_for_cqe() { let mut ready = cq.ready() as usize + 1; SQ.release(ready); - super::complete(cqe); + super::complete(cqe.into()); ready -= 1; while let Some(cqe) = cq.peek_for_cqe() { @@ -101,7 +100,7 @@ unsafe fn complete(mut cq: iou::CompletionQueue<'static>) { SQ.release(ready); } - super::complete(cqe); + super::complete(cqe.into()); ready -= 1; } diff --git a/src/drive/mod.rs b/src/drive/mod.rs index 60494e3..7c34ee2 100644 --- a/src/drive/mod.rs +++ b/src/drive/mod.rs @@ -8,8 +8,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::completion; +use crate::completion::complete; +use crate::kernel::SQE; -pub use crate::completion::complete; /// A completion which will be used to wake the task waiting on this event. /// @@ -34,10 +35,16 @@ impl<'cx> Completion<'cx> { pub trait Drive { /// Prepare an event on the submission queue. /// +<<<<<<< HEAD /// The implementer is responsible for provisioning an [`iou::SubmissionQueueEvent`] from the /// submission queue. Once an SQE is available, the implementer should pass it to the /// `prepare` callback, which constructs a [`Completion`], and return that `Completion` to the /// caller. +======= + /// The implementer is responsible for provisioning an [`SQE`] from the submission queue. + /// Once an SQE is available, the implementer should pass it to the `prepare` callback, which + /// constructs a [`Completion`], and return that `Completion` to the caller. +>>>>>>> 21943ce... New low-level API not based on iou. /// /// If the driver is not ready to receive more events, it can return `Poll::Pending`. If it /// does, it must register a waker to wake the task when more events can be prepared, otherwise @@ -48,7 +55,7 @@ pub trait Drive { fn poll_prepare<'cx>( self: Pin<&mut Self>, ctx: &mut Context<'cx>, - prepare: impl FnOnce(iou::SubmissionQueueEvent<'_>, &mut Context<'cx>) -> Completion<'cx>, + prepare: impl FnOnce(&mut SQE, &mut Context<'cx>) -> Completion<'cx>, ) -> Poll>; /// Submit all of the events on the submission queue. @@ -69,5 +76,5 @@ pub trait Drive { self: Pin<&mut Self>, ctx: &mut Context<'_>, eager: bool, - ) -> Poll>; + ) -> Poll>; } diff --git a/src/event/close.rs b/src/event/close.rs index 0e4eca6..a5d0d89 100644 --- a/src/event/close.rs +++ b/src/event/close.rs @@ -1,7 +1,7 @@ use std::mem::ManuallyDrop; use std::os::unix::io::RawFd; -use super::{Event, Cancellation}; +use super::{Event, SQE, Cancellation}; pub struct Close { fd: RawFd, @@ -14,8 +14,8 @@ impl Close { } impl Event for Close { - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) { - uring_sys::io_uring_prep_close(sqe.raw_mut(), self.fd) + unsafe fn prepare(&mut self, sqe: &mut SQE) { + sqe.prep_close(self.fd) } unsafe fn cancel(_: &mut ManuallyDrop) -> Cancellation { diff --git a/src/event/connect.rs b/src/event/connect.rs index 909504e..5bb14ad 100644 --- a/src/event/connect.rs +++ b/src/event/connect.rs @@ -3,7 +3,7 @@ use std::os::unix::io::RawFd; use std::mem::ManuallyDrop; use std::net::SocketAddr; -use super::{Event, Cancellation}; +use super::{Event, SQE, Cancellation}; pub struct Connect { pub fd: RawFd, @@ -19,9 +19,8 @@ impl Connect { } impl Event for Connect { - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) { - let addr = &mut *self.addr as *mut libc::sockaddr_storage as *mut libc::sockaddr; - uring_sys::io_uring_prep_connect(sqe.raw_mut(), self.fd, addr, self.addrlen); + unsafe fn prepare(&mut self, sqe: &mut SQE) { + sqe.prep_connect(self.fd, &mut *self.addr, self.addrlen); } unsafe fn cancel(this: &mut ManuallyDrop) -> Cancellation { diff --git a/src/event/mod.rs b/src/event/mod.rs index 5b938a6..3b75469 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -11,6 +11,7 @@ mod writev; use std::mem::ManuallyDrop; use crate::cancellation::Cancellation; +use crate::kernel::SQE; pub use connect::Connect; pub use close::Close; @@ -51,7 +52,7 @@ pub trait Event { /// In essence implementing prepare, users can write code ass if any heap addresses passed to /// the kernel have passed ownership of that data to the kernel for the time that the event is /// completed. - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>); + unsafe fn prepare(&mut self, sqe: &mut SQE); /// Return the cancellation callback for this event. /// diff --git a/src/event/openat.rs b/src/event/openat.rs index 8f952ff..45dab82 100644 --- a/src/event/openat.rs +++ b/src/event/openat.rs @@ -4,7 +4,7 @@ use std::os::unix::io::RawFd; use std::os::unix::ffi::OsStrExt; use std::path::Path; -use super::{Event, Cancellation}; +use super::{Event, SQE, Cancellation}; pub struct OpenAt { path: CString, @@ -21,9 +21,8 @@ impl OpenAt { } impl Event for OpenAt { - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) { - let path = self.path.as_ptr(); - uring_sys::io_uring_prep_openat(sqe.raw_mut(), self.dfd, path, self.flags, self.mode); + unsafe fn prepare(&mut self, sqe: &mut SQE) { + sqe.prep_openat(self.dfd, self.path.as_ptr(), self.flags, self.mode) } unsafe fn cancel(this: &mut ManuallyDrop) -> Cancellation { diff --git a/src/event/read.rs b/src/event/read.rs index 850dbf0..8fb4007 100644 --- a/src/event/read.rs +++ b/src/event/read.rs @@ -2,23 +2,23 @@ use std::os::unix::io::AsRawFd; use std::mem::ManuallyDrop; use std::marker::Unpin; -use super::{Event, Cancellation}; +use super::{Event, SQE, Cancellation}; /// A basic read event. pub struct Read<'a, T> { pub io: &'a T, pub buf: Vec, - pub offset: usize + pub offset: u64 } impl<'a, T: AsRawFd + Unpin> Read<'a, T> { - pub fn new(io: &'a T, buf: Vec, offset: usize) -> Read { + pub fn new(io: &'a T, buf: Vec, offset: u64) -> Read { Read { io, buf, offset } } } impl<'a, T: AsRawFd + Unpin> Event for Read<'a, T> { - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) { + unsafe fn prepare(&mut self, sqe: &mut SQE) { sqe.prep_read(self.io.as_raw_fd(), &mut self.buf[..], self.offset); } diff --git a/src/event/write.rs b/src/event/write.rs index 3dc7bfa..9466693 100644 --- a/src/event/write.rs +++ b/src/event/write.rs @@ -2,23 +2,23 @@ use std::os::unix::io::AsRawFd; use std::mem::ManuallyDrop; use std::marker::Unpin; -use super::{Event, Cancellation}; +use super::{Event, SQE, Cancellation}; /// A basic write event. pub struct Write<'a, T> { pub io: &'a T, pub buf: Vec, - pub offset: usize + pub offset: u64 } impl<'a, T: AsRawFd + Unpin> Write<'a, T> { - pub fn new(io: &'a T, buf: Vec, offset: usize) -> Write { + pub fn new(io: &'a T, buf: Vec, offset: u64) -> Write { Write { io, buf, offset } } } impl<'a, T: AsRawFd + Unpin> Event for Write<'a, T> { - unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) { + unsafe fn prepare(&mut self, sqe: &mut SQE) { sqe.prep_write(self.io.as_raw_fd(), self.buf.as_ref(), self.offset); } diff --git a/src/fs.rs b/src/fs.rs index b25d180..0ddca07 100644 --- a/src/fs.rs +++ b/src/fs.rs @@ -25,7 +25,7 @@ pub struct File> { fd: RawFd, active: Op, buf: Buffer, - pos: usize, + pos: u64, } #[derive(Copy, Clone, Debug, Eq, PartialEq)] @@ -106,7 +106,7 @@ impl File { self.ring.cancel(self.buf.cancellation()); } - fn poll_file_size(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + fn poll_file_size(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { static EMPTY: libc::c_char = 0; self.as_mut().guard_op(Op::Statx); @@ -116,16 +116,13 @@ impl File { let flags = libc::AT_EMPTY_PATH; let mask = libc::STATX_SIZE; unsafe { - ready!(ring.poll(ctx, true, |sqe| { - uring_sys::io_uring_prep_statx(sqe.raw_mut(), fd, &EMPTY, flags, mask, statx); - }))?; - - Poll::Ready(Ok((*statx).stx_size as usize)) + ready!(ring.poll(ctx, true, |sqe| sqe.prep_statx(fd, &EMPTY, flags, mask, statx)))?; + Poll::Ready(Ok((*statx).stx_size)) } } #[inline(always)] - fn split(self: Pin<&mut Self>) -> (Pin<&mut Ring>, &mut Buffer, &mut usize) { + fn split(self: Pin<&mut Self>) -> (Pin<&mut Ring>, &mut Buffer, &mut u64) { unsafe { let this = Pin::get_unchecked_mut(self); (Pin::new_unchecked(&mut this.ring), &mut this.buf, &mut this.pos) @@ -143,7 +140,7 @@ impl File { } #[inline(always)] - fn pos(self: Pin<&mut Self>) -> Pin<&mut usize> { + fn pos(self: Pin<&mut Self>) -> Pin<&mut u64> { unsafe { Pin::map_unchecked_mut(self, |this| &mut this.pos) } } } @@ -166,7 +163,7 @@ impl AsyncBufRead for File { let (ring, buf, pos) = self.split(); buf.fill_buf(|buf| { let n = ready!(ring.poll(ctx, true, |sqe| unsafe { sqe.prep_read(fd, buf, *pos) }))?; - *pos += n; + *pos += n as u64; Poll::Ready(Ok(n as u32)) }) } @@ -185,9 +182,9 @@ impl AsyncWrite for File { Poll::Ready(Ok(io::Write::write(&mut buf, slice)? as u32)) }))?; let n = ready!(ring.poll(ctx, true, |sqe| unsafe { sqe.prep_write(fd, data, *pos) }))?; - *pos += n; + *pos += n as u64; buf.clear(); - Poll::Ready(Ok(n)) + Poll::Ready(Ok(n as usize)) } fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { @@ -198,9 +195,7 @@ impl AsyncWrite for File { fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { self.as_mut().guard_op(Op::Close); let fd = self.fd; - ready!(self.ring().poll(ctx, true, |sqe| unsafe { - uring_sys::io_uring_prep_close(sqe.raw_mut(), fd) - }))?; + ready!(self.ring().poll(ctx, true, |sqe| sqe.prep_close(fd)))?; Poll::Ready(Ok(())) } } @@ -211,8 +206,8 @@ impl AsyncSeek for File { { let (whence, offset) = match pos { io::SeekFrom::Start(n) => { - *self.as_mut().pos() = n as usize; - return Poll::Ready(Ok(self.pos as u64)); + *self.as_mut().pos() = n; + return Poll::Ready(Ok(self.pos)); } io::SeekFrom::Current(n) => (self.pos, n), io::SeekFrom::End(n) => { @@ -220,7 +215,7 @@ impl AsyncSeek for File { } }; let valid_seek = if offset.is_negative() { - match whence.checked_sub(offset.abs() as usize) { + match whence.checked_sub(offset.abs() as u64) { Some(valid_seek) => valid_seek, None => { let invalid = io::Error::from(io::ErrorKind::InvalidInput); @@ -228,7 +223,7 @@ impl AsyncSeek for File { } } } else { - match whence.checked_add(offset as usize) { + match whence.checked_add(offset as u64) { Some(valid_seek) => valid_seek, None => { let overflow = io::Error::from_raw_os_error(libc::EOVERFLOW); @@ -237,7 +232,7 @@ impl AsyncSeek for File { } }; *self.as_mut().pos() = valid_seek; - Poll::Ready(Ok(self.pos as u64)) + Poll::Ready(Ok(self.pos)) } } diff --git a/src/kernel/completion_queue.rs b/src/kernel/completion_queue.rs new file mode 100644 index 0000000..94bee93 --- /dev/null +++ b/src/kernel/completion_queue.rs @@ -0,0 +1,60 @@ +use std::io; +use std::mem::MaybeUninit; +use std::ptr; +use std::sync::Arc; + +use super::{IoUring, CQE}; + +pub struct CompletionQueue { + ring: Arc, +} + +impl CompletionQueue { + pub(crate) fn new(ring: Arc) -> CompletionQueue { + CompletionQueue { ring } + } + + pub fn wait_for_cqe(&mut self) -> io::Result { + unsafe { + let mut cqe = MaybeUninit::uninit(); + + match uring_sys::io_uring_wait_cqes( + self.ring.ring(), + cqe.as_mut_ptr(), + 1, + ptr::null(), + ptr::null(), + ) { + n if n >= 0 => { + let cqe_slot = cqe.assume_init(); + let cqe = CQE::from_raw(ptr::read(cqe_slot)); + uring_sys::io_uring_cqe_seen(self.ring.ring(), cqe_slot); + Ok(cqe) + } + n => Err(io::Error::from_raw_os_error(-n)), + } + } + } + + pub fn peek_for_cqe(&mut self) -> Option { + unsafe { + let mut cqe = MaybeUninit::uninit(); + let count = uring_sys::io_uring_peek_batch_cqe(self.ring.ring(), cqe.as_mut_ptr(), 1); + if count > 0 { + let cqe_slot = cqe.assume_init(); + let cqe = CQE::from_raw(ptr::read(cqe_slot)); + uring_sys::io_uring_cqe_seen(self.ring.ring(), cqe_slot); + Some(cqe) + } else { + None + } + } + } + + pub fn ready(&self) -> u32 { + unsafe { uring_sys::io_uring_cq_ready(self.ring.ring()) } + } +} + +unsafe impl Send for CompletionQueue { } +unsafe impl Sync for CompletionQueue { } diff --git a/src/kernel/cqe.rs b/src/kernel/cqe.rs new file mode 100644 index 0000000..7e0f642 --- /dev/null +++ b/src/kernel/cqe.rs @@ -0,0 +1,32 @@ +use std::io; + +pub struct CQE { + user_data: u64, + res: i32, + flags: u32, +} + +impl CQE { + pub(crate) fn from_raw(cqe: uring_sys::io_uring_cqe) -> CQE { + CQE { user_data: cqe.user_data, res: cqe.res, flags: cqe.flags } + } + + pub fn complete(self) { + crate::completion::complete(self) + } + + pub fn result(&self) -> io::Result { + match self.res { + n if n >= 0 => Ok(n as u32), + err => Err(io::Error::from_raw_os_error(err)), + } + } + + pub fn flags(&self) -> u32 { + self.flags + } + + pub fn user_data(&self) -> u64 { + self.user_data + } +} diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs new file mode 100644 index 0000000..b218f36 --- /dev/null +++ b/src/kernel/mod.rs @@ -0,0 +1,11 @@ +mod cqe; +mod sqe; +mod completion_queue; +mod submission_queue; +mod ring; + +pub use sqe::SQE; +pub use cqe::CQE; +pub use completion_queue::CompletionQueue; +pub use submission_queue::SubmissionQueue; +pub use ring::IoUring; diff --git a/src/kernel/ring.rs b/src/kernel/ring.rs new file mode 100644 index 0000000..0bb4379 --- /dev/null +++ b/src/kernel/ring.rs @@ -0,0 +1,40 @@ +use std::io; +use std::mem::MaybeUninit; +use std::ptr::NonNull; +use std::sync::Arc; + +use super::{SubmissionQueue, CompletionQueue}; + +pub struct IoUring { + ring: uring_sys::io_uring, +} + +impl IoUring { + pub fn new(entries: u32) -> io::Result { + unsafe { + let mut ring = MaybeUninit::uninit(); + match uring_sys::io_uring_queue_init(entries as _, ring.as_mut_ptr(), 0) { + n if n >= 0 => Ok(IoUring { ring: ring.assume_init() }), + n => Err(io::Error::from_raw_os_error(-n)), + } + } + } + + pub fn queues(self) -> (SubmissionQueue, CompletionQueue) { + let ring = Arc::new(self); + (SubmissionQueue::new(ring.clone()), CompletionQueue::new(ring)) + } + + pub(super) fn ring(&self) -> *mut uring_sys::io_uring { + NonNull::from(&self.ring).as_ptr() + } +} + +unsafe impl Send for IoUring { } +unsafe impl Sync for IoUring { } + +impl Drop for IoUring { + fn drop(&mut self) { + unsafe { uring_sys::io_uring_queue_exit(&mut self.ring) }; + } +} diff --git a/src/kernel/sqe.rs b/src/kernel/sqe.rs new file mode 100644 index 0000000..8ded3db --- /dev/null +++ b/src/kernel/sqe.rs @@ -0,0 +1,60 @@ +use std::os::unix::io::RawFd; +use std::ptr::NonNull; + +use crate::completion::Completion; + +#[repr(transparent)] +pub struct SQE { + inner: uring_sys::io_uring_sqe, +} + +impl SQE { + pub(crate) fn from_raw(sqe: &mut uring_sys::io_uring_sqe) -> &mut SQE { + sqe.user_data = 0; + unsafe { &mut *NonNull::from(sqe).cast().as_ptr() } + } + + pub(crate) fn set_completion(&mut self, completion: &Completion) { + self.inner.user_data = completion.addr(); + } + + pub(crate) fn unset_completion(&mut self) { + self.inner.user_data = 0; + } + + pub fn prep_nop(&mut self) { + unsafe { uring_sys::io_uring_prep_nop(&mut self.inner) }; + } + + pub unsafe fn prep_write(&mut self, fd: RawFd, buf: &[u8], off: u64) { + let len = buf.len(); + let addr = buf.as_ptr(); + uring_sys::io_uring_prep_write(&mut self.inner, fd, addr as _, len as _, off as _); + } + + pub unsafe fn prep_read(&mut self, fd: RawFd, buf: &mut [u8], off: u64) { + let len = buf.len(); + let addr = buf.as_mut_ptr(); + uring_sys::io_uring_prep_read(&mut self.inner, fd, addr as _, len as _, off as _); + } + + pub unsafe fn prep_openat(&mut self, dfd: RawFd, path: *const libc::c_char, flags: i32, mode: u32) { + uring_sys::io_uring_prep_openat(&mut self.inner, dfd, path, flags, mode); + } + + pub unsafe fn prep_connect(&mut self, fd: RawFd, addr: *mut libc::sockaddr_storage, len: libc::socklen_t) { + uring_sys::io_uring_prep_connect(&mut self.inner, fd, addr as *mut libc::sockaddr, len) + } + + pub unsafe fn prep_accept(&mut self, fd: RawFd, addr: *mut libc::sockaddr_storage, len: *mut libc::socklen_t, flags: i32) { + uring_sys::io_uring_prep_accept(&mut self.inner, fd, addr as *mut libc::sockaddr, len, flags) + } + + pub unsafe fn prep_statx(&mut self, fd: RawFd, path: *const libc::c_char, flags: i32, mask: u32, statx: *mut libc::statx) { + uring_sys::io_uring_prep_statx(&mut self.inner, fd, path, flags, mask, statx); + } + + pub fn prep_close(&mut self, fd: RawFd) { + unsafe { uring_sys::io_uring_prep_close(&mut self.inner, fd); } + } +} diff --git a/src/kernel/submission_queue.rs b/src/kernel/submission_queue.rs new file mode 100644 index 0000000..f4da1f0 --- /dev/null +++ b/src/kernel/submission_queue.rs @@ -0,0 +1,100 @@ +use std::io; +use std::sync::Arc; + +use super::{IoUring, SQE}; + +pub struct SubmissionQueue { + ring: Arc, +} + +impl SubmissionQueue { + pub(crate) fn new(ring: Arc) -> SubmissionQueue { + SubmissionQueue { ring } + } + + pub fn prepare(&mut self, n: u32) -> Option> { + unsafe { + let ring = self.ring.ring(); + let sq = &mut (*ring).sq; + if ((*ring).flags & uring_sys::IORING_SETUP_SQPOLL) == 0 { + use std::sync::atomic::*; + fence(Ordering::Acquire); + }; + let head = *sq.khead; + + let next = sq.sqe_tail + n; + if next - head <= *sq.kring_entries { + let offset = sq.sqe_tail & *sq.kring_mask; + sq.sqe_tail = next; + let head = Some(&mut *sq.sqes.offset(offset as isize)); + Some(SubmissionSegment { head, remaining: n }) + } else { + None + } + } + } + + pub fn submit(&mut self) -> io::Result { + match unsafe { uring_sys::io_uring_submit(self.ring.ring()) } { + n if n >= 0 => { + println!("submitted {}", n); + Ok(n as u32) + } + n => Err(io::Error::from_raw_os_error(-n)), + } + } +} + +unsafe impl Send for SubmissionQueue { } +unsafe impl Sync for SubmissionQueue { } + +pub struct SubmissionSegment<'a> { + head: Option<&'a mut uring_sys::io_uring_sqe>, + remaining: u32, +} + +impl<'a> SubmissionSegment<'a> { + pub fn singular(mut self) -> &'a mut SQE { + while self.remaining > 1 { + self.consume().unwrap().prep_nop(); + } + + self.consume().unwrap() + } + + pub fn hard_linked(self) -> HardLinked<'a> { + HardLinked { segment: self } + } + + fn consume(&mut self) -> Option<&'a mut SQE> { + let next = self.head.take()?; + + self.remaining -= 1; + if self.remaining > 0 { + unsafe { + let sqe = (next as *mut uring_sys::io_uring_sqe).offset(1); + self.head = Some(&mut *sqe); + } + } + + Some(SQE::from_raw(next)) + } +} + +pub struct HardLinked<'a> { + segment: SubmissionSegment<'a>, +} + +impl<'a> Iterator for HardLinked<'a> { + type Item = &'a mut SQE; + + fn next(&mut self) -> Option { + // TODO after the user is done with it, if there are remaining SQEs, I want to set + // IORING_SQE_HARDLINK + // + // Make sure that all of the unused are filled with noops + // + // Make it so the completion is only set on the final SQE somehow + self.segment.consume() + } +} diff --git a/src/lib.rs b/src/lib.rs index d0a284c..b097933 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,3 +19,5 @@ pub use drive::Drive; pub use event::Event; #[doc(inline)] pub use fs::File; + +pub mod kernel; diff --git a/src/net/listener.rs b/src/net/listener.rs index 4a21e1f..36c5c6e 100644 --- a/src/net/listener.rs +++ b/src/net/listener.rs @@ -100,7 +100,7 @@ impl TcpListener { self.ring.cancel(cancellation); } - fn addr(self: Pin<&mut Self>) -> (*mut libc::sockaddr, *mut libc::socklen_t) { + fn addr(self: Pin<&mut Self>) -> (*mut libc::sockaddr_storage, *mut libc::socklen_t) { unsafe { let this = Pin::get_unchecked_mut(self); if this.addr == ptr::null_mut() { @@ -109,7 +109,7 @@ impl TcpListener { *this.len = mem::size_of::() as _; } - (this.addr as *mut libc::sockaddr, this.len) + (this.addr, this.len) } } @@ -149,10 +149,10 @@ impl TcpListener { let flags = 0; let (addr, addrlen) = self.as_mut().addr(); let fd = ready!(self.as_mut().ring().poll(ctx, true, |sqe| unsafe { - uring_sys::io_uring_prep_accept(sqe.raw_mut(), fd, addr, addrlen, flags); + sqe.prep_accept(fd, addr, addrlen, flags); }))? as RawFd; let addr = unsafe { - let result = addr_from_c(&*addr, *addrlen as usize); + let result = addr_from_c(&*(addr as *mut libc::sockaddr), *addrlen as usize); self.as_mut().drop_addr(); result? }; @@ -215,9 +215,7 @@ impl<'a, D: Drive> Future for Close<'a, D> { fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { self.socket.as_mut().guard_op(Op::Close); let fd = self.socket.fd; - ready!(self.socket.as_mut().ring().poll(ctx, true, |sqe| unsafe { - uring_sys::io_uring_prep_close(sqe.raw_mut(), fd); - }))?; + ready!(self.socket.as_mut().ring().poll(ctx, true, |sqe| sqe.prep_close(fd)))?; Poll::Ready(Ok(())) } } diff --git a/src/net/stream.rs b/src/net/stream.rs index 9e51e11..93c9e5d 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -151,7 +151,7 @@ impl AsyncWrite for TcpStream { }))?; let n = ready!(ring.poll(ctx, true, |sqe| unsafe { sqe.prep_write(fd, data, 0) }))?; buf.clear(); - Poll::Ready(Ok(n)) + Poll::Ready(Ok(n as usize)) } fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { @@ -162,9 +162,7 @@ impl AsyncWrite for TcpStream { fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { self.as_mut().guard_op(Op::Close); let fd = self.fd; - ready!(self.ring().poll(ctx, true, |sqe| unsafe { - uring_sys::io_uring_prep_close(sqe.raw_mut(), fd) - }))?; + ready!(self.ring().poll(ctx, true, |sqe| sqe.prep_close(fd)))?; Poll::Ready(Ok(())) } } diff --git a/src/ring.rs b/src/ring.rs index f50a1e6..66d5b98 100644 --- a/src/ring.rs +++ b/src/ring.rs @@ -9,6 +9,7 @@ use crate::completion::Completion; use crate::drive::Completion as ExternalCompletion; use crate::drive::Drive; use crate::Cancellation; +use crate::kernel::SQE; use State::*; @@ -77,8 +78,8 @@ impl Ring { mut self: Pin<&mut Self>, ctx: &mut Context<'_>, is_eager: bool, - prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>), - ) -> Poll> { + prepare: impl FnOnce(&mut SQE), + ) -> Poll> { match self.state { Inert => { ready!(self.as_mut().poll_prepare(ctx, prepare)); @@ -103,18 +104,16 @@ impl Ring { fn poll_prepare( self: Pin<&mut Self>, ctx: &mut Context<'_>, - prepare: impl FnOnce(&mut iou::SubmissionQueueEvent<'_>), + prepare: impl FnOnce(&mut SQE), ) -> Poll<()> { let (driver, state, completion_slot) = self.split(); let completion = ready!(driver.poll_prepare(ctx, |sqe, ctx| { - struct SubmissionCleaner<'a>(iou::SubmissionQueueEvent<'a>); + struct SubmissionCleaner<'a>(&'a mut SQE); impl Drop for SubmissionCleaner<'_> { fn drop(&mut self) { - unsafe { - self.0.prep_nop(); - self.0.set_user_data(0); - } + self.0.prep_nop(); + self.0.unset_completion(); } } @@ -122,7 +121,7 @@ impl Ring { *state = Lost; prepare(&mut sqe.0); let completion = Completion::new(ctx.waker().clone()); - sqe.0.set_user_data(completion.addr()); + sqe.0.set_completion(&completion); mem::forget(sqe); ExternalCompletion::new(completion, ctx) })); @@ -141,7 +140,7 @@ impl Ring { } #[inline(always)] - fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + fn poll_complete(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { let (_, state, completion_slot) = self.split(); match completion_slot.take().unwrap().check(ctx.waker()) { Ok(result) => { diff --git a/src/submission.rs b/src/submission.rs index 5b907df..2e96b14 100644 --- a/src/submission.rs +++ b/src/submission.rs @@ -40,7 +40,7 @@ impl Future for Submission where E: Event, D: Drive, { - type Output = (E, io::Result); + type Output = (E, io::Result); fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let (ring, event) = self.split(); diff --git a/tests/basic-write.rs b/tests/basic-write.rs index bce8e5f..66df380 100644 --- a/tests/basic-write.rs +++ b/tests/basic-write.rs @@ -12,7 +12,7 @@ fn write_file() { let mut file = tempfile::tempfile().unwrap(); let write: Write<'_, File> = Write::new(&file, Vec::from(ASSERT), 0); let (_, result) = futures::executor::block_on(Submission::new(write, demo::driver())); - assert_eq!(result.unwrap(), ASSERT.len()); + assert_eq!(result.unwrap() as usize, ASSERT.len()); let mut buf = vec![]; assert_eq!(file.read_to_end(&mut buf).unwrap(), ASSERT.len());