Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Commit

Permalink
Add ZSTD compression (#107)
Browse files Browse the repository at this point in the history
This adds ZSTD compression along with benchmark for it. The
implementation is backed by the `zstd-rs` crate.

Fixes #52.
  • Loading branch information
sunchao committed May 2, 2018
1 parent ba5ad84 commit 5143db1
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ x86intrin = "0.4.3"
chrono = "0.4"
lz4 = "1.22"
num-bigint = "0.1"
zstd = "0.4"

[dev-dependencies]
lazy_static = "1"
Expand Down
18 changes: 18 additions & 0 deletions benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ compress!(compress_lz4_double, Compression::LZ4, 5);
compress!(compress_lz4_fixed, Compression::LZ4, 6);
compress!(compress_lz4_int96, Compression::LZ4, 7);

compress!(compress_zstd_binary, Compression::ZSTD, 0);
compress!(compress_zstd_int32, Compression::ZSTD, 1);
compress!(compress_zstd_int64, Compression::ZSTD, 2);
compress!(compress_zstd_boolean, Compression::ZSTD, 3);
compress!(compress_zstd_float, Compression::ZSTD, 4);
compress!(compress_zstd_double, Compression::ZSTD, 5);
compress!(compress_zstd_fixed, Compression::ZSTD, 6);
compress!(compress_zstd_int96, Compression::ZSTD, 7);

decompress!(decompress_brotli_binary, Compression::BROTLI, 0);
decompress!(decompress_brotli_int32, Compression::BROTLI, 1);
decompress!(decompress_brotli_int64, Compression::BROTLI, 2);
Expand Down Expand Up @@ -181,3 +190,12 @@ decompress!(decompress_lz4_float, Compression::LZ4, 4);
decompress!(decompress_lz4_double, Compression::LZ4, 5);
decompress!(decompress_lz4_fixed, Compression::LZ4, 6);
decompress!(decompress_lz4_int96, Compression::LZ4, 7);

decompress!(decompress_zstd_binary, Compression::ZSTD, 0);
decompress!(decompress_zstd_int32, Compression::ZSTD, 1);
decompress!(decompress_zstd_int64, Compression::ZSTD, 2);
decompress!(decompress_zstd_boolean, Compression::ZSTD, 3);
decompress!(decompress_zstd_float, Compression::ZSTD, 4);
decompress!(decompress_zstd_double, Compression::ZSTD, 5);
decompress!(decompress_zstd_fixed, Compression::ZSTD, 6);
decompress!(decompress_zstd_int96, Compression::ZSTD, 7);
41 changes: 40 additions & 1 deletion src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! assert_eq!(output, data);
//! ```

use std::io::{Read, Write};
use std::io::{self, Read, Write};

use basic::Compression as CodecType;
use errors::{Result, ParquetError};
Expand All @@ -50,6 +50,7 @@ use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use snap::{decompress_len, Decoder, Encoder};
use lz4;
use zstd;

/// Parquet compression codec interface.
pub trait Codec {
Expand All @@ -73,6 +74,7 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec))
}
Expand Down Expand Up @@ -217,6 +219,37 @@ impl Codec for LZ4Codec {
}
}

/// Codec for Zstandard compression algorithm.
pub struct ZSTDCodec {
}

impl ZSTDCodec {
/// Creates new Zstandard compression codec.
fn new() -> Self {
Self { }
}
}

/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed.
const ZSTD_COMPRESSION_LEVEL: i32 = 1;

impl Codec for ZSTDCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
e => Err(general_err!("Error when decompressing with ZSTD: {:?}", e))
}
}

fn compress(&mut self, input_buf: &[u8]) -> Result<Vec<u8>> {
let output = Vec::new();
let mut encoder = zstd::Encoder::new(output, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(&input_buf[..])?;
encoder.finish()
.map_err(|e| general_err!("Error when compressing using ZSTD: {}", e))
}
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -278,4 +311,10 @@ mod tests {
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}

#[test]
fn test_codec_zstd() {
test_codec(CodecType::ZSTD);
}

}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ extern crate parquet_format;
extern crate chrono;
extern crate lz4;
extern crate num_bigint;
extern crate zstd;

#[macro_use]
pub mod errors;
Expand Down

0 comments on commit 5143db1

Please sign in to comment.