Skip to content

Commit

Permalink
polars lazy: read globbed parquet paths
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 2, 2022
1 parent 483879d commit 926a57c
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 49 deletions.
3 changes: 2 additions & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ description = "Lazy query engine for the Polars DataFrame library"
# this depeadency gets activated
compile = []
default = ["compile"]
parquet = ["polars-core/parquet", "polars-io/parquet"]
parquet = ["polars-core/parquet", "polars-io/parquet", "glob"]
ipc = ["polars-io/ipc"]
csv-file = ["polars-io/csv-file"]
temporal = ["polars-core/temporal"]
Expand Down Expand Up @@ -79,6 +79,7 @@ test = [

[dependencies]
ahash = "0.7"
glob = { version = "0.3", optional = true }
rayon = "1.5"
regex = { version = "1.4", optional = true }

Expand Down
34 changes: 30 additions & 4 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use polars_core::prelude::*;
use polars_core::toggle_string_cache;
use std::sync::Arc;

use crate::functions::concat;
use crate::logical_plan::optimizer::aggregate_pushdown::AggregatePushdown;
#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use crate::logical_plan::optimizer::aggregate_scan_projections::AggScanProjection;
Expand Down Expand Up @@ -308,11 +309,8 @@ impl LazyFrame {
let logical_plan = self.clone().get_plan_builder().build();
logical_plan.schema().clone()
}

/// Create a LazyFrame directly from a parquet scan.
#[cfg(feature = "parquet")]
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub fn scan_parquet(
fn scan_parquet_impl(
path: String,
n_rows: Option<usize>,
cache: bool,
Expand All @@ -325,6 +323,34 @@ impl LazyFrame {
Ok(lf)
}

/// Create a LazyFrame directly from a parquet scan.
#[cfg(feature = "parquet")]
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub fn scan_parquet(
path: String,
n_rows: Option<usize>,
cache: bool,
parallel: bool,
rechunk: bool,
) -> Result<Self> {
if path.contains('*') {
let paths = glob::glob(&path)
.map_err(|_| PolarsError::ValueError("invalid glob pattern given".into()))?;
let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let path_string = path.to_string_lossy().into_owned();
Self::scan_parquet_impl(path_string, n_rows, cache, false)
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
} else {
Self::scan_parquet_impl(path, n_rows, cache, parallel)
}
}

/// Create a LazyFrame directly from a ipc scan.
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
Expand Down
44 changes: 44 additions & 0 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use super::*;

#[test]
fn test_parquet_exec() -> Result<()> {
// filter
for par in [true, false] {
let out = scan_foods_parquet(par)
.filter(col("category").eq(lit("seafood")))
.collect()?;
assert_eq!(out.shape(), (8, 4));
}

// project
for par in [true, false] {
let out = scan_foods_parquet(par)
.select([col("category"), col("sugars_g")])
.collect()?;
assert_eq!(out.shape(), (27, 2));
}

// project + filter
for par in [true, false] {
let out = scan_foods_parquet(par)
.select([col("category"), col("sugars_g")])
.filter(col("category").eq(lit("seafood")))
.collect()?;
assert_eq!(out.shape(), (8, 2));
}

Ok(())
}

#[test]
#[cfg(target_os = "unix")]
fn test_parquet_globbing() -> Result<()> {
let glob = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.parquet";
let df = LazyFrame::scan_parquet(glob.into(), None, false, true, false)?.collect()?;
assert_eq!(df.shape(), (54, 4));
let cal = df.column("calories")?;
assert_eq!(cal.get(0), AnyValue::Int64(45));
assert_eq!(cal.get(53), AnyValue::Int64(194));

Ok(())
}
30 changes: 21 additions & 9 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "parquet")]
mod io;
mod predicate_pushdown;
mod queries;

Expand All @@ -21,17 +23,27 @@ fn scan_foods_csv() -> LazyFrame {
LazyCsvReader::new(path.to_string()).finish().unwrap()
}

#[cfg(feature = "parquet")]
fn scan_foods_parquet(par: bool) -> LazyFrame {
let path = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
let out_path = path.replace(".csv", ".parquet");
fn init_parquet() {
for path in &[
"../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv",
"../../examples/aggregate_multiple_files_in_chunks/datasets/foods2.csv",
] {
let out_path = path.replace(".csv", ".parquet");

if std::fs::metadata(&out_path).is_err() {
let df = CsvReader::from_path(path).unwrap().finish().unwrap();
let f = std::fs::File::create(&out_path).unwrap();
ParquetWriter::new(f).finish(&df).unwrap();
if std::fs::metadata(&out_path).is_err() {
let df = CsvReader::from_path(path).unwrap().finish().unwrap();
let f = std::fs::File::create(&out_path).unwrap();
ParquetWriter::new(f).finish(&df).unwrap();
}
}
LazyFrame::scan_parquet(out_path, None, false, par).unwrap()
}

#[cfg(feature = "parquet")]
fn scan_foods_parquet(par: bool) -> LazyFrame {
init_parquet();
let out_path =
"../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.parquet".into();
LazyFrame::scan_parquet(out_path, None, false, par, true).unwrap()
}

pub(crate) fn fruits_cars() -> DataFrame {
Expand Down
31 changes: 0 additions & 31 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2471,37 +2471,6 @@ fn test_agg_unique_first() -> Result<()> {
Ok(())
}

#[test]
#[cfg(feature = "parquet")]
fn test_parquet_exec() -> Result<()> {
// filter
for par in [true, false] {
let out = scan_foods_parquet(par)
.filter(col("category").eq(lit("seafood")))
.collect()?;
assert_eq!(out.shape(), (8, 4));
}

// project
for par in [true, false] {
let out = scan_foods_parquet(par)
.select([col("category"), col("sugars_g")])
.collect()?;
assert_eq!(out.shape(), (27, 2));
}

// project + filter
for par in [true, false] {
let out = scan_foods_parquet(par)
.select([col("category"), col("sugars_g")])
.filter(col("category").eq(lit("seafood")))
.collect()?;
assert_eq!(out.shape(), (8, 2));
}

Ok(())
}

#[test]
#[cfg(feature = "is_in")]
fn test_is_in() -> Result<()> {
Expand Down
7 changes: 7 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,14 @@ def scan_parquet(
n_rows: Optional[int] = None,
cache: bool = True,
parallel: bool = True,
rechunk: bool = True,
) -> "LazyFrame":
"""
See Also: `pl.scan_parquet`
"""

self = LazyFrame.__new__(LazyFrame)
self._ldf = PyLazyFrame.new_from_parquet(file, n_rows, cache, parallel)
self._ldf = PyLazyFrame.new_from_parquet(file, n_rows, cache, parallel, rechunk)
return self

@staticmethod
Expand Down
5 changes: 4 additions & 1 deletion py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ def scan_parquet(
n_rows: Optional[int] = None,
cache: bool = True,
parallel: bool = True,
rechunk: bool = True,
**kwargs: Any,
) -> LazyFrame:
"""
Expand All @@ -600,6 +601,8 @@ def scan_parquet(
Cache the result after reading.
parallel
Read the parquet file in parallel. The single threaded reader consumes less memory.
rechunk
In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks.
"""

# Map legacy arguments to current ones and remove them from kwargs.
Expand All @@ -609,7 +612,7 @@ def scan_parquet(
file = str(file)

return LazyFrame.scan_parquet(
file=file, n_rows=n_rows, cache=cache, parallel=parallel
file=file, n_rows=n_rows, cache=cache, parallel=parallel, rechunk=rechunk
)


Expand Down
5 changes: 3 additions & 2 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,10 @@ impl PyLazyFrame {
n_rows: Option<usize>,
cache: bool,
parallel: bool,
rechunk: bool,
) -> PyResult<Self> {
let lf =
LazyFrame::scan_parquet(path, n_rows, cache, parallel).map_err(PyPolarsEr::from)?;
let lf = LazyFrame::scan_parquet(path, n_rows, cache, parallel, rechunk)
.map_err(PyPolarsEr::from)?;
Ok(lf.into())
}

Expand Down

0 comments on commit 926a57c

Please sign in to comment.