Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrate to latest parquet2 (#923)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 15, 2022
1 parent fafc70d commit cd5a4c0
Show file tree
Hide file tree
Showing 55 changed files with 2,115 additions and 966 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ hex = { version = "^0.4", optional = true }

# for IPC compression
lz4 = { version = "1.23.1", optional = true }
zstd = { version = "0.10", optional = true }
zstd = { version = "0.11", optional = true }

rand = { version = "0.8", optional = true }

Expand All @@ -68,7 +68,7 @@ futures = { version = "0.3", optional = true }
ahash = { version = "0.7", optional = true }

# parquet support
parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
parquet2 = { version = "0.11", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down
3 changes: 1 addition & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ fn main() -> Result<()> {

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;

Expand Down
3 changes: 1 addition & 2 deletions benches/write_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ fn write(array: &dyn Array, encoding: Encoding) -> Result<()> {

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ fn write_batch(path: &str, schema: Schema, columns: Chunk<Arc<dyn Array>>) -> Re

writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _size = writer.end(None)?;
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ fn parallel_write(path: &str, schema: &Schema, batches: &[Chunk]) -> Result<()>
// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _size = writer.end(None)?;

Expand Down
30 changes: 29 additions & 1 deletion src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{bitmap::Bitmap, buffer::Buffer, datatypes::DataType, error::ArrowError};
use crate::{
bitmap::{Bitmap, MutableBitmap},
buffer::Buffer,
datatypes::DataType,
error::ArrowError,
};

use super::Array;

Expand Down Expand Up @@ -274,6 +279,29 @@ impl FixedSizeBinaryArray {
.unwrap()
.into()
}

/// Creates a [`FixedSizeBinaryArray`] from a slice of arrays of bytes
pub fn from_slice<const N: usize, P: AsRef<[[u8; N]]>>(a: P) -> Self {
let values = a.as_ref().iter().flatten().copied().collect::<Vec<_>>();
Self::new(DataType::FixedSizeBinary(N), values.into(), None)
}

/// Creates a new [`FixedSizeBinaryArray`] from a slice of optional `[u8]`.
// Note: this can't be `impl From` because Rust does not allow double `AsRef` on it.
pub fn from<const N: usize, P: AsRef<[Option<[u8; N]>]>>(slice: P) -> Self {
let values = slice
.as_ref()
.iter()
.copied()
.flat_map(|x| x.unwrap_or([0; N]))
.collect::<Vec<_>>();
let validity = slice
.as_ref()
.iter()
.map(|x| x.is_some())
.collect::<MutableBitmap>();
Self::new(DataType::FixedSizeBinary(N), values.into(), validity.into())
}
}

pub trait FixedSizeBinaryValues {
Expand Down
3 changes: 1 addition & 2 deletions src/doc/lib.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ fn main() -> Result<()> {
// Write the file.
writer.start()?;
for group in row_groups {
let (group, len) = group?;
writer.write(group, len)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;
Ok(())
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ impl From<std::str::Utf8Error> for ArrowError {
}
}

impl From<std::string::FromUtf8Error> for ArrowError {
fn from(error: std::string::FromUtf8Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
}
}

impl From<simdutf8::basic::Utf8Error> for ArrowError {
fn from(error: simdutf8::basic::Utf8Error) -> Self {
ArrowError::External("".to_string(), Box::new(error))
Expand Down
10 changes: 5 additions & 5 deletions src/io/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ pub mod write;

const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema";

impl From<parquet2::error::ParquetError> for ArrowError {
fn from(error: parquet2::error::ParquetError) -> Self {
impl From<parquet2::error::Error> for ArrowError {
fn from(error: parquet2::error::Error) -> Self {
match error {
parquet2::error::ParquetError::FeatureNotActive(_, _) => {
parquet2::error::Error::FeatureNotActive(_, _) => {
let message = "Failed to read a compressed parquet file. \
Use the cargo feature \"io_parquet_compression\" to read compressed parquet files."
.to_string();
Expand All @@ -20,8 +20,8 @@ impl From<parquet2::error::ParquetError> for ArrowError {
}
}

impl From<ArrowError> for parquet2::error::ParquetError {
impl From<ArrowError> for parquet2::error::Error {
fn from(error: ArrowError) -> Self {
parquet2::error::ParquetError::General(error.to_string())
parquet2::error::Error::General(error.to_string())
}
}
Loading

0 comments on commit cd5a4c0

Please sign in to comment.