Skip to content

Commit

Permalink
fix: accidental panic if predicate selects no files (#12575)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Nov 20, 2023
1 parent 4d2027b commit 402e280
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 71 deletions.
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ default-members = [
# ]

[workspace.package]
version = "0.35.2"
version = "0.35.4"
authors = ["Ritchie Vink <ritchie46@gmail.com>"]
edition = "2021"
homepage = "https://www.pola.rs/"
Expand Down Expand Up @@ -78,26 +78,26 @@ version_check = "0.9.4"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
zstd = "0.13"

polars = { version = "0.35.2", path = "crates/polars", default-features = false }
polars-algo = { version = "0.35.2", path = "crates/polars-algo", default-features = false }
polars-core = { version = "0.35.2", path = "crates/polars-core", default-features = false }
polars-error = { version = "0.35.2", path = "crates/polars-error", default-features = false }
polars-ffi = { version = "0.35.2", path = "crates/polars-ffi", default-features = false }
polars-io = { version = "0.35.2", path = "crates/polars-io", default-features = false }
polars-json = { version = "0.35.2", path = "crates/polars-json", default-features = false }
polars-lazy = { version = "0.35.2", path = "crates/polars-lazy", default-features = false }
polars-ops = { version = "0.35.2", path = "crates/polars-ops", default-features = false }
polars-parquet = { version = "0.35.2", path = "crates/polars-parquet", default-features = false }
polars-pipe = { version = "0.35.2", path = "crates/polars-pipe", default-features = false }
polars-plan = { version = "0.35.2", path = "crates/polars-plan", default-features = false }
polars-row = { version = "0.35.2", path = "crates/polars-row", default-features = false }
polars-sql = { version = "0.35.2", path = "crates/polars-sql", default-features = false }
polars-time = { version = "0.35.2", path = "crates/polars-time", default-features = false }
polars-utils = { version = "0.35.2", path = "crates/polars-utils", default-features = false }
polars = { version = "0.35.4", path = "crates/polars", default-features = false }
polars-algo = { version = "0.35.4", path = "crates/polars-algo", default-features = false }
polars-core = { version = "0.35.4", path = "crates/polars-core", default-features = false }
polars-error = { version = "0.35.4", path = "crates/polars-error", default-features = false }
polars-ffi = { version = "0.35.4", path = "crates/polars-ffi", default-features = false }
polars-io = { version = "0.35.4", path = "crates/polars-io", default-features = false }
polars-json = { version = "0.35.4", path = "crates/polars-json", default-features = false }
polars-lazy = { version = "0.35.4", path = "crates/polars-lazy", default-features = false }
polars-ops = { version = "0.35.4", path = "crates/polars-ops", default-features = false }
polars-parquet = { version = "0.35.4", path = "crates/polars-parquet", default-features = false }
polars-pipe = { version = "0.35.4", path = "crates/polars-pipe", default-features = false }
polars-plan = { version = "0.35.4", path = "crates/polars-plan", default-features = false }
polars-row = { version = "0.35.4", path = "crates/polars-row", default-features = false }
polars-sql = { version = "0.35.4", path = "crates/polars-sql", default-features = false }
polars-time = { version = "0.35.4", path = "crates/polars-time", default-features = false }
polars-utils = { version = "0.35.4", path = "crates/polars-utils", default-features = false }

[workspace.dependencies.arrow]
package = "polars-arrow"
version = "0.35.2"
version = "0.35.4"
path = "crates/polars-arrow"
default-features = false
features = [
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,40 @@ mod read;
mod read_impl;
mod write;

use std::borrow::Cow;

pub use polars_parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

use crate::parquet::read_impl::materialize_hive_partitions;
use crate::utils::apply_projection;

pub type FileMetaDataRef = Arc<FileMetaData>;

pub fn materialize_empty_df(
projection: Option<&[usize]>,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
row_count: Option<&RowCount>,
) -> DataFrame {
let schema = if let Some(projection) = projection {
Cow::Owned(apply_projection(reader_schema, projection))
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());

if let Some(row_count) = row_count {
df.insert_column(0, Series::new_empty(&row_count.name, &IDX_DTYPE))
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);

df
}

use super::*;

#[cfg(test)]
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ use polars_parquet::write::FileMetaData;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::read_impl::FetchRowGroupsFromMmapReader;
use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::read_impl::materialize_empty_df;
use crate::parquet::read_impl::read_parquet;
pub use crate::parquet::read_impl::BatchedParquetReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
Expand Down
26 changes: 2 additions & 24 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use rayon::prelude::*;

use super::materialize_empty_df;
use super::mmap::ColumnStore;
use crate::mmap::{MmapBytesReader, ReaderBytes};
#[cfg(feature = "cloud")]
Expand All @@ -21,7 +22,7 @@ use crate::parquet::mmap::mmap_columns;
use crate::parquet::predicates::read_this_row_group;
use crate::parquet::{mmap, FileMetaDataRef, ParallelStrategy};
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::{apply_projection, get_reader_bytes};
use crate::utils::get_reader_bytes;
use crate::RowCount;

fn enlarge_data_type(mut data_type: ArrowDataType) -> ArrowDataType {
Expand Down Expand Up @@ -328,29 +329,6 @@ fn rg_to_dfs_par_over_rg(
Ok(dfs.into_iter().flatten().collect())
}

pub(super) fn materialize_empty_df(
projection: Option<&[usize]>,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
row_count: Option<&RowCount>,
) -> DataFrame {
let schema = if let Some(projection) = projection {
Cow::Owned(apply_projection(reader_schema, projection))
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());

if let Some(row_count) = row_count {
df.insert_column(0, Series::new_empty(&row_count.name, &IDX_DTYPE))
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);

df
}

#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
mut reader: R,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let file = std::fs::File::open(&self.path)?;
let (projection, predicate) = prepare_scan_args(
&self.predicate,
self.predicate.clone(),
&mut self.file_options.with_columns,
&mut self.schema,
self.file_options.row_count.is_some(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
fn prepare_scan_args(
predicate: &Option<Arc<dyn PhysicalExpr>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<Vec<String>>>,
schema: &mut SchemaRef,
has_row_count: bool,
Expand All @@ -53,7 +53,7 @@ fn prepare_scan_args(
has_row_count,
);

let predicate = predicate.clone().map(phys_expr_to_io_expr);
let predicate = predicate.map(phys_expr_to_io_expr);

(projection, predicate)
}
Expand Down
28 changes: 25 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ParquetExec {

let file = std::fs::File::open(path)?;
let (projection, predicate) = prepare_scan_args(
&self.predicate,
self.predicate.clone(),
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
base_row_count.is_some(),
Expand Down Expand Up @@ -275,7 +275,7 @@ impl ParquetExec {
.map(|hive| hive.materialize_partition_columns());

let (projection, predicate) = prepare_scan_args(
predicate,
predicate.clone(),
&mut file_options.with_columns.clone(),
&mut file_info.schema.clone(),
row_count.is_some(),
Expand Down Expand Up @@ -312,7 +312,29 @@ impl ParquetExec {
}

fn read(&mut self) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(self.paths[0].as_path());
let is_cloud = match self.paths.first() {
Some(p) => is_cloud_url(p.as_path()),
None => {
let hive_partitions = self
.file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
let (projection, _) = prepare_scan_args(
None,
&mut self.file_options.with_columns,
&mut self.file_info.schema,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);
return Ok(materialize_empty_df(
projection.as_deref(),
self.file_info.reader_schema.as_ref().unwrap(),
hive_partitions.as_deref(),
self.file_options.row_count.as_ref(),
));
},
};
let force_async = std::env::var("POLARS_FORCE_ASYNC").as_deref().unwrap_or("") == "1";

let out = if is_cloud || force_async {
Expand Down
Loading

0 comments on commit 402e280

Please sign in to comment.