Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defer work until polled #165

Closed
wants to merge 20 commits into from
2 changes: 1 addition & 1 deletion examples/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ fn main() {
}

// Include a new line
println!("");
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
println!();
});
}
2 changes: 1 addition & 1 deletion examples/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn main() {
}

let (res, slice) = stream.write_all(buf.slice(..read)).await;
let _ = res.unwrap();
res.unwrap();
ollie-etl marked this conversation as resolved.
Show resolved Hide resolved
buf = slice.into_inner();
println!("{} all {} bytes ping-ponged", socket_addr, read);
n += read;
Expand Down
2 changes: 1 addition & 1 deletion src/driver/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct Accept {
}

impl Op<Accept> {
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Op<Accept>> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Op creation is infallible

pub(crate) fn accept(fd: &SharedFd) -> Op<Accept> {
use io_uring::{opcode, types};

let socketaddr = Box::new((
Expand Down
17 changes: 11 additions & 6 deletions src/driver/close.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,34 @@
use crate::driver::Op;

use crate::driver::op::{self, Completable};
use std::io;
use std::os::unix::io::RawFd;
use std::{io, os::unix::io::RawFd};

pub(crate) struct Close {
fd: RawFd,
}

impl Op<Close> {
/// Close a file descriptor
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments unrelated to the PR, but helped me understand the whole Close shutdown saga whilst implementing

///
/// Close is special, in that it does not wait until it is Polled
/// before placing the Op on the submission queue. This is to ensure
/// that if the Driver is Dropped with the Operation incomplete,
/// the drop logic of the driver will ensure we do not leak resources
pub(crate) fn close(fd: RawFd) -> io::Result<Op<Close>> {
use io_uring::{opcode, types};

Op::try_submit_with(Close { fd }, |close| {
let op = Op::submit_with(Close { fd }, |close| {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create a Close Op: infallible

opcode::Close::new(types::Fd(close.fd)).build()
})
});

op.enqueue()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

immediately enqueue the Op: fallible

}
}

impl Completable for Close {
type Output = io::Result<()>;

fn complete(self, cqe: op::CqeResult) -> Self::Output {
let _ = cqe.result?;

Ok(())
}
}
2 changes: 1 addition & 1 deletion src/driver/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub(crate) struct Connect {

impl Op<Connect> {
/// Submit a request to connect.
pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result<Op<Connect>> {
pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> Op<Connect> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Op creation is infallible

use io_uring::{opcode, types};

Op::submit_with(
Expand Down
4 changes: 2 additions & 2 deletions src/driver/fsync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ pub(crate) struct Fsync {
}

impl Op<Fsync> {
pub(crate) fn fsync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
pub(crate) fn fsync(fd: &SharedFd) -> Op<Fsync> {
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd())).build()
})
}

pub(crate) fn datasync(fd: &SharedFd) -> io::Result<Op<Fsync>> {
pub(crate) fn datasync(fd: &SharedFd) -> Op<Fsync> {
Op::submit_with(Fsync { fd: fd.clone() }, |fsync| {
opcode::Fsync::new(types::Fd(fsync.fd.raw_fd()))
.flags(types::FsyncFlags::DATASYNC)
Expand Down
16 changes: 9 additions & 7 deletions src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ mod writev;
use crate::driver::op::Lifecycle;
use io_uring::opcode::AsyncCancel;
use io_uring::IoUring;
use slab::Slab;
use slab::{Slab, VacantEntry};
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};

Expand Down Expand Up @@ -239,8 +239,8 @@ impl Ops {
}

// Insert a new operation
fn insert(&mut self) -> usize {
self.lifecycle.insert(op::Lifecycle::Submitted)
fn insert(&mut self) -> VacantEntry<'_, op::Lifecycle> {
self.lifecycle.vacant_entry()
}

// Remove an operation
Expand All @@ -258,9 +258,11 @@ impl Ops {

impl Drop for Ops {
fn drop(&mut self) {
assert!(self
.lifecycle
.iter()
.all(|(_, cycle)| matches!(cycle, Lifecycle::Completed(_))))
for (_, cycle) in self.lifecycle.iter() {
match cycle {
Lifecycle::Completed(_) | Lifecycle::Pending(_) => {}
c => unreachable!("Lifecycle {:?} found during driver drop", c),
}
}
Comment on lines +261 to +266
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stricvly unrelated, but gives better panic messages when something goes wrong during debug. can revert or PR elsewhere if preferred.

}
}
4 changes: 2 additions & 2 deletions src/driver/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::io;
pub struct NoOp {}

impl Op<NoOp> {
pub fn no_op() -> io::Result<Op<NoOp>> {
pub fn no_op() -> Op<NoOp> {
use io_uring::opcode;

Op::submit_with(NoOp {}, |_| opcode::Nop::new().build())
Expand All @@ -30,7 +30,7 @@ mod test {
use crate as tokio_uring;

#[test]
fn perform_no_op() -> () {
fn perform_no_op() {
tokio_uring::start(async {
tokio_uring::no_op().await.unwrap();
})
Expand Down
141 changes: 102 additions & 39 deletions src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};

use io_uring::{cqueue, squeue};
use io_uring::{
cqueue,
squeue::{self, Entry},
};

mod slab_list;

use slab::Slab;
use slab_list::{SlabListEntry, SlabListIndices};

use crate::driver;
use crate::runtime::CONTEXT;
use crate::util::PhantomUnsendUnsync;

Expand Down Expand Up @@ -46,7 +48,10 @@ pub(crate) trait Completable {
}

pub(crate) enum Lifecycle {
/// The operation has been submitted to uring and is currently in-flight
/// The operation has been created, and will be submitted when polled
Pending(Entry),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The meat of the PR


/// The operation has been manually submitted
Submitted,

/// The submitter is waiting for the completion of the operation
Expand All @@ -64,6 +69,19 @@ pub(crate) enum Lifecycle {
CompletionList(SlabListIndices),
}

impl std::fmt::Debug for Lifecycle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending(_) => f.debug_tuple("Pending").finish(),
Self::Submitted => f.debug_tuple("Submitted").finish(),
Self::Waiting(_) => f.debug_tuple("Waiting").finish(),
Self::Ignored(_) => f.debug_tuple("Ignored").finish(),
Self::Completed(_) => f.debug_tuple("Completed").finish(),
Self::CompletionList(_) => f.debug_tuple("CompletionList").finish(),
}
}
}
Comment on lines +72 to +83
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Debug only. Again, can revert or PR elsewhere if preferred.


/// A single CQE entry
pub(crate) struct CqeResult {
pub(crate) result: io::Result<u32>,
Expand All @@ -83,57 +101,72 @@ impl From<cqueue::Entry> for CqeResult {
}
}

impl<T, CqeType> Op<T, CqeType>
where
T: Completable,
{
impl<T, CqeType> Op<T, CqeType> {
/// Create a new operation
fn new(data: T, inner: &mut driver::Driver) -> Self {
fn new(data: T, index: usize) -> Self {
Op {
index: inner.ops.insert(),
index,
data: Some(data),
_cqe_type: PhantomData,
_phantom: PhantomData,
}
}

/// Submit an operation to uring.
/// Submit an operation to the driver.
///
/// `state` is stored during the operation tracking any state submitted to
/// `data` is stored during the operation tracking any state for submission to
/// the kernel.
pub(super) fn submit_with<F>(data: T, f: F) -> io::Result<Self>
pub(super) fn submit_with<F>(mut data: T, f: F) -> Self
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submission to the driver is infallible. This now places the seq for submission into the Pending state

where
F: FnOnce(&mut T) -> squeue::Entry,
{
CONTEXT.with(|cx| {
cx.with_driver_mut(|driver| {
// Create the operation
let mut op = Op::new(data, driver);
// Get an vacent entry in the Lifecycle slab
let entry = driver.ops.insert();

// Configure the SQE
let sqe = f(op.data.as_mut().unwrap()).user_data(op.index as _);
// Configure the Sqe for submission
let sqe = f(&mut data).user_data(entry.key() as _);

// Push the new operation
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
// If the submission queue is full, flush it to the kernel
driver.submit()?;
}
// Create a pending Lifecycle entry for the Op
let op = Op::new(data, entry.key());
entry.insert(Lifecycle::Pending(sqe));

Ok(op)
op
})
})
}

/// Try submitting an operation to uring
pub(super) fn try_submit_with<F>(data: T, f: F) -> io::Result<Self>
where
F: FnOnce(&mut T) -> squeue::Entry,
{
if CONTEXT.with(|cx| cx.is_set()) {
Op::submit_with(data, f)
} else {
Err(io::ErrorKind::Other.into())
}
/// Enqueue an operation on the submission ring, without `await`ing the future
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could end the sentence with a period, to be consistent with other changes to this file.

///
/// This is useful when any failure in io_uring submission needs
/// to be handled at the point of submission.
pub(super) fn enqueue(self) -> io::Result<Self> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does nothing if called on an already submitted operation. TODO: improve docs

CONTEXT
.with(|runtime_context| {
runtime_context.with_driver_mut(|driver| {
let (lifecycle, _) = driver
.ops
.get_mut(self.index)
.expect("invalid internal state");

match std::mem::replace(lifecycle, Lifecycle::Submitted) {
Lifecycle::Pending(sqe) => {
// Try to push the new operation
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
// Fail with an IoError if encountered
driver.submit()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lifecycle entry will stay in the Submitted state even if this fails?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, yes. Bug. It should be completed

}
}
lc => {
let lifecycle = driver.ops.get_mut(self.index).unwrap().0;
*lifecycle = lc;
}
}
Ok(())
})
})
.map(|_| self)
}
}

Expand All @@ -155,7 +188,26 @@ where
.get_mut(me.index)
.expect("invalid internal state");

match mem::replace(lifecycle, Lifecycle::Submitted) {
match mem::replace(lifecycle, Lifecycle::Ignored(Box::new(()))) {
Lifecycle::Pending(sqe) => {
// Try to push the new operation
while unsafe { driver.uring.submission().push(&sqe).is_err() } {
// If the sqe is full, flush to kernel
if let Err(e) = driver.submit() {
// Fail if an IoError in encountered
let cqe = CqeResult {
result: Err(e),
flags: 0,
};
driver.ops.remove(me.index);
me.index = usize::MAX;
return Poll::Ready(me.data.take().unwrap().complete(cqe));
}
}
let lifecycle = driver.ops.get_mut(me.index).unwrap().0;
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
}
Lifecycle::Submitted => {
*lifecycle = Lifecycle::Waiting(cx.waker().clone());
Poll::Pending
Expand Down Expand Up @@ -202,11 +254,11 @@ impl<T, CqeType> Drop for Op<T, CqeType> {
}
};

match mem::replace(lifecycle, Lifecycle::Submitted) {
match mem::replace(lifecycle, Lifecycle::Ignored(Box::new(()))) {
Lifecycle::Submitted | Lifecycle::Waiting(_) => {
*lifecycle = Lifecycle::Ignored(Box::new(self.data.take()));
}
Lifecycle::Completed(..) => {
Lifecycle::Completed(..) | Lifecycle::Pending(_) => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the op is dropped before it is submitted to the ring, no work on the ring is scheduled.

driver.ops.remove(self.index);
}
Lifecycle::CompletionList(indices) => {
Expand Down Expand Up @@ -234,7 +286,13 @@ impl Lifecycle {
pub(super) fn complete(&mut self, completions: &mut Slab<Completion>, cqe: CqeResult) -> bool {
use std::mem;

match mem::replace(self, Lifecycle::Submitted) {
match mem::replace(self, Lifecycle::Ignored(Box::new(()))) {
Lifecycle::Pending(..) => {
// Pending Operations have not submitted to the ring
// They should not be receiving completions
unreachable!("Completion for pending Op")
}

x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => {
if io_uring::cqueue::more(cqe.flags) {
let mut list = SlabListIndices::new().into_list(completions);
Expand Down Expand Up @@ -433,6 +491,8 @@ mod test {
release();
}

/// Create a data, and an Op containing that data,
/// which is mocked as submitted on the ring
fn init() -> (Op<Rc<()>>, Rc<()>) {
use crate::driver::Driver;

Expand All @@ -441,10 +501,13 @@ mod test {

let op = CONTEXT.with(|cx| {
cx.set_driver(driver);

cx.with_driver_mut(|driver| Op::new(data.clone(), driver))
cx.with_driver_mut(|driver| {
let entry = driver.ops.insert();
let op = Op::new(data.clone(), entry.key());
entry.insert(Lifecycle::Submitted);
op
})
});

(op, data)
}

Expand Down
4 changes: 2 additions & 2 deletions src/driver/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Op<Open> {
| options.creation_mode()?
| (options.custom_flags & !libc::O_ACCMODE);

Op::submit_with(Open { path, flags }, |open| {
Ok(Op::submit_with(Open { path, flags }, |open| {
// Get a reference to the memory. The string will be held by the
// operation state and will not be accessed again until the operation
// completes.
Expand All @@ -33,7 +33,7 @@ impl Op<Open> {
.flags(flags)
.mode(options.mode)
.build()
})
}))
}
}

Expand Down
Loading