Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: accidental panic if predicate selects no files #12575

Merged
merged 3 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 18 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 18 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ default-members = [
# ]

[workspace.package]
version = "0.35.2"
version = "0.35.4"
authors = ["Ritchie Vink <ritchie46@gmail.com>"]
edition = "2021"
homepage = "https://www.pola.rs/"
Expand Down Expand Up @@ -78,26 +78,26 @@ version_check = "0.9.4"
xxhash-rust = { version = "0.8.6", features = ["xxh3"] }
zstd = "0.13"

polars = { version = "0.35.2", path = "crates/polars", default-features = false }
polars-algo = { version = "0.35.2", path = "crates/polars-algo", default-features = false }
polars-core = { version = "0.35.2", path = "crates/polars-core", default-features = false }
polars-error = { version = "0.35.2", path = "crates/polars-error", default-features = false }
polars-ffi = { version = "0.35.2", path = "crates/polars-ffi", default-features = false }
polars-io = { version = "0.35.2", path = "crates/polars-io", default-features = false }
polars-json = { version = "0.35.2", path = "crates/polars-json", default-features = false }
polars-lazy = { version = "0.35.2", path = "crates/polars-lazy", default-features = false }
polars-ops = { version = "0.35.2", path = "crates/polars-ops", default-features = false }
polars-parquet = { version = "0.35.2", path = "crates/polars-parquet", default-features = false }
polars-pipe = { version = "0.35.2", path = "crates/polars-pipe", default-features = false }
polars-plan = { version = "0.35.2", path = "crates/polars-plan", default-features = false }
polars-row = { version = "0.35.2", path = "crates/polars-row", default-features = false }
polars-sql = { version = "0.35.2", path = "crates/polars-sql", default-features = false }
polars-time = { version = "0.35.2", path = "crates/polars-time", default-features = false }
polars-utils = { version = "0.35.2", path = "crates/polars-utils", default-features = false }
polars = { version = "0.35.4", path = "crates/polars", default-features = false }
polars-algo = { version = "0.35.4", path = "crates/polars-algo", default-features = false }
polars-core = { version = "0.35.4", path = "crates/polars-core", default-features = false }
polars-error = { version = "0.35.4", path = "crates/polars-error", default-features = false }
polars-ffi = { version = "0.35.4", path = "crates/polars-ffi", default-features = false }
polars-io = { version = "0.35.4", path = "crates/polars-io", default-features = false }
polars-json = { version = "0.35.4", path = "crates/polars-json", default-features = false }
polars-lazy = { version = "0.35.4", path = "crates/polars-lazy", default-features = false }
polars-ops = { version = "0.35.4", path = "crates/polars-ops", default-features = false }
polars-parquet = { version = "0.35.4", path = "crates/polars-parquet", default-features = false }
polars-pipe = { version = "0.35.4", path = "crates/polars-pipe", default-features = false }
polars-plan = { version = "0.35.4", path = "crates/polars-plan", default-features = false }
polars-row = { version = "0.35.4", path = "crates/polars-row", default-features = false }
polars-sql = { version = "0.35.4", path = "crates/polars-sql", default-features = false }
polars-time = { version = "0.35.4", path = "crates/polars-time", default-features = false }
polars-utils = { version = "0.35.4", path = "crates/polars-utils", default-features = false }

[workspace.dependencies.arrow]
package = "polars-arrow"
version = "0.35.2"
version = "0.35.4"
path = "crates/polars-arrow"
default-features = false
features = [
Expand Down
28 changes: 28 additions & 0 deletions crates/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,40 @@ mod read;
mod read_impl;
mod write;

use std::borrow::Cow;

pub use polars_parquet::write::FileMetaData;
pub use read::*;
pub use write::{BrotliLevel, GzipLevel, ZstdLevel, *};

use crate::parquet::read_impl::materialize_hive_partitions;
use crate::utils::apply_projection;

pub type FileMetaDataRef = Arc<FileMetaData>;

pub fn materialize_empty_df(
projection: Option<&[usize]>,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
row_count: Option<&RowCount>,
) -> DataFrame {
let schema = if let Some(projection) = projection {
Cow::Owned(apply_projection(reader_schema, projection))
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());

if let Some(row_count) = row_count {
df.insert_column(0, Series::new_empty(&row_count.name, &IDX_DTYPE))
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);

df
}

use super::*;

#[cfg(test)]
Expand Down
5 changes: 1 addition & 4 deletions crates/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,14 @@ use polars_parquet::write::FileMetaData;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use super::read_impl::FetchRowGroupsFromMmapReader;
use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::FetchRowGroupsFromObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::async_impl::ParquetObjectStore;
#[cfg(feature = "cloud")]
use crate::parquet::read_impl::materialize_empty_df;
use crate::parquet::read_impl::read_parquet;
pub use crate::parquet::read_impl::BatchedParquetReader;
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
Expand Down
26 changes: 2 additions & 24 deletions crates/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use rayon::prelude::*;

use super::materialize_empty_df;
use super::mmap::ColumnStore;
use crate::mmap::{MmapBytesReader, ReaderBytes};
#[cfg(feature = "cloud")]
Expand All @@ -21,7 +22,7 @@ use crate::parquet::mmap::mmap_columns;
use crate::parquet::predicates::read_this_row_group;
use crate::parquet::{mmap, FileMetaDataRef, ParallelStrategy};
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::utils::{apply_projection, get_reader_bytes};
use crate::utils::get_reader_bytes;
use crate::RowCount;

fn enlarge_data_type(mut data_type: ArrowDataType) -> ArrowDataType {
Expand Down Expand Up @@ -328,29 +329,6 @@ fn rg_to_dfs_par_over_rg(
Ok(dfs.into_iter().flatten().collect())
}

pub(super) fn materialize_empty_df(
projection: Option<&[usize]>,
reader_schema: &ArrowSchema,
hive_partition_columns: Option<&[Series]>,
row_count: Option<&RowCount>,
) -> DataFrame {
let schema = if let Some(projection) = projection {
Cow::Owned(apply_projection(reader_schema, projection))
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());

if let Some(row_count) = row_count {
df.insert_column(0, Series::new_empty(&row_count.name, &IDX_DTYPE))
.unwrap();
}

materialize_hive_partitions(&mut df, hive_partition_columns, 0);

df
}

#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
mut reader: R,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let file = std::fs::File::open(&self.path)?;
let (projection, predicate) = prepare_scan_args(
&self.predicate,
self.predicate.clone(),
&mut self.file_options.with_columns,
&mut self.schema,
self.file_options.row_count.is_some(),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
fn prepare_scan_args(
predicate: &Option<Arc<dyn PhysicalExpr>>,
predicate: Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<Vec<String>>>,
schema: &mut SchemaRef,
has_row_count: bool,
Expand All @@ -53,7 +53,7 @@ fn prepare_scan_args(
has_row_count,
);

let predicate = predicate.clone().map(phys_expr_to_io_expr);
let predicate = predicate.map(phys_expr_to_io_expr);

(projection, predicate)
}
Expand Down
28 changes: 25 additions & 3 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl ParquetExec {

let file = std::fs::File::open(path)?;
let (projection, predicate) = prepare_scan_args(
&self.predicate,
self.predicate.clone(),
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
base_row_count.is_some(),
Expand Down Expand Up @@ -275,7 +275,7 @@ impl ParquetExec {
.map(|hive| hive.materialize_partition_columns());

let (projection, predicate) = prepare_scan_args(
predicate,
predicate.clone(),
&mut file_options.with_columns.clone(),
&mut file_info.schema.clone(),
row_count.is_some(),
Expand Down Expand Up @@ -312,7 +312,29 @@ impl ParquetExec {
}

fn read(&mut self) -> PolarsResult<DataFrame> {
let is_cloud = is_cloud_url(self.paths[0].as_path());
let is_cloud = match self.paths.first() {
Some(p) => is_cloud_url(p.as_path()),
None => {
let hive_partitions = self
.file_info
.hive_parts
.as_ref()
.map(|hive| hive.materialize_partition_columns());
let (projection, _) = prepare_scan_args(
None,
&mut self.file_options.with_columns,
&mut self.file_info.schema,
self.file_options.row_count.is_some(),
hive_partitions.as_deref(),
);
return Ok(materialize_empty_df(
projection.as_deref(),
self.file_info.reader_schema.as_ref().unwrap(),
hive_partitions.as_deref(),
self.file_options.row_count.as_ref(),
));
},
};
let force_async = std::env::var("POLARS_FORCE_ASYNC").as_deref().unwrap_or("") == "1";

let out = if is_cloud || force_async {
Expand Down
Loading