Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): add LazyFileListReader trait #6937

Merged
merged 2 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 29 additions & 36 deletions polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use polars_io::csv::utils::{get_reader_bytes, infer_file_schema};
use polars_io::csv::{CsvEncoding, NullValues};
use polars_io::RowCount;

use crate::frame::LazyFileListReader;
use crate::prelude::*;

#[derive(Clone)]
Expand Down Expand Up @@ -183,13 +184,6 @@ impl<'a> LazyCsvReader<'a> {
self
}

/// Rechunk the memory to contiguous chunks when parsing is done.
#[must_use]
pub fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}

/// Set [`CsvEncoding`]
#[must_use]
pub fn with_encoding(mut self, enc: CsvEncoding) -> Self {
Expand All @@ -212,15 +206,11 @@ impl<'a> LazyCsvReader<'a> {
F: Fn(Schema) -> PolarsResult<Schema>,
{
let path;
let path_str = self.path.to_string_lossy();

let mut file = if path_str.contains('*') {
let glob_err = || PolarsError::ComputeError("invalid glob pattern given".into());
let mut paths = glob::glob(&path_str).map_err(|_| glob_err())?;

let mut file = if let Some(mut paths) = self.glob()? {
match paths.next() {
Some(globresult) => {
path = globresult.map_err(|_| glob_err())?;
path = globresult?;
}
None => {
return Err(PolarsError::ComputeError(
Expand Down Expand Up @@ -261,8 +251,10 @@ impl<'a> LazyCsvReader<'a> {

Ok(self.with_schema(Arc::new(schema)))
}
}

pub fn finish_impl(self) -> PolarsResult<LazyFrame> {
impl LazyFileListReader for LazyCsvReader<'_> {
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_csv(
self.path,
self.delimiter,
Expand Down Expand Up @@ -291,27 +283,28 @@ impl<'a> LazyCsvReader<'a> {
Ok(lf)
}

pub fn finish(self) -> PolarsResult<LazyFrame> {
let path_str = self.path.to_string_lossy();
if path_str.contains('*') {
let paths = glob::glob(&path_str)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;

let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let mut builder = self.clone();
builder.path = path;
// do no rechunk yet.
builder.rechunk = false;
builder.finish_impl()
})
.collect::<PolarsResult<Vec<_>>>()?;
// set to false, as the csv parser has full thread utilization
concat_impl(&lfs, self.rechunk, false, true)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
} else {
self.finish_impl()
}
fn path(&self) -> &Path {
&self.path
}

fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}

fn rechunk(&self) -> bool {
self.rechunk
}

/// Rechunk the memory to contiguous chunks when parsing is done.
#[must_use]
fn with_rechunk(mut self, toggle: bool) -> Self {
self.rechunk = toggle;
self
}

fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
// set to false, as the csv parser has full thread utilization
concat_impl(&lfs, self.rechunk(), false, true)
}
}
112 changes: 112 additions & 0 deletions polars/polars-lazy/src/frame/file_list_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::path::{Path, PathBuf};

use polars_core::cloud::CloudOptions;
use polars_core::prelude::*;
use polars_io::is_cloud_url;

use crate::prelude::*;

pub type GlobIterator = Box<dyn Iterator<Item = PolarsResult<PathBuf>>>;

// cloud_options is used only with async feature
#[allow(unused_variables)]
fn polars_glob(pattern: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<GlobIterator> {
if is_cloud_url(pattern) {
#[cfg(feature = "async")]
{
let paths = polars_io::async_glob(pattern, cloud_options)?;
Ok(Box::new(paths.into_iter().map(|a| Ok(PathBuf::from(&a)))))
}
#[cfg(not(feature = "async"))]
panic!("Feature `async` must be enabled to use globbing patterns with cloud urls.")
} else {
let paths = glob::glob(pattern)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;

let paths = paths.map(|v| v.map_err(|e| PolarsError::ComputeError(format!("{e}").into())));

Ok(Box::new(paths))
}
}

/// Reads [LazyFrame] from a filesystem or a cloud storage.
/// Supports glob patterns.
///
/// Use [LazyFileListReader::finish] to get the final [LazyFrame].
pub trait LazyFileListReader: Clone {
/// Get the final [LazyFrame].
fn finish(self) -> PolarsResult<LazyFrame> {
if let Some(paths) = self.glob()? {
let lfs = paths
.map(|r| {
let path = r?;
self.clone()
.with_path(path.clone())
.with_rechunk(false)
.finish_no_glob()
.map_err(|e| {
PolarsError::ComputeError(
format!("while reading {} got {e:?}.", path.display()).into(),
)
})
})
.collect::<PolarsResult<Vec<_>>>()?;

if lfs.is_empty() {
return PolarsResult::Err(PolarsError::ComputeError(
format!(
"no matching files found in {}",
self.path().to_string_lossy()
)
.into(),
));
}

self.concat_impl(lfs)
} else {
self.finish_no_glob()
}
}

/// Recommended concatenation of [LazyFrame]s from many input files.
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame>;

/// Get the final [LazyFrame].
/// This method assumes, that path is *not* a glob.
///
/// It is recommended to always use [LazyFileListReader::finish] method.
fn finish_no_glob(self) -> PolarsResult<LazyFrame>;

/// Path of the scanned file.
/// It can be potentially a glob pattern.
fn path(&self) -> &Path;

/// Set path of the scanned file.
/// Support glob patterns.
#[must_use]
fn with_path(self, path: PathBuf) -> Self;

/// Rechunk the memory to contiguous chunks when parsing is done.
fn rechunk(&self) -> bool;

/// Rechunk the memory to contiguous chunks when parsing is done.
#[must_use]
fn with_rechunk(self, toggle: bool) -> Self;

/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
None
}

/// Get list of files referenced by this reader.
///
/// Returns [None] if path is not a glob pattern.
fn glob(&self) -> PolarsResult<Option<GlobIterator>> {
let path_str = self.path().to_string_lossy();
if path_str.contains('*') {
polars_glob(&path_str, self.cloud_options()).map(Some)
} else {
Ok(None)
}
}
}
86 changes: 50 additions & 36 deletions polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::{Path, PathBuf};

use polars_core::prelude::*;
use polars_io::RowCount;
Expand Down Expand Up @@ -26,8 +26,22 @@ impl Default for ScanArgsIpc {
}
}

impl LazyFrame {
fn scan_ipc_impl(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
#[derive(Clone)]
struct LazyIpcReader {
args: ScanArgsIpc,
path: PathBuf,
}

impl LazyIpcReader {
fn new(path: PathBuf, args: ScanArgsIpc) -> Self {
Self { args, path }
}
}

impl LazyFileListReader for LazyIpcReader {
fn finish_no_glob(self) -> PolarsResult<LazyFrame> {
let args = self.args;
let path = self.path;
let options = IpcScanOptions {
n_rows: args.n_rows,
cache: args.cache,
Expand All @@ -37,9 +51,7 @@ impl LazyFrame {
memmap: args.memmap,
};
let row_count = args.row_count;
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path.as_ref(), options)?
.build()
.into();
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
lf.opt_state.file_caching = true;

// it is a bit hacky, but this row_count function updates the schema
Expand All @@ -50,39 +62,41 @@ impl LazyFrame {
Ok(lf)
}

/// Create a LazyFrame directly from a ipc scan.
pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
let path = path.as_ref();
let path_str = path.to_string_lossy();
if path_str.contains('*') {
let paths = glob::glob(&path_str)
.map_err(|_| PolarsError::ComputeError("invalid glob pattern given".into()))?;
fn path(&self) -> &Path {
self.path.as_path()
}

let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let mut args = args.clone();
args.rechunk = false;
args.row_count = None;
Self::scan_ipc_impl(path, args)
})
.collect::<PolarsResult<Vec<_>>>()?;
fn with_path(mut self, path: PathBuf) -> Self {
self.path = path;
self
}

concat_impl(&lfs, args.rechunk, true, true)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize);
};
fn rechunk(&self) -> bool {
self.args.rechunk
}

if let Some(rc) = args.row_count {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
}
fn with_rechunk(mut self, toggle: bool) -> Self {
self.args.rechunk = toggle;
self
}

lf
})
} else {
Self::scan_ipc_impl(path, args)
}
fn concat_impl(&self, lfs: Vec<LazyFrame>) -> PolarsResult<LazyFrame> {
let args = &self.args;
concat_impl(&lfs, args.rechunk, true, true).map(|mut lf| {
if let Some(n_rows) = args.n_rows {
lf = lf.slice(0, n_rows as IdxSize)
};
if let Some(rc) = args.row_count.clone() {
lf = lf.with_row_count(&rc.name, Some(rc.offset))
};
lf
})
}
}

impl LazyFrame {
/// Create a LazyFrame directly from a ipc scan.
pub fn scan_ipc(path: impl AsRef<Path>, args: ScanArgsIpc) -> PolarsResult<Self> {
LazyIpcReader::new(path.as_ref().to_owned(), args).finish()
}
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod parquet;
mod python;

mod anonymous_scan;
mod file_list_reader;
#[cfg(feature = "pivot")]
pub mod pivot;

Expand All @@ -22,6 +23,7 @@ use std::sync::Arc;
pub use anonymous_scan::*;
#[cfg(feature = "csv-file")]
pub use csv::*;
pub use file_list_reader::*;
#[cfg(feature = "ipc")]
pub use ipc::*;
#[cfg(feature = "json")]
Expand Down
Loading