Skip to content
This repository has been archived by the owner on Jan 11, 2021. It is now read-only.

Add ZSTD compression #107

Merged
merged 2 commits into from
May 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ x86intrin = "0.4.3"
chrono = "0.4"
lz4 = "1.22"
num-bigint = "0.1"
zstd = "0.4"

[dev-dependencies]
lazy_static = "1"
Expand Down
18 changes: 18 additions & 0 deletions benches/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ compress!(compress_lz4_double, Compression::LZ4, 5);
compress!(compress_lz4_fixed, Compression::LZ4, 6);
compress!(compress_lz4_int96, Compression::LZ4, 7);

compress!(compress_zstd_binary, Compression::ZSTD, 0);
compress!(compress_zstd_int32, Compression::ZSTD, 1);
compress!(compress_zstd_int64, Compression::ZSTD, 2);
compress!(compress_zstd_boolean, Compression::ZSTD, 3);
compress!(compress_zstd_float, Compression::ZSTD, 4);
compress!(compress_zstd_double, Compression::ZSTD, 5);
compress!(compress_zstd_fixed, Compression::ZSTD, 6);
compress!(compress_zstd_int96, Compression::ZSTD, 7);

decompress!(decompress_brotli_binary, Compression::BROTLI, 0);
decompress!(decompress_brotli_int32, Compression::BROTLI, 1);
decompress!(decompress_brotli_int64, Compression::BROTLI, 2);
Expand Down Expand Up @@ -181,3 +190,12 @@ decompress!(decompress_lz4_float, Compression::LZ4, 4);
decompress!(decompress_lz4_double, Compression::LZ4, 5);
decompress!(decompress_lz4_fixed, Compression::LZ4, 6);
decompress!(decompress_lz4_int96, Compression::LZ4, 7);

decompress!(decompress_zstd_binary, Compression::ZSTD, 0);
decompress!(decompress_zstd_int32, Compression::ZSTD, 1);
decompress!(decompress_zstd_int64, Compression::ZSTD, 2);
decompress!(decompress_zstd_boolean, Compression::ZSTD, 3);
decompress!(decompress_zstd_float, Compression::ZSTD, 4);
decompress!(decompress_zstd_double, Compression::ZSTD, 5);
decompress!(decompress_zstd_fixed, Compression::ZSTD, 6);
decompress!(decompress_zstd_int96, Compression::ZSTD, 7);
41 changes: 40 additions & 1 deletion src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
//! assert_eq!(output, data);
//! ```

use std::io::{Read, Write};
use std::io::{self, Read, Write};

use basic::Compression as CodecType;
use errors::{Result, ParquetError};
Expand All @@ -50,6 +50,7 @@ use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use snap::{decompress_len, Decoder, Encoder};
use lz4;
use zstd;

/// Parquet compression codec interface.
pub trait Codec {
Expand All @@ -73,6 +74,7 @@ pub fn create_codec(codec: CodecType) -> Result<Option<Box<Codec>>> {
CodecType::GZIP => Ok(Some(Box::new(GZipCodec::new()))),
CodecType::SNAPPY => Ok(Some(Box::new(SnappyCodec::new()))),
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec))
}
Expand Down Expand Up @@ -217,6 +219,37 @@ impl Codec for LZ4Codec {
}
}

/// Codec for Zstandard compression algorithm.
pub struct ZSTDCodec {
}

impl ZSTDCodec {
/// Creates new Zstandard compression codec.
fn new() -> Self {
Self { }
}
}

/// Compression level (1-21) for ZSTD. Choose 1 here for better compression speed.
const ZSTD_COMPRESSION_LEVEL: i32 = 1;

impl Codec for ZSTDCodec {
fn decompress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
e => Err(general_err!("Error when decompressing with ZSTD: {:?}", e))
}
}

fn compress(&mut self, input_buf: &[u8]) -> Result<Vec<u8>> {
let output = Vec::new();
let mut encoder = zstd::Encoder::new(output, ZSTD_COMPRESSION_LEVEL)?;
encoder.write_all(&input_buf[..])?;
encoder.finish()
.map_err(|e| general_err!("Error when compressing using ZSTD: {}", e))
}
}

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -278,4 +311,10 @@ mod tests {
fn test_codec_lz4() {
test_codec(CodecType::LZ4);
}

#[test]
fn test_codec_zstd() {
test_codec(CodecType::ZSTD);
}

}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ extern crate parquet_format;
extern crate chrono;
extern crate lz4;
extern crate num_bigint;
extern crate zstd;

#[macro_use]
pub mod errors;
Expand Down