Skip to content

Commit

Permalink
Add support for loading a collection of parquet files (#3894)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrei-ionescu committed Jul 13, 2022
1 parent c563a72 commit 4d541a0
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 12 deletions.
47 changes: 35 additions & 12 deletions polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,40 @@ impl LazyFrame {
Ok(lf)
}

fn concat_impl(lfs: Vec<LazyFrame>, args: ScanArgsParquet) -> Result<LazyFrame> {
concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize)
};
if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
};
lf
})
}

/// Create a LazyFrame directly from a parquet scan.
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub fn scan_parquet_files(paths: Vec<String>, args: ScanArgsParquet) -> Result<Self> {
let lfs = paths
.iter()
.map(|p| {
Self::scan_parquet_impl(
p.to_string(),
args.n_rows,
args.cache,
args.parallel,
None,
args.rechunk,
)
})
.collect::<Result<Vec<_>>>()?;

Self::concat_impl(lfs, args)
}

/// Create a LazyFrame directly from a parquet scan.
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub fn scan_parquet(path: String, args: ScanArgsParquet) -> Result<Self> {
Expand All @@ -62,18 +96,7 @@ impl LazyFrame {
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize)
};

if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
};
lf
})
Self::concat_impl(lfs, args)
} else {
Self::scan_parquet_impl(
path,
Expand Down
66 changes: 66 additions & 0 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,3 +411,69 @@ fn scan_anonymous_fn() -> Result<()> {
assert_eq!(df.shape(), (5, 4));
Ok(())
}

#[test]
fn test_scan_parquet_files() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
for path in &[
"../../examples/datasets/foods1.csv",
"../../examples/datasets/foods2.csv",
"../../examples/datasets/foods3.csv",
"../../examples/datasets/foods4.csv",
"../../examples/datasets/foods5.csv",
] {
let out_path = path.replace(".csv", ".parquet");
if std::fs::metadata(&out_path).is_err() {
let mut df = CsvReader::from_path(path).unwrap().finish().unwrap();
let f = std::fs::File::create(&out_path).unwrap();
ParquetWriter::new(f)
.with_statistics(true)
.finish(&mut df)
.unwrap();
}
}
let files_to_load_set = vec![
"../../examples/datasets/foods3.parquet",
"../../examples/datasets/foods5.parquet",
]
.into_iter()
.map(|i| i.to_string())
.collect();

let df = LazyFrame::scan_parquet_files(files_to_load_set, Default::default())?.collect()?;
assert_eq!(df.shape(), (54, 4));

/*
* This should output:
*
* +------------+-------+
* | category | count |
* | --- | --- |
* | str | u64 |
* +============+=======+
* | fruit | 14 |
* +------------+-------+
* | meat | 10 |
* +------------+-------+
* | seafood | 16 |
* +------------+-------+
* | vegetables | 14 |
* +------------+-------+
*/

let grouped = df
.groupby(["category"])?
.agg(&[("calories", &["count"])])?
.sort(["category"], false)?;
assert_eq!(grouped.shape(), (4, 2));
println!("{:?}", grouped);
assert_eq!(grouped.get(0).unwrap()[0], AnyValue::Utf8("fruit"));
assert_eq!(grouped.get(0).unwrap()[1], AnyValue::UInt64(14));
assert_eq!(grouped.get(1).unwrap()[0], AnyValue::Utf8("meat"));
assert_eq!(grouped.get(1).unwrap()[1], AnyValue::UInt64(10));
assert_eq!(grouped.get(2).unwrap()[0], AnyValue::Utf8("seafood"));
assert_eq!(grouped.get(2).unwrap()[1], AnyValue::UInt64(16));
assert_eq!(grouped.get(3).unwrap()[0], AnyValue::Utf8("vegetables"));
assert_eq!(grouped.get(3).unwrap()[1], AnyValue::UInt64(14));
Ok(())
}

0 comments on commit 4d541a0

Please sign in to comment.