Skip to content

Commit

Permalink
dedup Async
Browse files Browse the repository at this point in the history
  • Loading branch information
infinity0 committed Sep 1, 2020
1 parent 158fea0 commit dc00a05
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 69 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -7,6 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1"
clap = "2"
futures = "0.3"
futures-lite = "0.1.10"
Expand All @@ -20,7 +21,7 @@ tokio = { version = "0.2", features = ["full"] }
tokio-util = { version = "0.3", features = ["compat"] }

# bwlim-test-async
async-io = "0.2"
async-io = { git = "https://github.com/infinity0/async-io", branch = "async-reuse" }
env_logger = "0.7"
smol = "0.3"

Expand Down
115 changes: 47 additions & 68 deletions src/lib.rs
@@ -1,21 +1,25 @@
#![feature(generic_associated_types)]

pub mod limit;
pub mod reactor;
pub mod stats;
pub mod testing;
pub mod util;
pub mod sys;

use std::borrow::*;
use std::fmt::Debug;
use std::future::Future;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::pin::Pin;
use std::mem::ManuallyDrop;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;
use std::sync::{Arc, MutexGuard};
use std::task::{Context, Poll};

use async_io::*;
use async_trait::async_trait;

use futures_lite::io::{AsyncRead, AsyncWrite};
use futures_lite::{future, pin};

use crate::limit::RateLimited;
use crate::util::RorW;
Expand Down Expand Up @@ -43,56 +47,53 @@ impl<T> Drop for RLAsync<T> {
}
}

// copied from async-io, except:
// self.get_mut() replaced with lock() / RateLimited
// TODO: figure out a way to de-duplicate with them
impl<T: Debug> RLAsync<T> {
pub async fn readable(&self) -> io::Result<()> {
/// Wrapper for `MutexGuard` that implements `Borrow`/`BorrowMut`.
pub struct MGW<'a, T: ?Sized>(pub MutexGuard<'a, T>);

impl<T: ?Sized> Borrow<T> for MGW<'_, T> {
fn borrow(&self) -> &T {
&*self.0
}
}

impl<T: ?Sized> BorrowMut<T> for MGW<'_, T> {
fn borrow_mut(&mut self) -> &mut T {
&mut *self.0
}
}

#[async_trait]
impl<T> AnAsync<RateLimited<T>> for RLAsync<T> where T: Send + Sync {
type Borrow<'a> where T: 'a = MGW<'a, RateLimited<T>>;
type BorrowMut<'a> where T: 'a = MGW<'a, RateLimited<T>>;

fn get_ref<'a>(&'a self) -> MGW<RateLimited<T>> {
MGW(self.source.raw.lock().unwrap())
}

fn get_mut<'a>(&'a mut self) -> MGW<RateLimited<T>> {
MGW(self.source.raw.lock().unwrap())
}

async fn readable(&self) -> io::Result<()> {
self.source.readable().await
}
pub async fn writable(&self) -> io::Result<()> {

async fn writable(&self) -> io::Result<()> {
self.source.writable().await
}
pub async fn read_with_mut<R>(
&mut self,
op: impl FnMut(&mut RateLimited<T>) -> io::Result<R>,
) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the read operation.
if !self.source.readers_registered() {
let mut inner = self.source.raw.lock().unwrap();
match op(&mut inner) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
}
// Wait until the I/O handle becomes readable.
optimistic(self.readable()).await?;
}
}
pub async fn write_with_mut<R>(
&mut self,
op: impl FnMut(&mut RateLimited<T>) -> io::Result<R>,
) -> io::Result<R> {
let mut op = op;
loop {
// If there are no blocked readers, attempt the write operation.
if !self.source.writers_registered() {
let mut inner = self.source.raw.lock().unwrap();
match op(&mut inner) {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return res,
}
}
// Wait until the I/O handle becomes writable.
optimistic(self.writable()).await?;
}

fn readers_registered(&self) -> bool {
self.source.readers_registered()
}

fn writers_registered(&self) -> bool {
self.source.writers_registered()
}
}

// copied from async-io
impl<T: Read + Debug> AsyncRead for RLAsync<T> {
impl<T: Read + Send + Sync> AsyncRead for RLAsync<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -111,7 +112,7 @@ impl<T: Read + Debug> AsyncRead for RLAsync<T> {
}

// copied from async-io
impl<T: Write + Debug> AsyncWrite for RLAsync<T>
impl<T: Write + Send + Sync> AsyncWrite for RLAsync<T>
where
T: AsRawSource
{
Expand Down Expand Up @@ -141,28 +142,6 @@ where
}
}

// copied from async-io
fn poll_future<T>(cx: &mut Context<'_>, fut: impl Future<Output = T>) -> Poll<T> {
pin!(fut);
fut.poll(cx)
}

// copied from async-io
async fn optimistic(fut: impl Future<Output = io::Result<()>>) -> io::Result<()> {
let mut polled = false;
pin!(fut);

future::poll_fn(|cx| {
if !polled {
polled = true;
fut.as_mut().poll(cx)
} else {
Poll::Ready(Ok(()))
}
})
.await
}

// copied from async-io
fn shutdown_write(raw: RawSource) -> io::Result<()> {
// This may not be a TCP stream, but that's okay. All we do is attempt a `shutdown()` on the
Expand Down

0 comments on commit dc00a05

Please sign in to comment.