Skip to content

Commit

Permalink
update arrow (#3181)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 26, 2022
1 parent 929ec8c commit 7ddff81
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 124 deletions.
4 changes: 2 additions & 2 deletions polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
num = "^0.4"
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ thiserror = "^1.0"

[dependencies.arrow]
package = "arrow2"
# git = "https://github.com/jorgecarleitao/arrow2"
git = "https://github.com/ritchie46/arrow2"
# rev = "36b08249dd03c8b8f88f454158fcf3401c647a49"
branch = "polars"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d"
# branch = "polars"
version = "0.10"
default-features = false
features = [
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/chunked_array/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ where
arr.null_bitmap = validity;
Box::new(arr)
}
fn to_boxed(&self) -> Box<dyn Array> {
Box::new(self.clone())
}
}

impl<T> ObjectChunked<T>
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ private = ["polars-time/private"]
[dependencies]
ahash = "0.7"
anyhow = "1.0"
# arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "36b08249dd03c8b8f88f454158fcf3401c647a49", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "e478619ef1f07c472b4e5daa35bfa08dbd87591d", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "polars", default-features = false }
# arrow = { package = "arrow2", version = "0.10", default-features = false }
csv-core = { version = "0.1.10", optional = true }
dirs = "4.0"
Expand Down
103 changes: 30 additions & 73 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::ArrowResult;
use arrow::io::parquet::read::statistics::{
deserialize_statistics, PrimitiveStatistics, Statistics, Utf8Statistics,
};
use arrow::io::parquet::read::ColumnChunkMetaData;
use arrow::array::{Array, ArrayRef};
use arrow::compute::concatenate::concatenate;
use arrow::io::parquet::read::statistics::{self, deserialize, Statistics};
use arrow::io::parquet::read::RowGroupMetaData;
use polars_core::prelude::*;

/// The statistics for a column in a Parquet file
Expand All @@ -11,79 +11,37 @@ use polars_core::prelude::*;
/// - min value
/// - null_count
#[cfg_attr(debug_assertions, derive(Debug))]
pub struct ColumnStats(Box<dyn Statistics>);
pub struct ColumnStats(Statistics, Field);

impl ColumnStats {
pub fn dtype(&self) -> DataType {
self.0.data_type().into()
self.1.data_type().clone()
}

pub fn null_count(&self) -> Option<usize> {
self.0.null_count().map(|v| v as usize)
match &self.0.null_count {
statistics::Count::Single(arr) => {
if arr.is_valid(0) {
Some(arr.value(0) as usize)
} else {
None
}
}
_ => None,
}
}

pub fn to_min_max(&self) -> Option<Series> {
let name = "";
use DataType::*;
let s = match self.dtype() {
Float64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Float32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<f32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Int32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<i32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt32 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u32>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
UInt64 => {
let stats = self
.0
.as_any()
.downcast_ref::<PrimitiveStatistics<u64>>()
.unwrap();
Series::new(name, [stats.min_value, stats.max_value])
}
Utf8 => {
let stats = self.0.as_any().downcast_ref::<Utf8Statistics>().unwrap();
Series::new(
name,
[stats.min_value.as_deref(), stats.max_value.as_deref()],
)
}
_ => return None,
};
Some(s)
let max_val = &*self.0.max_value;
let min_val = &*self.0.min_value;

let dtype = DataType::from(min_val.data_type());
if dtype.is_numeric() || matches!(dtype, DataType::Utf8) {
let arr = concatenate(&[min_val, max_val]).unwrap();
Some(Series::try_from(("", Arc::from(arr) as ArrayRef)).unwrap())
} else {
None
}
}
}

Expand All @@ -105,17 +63,16 @@ impl BatchStats {

/// Collect the statistics in a column chunk.
pub(crate) fn collect_statistics(
md: &[ColumnChunkMetaData],
md: &[RowGroupMetaData],
arrow_schema: &ArrowSchema,
) -> ArrowResult<Option<BatchStats>> {
let mut schema = Schema::with_capacity(arrow_schema.fields.len());
let mut stats = vec![];

for fld in &arrow_schema.fields {
for st in deserialize_statistics(fld, md)?.into_iter().flatten() {
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st));
}
let st = deserialize(fld, md)?;
schema.with_column(fld.name.to_string(), (&fld.data_type).into());
stats.push(ColumnStats(st, Field::from(fld)));
}

Ok(if stats.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let current_row_count = md.num_rows() as u32;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(md.columns(), schema)? {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
Expand Down
22 changes: 13 additions & 9 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl FallibleStreamingIterator for Bla {
#[must_use]
pub struct ParquetWriter<W> {
writer: W,
compression: write::Compression,
compression: write::CompressionOptions,
statistics: bool,
}

pub use write::Compression as ParquetCompression;
pub use write::CompressionOptions as ParquetCompression;

impl<W> ParquetWriter<W>
where
Expand All @@ -61,13 +61,13 @@ where
{
ParquetWriter {
writer,
compression: write::Compression::Snappy,
compression: write::CompressionOptions::Lz4Raw,
statistics: false,
}
}

/// Set the compression used. Defaults to `Snappy`.
pub fn with_compression(mut self, compression: write::Compression) -> Self {
/// Set the compression used. Defaults to `Lz4Raw`.
pub fn with_compression(mut self, compression: write::CompressionOptions) -> Self {
self.compression = compression;
self
}
Expand Down Expand Up @@ -111,8 +111,12 @@ where
.zip(parquet_schema.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let encoded_pages =
array_to_pages(array.as_ref(), descriptor.clone(), options, *encoding)?;
let encoded_pages = array_to_pages(
array.as_ref(),
descriptor.descriptor.clone(),
options,
*encoding,
)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
Expand All @@ -133,8 +137,8 @@ where
// write the headers
writer.start()?;
for group in row_group_iter {
let (group, len) = group?;
writer.write(group, len)?;
let (group, _len) = group?;
writer.write(group)?;
}
let _ = writer.end(None)?;

Expand Down
13 changes: 10 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,9 +419,16 @@ mod stats {
let fld_l = self.left.to_field(schema)?;
let fld_r = self.right.to_field(schema)?;

debug_assert_eq!(fld_l.data_type(), fld_r.data_type(), "implementation error");
if fld_l.data_type() != fld_r.data_type() {
return Ok(true);
#[cfg(debug_assertions)]
{
match (fld_l.data_type(), fld_r.data_type()) {
#[cfg(feature = "dtype-categorical")]
(DataType::Utf8, DataType::Categorical(_)) => {}
#[cfg(feature = "dtype-categorical")]
(DataType::Categorical(_), DataType::Utf8) => {}
(l, r) if l != r => panic!("implementation error: {:?}, {:?}", l, r),
_ => {}
}
}

let dummy = DataFrame::new_no_checks(vec![]);
Expand Down
39 changes: 11 additions & 28 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ impl PyDataFrame {
"gzip" => ParquetCompression::Gzip,
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli,
"lz4" => ParquetCompression::Lz4,
"zstd" => ParquetCompression::Zstd,
"lz4" => ParquetCompression::Lz4Raw,
"zstd" => ParquetCompression::Zstd(None),
s => return Err(PyPolarsErr::Other(format!("compression {} not supported", s)).into()),
};

Expand Down

0 comments on commit 7ddff81

Please sign in to comment.