Skip to content

Commit

Permalink
Added failing compressor test for small buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
jean-airoldie committed Aug 1, 2020
1 parent 19d3d6d commit 7e10dc1
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ rand = { version = "0.7.3", features = ["small_rng"] }
rayon = "1.3.1"
static_assertions = "1.1.0"
smol = "0.3.3"
futures-test = "0.3.5"

[build-dependencies]
cc = { version = "1.0.58", features = ["parallel"] }
Expand Down
118 changes: 117 additions & 1 deletion tests/lz4f_async_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ use futures::future::join_all;
use lzzzz::{lz4f, lz4f::*};
use rand::{distributions::Standard, rngs::SmallRng, Rng, SeedableRng};
use static_assertions::assert_impl_all;
use std::{
io,
pin::Pin,
task::{Context, Poll},
};

mod common;
use common::lz4f_test_set;
Expand Down Expand Up @@ -145,9 +150,65 @@ mod async_bufread_compressor {
mod async_write_compressor {
use super::*;
use futures::future::join_all;
use futures_lite::AsyncWriteExt;
use futures_lite::{AsyncWrite, AsyncWriteExt};
use futures_test::task::noop_context;
use io::Write;
use lzzzz::lz4f::AsyncWriteCompressor;

struct Byte(Option<u8>);

impl Byte {
fn new() -> Self {
Byte(None)
}

fn clear(&mut self) {
self.0.take();
}

fn get(&self) -> Option<u8> {
self.0
}
}

impl io::Write for Byte {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.0 {
None => {
if buf.is_empty() {
Ok(0)
} else {
self.0.replace(buf[0]);
Ok(1)
}
}
Some(_) => Ok(0),
}
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

impl AsyncWrite for Byte {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(Write::write(&mut *self, buf))
}

fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Write::flush(&mut *self))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}

#[test]
fn default() {
smol::run(async {
Expand All @@ -168,6 +229,61 @@ mod async_write_compressor {
.await;
})
}

#[test]
fn small_buffer() {
smol::run(async {
join_all(lz4f_test_set().map(|(src, prefs)| async move {
let mut comp_buf = Vec::new();
let mut decomp_buf = Vec::new();
let mut cx = noop_context();
{
let byte = Byte::new();
let mut w = AsyncWriteCompressor::new(byte, prefs).unwrap();
let mut total = 0;

// Painfully write, byte by byte.
while total < src.len() {
let pin = Pin::new(&mut w);
match pin.poll_write(&mut cx, &src[total..]) {
Poll::Ready(Ok(written)) => {
assert_eq!(written, 1, "byte wasn't cleared");
total += 1;
comp_buf.push(w.get_ref().get().unwrap());
w.get_mut().clear();
}
Poll::Pending => {
// Should never be pending here since the `Byte`
// is always ready.
panic!("should never be pending");
}
Poll::Ready(Err(err)) => panic!("{}", err),
}
}

// Painfully close, byte by byte.
loop {
let pin = Pin::new(&mut w);
if let Poll::Ready(res) = pin.poll_close(&mut cx) {
res.unwrap();
if let Some(byte) = w.get_ref().get() {
comp_buf.push(byte);
}
break;
}
comp_buf.push(w.get_ref().get().unwrap());
w.get_mut().clear();
}
}
assert_eq!(
lz4f::decompress_to_vec(&comp_buf, &mut decomp_buf).unwrap(),
decomp_buf.len()
);
assert_eq!(decomp_buf, src);
}))
.await;
})
}
}

mod async_read_decompressor {
Expand Down

0 comments on commit 7e10dc1

Please sign in to comment.