Skip to content

Commit

Permalink
feat: Make hive_partitioning parameter default to None, which is …
Browse files Browse the repository at this point in the history
…automatically enabled for single directory inputs, and disabled otherwise (#17106)

Co-authored-by: Ritchie Vink <ritchie46@gmail.com>
  • Loading branch information
nameexhaustion and ritchie46 authored Jun 22, 2024
1 parent a99b234 commit 7f5412a
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 53 deletions.
7 changes: 5 additions & 2 deletions crates/polars-io/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ pub struct RowIndex {
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct HiveOptions {
pub enabled: bool,
/// This can be `None` to automatically enable for single directory scans
/// and disable otherwise. However it should be initialized if it is inside
/// a DSL / IR plan.
pub enabled: Option<bool>,
pub hive_start_idx: usize,
pub schema: Option<SchemaRef>,
}

impl Default for HiveOptions {
fn default() -> Self {
Self {
enabled: true,
enabled: Some(true),
hive_start_idx: 0,
schema: None,
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl LazyCsvReader {
where
F: Fn(Schema) -> PolarsResult<Schema>,
{
let paths = self.expand_paths()?.0;
let paths = self.expand_paths(false)?.0;
let Some(path) = paths.first() else {
polars_bail!(ComputeError: "no paths specified for this reader");
};
Expand Down Expand Up @@ -262,7 +262,7 @@ impl LazyFileListReader for LazyCsvReader {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
// `expand_paths` respects globs
let paths = self.expand_paths()?.0;
let paths = self.expand_paths(false)?.0;

let mut lf: LazyFrame =
DslBuilder::scan_csv(paths, self.read_options, self.cache, self.cloud_options)?
Expand Down
66 changes: 47 additions & 19 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,45 @@ use crate::prelude::*;

pub type PathIterator = Box<dyn Iterator<Item = PolarsResult<PathBuf>>>;

pub(super) fn get_glob_start_idx(path: &[u8]) -> Option<usize> {
memchr::memchr3(b'*', b'?', b'[', path)
}

/// Recursively traverses directories and expands globs if `glob` is `true`.
/// Returns the expanded paths and the index at which to start parsing hive
/// partitions from the path.
fn expand_paths(
paths: &[PathBuf],
#[allow(unused_variables)] cloud_options: Option<&CloudOptions>,
glob: bool,
check_directory_level: bool,
) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
let Some(first_path) = paths.first() else {
return Ok((vec![].into(), 0));
};

let is_cloud = is_cloud_url(first_path);
let mut expand_start_idx = usize::MAX;
let mut out_paths = vec![];

let expand_start_idx = &mut usize::MAX.clone();
let mut update_expand_start_idx = |i, path_idx: usize| {
if check_directory_level
&& ![usize::MAX, i].contains(expand_start_idx)
// They could still be the same directory level, just with different name length
&& (paths[path_idx].parent() != paths[path_idx - 1].parent())
{
polars_bail!(
InvalidOperation:
"attempted to read from different directory levels with hive partitioning enabled: first path: {}, second path: {}",
paths[path_idx - 1].to_str().unwrap(),
paths[path_idx].to_str().unwrap(),
)
} else {
*expand_start_idx = std::cmp::min(*expand_start_idx, i);
Ok(())
}
};

if is_cloud {
#[cfg(feature = "async")]
{
Expand All @@ -45,25 +68,25 @@ fn expand_paths(
})
}

for path in paths {
let glob_start_idx =
memchr::memchr3(b'*', b'?', b'[', path.to_str().unwrap().as_bytes());
for (path_idx, path) in paths.iter().enumerate() {
let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());

let (path, glob_start_idx) = if let Some(glob_start_idx) = glob_start_idx {
(path.clone(), glob_start_idx)
let path = if glob_start_idx.is_some() {
path.clone()
} else if !path.ends_with("/")
&& is_file_cloud(path.to_str().unwrap(), cloud_options)?
{
expand_start_idx = 0;
update_expand_start_idx(0, path_idx)?;
out_paths.push(path.clone());
continue;
} else if !glob {
polars_bail!(ComputeError: "not implemented: did not find cloud file at path = {} and `glob` was set to false", path.to_str().unwrap());
} else {
(path.join("**/*"), path.to_str().unwrap().len())
// FIXME: This will fail! See https://github.com/pola-rs/polars/issues/17105
path.join("**/*")
};

expand_start_idx = std::cmp::min(expand_start_idx, glob_start_idx);
update_expand_start_idx(0, path_idx)?;

out_paths.extend(
polars_io::async_glob(path.to_str().unwrap(), cloud_options)?
Expand All @@ -77,13 +100,14 @@ fn expand_paths(
} else {
let mut stack = VecDeque::new();

for path in paths {
for path_idx in 0..paths.len() {
let path = &paths[path_idx];
stack.clear();

if path.is_dir() {
let i = path.to_str().unwrap().len();

expand_start_idx = std::cmp::min(expand_start_idx, i);
update_expand_start_idx(i, path_idx)?;

stack.push_back(path.clone());

Expand All @@ -107,11 +131,10 @@ fn expand_paths(
continue;
}

let i = memchr::memchr3(b'*', b'?', b'[', path.to_str().unwrap().as_bytes());
let i = get_glob_start_idx(path.to_str().unwrap().as_bytes());

if glob && i.is_some() {
let i = i.unwrap();
expand_start_idx = std::cmp::min(expand_start_idx, i);
update_expand_start_idx(0, path_idx)?;

let Ok(paths) = glob::glob(path.to_str().unwrap()) else {
polars_bail!(ComputeError: "invalid glob pattern given")
Expand All @@ -121,15 +144,15 @@ fn expand_paths(
out_paths.push(path.map_err(to_compute_err)?);
}
} else {
expand_start_idx = 0;
update_expand_start_idx(0, path_idx)?;
out_paths.push(path.clone());
}
}
}

Ok((
out_paths.into_iter().collect::<Arc<[_]>>(),
expand_start_idx,
*expand_start_idx,
))
}

Expand All @@ -144,7 +167,7 @@ pub trait LazyFileListReader: Clone {
return self.finish_no_glob();
}

let paths = self.expand_paths()?.0;
let paths = self.expand_paths(false)?.0;

let lfs = paths
.iter()
Expand Down Expand Up @@ -239,7 +262,12 @@ pub trait LazyFileListReader: Clone {

/// Returns a list of paths after resolving globs and directories, as well as
/// the string index at which to start parsing hive partitions.
fn expand_paths(&self) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
expand_paths(self.paths(), self.cloud_options(), self.glob())
fn expand_paths(&self, check_directory_level: bool) -> PolarsResult<(Arc<[PathBuf]>, usize)> {
expand_paths(
self.paths(),
self.cloud_options(),
self.glob(),
check_directory_level,
)
}
}
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl LazyIpcReader {

impl LazyFileListReader for LazyIpcReader {
fn finish(self) -> PolarsResult<LazyFrame> {
let paths = self.expand_paths()?.0;
let paths = self.expand_paths(false)?.0;
let args = self.args;

let options = IpcScanOptions {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl LazyFileListReader for LazyJsonLineReader {
return self.finish_no_glob();
}

let paths = self.expand_paths()?.0;
let paths = self.expand_paths(false)?.0;

let file_options = FileScanOptions {
n_rows: self.n_rows,
Expand Down
11 changes: 10 additions & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_io::cloud::CloudOptions;
use polars_io::parquet::read::ParallelStrategy;
use polars_io::{HiveOptions, RowIndex};

use super::get_glob_start_idx;
use crate::prelude::*;

#[derive(Clone)]
Expand Down Expand Up @@ -57,7 +58,15 @@ impl LazyParquetReader {
impl LazyFileListReader for LazyParquetReader {
/// Get the final [LazyFrame].
fn finish(mut self) -> PolarsResult<LazyFrame> {
let (paths, hive_start_idx) = self.expand_paths()?;
let (paths, hive_start_idx) =
self.expand_paths(self.args.hive_options.enabled.unwrap_or(false))?;
self.args.hive_options.enabled =
Some(self.args.hive_options.enabled.unwrap_or_else(|| {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& paths[0] != self.paths[0]
}));
self.args.hive_options.hive_start_idx = hive_start_idx;

let row_index = self.args.row_index;
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl DslBuilder {
file_counter: Default::default(),
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
enabled: Some(false),
..Default::default()
},
};
Expand Down Expand Up @@ -139,7 +139,7 @@ impl DslBuilder {
file_counter: Default::default(),
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
enabled: Some(false),
..Default::default()
},
},
Expand Down Expand Up @@ -175,7 +175,7 @@ impl DslBuilder {
file_counter: Default::default(),
// TODO: Support Hive partitioning.
hive_options: HiveOptions {
enabled: false,
enabled: Some(false),
..Default::default()
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub fn to_alp_impl(

let hive_parts = if hive_parts.is_some() {
hive_parts
} else if file_options.hive_options.enabled {
} else if file_options.hive_options.enabled.unwrap() {
hive_partitions_from_paths(
paths.as_ref(),
file_options.hive_options.hive_start_idx,
Expand Down
10 changes: 6 additions & 4 deletions py-polars/polars/io/parquet/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def read_parquet(
row_index_offset: int = 0,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
hive_partitioning: bool = True,
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
rechunk: bool = False,
Expand Down Expand Up @@ -82,7 +82,9 @@ def read_parquet(
can be skipped from reading.
hive_partitioning
Infer statistics and schema from Hive partitioned URL and use them
to prune reads.
to prune reads. This is unset by default (i.e. `None`), meaning it is
automatically enabled when a single directory is passed, and otherwise
disabled.
glob
Expand path given via globbing rules.
hive_schema
Expand Down Expand Up @@ -289,7 +291,7 @@ def scan_parquet(
row_index_offset: int = 0,
parallel: ParallelStrategy = "auto",
use_statistics: bool = True,
hive_partitioning: bool = True,
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
rechunk: bool = False,
Expand Down Expand Up @@ -419,7 +421,7 @@ def _scan_parquet_impl(
storage_options: dict[str, object] | None = None,
low_memory: bool = False,
use_statistics: bool = True,
hive_partitioning: bool = True,
hive_partitioning: bool | None = None,
glob: bool = True,
hive_schema: SchemaDict | None = None,
retries: int = 0,
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lazyframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl PyLazyFrame {
low_memory: bool,
cloud_options: Option<Vec<(String, String)>>,
use_statistics: bool,
hive_partitioning: bool,
hive_partitioning: Option<bool>,
hive_schema: Option<Wrap<Schema>>,
retries: usize,
glob: bool,
Expand Down
Loading

0 comments on commit 7f5412a

Please sign in to comment.