Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Commit

Permalink
Add LZ4 compression
Browse files Browse the repository at this point in the history
This adds the support for LZ4 compression codec.
  • Loading branch information
sunchao committed Apr 22, 2018
1 parent 9cec60e commit 70a7c3c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ thrift = "0.0.4"
x86intrin = "0.4.3"
chrono = "0.4"
lazy_static = "1"
lz4 = "1.22"
2 changes: 2 additions & 0 deletions benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ macro_rules! decompress {
compress!(compress_brotli, Compression::BROTLI);
compress!(compress_gzip, Compression::GZIP);
compress!(compress_snappy, Compression::SNAPPY);
compress!(compress_lz4, Compression::LZ4);

decompress!(decompress_brotli, Compression::BROTLI);
decompress!(decompress_gzip, Compression::GZIP);
decompress!(decompress_snappy, Compression::SNAPPY);
decompress!(decompress_lz4, Compression::LZ4);
54 changes: 54 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use flate2::Compression;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use snap::{decompress_len, Decoder, Encoder};
use lz4;

/// Parquet compression codec interface.
pub trait Codec {
Expand All @@ -71,6 +72,7 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
CodecType::BROTLI => Ok(Some(Box::new(BrotliCodec::new()))),
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec))
}
Expand Down Expand Up @@ -168,6 +170,53 @@ impl Codec for BrotliCodec {
}


const LZ4_BUFFER_SIZE: usize = 4096;

pub struct LZ4Codec {}

impl LZ4Codec {
/// Creates new Brotli compression codec.
fn new() -> Self {
Self {}
}
}

impl Codec for LZ4Codec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<usize> {
let mut decoder = lz4::Decoder::new(input_buf)?;
let mut buffer: [u8; LZ4_BUFFER_SIZE] = [0; LZ4_BUFFER_SIZE];
let mut total_len = 0;
loop {
let len = decoder.read(&mut buffer)?;
if len == 0 {
break;
}
total_len += len;
output_buf.write_all(&buffer[0..len])?;
}
Ok(total_len)
}

fn compress(&mut self, input_buf: &[u8]) -> Result<Vec<u8>> {
let mut encoder = lz4::EncoderBuilder::new().build(Vec::new())?;
let mut from = 0;
loop {
let to = ::std::cmp::min(from + LZ4_BUFFER_SIZE, input_buf.len());
encoder.write_all(&input_buf[from..to])?;
from += LZ4_BUFFER_SIZE;
if from >= input_buf.len() {
break;
}
}
let (v, result) = encoder.finish();
match result {
Ok(_) => Ok(v),
e => Err(general_err!("Error when finishing compressing with LZ4: {:?}", e))
}
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -223,4 +272,9 @@ mod tests {
fn test_codec_brotli() {
test_codec(CodecType::BROTLI);
}

#[test]
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ extern crate x86intrin;
extern crate parquet_format;
extern crate chrono;
extern crate lazy_static;
extern crate lz4;

#[macro_use]
pub mod errors;
Expand Down

0 comments on commit 70a7c3c

Please sign in to comment.