Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read and write Parquet's delta-bitpacked (integer encoding) #1226

Merged
merged 3 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] }
parquet2 = { version = "0.16", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }
Expand Down
26 changes: 18 additions & 8 deletions arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ def get_file_path(file: str):


def _prepare(
file: str, version: str, compression: str, encoding_utf8: str, projection=None
file: str,
version: str,
compression: str,
encoding_utf8: str,
encoding_int: str,
projection=None,
):
write = f"{file}.parquet"

Expand All @@ -26,6 +31,8 @@ def _prepare(
version,
"--encoding-utf8",
encoding_utf8,
"--encoding-int",
encoding_int,
"--compression",
compression,
]
Expand All @@ -38,7 +45,7 @@ def _prepare(
return write


def _expected(file: str):
def _expected(file: str) -> pyarrow.Table:
return pyarrow.ipc.RecordBatchFileReader(get_file_path(file)).read_all()


Expand Down Expand Up @@ -75,16 +82,19 @@ def variations():
# "generated_custom_metadata",
]:
# pyarrow does not support decoding "delta"-encoded values.
# for encoding in ["plain", "delta"]:
for encoding in ["plain"]:
for encoding_int in ["plain", "delta"]:
if encoding_int == "delta" and file in {"generated_primitive", "generated_null"}:
# see https://issues.apache.org/jira/browse/ARROW-17465
continue

for compression in ["uncompressed", "zstd", "snappy"]:
yield (version, file, compression, encoding)
yield (version, file, compression, "plain", encoding_int)


if __name__ == "__main__":
for (version, file, compression, encoding_utf8) in variations():
for (version, file, compression, encoding_utf8, encoding_int) in variations():
expected = _expected(file)
path = _prepare(file, version, compression, encoding_utf8)
path = _prepare(file, version, compression, encoding_utf8, encoding_int)

table = pq.read_table(path)
os.remove(path)
Expand All @@ -95,4 +105,4 @@ def variations():
if str(c1.type) in ["month_interval", "day_time_interval"]:
# pyarrow does not support interval types from parquet
continue
assert c1 == c2
assert c1 == c2, (c1, c2)
56 changes: 34 additions & 22 deletions arrow-parquet-integration-testing/main_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
from main import _prepare, _expected


def test(file: str, version: str, column, compression: str, encoding: str):
def test(
file: str,
version: str,
column: str,
compression: str,
encoding: str,
):
"""
Tests that pyspark can read a parquet file written by arrow2.

Expand All @@ -16,13 +22,13 @@ def test(file: str, version: str, column, compression: str, encoding: str):
In pyspark: read (written) parquet to Python
assert that they are equal
"""
# write parquet
path = _prepare(file, version, compression, encoding, [column[1]])

# read IPC to Python
expected = _expected(file)
expected = next(c for i, c in enumerate(expected) if i == column[1])
expected = expected.combine_chunks().tolist()
column_index = next(i for i, c in enumerate(expected.column_names) if c == column)
expected = expected[column].combine_chunks().tolist()

# write parquet
path = _prepare(file, version, compression, encoding, encoding, [column_index])

# read parquet to Python
spark = pyspark.sql.SparkSession.builder.config(
Expand All @@ -31,28 +37,34 @@ def test(file: str, version: str, column, compression: str, encoding: str):
"false",
).getOrCreate()

result = spark.read.parquet(path).select(column[0]).collect()
result = [r[column[0]] for r in result]
result = spark.read.parquet(path).select(column).collect()
result = [r[column] for r in result]
os.remove(path)

# assert equality
assert expected == result


test("generated_primitive", "2", ("utf8_nullable", 24), "uncompressed", "delta")
test("generated_primitive", "2", ("utf8_nullable", 24), "snappy", "delta")
test("generated_null", "2", "f1", "uncompressed", "delta")

test("generated_primitive", "2", "utf8_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "utf8_nullable", "snappy", "delta")
test("generated_primitive", "2", "int32_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "int32_nullable", "snappy", "delta")
test("generated_primitive", "2", "int16_nullable", "uncompressed", "delta")
test("generated_primitive", "2", "int16_nullable", "snappy", "delta")

test("generated_dictionary", "1", ("dict0", 0), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict0", 0), "snappy", "plain")
test("generated_dictionary", "2", ("dict0", 0), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict0", 0), "snappy", "plain")
test("generated_dictionary", "1", "dict0", "uncompressed", "plain")
test("generated_dictionary", "1", "dict0", "snappy", "plain")
test("generated_dictionary", "2", "dict0", "uncompressed", "plain")
test("generated_dictionary", "2", "dict0", "snappy", "plain")

test("generated_dictionary", "1", ("dict1", 1), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict1", 1), "snappy", "plain")
test("generated_dictionary", "2", ("dict1", 1), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict1", 1), "snappy", "plain")
test("generated_dictionary", "1", "dict1", "uncompressed", "plain")
test("generated_dictionary", "1", "dict1", "snappy", "plain")
test("generated_dictionary", "2", "dict1", "uncompressed", "plain")
test("generated_dictionary", "2", "dict1", "snappy", "plain")

test("generated_dictionary", "1", ("dict2", 2), "uncompressed", "plain")
test("generated_dictionary", "1", ("dict2", 2), "snappy", "plain")
test("generated_dictionary", "2", ("dict2", 2), "uncompressed", "plain")
test("generated_dictionary", "2", ("dict2", 2), "snappy", "plain")
test("generated_dictionary", "1", "dict2", "uncompressed", "plain")
test("generated_dictionary", "1", "dict2", "snappy", "plain")
test("generated_dictionary", "2", "dict2", "uncompressed", "plain")
test("generated_dictionary", "2", "dict2", "snappy", "plain")
13 changes: 11 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::fs::File;
use std::{io::Read};
use std::io::Read;

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
AHashMap,
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
Expand All @@ -16,6 +15,7 @@ use arrow2::{
RowGroupIterator, Version as ParquetVersion, WriteOptions,
},
},
AHashMap,
};
use clap::Parser;
use flate2::read::GzDecoder;
Expand Down Expand Up @@ -110,6 +110,8 @@ struct Args {
projection: Option<String>,
#[clap(short, long, arg_enum, help = "encoding scheme for utf8", default_value_t = EncodingScheme::Plain)]
encoding_utf8: EncodingScheme,
#[clap(short('i'), long, arg_enum, help = "encoding scheme for int", default_value_t = EncodingScheme::Plain)]
encoding_int: EncodingScheme,
#[clap(short, long, arg_enum)]
compression: Compression,
}
Expand Down Expand Up @@ -178,6 +180,13 @@ fn main() -> Result<()> {
.map(|f| {
transverse(&f.data_type, |dt| match dt {
DataType::Dictionary(..) => Encoding::RleDictionary,
DataType::Int32 => {
if args.encoding_int == EncodingScheme::Delta {
Encoding::DeltaBinaryPacked
} else {
Encoding::Plain
}
}
DataType::Utf8 | DataType::LargeUtf8 => {
if args.encoding_utf8 == EncodingScheme::Delta {
Encoding::DeltaLengthByteArray
Expand Down
2 changes: 1 addition & 1 deletion src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@ impl From<parquet2::error::Error> for Error {

impl From<Error> for parquet2::error::Error {
fn from(error: Error) -> Self {
parquet2::error::Error::General(error.to_string())
parquet2::error::Error::OutOfSpec(error.to_string())
}
}
37 changes: 24 additions & 13 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
bitmap::{Bitmap, MutableBitmap},
buffer::Buffer,
datatypes::DataType,
error::Result,
error::{Error, Result},
};

use super::super::utils::{
Expand Down Expand Up @@ -51,13 +51,13 @@ impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);
let mut lengths_iter = delta_length_byte_array::Decoder::try_new(values)?;

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();
.map(|x| x.map(|x| x as usize).map_err(Error::from))
.collect::<Result<Vec<_>>>()?;

let values = lengths_iter.into_values();
Ok(Self {
Expand Down Expand Up @@ -405,20 +405,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
State::RequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
Expand All @@ -442,21 +448,26 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
for x in page
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref())
.take(additional)
{
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
&mut page_values.values.by_ref().map(op),
&mut page_values
.values
.by_ref()
.map(|index| page_dict[index.unwrap() as usize].as_ref()),
)
}
}
Expand Down
17 changes: 12 additions & 5 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => {
Expand All @@ -104,18 +104,25 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
}
State::RequiredDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
}
State::OptionalDictionary(page) => {
let dict_values = &page.dict;
let op = move |index: u32| dict_values[index as usize].as_ref();
let item = page.values.next().map(op).unwrap_or_default();
let item = page
.values
.next()
.map(|index| dict_values[index.unwrap() as usize].as_ref())
.unwrap_or_default();
values.push(item);
validity.push(true);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
3 changes: 2 additions & 1 deletion src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
)
}

fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) {
fn push_valid(&self, state: &mut State, decoded: &mut Self::DecodedState) -> Result<()> {
let (values, validity) = decoded;
match state {
State::Optional(page_values) => {
Expand All @@ -95,6 +95,7 @@ impl<'a> NestedDecoder<'a> for BooleanDecoder {
values.push(value);
}
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
Expand Down
12 changes: 8 additions & 4 deletions src/io/parquet/read/deserialize/dictionary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ where
Some(remaining),
values,
&mut page.values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -176,7 +177,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -195,7 +197,8 @@ where
Some(remaining),
values,
&mut page_values.by_ref().map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand All @@ -211,7 +214,8 @@ where
page.values
.by_ref()
.map(|x| {
let x: usize = x.try_into().unwrap();
// todo: rm unwrap
let x: usize = x.unwrap().try_into().unwrap();
let x: K = match x.try_into() {
Ok(key) => key,
// todo: convert this to an error.
Expand Down
Loading