Skip to content

Commit

Permalink
Improve code around closing.
Browse files Browse the repository at this point in the history
All IO objects will be closed in a blocking fashion if they have not
been closed asynchronously (closes #46). Also, they will panic if
the user attempts to perform IO after they have been closed.

UnixStream is made a thin wrapper around TcpStream instead of
duplicating a lot of code.
  • Loading branch information
withoutboats committed Sep 24, 2020
1 parent d16e4dd commit 6660fa1
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 91 deletions.
12 changes: 11 additions & 1 deletion src/fs.rs
Expand Up @@ -36,6 +36,7 @@ enum Op {
Close,
Nothing,
Statx,
Closed,
}

impl File {
Expand Down Expand Up @@ -94,6 +95,9 @@ impl<D: Drive> File<D> {

fn guard_op(self: Pin<&mut Self>, op: Op) {
let this = unsafe { Pin::get_unchecked_mut(self) };
if this.active == Op::Closed {
panic!("Attempted to perform IO on a closed File");
}
if this.active != Op::Nothing && this.active != op {
this.cancel();
}
Expand Down Expand Up @@ -147,6 +151,10 @@ impl<D: Drive> File<D> {
fn pos(self: Pin<&mut Self>) -> Pin<&mut u64> {
unsafe { Pin::map_unchecked_mut(self, |this| &mut this.pos) }
}

fn confirm_close(self: Pin<&mut Self>) {
unsafe { Pin::get_unchecked_mut(self).active = Op::Closed; }
}
}

impl<D: Drive> AsyncRead for File<D> {
Expand Down Expand Up @@ -207,11 +215,12 @@ impl<D: Drive> AsyncWrite for File<D> {
fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.as_mut().guard_op(Op::Close);
let fd = self.fd;
ready!(self.ring().poll(ctx, true, 1, |sqs| unsafe {
ready!(self.as_mut().ring().poll(ctx, true, 1, |sqs| unsafe {
let mut sqe = sqs.single().unwrap();
sqe.prep_close(fd);
sqe
}))?;
self.confirm_close();
Poll::Ready(Ok(()))
}
}
Expand Down Expand Up @@ -271,6 +280,7 @@ impl<D: Drive> From<File<D>> for fs::File {
impl<D: Drive> Drop for File<D> {
fn drop(&mut self) {
match self.active {
Op::Closed => { }
Op::Nothing => unsafe { libc::close(self.fd); },
_ => self.cancel(),
}
Expand Down
11 changes: 11 additions & 0 deletions src/net/listener.rs
Expand Up @@ -26,6 +26,7 @@ enum Op {
Nothing = 0,
Accept,
Close,
Closed,
}

impl TcpListener {
Expand Down Expand Up @@ -60,6 +61,9 @@ impl<D: Drive> TcpListener<D> {

fn guard_op(self: Pin<&mut Self>, op: Op) {
let this = unsafe { Pin::get_unchecked_mut(self) };
if this.active == Op::Closed {
panic!("Attempted to perform IO on a closed TcpListener");
}
if this.active != Op::Nothing && this.active != op {
this.cancel();
}
Expand All @@ -78,6 +82,7 @@ impl<D: Drive> TcpListener<D> {
}
}
Op::Close => Cancellation::null(),
Op::Closed => return,
Op::Nothing => return,
};
self.active = Op::Nothing;
Expand All @@ -101,6 +106,10 @@ impl<D: Drive> TcpListener<D> {
(Pin::new_unchecked(&mut this.ring), &mut **this.addr.as_mut().unwrap())
}
}

fn confirm_close(self: Pin<&mut Self>) {
unsafe { Pin::get_unchecked_mut(self).active = Op::Closed; }
}
}

impl<D: Drive + Clone> TcpListener<D> {
Expand Down Expand Up @@ -176,6 +185,7 @@ impl<D: Drive + Clone> TcpListener<D> {
impl<D: Drive> Drop for TcpListener<D> {
fn drop(&mut self) {
match self.active {
Op::Closed => { }
Op::Nothing => unsafe { libc::close(self.fd); }
_ => self.cancel(),
}
Expand Down Expand Up @@ -259,6 +269,7 @@ impl<'a, D: Drive> Future for Close<'a, D> {
sqe.prep_close(fd);
sqe
}))?;
self.socket.as_mut().confirm_close();
Poll::Ready(Ok(()))
}
}
23 changes: 21 additions & 2 deletions src/net/stream.rs
Expand Up @@ -31,6 +31,7 @@ enum Op {
Write,
Close,
Nothing,
Closed,
}

impl TcpStream {
Expand All @@ -51,7 +52,7 @@ impl<D: Drive + Clone> TcpStream<D> {
}

impl<D: Drive> TcpStream<D> {
pub(super) fn from_fd(fd: RawFd, ring: Ring<D>) -> TcpStream<D> {
pub(crate) fn from_fd(fd: RawFd, ring: Ring<D>) -> TcpStream<D> {
TcpStream {
buf: Buffer::new(),
active: Op::Nothing,
Expand All @@ -61,6 +62,9 @@ impl<D: Drive> TcpStream<D> {

fn guard_op(self: Pin<&mut Self>, op: Op) {
let this = unsafe { Pin::get_unchecked_mut(self) };
if this.active == Op::Closed {
panic!("Attempted to perform IO on a closed stream");
}
if this.active != Op::Nothing && this.active != op {
this.cancel();
}
Expand Down Expand Up @@ -89,6 +93,10 @@ impl<D: Drive> TcpStream<D> {
(Pin::new_unchecked(&mut this.ring), &mut this.buf)
}
}

fn confirm_close(self: Pin<&mut Self>) {
unsafe { Pin::get_unchecked_mut(self).active = Op::Closed; }
}
}

pub struct Connect<D: Drive = DemoDriver<'static>>(
Expand Down Expand Up @@ -173,11 +181,22 @@ impl<D: Drive> AsyncWrite for TcpStream<D> {
fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.as_mut().guard_op(Op::Close);
let fd = self.fd;
ready!(self.ring().poll(ctx, true, 1, |sqs| unsafe {
ready!(self.as_mut().ring().poll(ctx, true, 1, |sqs| unsafe {
let mut sqe = sqs.single().unwrap();
sqe.prep_close(fd);
sqe
}))?;
self.confirm_close();
Poll::Ready(Ok(()))
}
}

impl<D: Drive> Drop for TcpStream<D> {
fn drop(&mut self) {
match self.active {
Op::Closed => { }
Op::Nothing => unsafe { libc::close(self.fd); },
_ => self.cancel(),
}
}
}
12 changes: 11 additions & 1 deletion src/unix/listener.rs
Expand Up @@ -25,6 +25,7 @@ enum Op {
Nothing = 0,
Accept,
Close,
Closed,
}

impl UnixListener {
Expand Down Expand Up @@ -56,14 +57,17 @@ impl<D: Drive> UnixListener<D> {

fn guard_op(self: Pin<&mut Self>, op: Op) {
let this = unsafe { Pin::get_unchecked_mut(self) };
if this.active == Op::Closed {
panic!("Attempted to perform IO on a closed UnixListener");
}
if this.active != Op::Nothing && this.active != op {
this.cancel();
}
this.active = op;
}

fn cancel(&mut self) {
if self.active != Op::Nothing {
if !matches!(self.active, Op::Nothing | Op::Closed) {
self.active = Op::Nothing;
self.ring.cancel(Cancellation::null());
}
Expand All @@ -72,6 +76,10 @@ impl<D: Drive> UnixListener<D> {
fn ring(self: Pin<&mut Self>) -> Pin<&mut Ring<D>> {
unsafe { Pin::map_unchecked_mut(self, |this| &mut this.ring) }
}

fn confirm_close(self: Pin<&mut Self>) {
unsafe { Pin::get_unchecked_mut(self).active = Op::Closed; }
}
}

impl<D: Drive + Clone> UnixListener<D> {
Expand Down Expand Up @@ -109,6 +117,7 @@ impl<D: Drive + Clone> UnixListener<D> {
impl<D: Drive> Drop for UnixListener<D> {
fn drop(&mut self) {
match self.active {
Op::Closed => { }
Op::Nothing => unsafe { libc::close(self.fd); }
_ => self.cancel(),
}
Expand Down Expand Up @@ -162,6 +171,7 @@ impl<'a, D: Drive> Future for Close<'a, D> {
sqe.prep_close(fd);
sqe
}))?;
self.socket.as_mut().confirm_close();
Poll::Ready(Ok(()))
}
}
103 changes: 16 additions & 87 deletions src/unix/stream.rs
Expand Up @@ -10,27 +10,17 @@ use futures_io::{AsyncRead, AsyncBufRead, AsyncWrite};
use iou::sqe::SockAddr;
use nix::sys::socket::UnixAddr;

use crate::buf::Buffer;
use crate::drive::demo::DemoDriver;
use crate::{Drive, Ring};
use crate::event;
use crate::Submission;

use super::{socket, socketpair};

pub struct UnixStream<D: Drive = DemoDriver<'static>> {
ring: Ring<D>,
buf: Buffer,
active: Op,
fd: RawFd,
}
use crate::net::TcpStream;

#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Op {
Read,
Write,
Close,
Nothing,
pub struct UnixStream<D: Drive = DemoDriver<'static>> {
inner: TcpStream<D>,
}

impl UnixStream {
Expand Down Expand Up @@ -64,41 +54,13 @@ impl<D: Drive + Clone> UnixStream<D> {
impl<D: Drive> UnixStream<D> {
pub(super) fn from_fd(fd: RawFd, ring: Ring<D>) -> UnixStream<D> {
UnixStream {
buf: Buffer::new(),
active: Op::Nothing,
fd, ring,
}
}

fn guard_op(self: Pin<&mut Self>, op: Op) {
let this = unsafe { Pin::get_unchecked_mut(self) };
if this.active != Op::Nothing && this.active != op {
this.cancel();
inner: TcpStream::from_fd(fd, ring),
}
this.active = op;
}

fn cancel(&mut self) {
self.active = Op::Nothing;
self.ring.cancel(self.buf.cancellation());
}

#[inline(always)]
fn ring(self: Pin<&mut Self>) -> Pin<&mut Ring<D>> {
unsafe { Pin::map_unchecked_mut(self, |this| &mut this.ring) }
}

#[inline(always)]
fn buf(self: Pin<&mut Self>) -> Pin<&mut Buffer> {
unsafe { Pin::map_unchecked_mut(self, |this| &mut this.buf) }
}

#[inline(always)]
fn split(self: Pin<&mut Self>) -> (Pin<&mut Ring<D>>, &mut Buffer) {
unsafe {
let this = Pin::get_unchecked_mut(self);
(Pin::new_unchecked(&mut this.ring), &mut this.buf)
}
fn inner(self: Pin<&mut Self>) -> Pin<&mut TcpStream<D>> {
unsafe { Pin::map_unchecked_mut(self, |this| &mut this.inner) }
}
}

Expand Down Expand Up @@ -129,66 +91,33 @@ impl<D: Drive + Clone> Future for Connect<D> {
}

impl<D: Drive> AsyncRead for UnixStream<D> {
fn poll_read(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8])
fn poll_read(self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &mut [u8])
-> Poll<io::Result<usize>>
{
let mut inner = ready!(self.as_mut().poll_fill_buf(ctx))?;
let len = io::Read::read(&mut inner, buf)?;
self.consume(len);
Poll::Ready(Ok(len))
self.inner().poll_read(ctx, buf)
}
}

impl<D: Drive> AsyncBufRead for UnixStream<D> {
fn poll_fill_buf(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.as_mut().guard_op(Op::Read);
let fd = self.fd;
let (ring, buf) = self.split();
buf.fill_buf(|buf| {
let n = ready!(ring.poll(ctx, true, 1, |sqs| unsafe {
let mut sqe = sqs.single().unwrap();
sqe.prep_read(fd, buf, 0);
sqe
}))?;
Poll::Ready(Ok(n as u32))
})
fn poll_fill_buf(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
self.inner().poll_fill_buf(ctx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.buf().consume(amt);
self.inner().consume(amt)
}
}

impl<D: Drive> AsyncWrite for UnixStream<D> {
fn poll_write(mut self: Pin<&mut Self>, ctx: &mut Context<'_>, slice: &[u8]) -> Poll<io::Result<usize>> {
self.as_mut().guard_op(Op::Write);
let fd = self.fd;
let (ring, buf) = self.split();
let data = ready!(buf.fill_buf(|mut buf| {
Poll::Ready(Ok(io::Write::write(&mut buf, slice)? as u32))
}))?;
let n = ready!(ring.poll(ctx, true, 1, |sqs| unsafe {
let mut sqe = sqs.single().unwrap();
sqe.prep_write(fd, data, 0);
sqe
}))?;
buf.clear();
Poll::Ready(Ok(n as usize))
fn poll_write(self: Pin<&mut Self>, ctx: &mut Context<'_>, slice: &[u8]) -> Poll<io::Result<usize>> {
self.inner().poll_write(ctx, slice)
}

fn poll_flush(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_write(ctx, &[]))?;
Poll::Ready(Ok(()))
self.inner().poll_flush(ctx)
}

fn poll_close(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.as_mut().guard_op(Op::Close);
let fd = self.fd;
ready!(self.ring().poll(ctx, true, 1, |sqs| unsafe {
let mut sqe = sqs.single().unwrap();
sqe.prep_close(fd);
sqe
}))?;
Poll::Ready(Ok(()))
fn poll_close(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner().poll_close(ctx)
}
}
16 changes: 16 additions & 0 deletions tests/file-close.rs
@@ -0,0 +1,16 @@
use futures::{AsyncReadExt, AsyncWriteExt};

use ringbahn::File;

const ASSERT: &[u8] = b"But this formidable power of death -";

#[test]
fn read_and_close_file() {
futures::executor::block_on(async move {
let mut file = File::open("props.txt").await.unwrap();
let mut buf = vec![0; 4096];
assert!(file.read(&mut buf).await.is_ok());
assert_eq!(&buf[0..ASSERT.len()], ASSERT);
file.close().await.unwrap();
});
}

0 comments on commit 6660fa1

Please sign in to comment.