Skip to content

Commit

Permalink
New low-level API not based on iou.
Browse files Browse the repository at this point in the history
This makes setting the user_data field private to ringbahn, so it
is safe to complete a CQE assuming it contains a Completion. With
some other changes around ring ownership, it makes the demo driver 100%
safe code.

The full iou API has not yet been implemented, just enough to implement
the demo driver.

The transition has begun to try to allow events to request multiple SQEs
from the SQ, allowing them to create linked events in a single go. The
first multi-SQE API will be the `hard_linked` API, which allows multiple
events to be linked together with `IORING_SQE_HARDLINK`. The results of
all but the final event will be ignored; the completion will only awaken
on the final event. This functionality is currently not correct
implemented.
  • Loading branch information
withoutboats committed Sep 18, 2020
1 parent 3557ab2 commit fdd4ebc
Show file tree
Hide file tree
Showing 24 changed files with 394 additions and 105 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,6 @@ parking_lot = "0.10.2"
once_cell = "1.3.1"
libc = "0.2.71"
uring-sys = "0.6.1"
iou = "0.0.0-ringbahn.1"

[dev-dependencies]
tempfile = "3.1.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/read-event.rs
Expand Up @@ -10,7 +10,7 @@ fn main() -> io::Result<()> {
let submission = Submission::new(event, driver);
let content = futures::executor::block_on(async move {
let (event, result) = submission.await;
let bytes_read = result?;
let bytes_read = result? as usize;
let s = String::from_utf8_lossy(&event.buf[0..bytes_read]).to_string();
io::Result::Ok(s)
})?;
Expand Down
30 changes: 10 additions & 20 deletions src/completion.rs
Expand Up @@ -6,6 +6,7 @@ use std::task::Waker;
use parking_lot::Mutex;

use crate::Cancellation;
use crate::kernel::CQE;

use State::*;

Expand All @@ -26,7 +27,7 @@ pub struct Completion {

enum State {
Submitted(Waker),
Completed(io::Result<usize>),
Completed(io::Result<u32>),
Cancelled(Cancellation),
Empty,
}
Expand All @@ -49,7 +50,7 @@ impl Completion {
/// Check if the completion has completed. If it has, the result of the completion will be
/// returned and the completion will be deallocated. If it has not been completed, the waker
/// field will be updated to the new waker if the old waker would not wake the same task.
pub fn check(self, waker: &Waker) -> Result<io::Result<usize>, Completion> {
pub fn check(self, waker: &Waker) -> Result<io::Result<u32>, Completion> {
let mut state = self.state.lock();
match mem::replace(&mut *state, State::Empty) {
Submitted(old_waker) => {
Expand Down Expand Up @@ -85,35 +86,24 @@ impl Completion {
}
}

/// Complete an `[iou::CompletionQueueEvent]` which was constructed from a [`Completion`].
///
/// This function should be used in combination with a driver that implements [`Drive`] to process
/// events on an io-uring instance. This function takes a CQE and processes it.
///
/// ## Safety
///
/// This function is only valid if the user_data in the CQE is null, the liburing timeout
/// signifier, or a pointer to a Completion constructed using ringbahn. If you have scheduled any
/// events on the io-uring instance using a library other than ringbahn, this method is not safe to
/// call unless you have filtered those events out in some manner.
pub unsafe fn complete(cqe: iou::CompletionQueueEvent) {
if cqe.is_timeout() { return; }

pub(crate) fn complete(cqe: CQE) {
let result = cqe.result();
let completion = cqe.user_data() as *mut Mutex<State>;

if completion != ptr::null_mut() {
let state: &Mutex<State> = &*completion;
let state: &Mutex<State> = unsafe { &*completion };
let mut state = state.lock();
match mem::replace(&mut *state, State::Empty) {
Submitted(waker) => {
*state = Completed(result);
waker.wake();
}
Cancelled(callback) => {
drop(callback);
drop(state);
drop(Box::from_raw(completion));
unsafe {
drop(callback);
drop(state);
drop(Box::from_raw(completion));
}
}
_ => unreachable!()
}
Expand Down
33 changes: 16 additions & 17 deletions src/drive/demo.rs
Expand Up @@ -17,11 +17,13 @@ use access_queue::*;

use super::{Drive, Completion};

static SQ: Lazy<AccessQueue<Mutex<iou::SubmissionQueue<'static>>>> = Lazy::new(init_sq);
use crate::kernel::*;

static SQ: Lazy<AccessQueue<Mutex<SubmissionQueue>>> = Lazy::new(init_sq);

/// The driver handle
pub struct DemoDriver<'a> {
sq: Access<'a, Mutex<iou::SubmissionQueue<'static>>>,
sq: Access<'a, Mutex<SubmissionQueue>>,
}

impl Default for DemoDriver<'_> {
Expand All @@ -40,7 +42,7 @@ impl Drive for DemoDriver<'_> {
fn poll_prepare<'cx>(
mut self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
prepare: impl FnOnce(iou::SubmissionQueueEvent<'_>, &mut Context<'cx>) -> Completion<'cx>,
prepare: impl FnOnce(&mut SQE, &mut Context<'cx>) -> Completion<'cx>,
) -> Poll<Completion<'cx>> {
// Wait for access to prepare. When ready, create a new Access future to wait next time we
// want to prepare with this driver, and lock the SQ.
Expand All @@ -51,8 +53,8 @@ impl Drive for DemoDriver<'_> {
self.sq = access;
let mut sq = sq.lock();
loop {
match sq.next_sqe() {
Some(sqe) => return Poll::Ready(prepare(sqe, ctx)),
match sq.prepare(1) {
Some(sqs) => return Poll::Ready(prepare(sqs.singular(), ctx)),
None => { let _ = sq.submit(); }
}
}
Expand All @@ -62,7 +64,7 @@ impl Drive for DemoDriver<'_> {
self: Pin<&mut Self>,
_: &mut Context<'_>,
eager: bool,
) -> Poll<io::Result<usize>> {
) -> Poll<io::Result<u32>> {
let result = if eager {
self.sq.skip_queue().lock().submit()
} else {
Expand All @@ -77,22 +79,19 @@ pub fn driver() -> DemoDriver<'static> {
DemoDriver { sq: SQ.access() }
}

fn init_sq() -> AccessQueue<Mutex<iou::SubmissionQueue<'static>>> {
unsafe {
static mut RING: Option<iou::IoUring> = None;
RING = Some(iou::IoUring::new(SQ_ENTRIES).expect("TODO handle io_uring_init failure"));
let (sq, cq, _) = RING.as_mut().unwrap().queues();
thread::spawn(move || complete(cq));
AccessQueue::new(Mutex::new(sq), CQ_ENTRIES)
}
fn init_sq() -> AccessQueue<Mutex<SubmissionQueue>> {
let ring = IoUring::new(SQ_ENTRIES).expect("TODO handle io_uring_init failure");
let (sq, cq) = ring.queues();
thread::spawn(move || complete(cq));
AccessQueue::new(Mutex::new(sq), CQ_ENTRIES)
}

unsafe fn complete(mut cq: iou::CompletionQueue<'static>) {
fn complete(mut cq: CompletionQueue) {
while let Ok(cqe) = cq.wait_for_cqe() {
let mut ready = cq.ready() as usize + 1;
SQ.release(ready);

super::complete(cqe);
super::complete(cqe.into());
ready -= 1;

while let Some(cqe) = cq.peek_for_cqe() {
Expand All @@ -101,7 +100,7 @@ unsafe fn complete(mut cq: iou::CompletionQueue<'static>) {
SQ.release(ready);
}

super::complete(cqe);
super::complete(cqe.into());
ready -= 1;
}

Expand Down
13 changes: 10 additions & 3 deletions src/drive/mod.rs
Expand Up @@ -8,8 +8,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use crate::completion;
use crate::completion::complete;
use crate::kernel::SQE;

pub use crate::completion::complete;

/// A completion which will be used to wake the task waiting on this event.
///
Expand All @@ -34,10 +35,16 @@ impl<'cx> Completion<'cx> {
pub trait Drive {
/// Prepare an event on the submission queue.
///
<<<<<<< HEAD
/// The implementer is responsible for provisioning an [`iou::SubmissionQueueEvent`] from the
/// submission queue. Once an SQE is available, the implementer should pass it to the
/// `prepare` callback, which constructs a [`Completion`], and return that `Completion` to the
/// caller.
=======
/// The implementer is responsible for provisioning an [`SQE`] from the submission queue.
/// Once an SQE is available, the implementer should pass it to the `prepare` callback, which
/// constructs a [`Completion`], and return that `Completion` to the caller.
>>>>>>> 21943ce... New low-level API not based on iou.
///
/// If the driver is not ready to receive more events, it can return `Poll::Pending`. If it
/// does, it must register a waker to wake the task when more events can be prepared, otherwise
Expand All @@ -48,7 +55,7 @@ pub trait Drive {
fn poll_prepare<'cx>(
self: Pin<&mut Self>,
ctx: &mut Context<'cx>,
prepare: impl FnOnce(iou::SubmissionQueueEvent<'_>, &mut Context<'cx>) -> Completion<'cx>,
prepare: impl FnOnce(&mut SQE, &mut Context<'cx>) -> Completion<'cx>,
) -> Poll<Completion<'cx>>;

/// Submit all of the events on the submission queue.
Expand All @@ -69,5 +76,5 @@ pub trait Drive {
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
eager: bool,
) -> Poll<io::Result<usize>>;
) -> Poll<io::Result<u32>>;
}
6 changes: 3 additions & 3 deletions src/event/close.rs
@@ -1,7 +1,7 @@
use std::mem::ManuallyDrop;
use std::os::unix::io::RawFd;

use super::{Event, Cancellation};
use super::{Event, SQE, Cancellation};

pub struct Close {
fd: RawFd,
Expand All @@ -14,8 +14,8 @@ impl Close {
}

impl Event for Close {
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) {
uring_sys::io_uring_prep_close(sqe.raw_mut(), self.fd)
unsafe fn prepare(&mut self, sqe: &mut SQE) {
sqe.prep_close(self.fd)
}

unsafe fn cancel(_: &mut ManuallyDrop<Self>) -> Cancellation {
Expand Down
7 changes: 3 additions & 4 deletions src/event/connect.rs
Expand Up @@ -3,7 +3,7 @@ use std::os::unix::io::RawFd;
use std::mem::ManuallyDrop;
use std::net::SocketAddr;

use super::{Event, Cancellation};
use super::{Event, SQE, Cancellation};

pub struct Connect {
pub fd: RawFd,
Expand All @@ -19,9 +19,8 @@ impl Connect {
}

impl Event for Connect {
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) {
let addr = &mut *self.addr as *mut libc::sockaddr_storage as *mut libc::sockaddr;
uring_sys::io_uring_prep_connect(sqe.raw_mut(), self.fd, addr, self.addrlen);
unsafe fn prepare(&mut self, sqe: &mut SQE) {
sqe.prep_connect(self.fd, &mut *self.addr, self.addrlen);
}

unsafe fn cancel(this: &mut ManuallyDrop<Self>) -> Cancellation {
Expand Down
3 changes: 2 additions & 1 deletion src/event/mod.rs
Expand Up @@ -11,6 +11,7 @@ mod writev;
use std::mem::ManuallyDrop;

use crate::cancellation::Cancellation;
use crate::kernel::SQE;

pub use connect::Connect;
pub use close::Close;
Expand Down Expand Up @@ -51,7 +52,7 @@ pub trait Event {
/// In essence implementing prepare, users can write code ass if any heap addresses passed to
/// the kernel have passed ownership of that data to the kernel for the time that the event is
/// completed.
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>);
unsafe fn prepare(&mut self, sqe: &mut SQE);

/// Return the cancellation callback for this event.
///
Expand Down
7 changes: 3 additions & 4 deletions src/event/openat.rs
Expand Up @@ -4,7 +4,7 @@ use std::os::unix::io::RawFd;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;

use super::{Event, Cancellation};
use super::{Event, SQE, Cancellation};

pub struct OpenAt {
path: CString,
Expand All @@ -21,9 +21,8 @@ impl OpenAt {
}

impl Event for OpenAt {
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) {
let path = self.path.as_ptr();
uring_sys::io_uring_prep_openat(sqe.raw_mut(), self.dfd, path, self.flags, self.mode);
unsafe fn prepare(&mut self, sqe: &mut SQE) {
sqe.prep_openat(self.dfd, self.path.as_ptr(), self.flags, self.mode)
}

unsafe fn cancel(this: &mut ManuallyDrop<Self>) -> Cancellation {
Expand Down
8 changes: 4 additions & 4 deletions src/event/read.rs
Expand Up @@ -2,23 +2,23 @@ use std::os::unix::io::AsRawFd;
use std::mem::ManuallyDrop;
use std::marker::Unpin;

use super::{Event, Cancellation};
use super::{Event, SQE, Cancellation};

/// A basic read event.
pub struct Read<'a, T> {
pub io: &'a T,
pub buf: Vec<u8>,
pub offset: usize
pub offset: u64
}

impl<'a, T: AsRawFd + Unpin> Read<'a, T> {
pub fn new(io: &'a T, buf: Vec<u8>, offset: usize) -> Read<T> {
pub fn new(io: &'a T, buf: Vec<u8>, offset: u64) -> Read<T> {
Read { io, buf, offset }
}
}

impl<'a, T: AsRawFd + Unpin> Event for Read<'a, T> {
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) {
unsafe fn prepare(&mut self, sqe: &mut SQE) {
sqe.prep_read(self.io.as_raw_fd(), &mut self.buf[..], self.offset);
}

Expand Down
8 changes: 4 additions & 4 deletions src/event/write.rs
Expand Up @@ -2,23 +2,23 @@ use std::os::unix::io::AsRawFd;
use std::mem::ManuallyDrop;
use std::marker::Unpin;

use super::{Event, Cancellation};
use super::{Event, SQE, Cancellation};

/// A basic write event.
pub struct Write<'a, T> {
pub io: &'a T,
pub buf: Vec<u8>,
pub offset: usize
pub offset: u64
}

impl<'a, T: AsRawFd + Unpin> Write<'a, T> {
pub fn new(io: &'a T, buf: Vec<u8>, offset: usize) -> Write<T> {
pub fn new(io: &'a T, buf: Vec<u8>, offset: u64) -> Write<T> {
Write { io, buf, offset }
}
}

impl<'a, T: AsRawFd + Unpin> Event for Write<'a, T> {
unsafe fn prepare(&mut self, sqe: &mut iou::SubmissionQueueEvent<'_>) {
unsafe fn prepare(&mut self, sqe: &mut SQE) {
sqe.prep_write(self.io.as_raw_fd(), self.buf.as_ref(), self.offset);
}

Expand Down

0 comments on commit fdd4ebc

Please sign in to comment.