From 8a5f1a2069dfbc7066fa77bc10629c65dfb6500f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20S=C5=82apek?= <28485371+mslapek@users.noreply.github.com> Date: Thu, 16 Feb 2023 18:20:23 +0100 Subject: [PATCH 1/2] refactor(rust): add LazyFileListReader trait --- polars/polars-lazy/src/frame/csv.rs | 65 ++++---- .../polars-lazy/src/frame/filelistreader.rs | 112 +++++++++++++ polars/polars-lazy/src/frame/ipc.rs | 86 ++++++---- polars/polars-lazy/src/frame/mod.rs | 2 + polars/polars-lazy/src/frame/parquet.rs | 156 +++++++----------- 5 files changed, 254 insertions(+), 167 deletions(-) create mode 100644 polars/polars-lazy/src/frame/filelistreader.rs diff --git a/polars/polars-lazy/src/frame/csv.rs b/polars/polars-lazy/src/frame/csv.rs index ea9ccf6c2e36..f1cbae43a695 100644 --- a/polars/polars-lazy/src/frame/csv.rs +++ b/polars/polars-lazy/src/frame/csv.rs @@ -5,6 +5,7 @@ use polars_io::csv::utils::{get_reader_bytes, infer_file_schema}; use polars_io::csv::{CsvEncoding, NullValues}; use polars_io::RowCount; +use crate::frame::LazyFileListReader; use crate::prelude::*; #[derive(Clone)] @@ -183,13 +184,6 @@ impl<'a> LazyCsvReader<'a> { self } - /// Rechunk the memory to contiguous chunks when parsing is done. - #[must_use] - pub fn with_rechunk(mut self, toggle: bool) -> Self { - self.rechunk = toggle; - self - } - /// Set [`CsvEncoding`] #[must_use] pub fn with_encoding(mut self, enc: CsvEncoding) -> Self { @@ -212,15 +206,11 @@ impl<'a> LazyCsvReader<'a> { F: Fn(Schema) -> PolarsResult, { let path; - let path_str = self.path.to_string_lossy(); - - let mut file = if path_str.contains('*') { - let glob_err = || PolarsError::ComputeError("invalid glob pattern given".into()); - let mut paths = glob::glob(&path_str).map_err(|_| glob_err())?; + let mut file = if let Some(mut paths) = self.glob()? { match paths.next() { Some(globresult) => { - path = globresult.map_err(|_| glob_err())?; + path = globresult?; } None => { return Err(PolarsError::ComputeError( @@ -261,8 +251,10 @@ impl<'a> LazyCsvReader<'a> { Ok(self.with_schema(Arc::new(schema))) } +} - pub fn finish_impl(self) -> PolarsResult { +impl LazyFileListReader for LazyCsvReader<'_> { + fn finish_no_glob(self) -> PolarsResult { let mut lf: LazyFrame = LogicalPlanBuilder::scan_csv( self.path, self.delimiter, @@ -291,27 +283,28 @@ impl<'a> LazyCsvReader<'a> { Ok(lf) } - pub fn finish(self) -> PolarsResult { - let path_str = self.path.to_string_lossy(); - if path_str.contains('*') { - let paths = glob::glob(&path_str) - .map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?; - - let lfs = paths - .map(|r| { - let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?; - let mut builder = self.clone(); - builder.path = path; - // do no rechunk yet. - builder.rechunk = false; - builder.finish_impl() - }) - .collect::>>()?; - // set to false, as the csv parser has full thread utilization - concat_impl(&lfs, self.rechunk, false, true) - .map_err(|_| PolarsError::ComputeError("no matching files found".into())) - } else { - self.finish_impl() - } + fn path(&self) -> &Path { + &self.path + } + + fn with_path(mut self, path: PathBuf) -> Self { + self.path = path; + self + } + + fn rechunk(&self) -> bool { + self.rechunk + } + + /// Rechunk the memory to contiguous chunks when parsing is done. + #[must_use] + fn with_rechunk(mut self, toggle: bool) -> Self { + self.rechunk = toggle; + self + } + + fn concat_impl(&self, lfs: Vec) -> PolarsResult { + // set to false, as the csv parser has full thread utilization + concat_impl(&lfs, self.rechunk(), false, true) } } diff --git a/polars/polars-lazy/src/frame/filelistreader.rs b/polars/polars-lazy/src/frame/filelistreader.rs new file mode 100644 index 000000000000..0102c6f820c0 --- /dev/null +++ b/polars/polars-lazy/src/frame/filelistreader.rs @@ -0,0 +1,112 @@ +use std::path::{Path, PathBuf}; + +use polars_core::cloud::CloudOptions; +use polars_core::prelude::*; +use polars_io::is_cloud_url; + +use crate::prelude::*; + +pub type GlobIterator = Box>>; + +// cloud_options is used only with async feature +#[allow(unused_variables)] +fn polars_glob(pattern: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult { + if is_cloud_url(pattern) { + #[cfg(feature = "async")] + { + let paths = polars_io::async_glob(pattern, cloud_options)?; + Ok(Box::new(paths.into_iter().map(|a| Ok(PathBuf::from(&a))))) + } + #[cfg(not(feature = "async"))] + panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.") + } else { + let paths = glob::glob(pattern) + .map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?; + + let paths = paths.map(|v| v.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))); + + Ok(Box::new(paths)) + } +} + +/// Reads [LazyFrame] from a filesystem or a cloud storage. +/// Supports glob patterns. +/// +/// Use [LazyFileListReader::finish] to get the final [LazyFrame]. +pub trait LazyFileListReader: Clone { + /// Get the final [LazyFrame]. + fn finish(self) -> PolarsResult { + if let Some(paths) = self.glob()? { + let lfs = paths + .map(|r| { + let path = r?; + self.clone() + .with_path(path.clone()) + .with_rechunk(false) + .finish_no_glob() + .map_err(|e| { + PolarsError::ComputeError( + format!("while reading {} got {e:?}.", path.display()).into(), + ) + }) + }) + .collect::>>()?; + + if lfs.is_empty() { + return PolarsResult::Err(PolarsError::ComputeError( + format!( + "no matching files found in {}", + self.path().to_string_lossy() + ) + .into(), + )); + } + + self.concat_impl(lfs) + } else { + self.finish_no_glob() + } + } + + /// Recommended concatenation of [LazyFrame]s from many input files. + fn concat_impl(&self, lfs: Vec) -> PolarsResult; + + /// Get the final [LazyFrame]. + /// This method assumes, that path is *not* a glob. + /// + /// It is recommended to always use [LazyFileListReader::finish] method. + fn finish_no_glob(self) -> PolarsResult; + + /// Path of the scanned file. + /// It can be potentially a glob pattern. + fn path(&self) -> &Path; + + /// Set path of the scanned file. + /// Support glob patterns. + #[must_use] + fn with_path(self, path: PathBuf) -> Self; + + /// Rechunk the memory to contiguous chunks when parsing is done. + fn rechunk(&self) -> bool; + + /// Rechunk the memory to contiguous chunks when parsing is done. + #[must_use] + fn with_rechunk(self, toggle: bool) -> Self; + + /// [CloudOptions] used to list files. + fn cloud_options(&self) -> Option<&CloudOptions> { + None + } + + /// Get list of files referenced by this reader. + /// + /// Returns [None] if path is not a glob pattern. + fn glob(&self) -> PolarsResult> { + let path_str = self.path().to_string_lossy(); + if path_str.contains('*') { + polars_glob(&path_str, self.cloud_options()).map(Some) + } else { + Ok(None) + } + } +} diff --git a/polars/polars-lazy/src/frame/ipc.rs b/polars/polars-lazy/src/frame/ipc.rs index d1248b14a77f..5cbede7fd053 100644 --- a/polars/polars-lazy/src/frame/ipc.rs +++ b/polars/polars-lazy/src/frame/ipc.rs @@ -1,4 +1,4 @@ -use std::path::Path; +use std::path::{Path, PathBuf}; use polars_core::prelude::*; use polars_io::RowCount; @@ -26,8 +26,22 @@ impl Default for ScanArgsIpc { } } -impl LazyFrame { - fn scan_ipc_impl(path: impl AsRef, args: ScanArgsIpc) -> PolarsResult { +#[derive(Clone)] +struct LazyIpcReader { + args: ScanArgsIpc, + path: PathBuf, +} + +impl LazyIpcReader { + fn new(path: PathBuf, args: ScanArgsIpc) -> Self { + Self { args, path } + } +} + +impl LazyFileListReader for LazyIpcReader { + fn finish_no_glob(self) -> PolarsResult { + let args = self.args; + let path = self.path; let options = IpcScanOptions { n_rows: args.n_rows, cache: args.cache, @@ -37,9 +51,7 @@ impl LazyFrame { memmap: args.memmap, }; let row_count = args.row_count; - let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path.as_ref(), options)? - .build() - .into(); + let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into(); lf.opt_state.file_caching = true; // it is a bit hacky, but this row_count function updates the schema @@ -50,39 +62,41 @@ impl LazyFrame { Ok(lf) } - /// Create a LazyFrame directly from a ipc scan. - pub fn scan_ipc(path: impl AsRef, args: ScanArgsIpc) -> PolarsResult { - let path = path.as_ref(); - let path_str = path.to_string_lossy(); - if path_str.contains('*') { - let paths = glob::glob(&path_str) - .map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?; + fn path(&self) -> &Path { + self.path.as_path() + } - let lfs = paths - .map(|r| { - let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?; - let mut args = args.clone(); - args.rechunk = false; - args.row_count = None; - Self::scan_ipc_impl(path, args) - }) - .collect::>>()?; + fn with_path(mut self, path: PathBuf) -> Self { + self.path = path; + self + } - concat_impl(&lfs, args.rechunk, true, true) - .map_err(|_| PolarsError::ComputeError("no matching files found".into())) - .map(|mut lf| { - if let Some(n_rows) = args.n_rows { - lf = lf.slice(0, n_rows as IdxSize); - }; + fn rechunk(&self) -> bool { + self.args.rechunk + } - if let Some(rc) = args.row_count { - lf = lf.with_row_count(&rc.name, Some(rc.offset)) - } + fn with_rechunk(mut self, toggle: bool) -> Self { + self.args.rechunk = toggle; + self + } - lf - }) - } else { - Self::scan_ipc_impl(path, args) - } + fn concat_impl(&self, lfs: Vec) -> PolarsResult { + let args = &self.args; + concat_impl(&lfs, args.rechunk, true, true).map(|mut lf| { + if let Some(n_rows) = args.n_rows { + lf = lf.slice(0, n_rows as IdxSize) + }; + if let Some(rc) = args.row_count.clone() { + lf = lf.with_row_count(&rc.name, Some(rc.offset)) + }; + lf + }) + } +} + +impl LazyFrame { + /// Create a LazyFrame directly from a ipc scan. + pub fn scan_ipc(path: impl AsRef, args: ScanArgsIpc) -> PolarsResult { + LazyIpcReader::new(path.as_ref().to_owned(), args).finish() } } diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 2af34916e302..419da158b7fc 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -11,6 +11,7 @@ mod parquet; mod python; mod anonymous_scan; +mod filelistreader; #[cfg(feature = "pivot")] pub mod pivot; @@ -22,6 +23,7 @@ use std::sync::Arc; pub use anonymous_scan::*; #[cfg(feature = "csv-file")] pub use csv::*; +pub use filelistreader::*; #[cfg(feature = "ipc")] pub use ipc::*; #[cfg(feature = "json")] diff --git a/polars/polars-lazy/src/frame/parquet.rs b/polars/polars-lazy/src/frame/parquet.rs index a7cd1a4559a6..85ee280e00bc 100644 --- a/polars/polars-lazy/src/frame/parquet.rs +++ b/polars/polars-lazy/src/frame/parquet.rs @@ -2,10 +2,8 @@ use std::path::{Path, PathBuf}; use polars_core::cloud::CloudOptions; use polars_core::prelude::*; -#[cfg(feature = "async")] -use polars_io::async_glob; use polars_io::parquet::ParallelStrategy; -use polars_io::{is_cloud_url, RowCount}; +use polars_io::RowCount; use crate::prelude::*; @@ -34,27 +32,31 @@ impl Default for ScanArgsParquet { } } -impl LazyFrame { - #[allow(clippy::too_many_arguments)] - fn scan_parquet_impl( - path: impl AsRef, - n_rows: Option, - cache: bool, - parallel: ParallelStrategy, - row_count: Option, - rechunk: bool, - low_memory: bool, - cloud_options: Option, - ) -> PolarsResult { +#[derive(Clone)] +struct LazyParquetReader { + args: ScanArgsParquet, + path: PathBuf, +} + +impl LazyParquetReader { + fn new(path: PathBuf, args: ScanArgsParquet) -> Self { + Self { args, path } + } +} + +impl LazyFileListReader for LazyParquetReader { + fn finish_no_glob(self) -> PolarsResult { + let row_count = self.args.row_count; + let path = self.path; let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet( - path.as_ref(), - n_rows, - cache, - parallel, + path, + self.args.n_rows, + self.args.cache, + self.args.parallel, None, - rechunk, - low_memory, - cloud_options, + self.args.rechunk, + self.args.low_memory, + self.args.cloud_options, )? .build() .into(); @@ -68,104 +70,68 @@ impl LazyFrame { Ok(lf) } - fn concat_impl(lfs: Vec, args: ScanArgsParquet) -> PolarsResult { + fn path(&self) -> &Path { + self.path.as_path() + } + + fn with_path(mut self, path: PathBuf) -> Self { + self.path = path; + self + } + + fn rechunk(&self) -> bool { + self.args.rechunk + } + + fn with_rechunk(mut self, toggle: bool) -> Self { + self.args.rechunk = toggle; + self + } + + fn cloud_options(&self) -> Option<&CloudOptions> { + self.args.cloud_options.as_ref() + } + + fn concat_impl(&self, lfs: Vec) -> PolarsResult { + let args = &self.args; concat_impl(&lfs, args.rechunk, true, true).map(|mut lf| { if let Some(n_rows) = args.n_rows { lf = lf.slice(0, n_rows as IdxSize) }; - if let Some(rc) = args.row_count { + if let Some(rc) = args.row_count.clone() { lf = lf.with_row_count(&rc.name, Some(rc.offset)) }; lf }) } +} +impl LazyFrame { /// Create a LazyFrame directly from a parquet scan. #[deprecated(note = "please use `concat_lf` instead")] pub fn scan_parquet_files>( paths: Vec

, args: ScanArgsParquet, ) -> PolarsResult { + let reader = LazyParquetReader::new( + paths.first().expect("got no files").as_ref().to_owned(), + args, + ); let lfs = paths .iter() .map(|p| { - Self::scan_parquet_impl( - p, - args.n_rows, - args.cache, - args.parallel, - None, - args.rechunk, - args.low_memory, - args.cloud_options.clone(), - ) + reader + .clone() + .with_path(p.as_ref().to_owned()) + .finish_no_glob() }) .collect::>>()?; - Self::concat_impl(lfs, args) + reader.concat_impl(lfs) } /// Create a LazyFrame directly from a parquet scan. pub fn scan_parquet(path: impl AsRef, args: ScanArgsParquet) -> PolarsResult { - let path = path.as_ref(); - let path_str = path.to_string_lossy(); - if path_str.contains('*') { - let paths = if is_cloud_url(path) { - #[cfg(feature = "async")] - { - Box::new( - async_glob(&path_str, args.cloud_options.as_ref())? - .into_iter() - .map(|a| Ok(PathBuf::from(&a))), - ) - } - #[cfg(not(feature = "async"))] - panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.") - } else { - Box::new( - glob::glob(&path_str).map_err(|_| { - PolarsError::ComputeError("invalid glob pattern given".into()) - })?, - ) as Box>> - }; - let lfs = paths - .map(|r| { - let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?; - Self::scan_parquet_impl( - path.clone(), - args.n_rows, - args.cache, - ParallelStrategy::None, - None, - false, - args.low_memory, - args.cloud_options.clone(), - ) - .map_err(|e| { - PolarsError::ComputeError( - format!("While reading {} got {e:?}.", path.display()).into(), - ) - }) - }) - .collect::>>()?; - - if lfs.is_empty() { - return PolarsResult::Err(PolarsError::ComputeError( - format!("Could not load any dataframes from {path_str}").into(), - )); - } - Self::concat_impl(lfs, args) - } else { - Self::scan_parquet_impl( - path, - args.n_rows, - args.cache, - args.parallel, - args.row_count, - args.rechunk, - args.low_memory, - args.cloud_options, - ) - } + LazyParquetReader::new(path.as_ref().to_owned(), args).finish() } } From a8710a4773bd2cc90b223c473708401f806ab8dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20S=C5=82apek?= <28485371+mslapek@users.noreply.github.com> Date: Fri, 17 Feb 2023 08:23:24 +0100 Subject: [PATCH 2/2] CR fix --- .../src/frame/{filelistreader.rs => file_list_reader.rs} | 0 polars/polars-lazy/src/frame/mod.rs | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename polars/polars-lazy/src/frame/{filelistreader.rs => file_list_reader.rs} (100%) diff --git a/polars/polars-lazy/src/frame/filelistreader.rs b/polars/polars-lazy/src/frame/file_list_reader.rs similarity index 100% rename from polars/polars-lazy/src/frame/filelistreader.rs rename to polars/polars-lazy/src/frame/file_list_reader.rs diff --git a/polars/polars-lazy/src/frame/mod.rs b/polars/polars-lazy/src/frame/mod.rs index 419da158b7fc..d2c23b95a235 100644 --- a/polars/polars-lazy/src/frame/mod.rs +++ b/polars/polars-lazy/src/frame/mod.rs @@ -11,7 +11,7 @@ mod parquet; mod python; mod anonymous_scan; -mod filelistreader; +mod file_list_reader; #[cfg(feature = "pivot")] pub mod pivot; @@ -23,7 +23,7 @@ use std::sync::Arc; pub use anonymous_scan::*; #[cfg(feature = "csv-file")] pub use csv::*; -pub use filelistreader::*; +pub use file_list_reader::*; #[cfg(feature = "ipc")] pub use ipc::*; #[cfg(feature = "json")]