Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Add LZ4 compression #98

Merged
merged 1 commit into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ flate2 = "0.2"
thrift = "0.0.4"
x86intrin = "0.4.3"
chrono = "0.4"
lz4 = "1.22"

[dev-dependencies]
lazy_static = "1"
Expand Down
18 changes: 18 additions & 0 deletions benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,15 @@ compress!(compress_snappy_double, Compression::SNAPPY, 5);
compress!(compress_snappy_fixed, Compression::SNAPPY, 6);
compress!(compress_snappy_int96, Compression::SNAPPY, 7);

compress!(compress_lz4_binary, Compression::LZ4, 0);
compress!(compress_lz4_int32, Compression::LZ4, 1);
compress!(compress_lz4_int64, Compression::LZ4, 2);
compress!(compress_lz4_boolean, Compression::LZ4, 3);
compress!(compress_lz4_float, Compression::LZ4, 4);
compress!(compress_lz4_double, Compression::LZ4, 5);
compress!(compress_lz4_fixed, Compression::LZ4, 6);
compress!(compress_lz4_int96, Compression::LZ4, 7);

decompress!(decompress_brotli_binary, Compression::BROTLI, 0);
decompress!(decompress_brotli_int32, Compression::BROTLI, 1);
decompress!(decompress_brotli_int64, Compression::BROTLI, 2);
Expand All @@ -163,3 +172,12 @@ decompress!(decompress_snappy_float, Compression::SNAPPY, 4);
decompress!(decompress_snappy_double, Compression::SNAPPY, 5);
decompress!(decompress_snappy_fixed, Compression::SNAPPY, 6);
decompress!(decompress_snappy_int96, Compression::SNAPPY, 7);

decompress!(decompress_lz4_binary, Compression::LZ4, 0);
decompress!(decompress_lz4_int32, Compression::LZ4, 1);
decompress!(decompress_lz4_int64, Compression::LZ4, 2);
decompress!(decompress_lz4_boolean, Compression::LZ4, 3);
decompress!(decompress_lz4_float, Compression::LZ4, 4);
decompress!(decompress_lz4_double, Compression::LZ4, 5);
decompress!(decompress_lz4_fixed, Compression::LZ4, 6);
decompress!(decompress_lz4_int96, Compression::LZ4, 7);
55 changes: 55 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use snap::{decompress_len, Decoder, Encoder};
use lz4;

/// Parquet compression codec interface.
pub trait Codec {
Expand All @@ -71,6 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec))
}
Expand Down Expand Up @@ -168,6 +170,54 @@ impl Codec for BrotliCodec {
}


const LZ4_BUFFER_SIZE: usize = 4096;

/// Codec for LZ4 compression algorithm.
pub struct LZ4Codec {}

impl LZ4Codec {
/// Creates new LZ4 compression codec.
fn new() -> Self {
Self {}
}
}

impl Codec for LZ4Codec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
let len = decoder.read(&mut buffer)?;
if len == 0 {
break;
}
total_len += len;
output_buf.write_all(&buffer[0..len])?;
}
Ok(total_len)
}

fn compress(&mut self, input_buf: &[u8]) -> Result<Vec<u8>> {
let mut encoder = lz4::EncoderBuilder::new().build(Vec::new())?;
let mut from = 0;
loop {
let to = ::std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
let (v, result) = encoder.finish();
match result {
Ok(_) => Ok(v),
e => Err(general_err!("Error when finishing compressing with LZ4: {:?}", e))
}
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -223,4 +273,9 @@ mod tests {
fn test_codec_brotli() {
test_codec(CodecType::BROTLI);
}

#[test]
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ extern crate rand;
extern crate x86intrin;
extern crate parquet_format;
extern crate chrono;
extern crate lz4;

#[macro_use]
pub mod errors;
Expand Down