Skip to content

Commit

Permalink
Merge pull request #5 from twmb/deref_to_u8s
Browse files Browse the repository at this point in the history
0.2.0: remove Box<[u8]> requirement
  • Loading branch information
twmb committed Sep 7, 2017
2 parents fba0cc4 + d9f576d commit 0260247
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 40 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bufio"
version = "0.1.1"
version = "0.2.0"
authors = ["Travis Bischel <travis.bischel@gmail.com>"]
description = "Buffered IO with futures on top of a threadpool for blocking IO"
documentation = "https://docs.rs/futures-bufio"
Expand Down
37 changes: 21 additions & 16 deletions src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use futures::Future;
use futures::future::{Either, ok};
use futures_cpupool::CpuPool;

use std::ops::DerefMut;

use std::io::{self, Read};
use std::mem;

Expand Down Expand Up @@ -35,7 +37,7 @@ use common::EXP_POOL;
/// let pool = CpuPool::new(1);
/// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
///
/// let buf = vec![0; 10].into_boxed_slice();
/// let buf = vec![0; 10];
/// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
/// // in real usage, we have the option to deconstruct our BufReader or reuse buf here
/// panic!("unable to read full: {}", e);
Expand All @@ -53,9 +55,9 @@ pub struct BufReader<R> {
}

/// Wraps `R` with the original buffer being read into and the number of bytes read.
type OkRead<R> = (R, Box<[u8]>, usize);
type OkRead<R, B> = (R, B, usize);
/// Wraps `R` with the original buffer being read into and the error encountered while reading.
type ErrRead<R> = (R, Box<[u8]>, io::Error);
type ErrRead<R, B> = (R, B, io::Error);

impl<R: Read + Send + 'static> BufReader<R> {
/// Creates and returns a new `BufReader` with an internal buffer of size `cap`.
Expand Down Expand Up @@ -169,7 +171,7 @@ impl<R: Read + Send + 'static> BufReader<R> {
/// // unsafely move the reader's position to the beginning of our known text, and read it
/// unsafe { reader.set_pos(buf_len-p.len()); }
/// let (_, b, _) = reader
/// .try_read_full(vec![0; p.len()].into_boxed_slice())
/// .try_read_full(vec![0; p.len()])
/// .wait()
/// .unwrap_or_else(|(_, _, e)| {
/// panic!("unable to read: {}", e);
Expand Down Expand Up @@ -239,7 +241,7 @@ impl<R: Read + Send + 'static> BufReader<R> {
/// let pool = CpuPool::new(1);
/// let reader = BufReader::with_pool_and_capacity(pool, 10, f);
///
/// let buf = vec![0; 10].into_boxed_slice();
/// let buf = vec![0; 10];
/// let (reader, buf, n) = reader.try_read_full(buf).wait().unwrap_or_else(|(_, _, e)| {
/// // in real usage, we have the option to deconstruct our BufReader or reuse buf here
/// panic!("unable to read full: {}", e);
Expand All @@ -249,10 +251,13 @@ impl<R: Read + Send + 'static> BufReader<R> {
/// assert_eq!(&buf[..n], b"foo text");
/// # }
/// ```
pub fn try_read_full(
pub fn try_read_full<B>(
mut self,
mut buf: Box<[u8]>,
) -> impl Future<Item = OkRead<Self>, Error = ErrRead<Self>> {
mut buf: B,
) -> impl Future<Item = OkRead<Self, B>, Error = ErrRead<Self, B>>
where
B: DerefMut<Target = [u8]> + Send + 'static,
{
const U8READ: &str = "&[u8] reads never error";
let mut rem = buf.len();
let mut at = 0;
Expand All @@ -265,7 +270,7 @@ impl<R: Read + Send + 'static> BufReader<R> {
self.pos += at;

if rem == 0 {
return Either::A(ok::<OkRead<Self>, ErrRead<Self>>((self, buf, at)));
return Either::A(ok::<OkRead<Self, B>, ErrRead<Self, B>>((self, buf, at)));
}
}
// self.pos == self.cap
Expand Down Expand Up @@ -364,7 +369,7 @@ fn test_read() {
assert_eq!(f.pos, 10);

// disk read, no blocks
let (f, buf, n) = f.try_read_full(vec![0; 5].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 5]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -376,7 +381,7 @@ fn test_read() {
assert_eq!(&*f.buf, b"Strapped d");

// mem read only
let (f, buf, n) = f.try_read_full(vec![0; 2].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -388,7 +393,7 @@ fn test_read() {
assert_eq!(&*f.buf, b"Strapped d");

// mem (3) + disk blocks (20) + more mem (2)
let (f, buf, n) = f.try_read_full(vec![0; 25].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 25]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -400,7 +405,7 @@ fn test_read() {
assert_eq!(&*f.buf, b"cold, eyes");

// mem (8) + disk block (10)
let (f, buf, n) = f.try_read_full(vec![0; 18].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 18]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -412,7 +417,7 @@ fn test_read() {
assert_eq!(&*f.buf, b"cold, eyes"); // non-reset buf

// disk block (10)
let (f, buf, n) = f.try_read_full(vec![0; 10].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 10]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -424,7 +429,7 @@ fn test_read() {
assert_eq!(&*f.buf, b"cold, eyes");

// disk block (20) + mem (9) (over-read by one byte)
let (f, buf, n) = f.try_read_full(vec![0; 29].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 29]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand All @@ -435,7 +440,7 @@ fn test_read() {
assert_eq!(f.cap, 8);
assert_eq!(&*f.buf, b" I dead?es");

let (f, buf, n) = f.try_read_full(vec![0; 2].into()).wait().unwrap_or_else(
let (f, buf, n) = f.try_read_full(vec![0; 2]).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to read: {}", e)
},
Expand Down
58 changes: 35 additions & 23 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures_cpupool::CpuPool;

use std::io::{self, Write};
use std::mem;
use std::ops::Deref;

use common::*;

Expand Down Expand Up @@ -42,7 +43,7 @@ use common::*;
/// let pool = CpuPool::new(1);
/// let writer = BufWriter::with_pool_and_capacity(pool, 4096, f);
///
/// let buf = b"many small writes".to_vec().into_boxed_slice();
/// let buf = b"many small writes".to_vec();
/// let (writer, buf) = writer.write_all(buf).wait().unwrap_or_else(|(_, _, e)| {
/// // in real usage, we have the option to deconstruct our BufWriter or reuse buf here
/// panic!("unable to read full: {}", e);
Expand All @@ -59,9 +60,9 @@ pub struct BufWriter<W> {
}

/// Wraps `W` with the original buffer being written.
type OkWrite<W> = (W, Box<[u8]>);
type OkWrite<W, B> = (W, B);
/// Wraps `W` with the original buffer being written and the error encountered while writing.
type ErrWrite<W> = (W, Box<[u8]>, io::Error);
type ErrWrite<W, B> = (W, B, io::Error);

impl<W: Write + Send + 'static> BufWriter<W> {
/// Creates and returns a new `BufWriter` with an internal buffer of size `cap`.
Expand Down Expand Up @@ -236,34 +237,37 @@ impl<W: Write + Send + 'static> BufWriter<W> {
/// let pool = CpuPool::new(1);
/// let writer = BufWriter::with_pool_and_capacity(pool, 4096, f);
///
/// let buf = b"many small writes".to_vec().into_boxed_slice();
/// let buf = b"many small writes".to_vec();
/// let (writer, buf) = writer.write_all(buf).wait().unwrap_or_else(|(_, _, e)| {
/// // in real usage, we have the option to deconstruct our BufWriter or reuse buf here
/// panic!("unable to read full: {}", e);
/// });
/// assert_eq!(&*buf, b"many small writes"); // we can reuse buf
/// # }
/// ```
pub fn write_all(
pub fn write_all<B>(
mut self,
buf: Box<[u8]>,
) -> impl Future<Item = OkWrite<Self>, Error = ErrWrite<Self>> {
buf: B,
) -> impl Future<Item = OkWrite<Self, B>, Error = ErrWrite<Self, B>>
where
B: Deref<Target = [u8]> + Send + 'static,
{
let mut rem = buf.len();
let mut at = 0;
let mut write_buf = false;

if self.pos == 0 {
if buf.len() < self.buf.len() {
self.pos = copy(&mut self.buf, &*buf);
return Either::A(ok::<OkWrite<Self>, ErrWrite<Self>>((self, buf)));
return Either::A(ok::<OkWrite<Self, B>, ErrWrite<Self, B>>((self, buf)));
}
} else {
at = copy(&mut self.buf[self.pos..], &*buf);
self.pos += at;
rem -= at;

if self.pos != self.buf.len() {
return Either::A(ok::<OkWrite<Self>, ErrWrite<Self>>((self, buf)));
return Either::A(ok::<OkWrite<Self, B>, ErrWrite<Self, B>>((self, buf)));
}
write_buf = true;
}
Expand Down Expand Up @@ -398,9 +402,11 @@ fn test_write() {
);

// memory (5)
let (f, buf) = f.write_all(b"hello".to_vec().into_boxed_slice())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
let (f, buf) = f.write_all(b"hello".to_vec()).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to write to file: {}", e)
},
);
assert_eq!(f.pos, 5);
assert_eq!(f.w_start, 0);
assert_eq!(&*buf, b"hello");
Expand All @@ -421,9 +427,11 @@ fn test_write() {
assert_foo("hello");

// memory (2) with w_start at 5
let (f, buf) = f.write_all(b"tw".to_vec().into_boxed_slice())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
let (f, buf) = f.write_all(b"tw".to_vec()).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to write to file: {}", e)
},
);
assert_eq!(f.pos, 7);
assert_eq!(f.w_start, 5);
assert_eq!(&*buf, b"tw");
Expand All @@ -437,16 +445,18 @@ fn test_write() {
assert_foo("hellotw");

// memory (7)
let (f, buf) = f.write_all(b"goodbye".to_vec().into_boxed_slice())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
let (f, buf) = f.write_all(b"goodbye".to_vec()).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to write to file: {}", e)
},
);
assert_eq!(f.pos, 4);
assert_eq!(f.w_start, 0);
assert_eq!(&*buf, b"goodbye");
assert_foo("hellotwgoo");

// memory (6) + disk (10)
let (f, buf) = f.write_all(b"more++andthenten".to_vec().into_boxed_slice())
let (f, buf) = f.write_all(b"more++andthenten".to_vec())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
assert_eq!(f.pos, 0);
Expand All @@ -455,16 +465,18 @@ fn test_write() {
assert_foo("hellotwgoodbyemore++andthenten");

// disk (10)
let (f, buf) = f.write_all(b"andtenmore".to_vec().into_boxed_slice())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
let (f, buf) = f.write_all(b"andtenmore".to_vec()).wait().unwrap_or_else(
|(_, _, e)| {
panic!("unable to write to file: {}", e)
},
);
assert_eq!(f.pos, 0);
assert_eq!(f.w_start, 0);
assert_eq!(&*buf, b"andtenmore");
assert_foo("hellotwgoodbyemore++andthentenandtenmore");

// disk (10) + mem (5)
let (f, buf) = f.write_all(b"this is rly old".to_vec().into_boxed_slice())
let (f, buf) = f.write_all(b"this is rly old".to_vec())
.wait()
.unwrap_or_else(|(_, _, e)| panic!("unable to write to file: {}", e));
assert_eq!(f.pos, 5);
Expand Down

0 comments on commit 0260247

Please sign in to comment.