Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added basics of ORC
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 26, 2022
1 parent 2d6ee11 commit 6bb9fe2
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/coverage.yml
Expand Up @@ -19,8 +19,9 @@ jobs:
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Expand Up @@ -13,16 +13,17 @@ jobs:
submodules: true # needed to test IPC, which are located in a submodule
- name: Install Rust
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Setup parquet files
run: |
apt update && apt install python3-pip python3-venv -y -q
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test --features full

Expand Down
17 changes: 14 additions & 3 deletions Cargo.toml
Expand Up @@ -72,16 +72,21 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

# ORC support
orc-format = { git = "https://github.com/DataEngineeringLabs/orc-format.git", optional = true }

# Arrow integration tests support
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }

Expand Down Expand Up @@ -126,6 +131,7 @@ full = [
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_orc",
"io_avro_compression",
"io_avro_async",
"regex",
Expand All @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_compression = [
Expand All @@ -154,13 +161,17 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]

io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]

io_orc = [ "orc-format" ]

# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
Expand Down
4 changes: 3 additions & 1 deletion DEVELOPMENT.md
Expand Up @@ -42,10 +42,12 @@ source venv/bin/activate
pip install pip --upgrade

# Install pyarrow, version 6
pip install pyarrow==6
pip install pyarrow==6 pyorc

# Generate the parquet files (this might take some time, depending on your computer setup)
python parquet_integration/write_parquet.py
# generate ORC files
python parquet_integration/write_parquet.py

# Get out of venv, back to normal terminal
deactivate
Expand Down
3 changes: 3 additions & 0 deletions src/io/mod.rs
Expand Up @@ -5,6 +5,9 @@
#[cfg(feature = "io_odbc")]
pub mod odbc;

#[cfg(feature = "io_orc")]
pub mod orc;

#[cfg(any(
feature = "io_csv_read",
feature = "io_csv_read_async",
Expand Down
12 changes: 12 additions & 0 deletions src/io/orc/mod.rs
@@ -0,0 +1,12 @@
//! APIs to read from [ORC format](https://orc.apache.org).
pub mod read;

pub use orc_format as format;

use crate::error::Error;

impl From<format::Error> for Error {
fn from(error: format::Error) -> Self {
Error::ExternalFormat(format!("{:?}", error))
}
}
178 changes: 178 additions & 0 deletions src/io/orc/read/mod.rs
@@ -0,0 +1,178 @@
//! APIs to read from [ORC format](https://orc.apache.org).
use std::io::{Read, Seek, SeekFrom};

use crate::array::{BooleanArray, Float32Array};
use crate::bitmap::{Bitmap, MutableBitmap};
use crate::datatypes::{DataType, Field, Schema};
use crate::error::Error;

use orc_format::fallible_streaming_iterator::FallibleStreamingIterator;
use orc_format::proto::stream::Kind;
use orc_format::proto::{CompressionKind, Footer, StripeInformation, Type};
use orc_format::read::decode;
use orc_format::read::Stripe;

/// Infers a [`Schema`] from the files' [`Footer`].
/// # Errors
/// This function errors if the type is not yet supported.
pub fn infer_schema(footer: &Footer) -> Result<Schema, Error> {
let types = &footer.types;

let dt = infer_dt(&footer.types[0], types)?;
if let DataType::Struct(fields) = dt {
Ok(fields.into())
} else {
Err(Error::ExternalFormat(
"ORC root type must be a struct".to_string(),
))
}
}

fn infer_dt(type_: &Type, types: &[Type]) -> Result<DataType, Error> {
use orc_format::proto::r#type::Kind::*;
let dt = match type_.kind() {
Boolean => DataType::Boolean,
Byte => DataType::Int8,
Short => DataType::Int16,
Int => DataType::Int32,
Long => DataType::Int64,
Float => DataType::Float32,
Double => DataType::Float64,
String => DataType::Utf8,
Binary => DataType::Binary,
Struct => {
let sub_types = type_
.subtypes
.iter()
.cloned()
.zip(type_.field_names.iter())
.map(|(i, name)| {
infer_dt(
types.get(i as usize).ok_or_else(|| {
Error::ExternalFormat(format!("ORC field {i} not found"))
})?,
types,
)
.map(|dt| Field::new(name, dt, true))
})
.collect::<Result<Vec<_>, Error>>()?;
DataType::Struct(sub_types)
}
kind => return Err(Error::nyi(format!("Reading {kind:?} from ORC"))),
};
Ok(dt)
}

/// Reads the stripe [`StripeInformation`] into memory.
pub fn read_stripe<R: Read + Seek>(
reader: &mut R,
stripe_info: StripeInformation,
compression: CompressionKind,
) -> Result<Stripe, Error> {
let offset = stripe_info.offset();
reader.seek(SeekFrom::Start(offset)).unwrap();

let len = stripe_info.index_length() + stripe_info.data_length() + stripe_info.footer_length();
let mut stripe = vec![0; len as usize];
reader.read_exact(&mut stripe).unwrap();

Ok(Stripe::try_new(stripe, stripe_info, compression)?)
}

fn deserialize_validity(
stripe: &Stripe,
column: usize,
scratch: &mut Vec<u8>,
) -> Result<Option<Bitmap>, Error> {
let mut chunks = stripe.get_bytes(column, Kind::Present, std::mem::take(scratch))?;

let mut validity = MutableBitmap::with_capacity(stripe.number_of_rows());
let mut remaining = stripe.number_of_rows();
while let Some(chunk) = chunks.next()? {
// todo: this can be faster by iterating in bytes instead of single bits via `BooleanRun`
let iter = decode::BooleanIter::new(chunk, remaining);
for item in iter {
remaining -= 1;
validity.push(item?)
}
}
*scratch = std::mem::take(&mut chunks.into_inner());

Ok(validity.into())
}

/// Deserializes column `column` from `stripe`, assumed to represent a f32
pub fn deserialize_f32(
data_type: DataType,
stripe: &Stripe,
column: usize,
) -> Result<Float32Array, Error> {
let mut scratch = vec![];
let num_rows = stripe.number_of_rows();

let validity = deserialize_validity(stripe, column, &mut scratch)?;

let mut chunks = stripe.get_bytes(column, Kind::Data, scratch)?;

let mut values = Vec::with_capacity(num_rows);
if let Some(validity) = &validity {
let mut validity_iter = validity.iter();
while let Some(chunk) = chunks.next()? {
let mut valid_iter = decode::deserialize_f32(chunk);
let iter = validity_iter.by_ref().map(|is_valid| {
if is_valid {
valid_iter.next().unwrap()
} else {
0.0f32
}
});
values.extend(iter);
}
} else {
while let Some(chunk) = chunks.next()? {
values.extend(decode::deserialize_f32(chunk));
}
}

Float32Array::try_new(data_type, values.into(), validity)
}

/// Deserializes column `column` from `stripe`, assumed to represent a boolean array
pub fn deserialize_bool(
data_type: DataType,
stripe: &Stripe,
column: usize,
) -> Result<BooleanArray, Error> {
let num_rows = stripe.number_of_rows();
let mut scratch = vec![];

let validity = deserialize_validity(stripe, column, &mut scratch)?;

let mut chunks = stripe.get_bytes(column, Kind::Data, std::mem::take(&mut scratch))?;

let mut values = MutableBitmap::with_capacity(num_rows);
if let Some(validity) = &validity {
let mut validity_iter = validity.iter();

while let Some(chunk) = chunks.next()? {
let mut valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8);
validity_iter.by_ref().try_for_each(|is_valid| {
values.push(if is_valid {
valid_iter.next().unwrap()?
} else {
false
});
Result::<(), Error>::Ok(())
})?;
}
} else {
while let Some(chunk) = chunks.next()? {
let valid_iter = decode::BooleanIter::new(chunk, chunk.len() * 8);
for v in valid_iter {
values.push(v?)
}
}
}

BooleanArray::try_new(data_type, values.into(), validity)
}
3 changes: 3 additions & 0 deletions tests/it/io/mod.rs
Expand Up @@ -16,6 +16,9 @@ mod parquet;
#[cfg(feature = "io_avro")]
mod avro;

#[cfg(feature = "io_orc")]
mod orc;

#[cfg(any(
feature = "io_csv_read",
feature = "io_csv_write",
Expand Down
1 change: 1 addition & 0 deletions tests/it/io/orc/mod.rs
@@ -0,0 +1 @@
mod read;
28 changes: 28 additions & 0 deletions tests/it/io/orc/read.rs
@@ -0,0 +1,28 @@
use arrow2::array::*;
use arrow2::datatypes::DataType;
use arrow2::error::Error;
use arrow2::io::orc::{format, read};

#[test]
fn infer() -> Result<(), Error> {
let mut reader = std::fs::File::open("fixtures/pyorc/test.orc").unwrap();
let (ps, footer, _) = format::read::read_metadata(&mut reader)?;
let schema = read::infer_schema(&footer)?;

assert_eq!(schema.fields.len(), 12);

let stripe = read::read_stripe(&mut reader, footer.stripes[0].clone(), ps.compression())?;

let array = read::deserialize_f32(DataType::Float32, &stripe, 1)?;
assert_eq!(
array,
Float32Array::from([Some(1.0), Some(2.0), None, Some(4.0), Some(5.0)])
);

let array = read::deserialize_bool(DataType::Boolean, &stripe, 2)?;
assert_eq!(
array,
BooleanArray::from([Some(true), Some(false), None, Some(true), Some(false)])
);
Ok(())
}
47 changes: 47 additions & 0 deletions tests/it/io/orc/write.py
@@ -0,0 +1,47 @@
import os

import pyorc


data = {
"a": [1.0, 2.0, None, 4.0, 5.0],
"b": [True, False, None, True, False],
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
"d": ["a", "bb", None, "ccc", "ddd"],
"e": ["ddd", "cc", None, "bb", "a"],
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
"int_short_repeated": [5, 5, None, 5, 5],
"int_neg_short_repeated": [-5, -5, None, -5, -5],
"int_delta": [1, 2, None, 4, 5],
"int_neg_delta": [5, 4, None, 2, 1],
"int_direct": [1, 6, None, 3, 2],
"int_neg_direct": [-1, -6, None, -3, -2],
}


def _write(
schema: str,
data,
file_name: str,
compression=pyorc.CompressionKind.NONE,
dict_key_size_threshold=0.0,
):
output = open(file_name, "wb")
writer = pyorc.Writer(
output,
schema,
dict_key_size_threshold=dict_key_size_threshold,
compression=compression,
)
num_rows = len(list(data.values())[0])
for x in range(num_rows):
row = tuple(values[x] for values in data.values())
writer.write(row)
writer.close()

os.makedirs("fixtures/pyorc", exist_ok=True)
_write(
"struct<a:float,b:boolean,str_direct:string,d:string,e:string,f:string,int_short_repeated:int,int_neg_short_repeated:int,int_delta:int,int_neg_delta:int,int_direct:int,int_neg_direct:int>",
data,
"fixtures/pyorc/test.orc",
)

0 comments on commit 6bb9fe2

Please sign in to comment.