Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ProvideBuffers API for buffer preregistration #34

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ parking_lot = "0.10.2"
once_cell = "1.3.1"
libc = "0.2.71"
uring-sys = "0.6.1"
iou = "0.0.0-ringbahn.1"
iou = { path = "../iou" }

[dev-dependencies]
tempfile = "3.1.0"
Expand Down
2 changes: 1 addition & 1 deletion examples/read-event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fn main() -> io::Result<()> {
let event = event::Read::new(&file, vec![0; meta.len() as usize], 0);
let submission = Submission::new(event, driver);
let content = futures::executor::block_on(async move {
let (event, result) = submission.await;
let (event, result, ..) = submission.await;
let bytes_read = result?;
let s = String::from_utf8_lossy(&event.buf[0..bytes_read]).to_string();
io::Result::Ok(s)
Expand Down
209 changes: 119 additions & 90 deletions src/buf.rs
Original file line number Diff line number Diff line change
@@ -1,80 +1,108 @@
use std::alloc::{alloc, dealloc, handle_alloc_error, Layout};
use std::io;
use std::cmp;
use std::mem;
use std::ptr::NonNull;
use std::marker::Unpin;
use std::mem::{MaybeUninit, ManuallyDrop};
use std::pin::Pin;
use std::task::{Poll, Context};
use std::slice;
use std::task::Poll;

use futures_core::ready;
use crate::Cancellation;
use crate::drive::{Drive, ProvideBufferSealed};
use crate::Ring;

pub struct Buffer {
data: NonNull<()>,
storage: Storage,
capacity: u32,
pub struct Buffer<D: Drive> {
storage: Storage<D>,
pos: u32,
cap: u32,
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum Storage {
Nothing = 0,
Buffer,
Statx,
enum Storage<D: Drive> {
Read(ManuallyDrop<D::ReadBuf>),
Write(ManuallyDrop<D::WriteBuf>),
Statx(ManuallyDrop<Box<MaybeUninit<libc::statx>>>),
Empty,
}

impl Buffer {
pub fn new() -> Buffer {
impl<D: Drive> Buffer<D> {
pub fn new() -> Buffer<D> {
Buffer {
data: NonNull::dangling(),
storage: Storage::Nothing,
capacity: 4096 * 2,
storage: Storage::Empty,
pos: 0,
cap: 0,
}
}

#[inline(always)]
pub fn buffered_from_read(&self) -> &[u8] {
if self.storage == Storage::Buffer {
if let Storage::Read(buf) = &self.storage {
unsafe {
let data: *mut u8 = self.data.cast().as_ptr();
let cap = self.cap - self.pos;
slice::from_raw_parts(data.offset(self.pos as isize), cap as usize)
let ptr: *mut u8 = buf.as_slice().as_ptr() as *mut u8;
let cap = (self.cap - self.pos) as usize;
slice::from_raw_parts(ptr.offset(self.pos as isize), cap)
}
} else {
&[]
}
}

// invariant: if fill returns N, it must actually have filled read buf up to n bytes
#[inline]
pub fn fill_buf(&mut self, fill: impl FnOnce(&mut [u8]) -> Poll<io::Result<u32>>)
-> Poll<io::Result<&[u8]>>
{
match self.storage {
Storage::Buffer => {
pub unsafe fn fill_read_buf(
&mut self,
ctx: &mut Context<'_>,
mut ring: Pin<&mut Ring<D>>,
fill: impl FnOnce(Pin<&mut Ring<D>>, &mut Context<'_>, &mut D::ReadBuf) -> Poll<(io::Result<usize>, u32)>,
) -> Poll<io::Result<&[u8]>> {
if matches!(self.storage, Storage::Empty) {
ready!(self.alloc_read_buf(ctx, ring.as_mut()))?;
}
match &mut self.storage {
Storage::Read(buf) => {
if self.pos >= self.cap {
let buf = unsafe {
slice::from_raw_parts_mut(self.data.cast().as_ptr(), self.capacity as usize)
};
self.cap = ready!(fill(buf))?;
buf.return_to_group(ring.as_mut().driver_pinned());

let (result, flags) = ready!(fill(ring.as_mut(), ctx, &mut *buf));

self.cap = result? as u32;
self.pos = 0;
}

if flags & 1 == 1 {
buf.access_from_group(ring.driver_pinned(), (flags >> 16) as u16);
}
}
Poll::Ready(Ok(self.buffered_from_read()))
}
Storage::Nothing => {
self.cap = ready!(fill(self.alloc_buf()))?;
Poll::Ready(Ok(self.buffered_from_read()))
_ => panic!("attempted to fill read buf while holding other buf"),
}
}

#[inline]
pub fn fill_write_buf(
&mut self,
ctx: &mut Context<'_>,
mut ring: Pin<&mut Ring<D>>,
data: &[u8],
) -> Poll<io::Result<&mut D::WriteBuf>> {
if matches!(self.storage, Storage::Empty) {
ready!(self.alloc_write_buf(ctx, ring.as_mut()))?;
}
match &mut self.storage {
Storage::Write(buf) => {
if self.pos == 0 {
unsafe { buf.fill(data) };
self.pos = 1;
}
Poll::Ready(Ok(&mut *buf))
}
_ => panic!("attempted to fill buf while not holding buffer"),
_ => panic!("attempted to fill read buf while holding other buf"),
}
}

#[inline(always)]
pub fn consume(&mut self, amt: usize) {
self.pos = cmp::min(self.pos + amt as u32, self.cap);
pub fn consume(self: Pin<&mut Self>, amt: usize) {
let this = unsafe { Pin::get_unchecked_mut(self) };
this.pos = cmp::min(this.pos + amt as u32, this.cap);
}

#[inline(always)]
Expand All @@ -83,84 +111,85 @@ impl Buffer {
self.cap = 0;
}

pub fn cancellation(&mut self) -> Cancellation {
match self.storage {
Storage::Buffer => {
self.clear();
self.storage = Storage::Nothing;
let data = mem::replace(&mut self.data, NonNull::dangling());
unsafe { Cancellation::buffer(data.cast().as_ptr(), self.capacity as usize) }
pub fn cancellation(&mut self, driver: Pin<&mut D>) -> Cancellation {
let cancellation = match &mut self.storage {
Storage::Read(buf) => unsafe {
ProvideBufferSealed::cleanup(ManuallyDrop::new(ManuallyDrop::take(buf)), driver)
}
Storage::Write(buf) => unsafe {
ProvideBufferSealed::cleanup(ManuallyDrop::new(ManuallyDrop::take(buf)), driver)
}
Storage::Statx => {
unsafe fn callback(statx: *mut (), _: usize) {
Storage::Statx(statx) => {
unsafe fn callback(statx: *mut (), _: usize, _: u32) {
dealloc(statx as *mut u8, Layout::new::<libc::statx>())
}

self.storage = Storage::Nothing;
let data = mem::replace(&mut self.data, NonNull::dangling());
unsafe {
Cancellation::new(data.cast().as_ptr(), 0, callback)
let statx = Box::into_raw(ManuallyDrop::take(statx));
Cancellation::new(statx as *mut (), 0, callback)
}
}
Storage::Nothing => Cancellation::null(),
}
Storage::Empty => Cancellation::null(),
};
self.storage = Storage::Empty;
cancellation
}

pub(crate) fn as_statx(&mut self) -> *mut libc::statx {
match self.storage {
Storage::Statx => self.data.cast().as_ptr(),
Storage::Nothing => self.alloc_statx(),
_ => panic!("accessed buffer as statx when storing something else"),
match &mut self.storage {
Storage::Statx(statx) => statx.as_mut_ptr(),
Storage::Empty => {
self.alloc_statx();
if let Storage::Statx(statx) = &mut self.storage {
statx.as_mut_ptr()
} else { unreachable!() }
}
_ => panic!("accessed buffer as statx when storing something else"),
}
}

fn alloc_buf(&mut self) -> &mut [u8] {
self.storage = Storage::Buffer;
self.alloc();
unsafe {
slice::from_raw_parts_mut(self.data.cast().as_ptr(), self.capacity as usize)
}
fn alloc_read_buf(&mut self, ctx: &mut Context<'_>, ring: Pin<&mut Ring<D>>)
-> Poll<io::Result<()>>
{
let buf = ready!(ring.driver_pinned().poll_provide_read_buf(ctx, 4096 * 2))?;
self.storage = Storage::Read(ManuallyDrop::new(buf));
Poll::Ready(Ok(()))
}

fn alloc_statx(&mut self) -> &mut libc::statx {
self.storage = Storage::Statx;
self.alloc();
unsafe {
&mut *self.data.cast().as_ptr()
}
fn alloc_write_buf(&mut self, ctx: &mut Context<'_>, ring: Pin<&mut Ring<D>>)
-> Poll<io::Result<()>>
{
let buf = ready!(ring.driver_pinned().poll_provide_write_buf(ctx, 4096 * 2))?;
self.storage = Storage::Write(ManuallyDrop::new(buf));
Poll::Ready(Ok(()))
}

fn alloc(&mut self) {
fn alloc_statx(&mut self) {
unsafe {
let layout = self.layout().unwrap();
let ptr = alloc(layout);
if ptr.is_null() {
self.storage = Storage::Nothing;
handle_alloc_error(layout)
let layout = Layout::new::<libc::statx>();
let statx = alloc(layout);
if statx.is_null() {
handle_alloc_error(layout);
}
self.data = NonNull::new_unchecked(ptr).cast();
}
}

#[inline(always)]
fn layout(&self) -> Option<Layout> {
match self.storage {
Storage::Statx => Some(Layout::new::<libc::statx>()),
Storage::Buffer => Some(Layout::array::<u8>(self.capacity as usize).unwrap()),
Storage::Nothing => None,
self.storage = Storage::Statx(ManuallyDrop::new(Box::from_raw(statx as *mut _)));
}
}
}

unsafe impl Send for Buffer { }
unsafe impl Sync for Buffer { }
unsafe impl<D: Drive> Send for Buffer<D> where D::ReadBuf: Send, D::WriteBuf: Send { }
unsafe impl<D: Drive> Sync for Buffer<D> where D::ReadBuf: Sync, D::WriteBuf: Sync { }

impl Drop for Buffer {
impl<D: Drive> Drop for Buffer<D> {
fn drop(&mut self) {
if let Some(layout) = self.layout() {
unsafe {
dealloc(self.data.cast().as_ptr(), layout);
unsafe {
match &mut self.storage {
Storage::Read(buf) => ManuallyDrop::drop(buf),
Storage::Write(buf) => ManuallyDrop::drop(buf),
Storage::Statx(statx) => ManuallyDrop::drop(statx),
Storage::Empty => return,
}
}
}
}

impl<D: Drive> Unpin for Buffer<D> { }
16 changes: 11 additions & 5 deletions src/cancellation.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::mem::ManuallyDrop;
use std::ptr;

/// A cancellation callback to clean up resources when IO gets cancelled.
Expand All @@ -9,7 +10,7 @@ use std::ptr;
pub struct Cancellation {
data: *mut (),
metadata: usize,
drop: unsafe fn(*mut (), usize),
drop: unsafe fn(*mut (), usize, u32),
}

impl Cancellation {
Expand All @@ -25,21 +26,26 @@ impl Cancellation {
/// as well.
///
/// It must be safe to send the Cancellation type and references to it between threads.
pub unsafe fn new(data: *mut (), metadata: usize, drop: unsafe fn(*mut (), usize))
pub unsafe fn new(data: *mut (), metadata: usize, drop: unsafe fn(*mut (), usize, u32))
-> Cancellation
{
Cancellation { data, metadata, drop }
}

/// Construct a null cancellation, which does nothing when it is dropped.
pub fn null() -> Cancellation {
unsafe fn drop(_: *mut (), _: usize) { }
unsafe fn drop(_: *mut (), _: usize, _: u32) { }
Cancellation { data: ptr::null_mut(), metadata: 0, drop }
}

pub fn cancel(self, cqe_flags: u32) {
let this = ManuallyDrop::new(self);
unsafe { (this.drop)(this.data, this.metadata, cqe_flags) }
}


pub(crate) unsafe fn buffer(data: *mut u8, len: usize) -> Cancellation {
unsafe fn drop(data: *mut (), len: usize) {
unsafe fn drop(data: *mut (), len: usize, _: u32) {
std::mem::drop(Vec::from_raw_parts(data as *mut u8, len, len))
}

Expand All @@ -53,7 +59,7 @@ unsafe impl Sync for Cancellation { }
impl Drop for Cancellation {
fn drop(&mut self) {
unsafe {
(self.drop)(self.data, self.metadata)
(self.drop)(self.data, self.metadata, 0)
}
}
}
Loading