Skip to content

Commit

Permalink
update arrow (#3553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 2, 2022
1 parent 54a0680 commit ff4a318
Show file tree
Hide file tree
Showing 13 changed files with 136 additions and 76 deletions.
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "2fcc522b903f9a3d161daf97c8026fc9066736ee", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", features = ["compute_concatenate"], default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_mutable", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_mutable", features = ["compute_concatenate"], default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type ErrString = Cow<'static, str>;
#[derive(Debug, ThisError)]
pub enum PolarsError {
#[error(transparent)]
ArrowError(#[from] arrow::error::ArrowError),
ArrowError(#[from] arrow::error::Error),
#[error("{0}")]
ComputeError(ErrString),
#[error("Out of bounds: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-arrow/src/kernels/concatenate.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::array::growable::make_growable;
use arrow::array::{Array, ArrayRef};
use arrow::error::{ArrowError, Result};
use arrow::error::{Error as ArrowError, Result};
use std::sync::Arc;

/// Concatenate multiple [Array] of the same type into a single [`Array`].
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-arrow/src/kernels/string.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::trusted_len::PushUnchecked;
use arrow::array::{ArrayRef, UInt32Array, Utf8Array};
use arrow::buffer::Buffer;
use arrow::datatypes::DataType;
Expand All @@ -6,7 +7,7 @@ use std::sync::Arc;
pub fn string_lengths(array: &Utf8Array<i64>) -> ArrayRef {
let values = array.offsets().windows(2).map(|x| (x[1] - x[0]) as u32);

let values = Buffer::from_trusted_len_iter(values);
let values: Buffer<_> = Vec::from_trusted_len_iter(values).into();

let array = UInt32Array::from_data(DataType::UInt32, values, array.validity().cloned());
Arc::new(array)
Expand Down
10 changes: 4 additions & 6 deletions polars/polars-arrow/src/kernels/take.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::trusted_len::{PushUnchecked, TrustedLen};
use crate::utils::with_match_primitive_type;
use crate::{bit_util::unset_bit_raw, prelude::*, utils::CustomIterTools};
use arrow::array::*;
Expand Down Expand Up @@ -99,7 +100,7 @@ pub unsafe fn take_no_null_primitive<T: NativeType>(
*array_values.get_unchecked(*idx as usize)
});

let values = Buffer::from_trusted_len_iter(iter);
let values: Buffer<_> = Vec::from_trusted_len_iter(iter).into();
let validity = indices.validity().cloned();
Arc::new(PrimitiveArray::from_data(
T::PRIMITIVE.into(),
Expand All @@ -114,10 +115,7 @@ pub unsafe fn take_no_null_primitive<T: NativeType>(
/// - no bounds checks
/// - iterator must be TrustedLen
#[inline]
pub unsafe fn take_no_null_primitive_iter_unchecked<
T: NativeType,
I: IntoIterator<Item = usize>,
>(
pub unsafe fn take_no_null_primitive_iter_unchecked<T: NativeType, I: TrustedLen<Item = usize>>(
arr: &PrimitiveArray<T>,
indices: I,
) -> Arc<PrimitiveArray<T>> {
Expand All @@ -129,7 +127,7 @@ pub unsafe fn take_no_null_primitive_iter_unchecked<
*array_values.get_unchecked(idx)
});

let values = Buffer::from_trusted_len_iter_unchecked(iter);
let values: Buffer<_> = Vec::from_trusted_len_iter(iter).into();
Arc::new(PrimitiveArray::from_data(T::PRIMITIVE.into(), values, None))
}

Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,11 @@ thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "aafba7b4eb4991e016638cbc1d4df676912e8236"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "2fcc522b903f9a3d161daf97c8026fc9066736ee"
# path = "../../../arrow2"
branch = "improve_mutable"
# branch = "improve_mutable"
# version = "0.11"
default-features = false
features = [
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-core/src/chunked_array/ops/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl ChunkExplode for ListChunked {
if offsets[offsets.len() - 1] == 0 {
return Ok((
Series::new_empty(self.name(), &self.inner_dtype()),
Buffer::from_slice(&[]),
vec![].into(),
));
}

Expand Down Expand Up @@ -318,8 +318,7 @@ impl ChunkExplode for Utf8Chunked {
.map(|t| t.0 as i64)
.chain(std::iter::once(str_data.len() as i64));

// char_indices is TrustedLen
let offsets = unsafe { Buffer::from_trusted_len_iter_unchecked(chars) };
let offsets = Buffer::from_iter(chars);

// the old bitmap doesn't fit on the exploded array, so we need to create a new one.
let validity = if let Some(validity) = array.validity() {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,4 @@ impl From<regex::Error> for PolarsError {
}

pub type Result<T> = std::result::Result<T, PolarsError>;
pub use arrow::error::ArrowError;
pub use arrow::error::Error as ArrowError;
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "aafba7b4eb4991e016638cbc1d4df676912e8236", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_mutable", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "2fcc522b903f9a3d161daf97c8026fc9066736ee", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "improve_mutable", default-features = false }
# arrow = { package = "arrow2", version = "0.11", default-features = false }
# arrow = { package = "arrow2", path = "../../../arrow2", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
110 changes: 65 additions & 45 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use super::ArrowResult;
use arrow::array::Array;
use arrow::chunk::Chunk;
use arrow::datatypes::DataType as ArrowDataType;
use arrow::datatypes::PhysicalType;
use arrow::error::ArrowError;
use arrow::error::Error as ArrowError;
use arrow::io::parquet::read::ParquetError;
use arrow::io::parquet::write::{self, FileWriter, *};
use arrow::io::parquet::write::{array_to_pages, DynIter, DynStreamingIterator, Encoding};
use arrow::io::parquet::write::{DynIter, DynStreamingIterator, Encoding};
use polars_core::prelude::*;
use rayon::prelude::*;
use std::collections::VecDeque;
use std::io::Write;

pub use write::{BrotliLevel, CompressionOptions as ParquetCompression, GzipLevel, ZstdLevel};

struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
Expand Down Expand Up @@ -48,8 +54,6 @@ pub struct ParquetWriter<W> {
statistics: bool,
}

pub use write::CompressionOptions as ParquetCompression;

impl<W> ParquetWriter<W>
where
W: Write,
Expand Down Expand Up @@ -90,63 +94,79 @@ where
};
let schema = ArrowSchema::from(fields);
let parquet_schema = write::to_parquet_schema(&schema)?;
let encodings = schema
.fields
.iter()
.map(|field| match field.data_type().to_physical_type() {
// delta encoding
// Not yet supported by pyarrow
// PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// dictionaries are kept dict-encoded
// declare encodings
let encoding_map = |data_type: &ArrowDataType| {
match data_type.to_physical_type() {
PhysicalType::Dictionary(_) => Encoding::RleDictionary,
// remaining is plain
_ => Encoding::Plain,
})
}
};

let encodings = (&schema.fields)
.iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

let row_group_iter = rb_iter.filter_map(|batch| match batch.len() {
0 => None,
_ => {
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.fields().par_iter())
.zip(encodings.par_iter())
.map(|((array, tp), encoding)| {
let encoded_pages =
array_to_pages(array.as_ref(), tp.clone(), options, *encoding)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()
})
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()
.map(Some)
.transpose();

columns.map(|columns| {
columns.map(|columns| {
let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
(row_group, batch.columns()[0].len())
})
})
let row_group = create_serializer(
batch,
parquet_schema.fields().to_vec(),
encodings.clone(),
options,
);

Some(row_group)
}
});

let mut writer = FileWriter::try_new(&mut self.writer, schema, options)?;
// write the headers
writer.start()?;
for group in row_group_iter {
let (group, _len) = group?;
writer.write(group)?;
writer.write(group?)?;
}
let _ = writer.end(None)?;

Ok(())
}
}

fn create_serializer(
batch: Chunk<Arc<dyn Array>>,
fields: Vec<ParquetType>,
encodings: Vec<Vec<Encoding>>,
options: WriteOptions,
) -> std::result::Result<RowGroupIter<'static, ArrowError>, ArrowError> {
let columns = batch
.columns()
.par_iter()
.zip(fields)
.zip(encodings)
.flat_map(move |((array, type_), encoding)| {
let encoded_columns = array_to_columns(array, type_, options, encoding).unwrap();
encoded_columns
.into_iter()
.map(|encoded_pages| {
let encoded_pages = DynIter::new(
encoded_pages
.into_iter()
.map(|x| x.map_err(|e| ParquetError::General(e.to_string()))),
);
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()
})
.collect::<Vec<_>>()
})
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()?;

let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
Ok(row_group)
}
10 changes: 5 additions & 5 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 21 additions & 2 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,7 @@ def transpose(
def write_parquet(
self,
file: Union[str, Path, BytesIO],
*,
compression: Optional[
Union[
Literal[
Expand All @@ -1359,6 +1360,7 @@ def write_parquet(
str,
]
] = "lz4",
compression_level: Optional[int] = None,
statistics: bool = False,
use_pyarrow: bool = False,
**kwargs: Any,
Expand All @@ -1379,6 +1381,17 @@ def write_parquet(
- "brotli"
- "lz4"
- "zstd"
compression_level
Supported by {'gzip', 'brotli', 'zstd'}
- "gzip"
* min-level: 0
* max-level: 10
- "brotli"
* min-level: 0
* max-level: 11
- "zstd"
* min-level: 1
* max-level: 22
statistics
Write statistics to the parquet headers. This requires extra compute.
use_pyarrow
Expand Down Expand Up @@ -1420,7 +1433,7 @@ def write_parquet(
**kwargs,
)
else:
self._df.to_parquet(file, compression, statistics)
self._df.to_parquet(file, compression, compression_level, statistics)

def to_parquet(
self,
Expand All @@ -1444,7 +1457,13 @@ def to_parquet(
warnings.warn(
"'to_parquet' is deprecated. please use 'write_parquet'", DeprecationWarning
)
return self.write_parquet(file, compression, statistics, use_pyarrow, **kwargs)
return self.write_parquet(
file,
compression=compression,
statistics=statistics,
use_pyarrow=use_pyarrow,
**kwargs,
)

def to_numpy(self) -> np.ndarray:
"""
Expand Down

0 comments on commit ff4a318

Please sign in to comment.