diff --git a/Cargo.toml b/Cargo.toml index 7794d25..834d68f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ flate2 = "0.2" thrift = "0.0.4" x86intrin = "0.4.3" chrono = "0.4" +lz4 = "1.22" [dev-dependencies] lazy_static = "1" diff --git a/benches/codec.rs b/benches/codec.rs index b2eba30..40aaf7b 100644 --- a/benches/codec.rs +++ b/benches/codec.rs @@ -137,6 +137,15 @@ compress!(compress_snappy_double, Compression::SNAPPY, 5); compress!(compress_snappy_fixed, Compression::SNAPPY, 6); compress!(compress_snappy_int96, Compression::SNAPPY, 7); +compress!(compress_lz4_binary, Compression::LZ4, 0); +compress!(compress_lz4_int32, Compression::LZ4, 1); +compress!(compress_lz4_int64, Compression::LZ4, 2); +compress!(compress_lz4_boolean, Compression::LZ4, 3); +compress!(compress_lz4_float, Compression::LZ4, 4); +compress!(compress_lz4_double, Compression::LZ4, 5); +compress!(compress_lz4_fixed, Compression::LZ4, 6); +compress!(compress_lz4_int96, Compression::LZ4, 7); + decompress!(decompress_brotli_binary, Compression::BROTLI, 0); decompress!(decompress_brotli_int32, Compression::BROTLI, 1); decompress!(decompress_brotli_int64, Compression::BROTLI, 2); @@ -163,3 +172,12 @@ decompress!(decompress_snappy_float, Compression::SNAPPY, 4); decompress!(decompress_snappy_double, Compression::SNAPPY, 5); decompress!(decompress_snappy_fixed, Compression::SNAPPY, 6); decompress!(decompress_snappy_int96, Compression::SNAPPY, 7); + +decompress!(decompress_lz4_binary, Compression::LZ4, 0); +decompress!(decompress_lz4_int32, Compression::LZ4, 1); +decompress!(decompress_lz4_int64, Compression::LZ4, 2); +decompress!(decompress_lz4_boolean, Compression::LZ4, 3); +decompress!(decompress_lz4_float, Compression::LZ4, 4); +decompress!(decompress_lz4_double, Compression::LZ4, 5); +decompress!(decompress_lz4_fixed, Compression::LZ4, 6); +decompress!(decompress_lz4_int96, Compression::LZ4, 7); diff --git a/src/compression.rs b/src/compression.rs index e5c3326..94ba794 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -49,6 +49,7 @@ use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; use snap::{decompress_len, Decoder, Encoder}; +use lz4; /// Parquet compression codec interface. pub trait Codec { @@ -71,6 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result>> { CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))), CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))), CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))), + CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))), CodecType::UNCOMPRESSED => Ok(None), _ => Err(nyi_err!("The codec type {} is not supported yet", codec)) } @@ -168,6 +170,54 @@ impl Codec for BrotliCodec { } +const LZ4_BUFFER_SIZE: usize = 4096; + +/// Codec for LZ4 compression algorithm. +pub struct LZ4Codec {} + +impl LZ4Codec { + /// Creates new LZ4 compression codec. + fn new() -> Self { + Self {} + } +} + +impl Codec for LZ4Codec { + fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec) -> Result { + let mut decoder = lz4::Decoder::new(input_buf)?; + let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE]; + let mut total_len = 0; + loop { + let len = decoder.read(&mut buffer)?; + if len == 0 { + break; + } + total_len += len; + output_buf.write_all(&buffer[0..len])?; + } + Ok(total_len) + } + + fn compress(&mut self, input_buf: &[u8]) -> Result> { + let mut encoder = lz4::EncoderBuilder::new().build(Vec::new())?; + let mut from = 0; + loop { + let to = ::std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len()); + encoder.write_all(&input_buf[from..to])?; + from += LZ4_BUFFER_SIZE; + if from >= input_buf.len() { + break; + } + } + let (v, result) = encoder.finish(); + match result { + Ok(_) => Ok(v), + e => Err(general_err!("Error when finishing compressing with LZ4: {:?}", e)) + } + } +} + + #[cfg(test)] mod tests { use super::*; @@ -223,4 +273,9 @@ mod tests { fn test_codec_brotli() { test_codec(CodecType::BROTLI); } + + #[test] + fn test_codec_lz4() { + test_codec(CodecType::LZ4); + } } diff --git a/src/lib.rs b/src/lib.rs index 714f6ae..d692820 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -130,6 +130,7 @@ extern crate rand; extern crate x86intrin; extern crate parquet_format; extern crate chrono; +extern crate lz4; #[macro_use] pub mod errors;