Skip to content

Commit

Permalink
feat[rust]: allow io functions to take AsRef<Path>s instead of `Str…
Browse files Browse the repository at this point in the history
…ing` for path arguments (#4532)
  • Loading branch information
isaacthefallenapple committed Aug 23, 2022
1 parent 55125e9 commit d05597e
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 61 deletions.
2 changes: 1 addition & 1 deletion examples/read_csv/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars::prelude::*;

fn main() -> Result<()> {
let mut df = LazyCsvReader::new("../datasets/foods1.csv".into())
let mut df = LazyCsvReader::new("../datasets/foods1.csv")
.finish()?
.select([
// select all columns
Expand Down
19 changes: 8 additions & 11 deletions examples/read_parquet/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use polars::prelude::*;

fn main() -> Result<()> {
let df = LazyFrame::scan_parquet(
"../datasets/foods1.parquet".into(),
ScanArgsParquet::default(),
)?
.select([
// select all columns
all(),
// and do some aggregations
cols(["fats_g", "sugars_g"]).sum().suffix("_summed"),
])
.collect()?;
let df = LazyFrame::scan_parquet("../datasets/foods1.parquet", ScanArgsParquet::default())?
.select([
// select all columns
all(),
// and do some aggregations
cols(["fats_g", "sugars_g"]).sum().suffix("_summed"),
])
.collect()?;

dbg!(df);
Ok(())
Expand Down
17 changes: 10 additions & 7 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::path::Path;
use std::path::PathBuf;

use polars_core::prelude::*;
use polars_io::csv::utils::get_reader_bytes;
use polars_io::csv::utils::infer_file_schema;
Expand All @@ -9,7 +12,7 @@ use crate::prelude::*;
#[derive(Clone)]
#[cfg(feature = "csv-file")]
pub struct LazyCsvReader<'a> {
path: String,
path: PathBuf,
delimiter: u8,
has_header: bool,
ignore_errors: bool,
Expand All @@ -33,9 +36,9 @@ pub struct LazyCsvReader<'a> {

#[cfg(feature = "csv-file")]
impl<'a> LazyCsvReader<'a> {
pub fn new(path: String) -> Self {
pub fn new(path: impl AsRef<Path>) -> Self {
LazyCsvReader {
path,
path: path.as_ref().to_owned(),
delimiter: b',',
has_header: true,
ignore_errors: false,
Expand Down Expand Up @@ -262,16 +265,16 @@ impl<'a> LazyCsvReader<'a> {
}

pub fn finish(self) -> Result<LazyFrame> {
if self.path.contains('*') {
let paths = glob::glob(&self.path)
let path_str = self.path.to_string_lossy();
if path_str.contains('*') {
let paths = glob::glob(&path_str)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;

let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
let mut builder = self.clone();
builder.path = path_string;
builder.path = path;
if builder.skip_rows > 0 {
builder.skip_rows = 0;
builder.n_rows = None;
Expand Down
19 changes: 12 additions & 7 deletions polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::Path;

use polars_core::prelude::*;
use polars_io::RowCount;

Expand Down Expand Up @@ -25,7 +27,7 @@ impl Default for ScanArgsIpc {
}

impl LazyFrame {
fn scan_ipc_impl(path: String, args: ScanArgsIpc) -> Result<Self> {
fn scan_ipc_impl(path: impl AsRef<Path>, args: ScanArgsIpc) -> Result<Self> {
let options = IpcScanOptions {
n_rows: args.n_rows,
cache: args.cache,
Expand All @@ -35,7 +37,9 @@ impl LazyFrame {
memmap: args.memmap,
};
let row_count = args.row_count;
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path.as_ref(), options)?
.build()
.into();
lf.opt_state.file_caching = true;

// it is a bit hacky, but this row_count function updates the schema
Expand All @@ -48,17 +52,18 @@ impl LazyFrame {

/// Create a LazyFrame directly from a ipc scan.
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
pub fn scan_ipc(path: String, args: ScanArgsIpc) -> Result<Self> {
if path.contains('*') {
let paths = glob::glob(&path)
pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> Result<Self> {
let path = path.as_ref();
let path_str = path.to_string_lossy();
if path_str.contains('*') {
let paths = glob::glob(&path_str)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;
let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
let mut args = args.clone();
args.row_count = None;
Self::scan_ipc_impl(path_string, args)
Self::scan_ipc_impl(path, args)
})
.collect::<Result<Vec<_>>>()?;

Expand Down
30 changes: 21 additions & 9 deletions polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::path::Path;

use polars_core::prelude::*;
use polars_io::parquet::ParallelStrategy;
use polars_io::RowCount;
Expand Down Expand Up @@ -29,7 +31,7 @@ impl Default for ScanArgsParquet {

impl LazyFrame {
fn scan_parquet_impl(
path: String,
path: impl AsRef<Path>,
n_rows: Option<usize>,
cache: bool,
parallel: ParallelStrategy,
Expand All @@ -38,7 +40,13 @@ impl LazyFrame {
low_memory: bool,
) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(
path, n_rows, cache, parallel, None, rechunk, low_memory,
path.as_ref(),
n_rows,
cache,
parallel,
None,
rechunk,
low_memory,
)?
.build()
.into();
Expand Down Expand Up @@ -67,12 +75,15 @@ impl LazyFrame {
/// Create a LazyFrame directly from a parquet scan.
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
#[deprecated(note = "please use `concat_lf` instead")]
pub fn scan_parquet_files(paths: Vec<String>, args: ScanArgsParquet) -> Result<Self> {
pub fn scan_parquet_files<P: AsRef<Path>>(
paths: Vec<P>,
args: ScanArgsParquet,
) -> Result<Self> {
let lfs = paths
.iter()
.map(|p| {
Self::scan_parquet_impl(
p.to_string(),
p,
args.n_rows,
args.cache,
args.parallel,
Expand All @@ -88,16 +99,17 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
pub fn scan_parquet(path: String, args: ScanArgsParquet) -> Result<Self> {
if path.contains('*') {
let paths = glob::glob(&path)
pub fn scan_parquet(path: impl AsRef<Path>, args: ScanArgsParquet) -> Result<Self> {
let path = path.as_ref();
let path_str = path.to_string_lossy();
if path_str.contains('*') {
let paths = glob::glob(&path_str)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;
let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
Self::scan_parquet_impl(
path_string,
path,
args.n_rows,
args.cache,
ParallelStrategy::None,
Expand Down
33 changes: 15 additions & 18 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ fn test_parquet_globbing() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
let glob = "../../examples/datasets/*.parquet";
let df = LazyFrame::scan_parquet(
glob.into(),
glob,
ScanArgsParquet {
n_rows: None,
cache: true,
Expand All @@ -174,7 +174,7 @@ fn test_ipc_globbing() -> Result<()> {
init_files();
let glob = "../../examples/datasets/*.ipc";
let df = LazyFrame::scan_ipc(
glob.into(),
glob,
ScanArgsIpc {
n_rows: None,
cache: true,
Expand Down Expand Up @@ -206,7 +206,7 @@ fn slice_at_union(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
#[cfg(not(target_os = "windows"))]
fn test_csv_globbing() -> Result<()> {
let glob = "../../examples/datasets/*.csv";
let full_df = LazyCsvReader::new(glob.into()).finish()?.collect()?;
let full_df = LazyCsvReader::new(glob).finish()?.collect()?;

// all 5 files * 27 rows
assert_eq!(full_df.shape(), (135, 4));
Expand All @@ -215,22 +215,19 @@ fn test_csv_globbing() -> Result<()> {
assert_eq!(cal.get(53), AnyValue::Int64(194));

let glob = "../../examples/datasets/*.csv";
let lf = LazyCsvReader::new(glob.into()).finish()?.slice(0, 100);
let lf = LazyCsvReader::new(glob).finish()?.slice(0, 100);

let df = lf.clone().collect()?;
assert_eq!(df.shape(), (100, 4));
let df = LazyCsvReader::new(glob.into())
.finish()?
.slice(20, 60)
.collect()?;
let df = LazyCsvReader::new(glob).finish()?.slice(20, 60).collect()?;
assert!(full_df.slice(20, 60).frame_equal(&df));

let mut expr_arena = Arena::with_capacity(16);
let mut lp_arena = Arena::with_capacity(8);
let node = lf.clone().optimize(&mut lp_arena, &mut expr_arena)?;
assert!(slice_at_union(&mut lp_arena, node));

let lf = LazyCsvReader::new(glob.into())
let lf = LazyCsvReader::new(glob)
.finish()?
.filter(col("sugars_g").lt(lit(1i32)))
.slice(0, 100);
Expand All @@ -254,9 +251,9 @@ fn test_union_and_agg_projections() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
// a union vstacks columns and aggscan optimization determines columns to aggregate in a
// hashmap, if that doesn't set them sorted the vstack will panic.
let lf1 = LazyFrame::scan_parquet(GLOB_PARQUET.into(), Default::default())?;
let lf2 = LazyFrame::scan_ipc(GLOB_IPC.into(), Default::default())?;
let lf3 = LazyCsvReader::new(GLOB_CSV.into()).finish()?;
let lf1 = LazyFrame::scan_parquet(GLOB_PARQUET, Default::default())?;
let lf2 = LazyFrame::scan_ipc(GLOB_IPC, Default::default())?;
let lf3 = LazyCsvReader::new(GLOB_CSV).finish()?;

for lf in [lf1, lf2, lf3] {
let lf = lf.filter(col("category").eq(lit("vegetables"))).select([
Expand Down Expand Up @@ -323,7 +320,7 @@ fn test_slice_filter() -> Result<()> {

#[test]
fn skip_rows_and_slice() -> Result<()> {
let out = LazyCsvReader::new(FOODS_CSV.to_string())
let out = LazyCsvReader::new(FOODS_CSV)
.with_skip_rows(4)
.finish()?
.limit(1)
Expand All @@ -337,7 +334,7 @@ fn skip_rows_and_slice() -> Result<()> {
fn test_row_count_on_files() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
for offset in [0 as IdxSize, 10] {
let lf = LazyCsvReader::new(FOODS_CSV.to_string())
let lf = LazyCsvReader::new(FOODS_CSV)
.with_row_count(Some(RowCount {
name: "rc".into(),
offset,
Expand All @@ -352,7 +349,7 @@ fn test_row_count_on_files() -> Result<()> {
(offset..27 + offset).collect::<Vec<_>>()
);

let lf = LazyFrame::scan_parquet(FOODS_PARQUET.to_string(), Default::default())?
let lf = LazyFrame::scan_parquet(FOODS_PARQUET, Default::default())?
.with_row_count("rc", Some(offset));
assert!(row_count_at_scan(lf.clone()));
let df = lf.collect()?;
Expand All @@ -362,8 +359,8 @@ fn test_row_count_on_files() -> Result<()> {
(offset..27 + offset).collect::<Vec<_>>()
);

let lf = LazyFrame::scan_ipc(FOODS_IPC.to_string(), Default::default())?
.with_row_count("rc", Some(offset));
let lf =
LazyFrame::scan_ipc(FOODS_IPC, Default::default())?.with_row_count("rc", Some(offset));

assert!(row_count_at_scan(lf.clone()));
let df = lf.clone().collect()?;
Expand All @@ -386,7 +383,7 @@ fn test_row_count_on_files() -> Result<()> {

#[test]
fn scan_predicate_on_set_null_values() -> Result<()> {
let df = LazyCsvReader::new(FOODS_CSV.into())
let df = LazyCsvReader::new(FOODS_CSV)
.with_null_values(Some(NullValues::Named(vec![("fats_g".into(), "0".into())])))
.with_infer_schema_length(Some(0))
.finish()?
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ static FOODS_IPC: &str = "../../examples/datasets/foods1.ipc";
static FOODS_PARQUET: &str = "../../examples/datasets/foods1.parquet";

fn scan_foods_csv() -> LazyFrame {
LazyCsvReader::new(FOODS_CSV.to_string()).finish().unwrap()
LazyCsvReader::new(FOODS_CSV).finish().unwrap()
}

fn scan_foods_ipc() -> LazyFrame {
LazyFrame::scan_ipc(FOODS_IPC.to_string(), Default::default()).unwrap()
LazyFrame::scan_ipc(FOODS_IPC, Default::default()).unwrap()
}

fn init_files() {
Expand Down Expand Up @@ -89,7 +89,7 @@ fn init_files() {
#[cfg(feature = "parquet")]
fn scan_foods_parquet(parallel: bool) -> LazyFrame {
init_files();
let out_path = FOODS_PARQUET.to_string();
let out_path = FOODS_PARQUET;
let parallel = if parallel {
ParallelStrategy::Auto
} else {
Expand Down
6 changes: 3 additions & 3 deletions polars/src/docs/lazy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
//! let lf: LazyFrame = df.lazy();
//!
//! // scan a csv file lazily
//! let lf: LazyFrame = LazyCsvReader::new("some_path".into())
//! let lf: LazyFrame = LazyCsvReader::new("some_path")
//! .has_header(true)
//! .finish()?;
//!
//! // scan a parquet file lazily
//! let lf: LazyFrame = LazyFrame::scan_parquet("some_path".into(), Default::default())?;
//! let lf: LazyFrame = LazyFrame::scan_parquet("some_path", Default::default())?;
//!
//! # Ok(())
//! # }
Expand Down Expand Up @@ -114,7 +114,7 @@
//! use polars::prelude::*;
//! # fn example() -> Result<()> {
//!
//! let df = LazyCsvReader::new("reddit.csv".into())
//! let df = LazyCsvReader::new("reddit.csv")
//! .has_header(true)
//! .with_delimiter(b',')
//! .finish()?
Expand Down
4 changes: 2 additions & 2 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//! use polars::prelude::*;
//! # fn example() -> Result<()> {
//!
//! let lf1 = LazyFrame::scan_parquet("myfile_1.parquet".into(), Default::default())?
//! let lf1 = LazyFrame::scan_parquet("myfile_1.parquet", Default::default())?
//! .groupby([col("ham")])
//! .agg([
//! // expressions can be combined into powerful aggregations
Expand All @@ -27,7 +27,7 @@
//! col("foo").reverse().list().alias("reverse_group"),
//! ]);
//!
//! let lf2 = LazyFrame::scan_parquet("myfile_2.parquet".into(), Default::default())?
//! let lf2 = LazyFrame::scan_parquet("myfile_2.parquet", Default::default())?
//! .select([col("ham"), col("spam")]);
//!
//! let df = lf1
Expand Down

0 comments on commit d05597e

Please sign in to comment.