Skip to content

Commit

Permalink
scan pyarrow dataset (#3327)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 9, 2022
1 parent 0077f10 commit efc7d58
Show file tree
Hide file tree
Showing 34 changed files with 387 additions and 20 deletions.
9 changes: 9 additions & 0 deletions polars/polars-core/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ where
}
}

impl<J> FromIterator<J> for Schema
where
J: Into<Field>,
{
fn from_iter<I: IntoIterator<Item = J>>(iter: I) -> Self {
Schema::from(iter)
}
}

impl Schema {
// could not implement TryFrom
pub fn try_from_fallible<I>(flds: I) -> Result<Self>
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ log = ["polars-core/log"]
list_eval = []
chunked_ids = []
list_to_struct = ["polars-ops/list_to_struct"]
python = ["pyo3"]

# no guarantees whatsoever
private = ["polars-time/private"]
Expand Down Expand Up @@ -95,6 +96,7 @@ test = [
ahash = "0.7"
glob = "0.3"
parking_lot = "0.12"
pyo3 = { version = "0.16", optional = true }
rayon = "1.5"
regex = { version = "1.5", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
23 changes: 23 additions & 0 deletions polars/polars-lazy/src/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,29 @@ impl LogicalPlan {
self.write_dot(acc_str, prev_node, &current_node, id)?;
input.dot(acc_str, (branch, id + 1), &current_node)
}
#[cfg(feature = "python")]
PythonScan { options } => {
let schema = &options.schema;
let total_columns = schema.len();
let n_columns = if let Some(columns) = &options.with_columns {
format!("{}", columns.len())
} else {
"*".to_string()
};

let current_node = format!(
"PYTHON SCAN;\nπ {}/{};\n[{:?}]",
n_columns,
total_columns,
(branch, id)
);
if id == 0 {
self.write_dot(acc_str, prev_node, &current_node, id)?;
write!(acc_str, "\"{}\"", current_node)
} else {
self.write_dot(acc_str, prev_node, &current_node, id)
}
}
#[cfg(feature = "csv-file")]
CsvScan {
path,
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ mod csv;
mod ipc;
#[cfg(feature = "parquet")]
mod parquet;
#[cfg(feature = "python")]
mod python;

#[cfg(feature = "csv-file")]
pub use csv::*;
Expand Down
15 changes: 15 additions & 0 deletions polars/polars-lazy/src/frame/python.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::prelude::*;
use polars_core::prelude::*;

impl LazyFrame {
pub fn scan_from_python_function(schema: Schema, scan_fn: Vec<u8>) -> Self {
LogicalPlan::PythonScan {
options: PythonOptions {
scan_fn,
schema: Arc::new(schema),
..Default::default()
},
}
.into()
}
}
14 changes: 14 additions & 0 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ use std::sync::Arc;
// ALogicalPlan is a representation of LogicalPlan with Nodes which are allocated in an Arena
#[derive(Clone, Debug)]
pub enum ALogicalPlan {
#[cfg(feature = "python")]
PythonScan {
options: PythonOptions,
},
Melt {
input: Node,
args: Arc<MeltArgs>,
Expand Down Expand Up @@ -143,6 +147,8 @@ impl ALogicalPlan {
pub(crate) fn schema<'a>(&'a self, arena: &'a Arena<ALogicalPlan>) -> &'a SchemaRef {
use ALogicalPlan::*;
match self {
#[cfg(feature = "python")]
PythonScan { options } => &options.schema,
Union { inputs, .. } => arena.get(inputs[0]).schema(arena),
Cache { input } => arena.get(*input).schema(arena),
Sort { input, .. } => arena.get(*input).schema(arena),
Expand Down Expand Up @@ -189,6 +195,10 @@ impl ALogicalPlan {
use ALogicalPlan::*;

match self {
#[cfg(feature = "python")]
PythonScan { options } => PythonScan {
options: options.clone(),
},
Union { options, .. } => Union {
inputs,
options: *options,
Expand Down Expand Up @@ -446,6 +456,8 @@ impl ALogicalPlan {
container.push(*expr)
}
}
#[cfg(feature = "python")]
PythonScan { .. } => {}
}
}

Expand Down Expand Up @@ -499,6 +511,8 @@ impl ALogicalPlan {
#[cfg(feature = "csv-file")]
CsvScan { .. } => return,
DataFrameScan { .. } => return,
#[cfg(feature = "python")]
PythonScan { .. } => return,
};
container.push_node(input)
}
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ pub(crate) fn to_alp(
lp_arena: &mut Arena<ALogicalPlan>,
) -> Result<Node> {
let v = match lp {
#[cfg(feature = "python")]
LogicalPlan::PythonScan { options } => ALogicalPlan::PythonScan { options },
LogicalPlan::Union { inputs, options } => {
let inputs = inputs
.into_iter()
Expand Down Expand Up @@ -644,6 +646,8 @@ pub(crate) fn node_to_lp(
let lp = std::mem::take(lp);

match lp {
#[cfg(feature = "python")]
ALogicalPlan::PythonScan { options } => LogicalPlan::PythonScan { options },
ALogicalPlan::Union { inputs, options } => {
let inputs = inputs
.into_iter()
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ impl fmt::Debug for LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use LogicalPlan::*;
match self {
#[cfg(feature = "python")]
PythonScan { .. } => write!(f, "PYTHON SCAN"),
Union { inputs, .. } => write!(f, "UNION {:?}", inputs),
Cache { input } => write!(f, "CACHE {:?}", input),
#[cfg(feature = "parquet")]
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub enum Context {
serde(bound(deserialize = "'de: 'static"))
)]
pub enum LogicalPlan {
#[cfg(feature = "python")]
PythonScan { options: PythonOptions },
/// Filter on a boolean mask
Selection {
input: Box<LogicalPlan>,
Expand Down Expand Up @@ -208,6 +210,8 @@ impl LogicalPlan {
pub(crate) fn schema(&self) -> &SchemaRef {
use LogicalPlan::*;
match self {
#[cfg(feature = "python")]
PythonScan { options } => &options.schema,
Union { inputs, .. } => inputs[0].schema(),
Cache { input } => input.schema(),
Sort { input, .. } => input.schema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::Arc;

fn process_with_columns(
path: &Path,
with_columns: &Option<Vec<String>>,
with_columns: &Option<Arc<Vec<String>>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
schema: &Schema,
) {
Expand Down Expand Up @@ -92,15 +92,15 @@ impl AggScanProjection {
expr_arena: &mut Arena<AExpr>,
lp_arena: &mut Arena<ALogicalPlan>,
path: &Path,
with_columns: Option<Vec<String>>,
with_columns: Option<Arc<Vec<String>>>,
) -> ALogicalPlan {
// if the original projection is less than the new one. Also project locally
if let Some(with_columns) = with_columns {
if let Some(mut with_columns) = with_columns {
let agg = self.columns.get(path).unwrap();
if with_columns.len() < agg.len() {
let node = lp_arena.add(lp);

let projections = with_columns
let projections = std::mem::take(Arc::make_mut(&mut with_columns))
.into_iter()
.map(|s| expr_arena.add(AExpr::Column(Arc::from(s))))
.collect();
Expand Down Expand Up @@ -139,7 +139,7 @@ impl OptimizationRule for AggScanProjection {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
// prevent infinite loop
if options.with_columns == with_columns {
Expand Down Expand Up @@ -185,7 +185,7 @@ impl OptimizationRule for AggScanProjection {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
// prevent infinite loop
if options.with_columns == with_columns {
Expand Down Expand Up @@ -231,7 +231,7 @@ impl OptimizationRule for AggScanProjection {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
if options.with_columns == with_columns {
let lp = ALogicalPlan::CsvScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ impl PredicatePushDown {
| lp @ Aggregate {..} => {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
#[cfg(feature = "python")]
// python node does not yet support predicates
lp @ PythonScan {..} => {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn init_set() -> PlHashSet<Arc<str>> {
fn get_scan_columns(
acc_projections: &mut Vec<Node>,
expr_arena: &Arena<AExpr>,
) -> Option<Vec<String>> {
) -> Option<Arc<Vec<String>>> {
let mut with_columns = None;
if !acc_projections.is_empty() {
let mut columns = Vec::with_capacity(acc_projections.len());
Expand All @@ -28,7 +28,7 @@ fn get_scan_columns(
columns.push((*name).to_owned())
}
}
with_columns = Some(columns);
with_columns = Some(Arc::new(columns));
}
with_columns
}
Expand Down Expand Up @@ -427,6 +427,22 @@ impl ProjectionPushDown {
};
Ok(lp)
}
#[cfg(feature = "python")]
PythonScan { mut options } => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);

options.output_schema = if options.with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&*options.schema,
true,
)))
};
Ok(PythonScan { options })
}
#[cfg(feature = "csv-file")]
CsvScan {
path,
Expand Down
21 changes: 18 additions & 3 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub struct CsvParserOptions {
pub(crate) has_header: bool,
pub(crate) skip_rows: usize,
pub(crate) n_rows: Option<usize>,
pub(crate) with_columns: Option<Vec<String>>,
pub(crate) with_columns: Option<Arc<Vec<String>>>,
pub(crate) low_memory: bool,
pub(crate) ignore_errors: bool,
pub(crate) cache: bool,
Expand All @@ -29,7 +29,7 @@ pub struct CsvParserOptions {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ParquetOptions {
pub(crate) n_rows: Option<usize>,
pub(crate) with_columns: Option<Vec<String>>,
pub(crate) with_columns: Option<Arc<Vec<String>>>,
pub(crate) cache: bool,
pub(crate) parallel: bool,
pub(crate) row_count: Option<RowCount>,
Expand All @@ -39,7 +39,7 @@ pub struct ParquetOptions {
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IpcScanOptions {
pub n_rows: Option<usize>,
pub with_columns: Option<Vec<String>>,
pub with_columns: Option<Arc<Vec<String>>>,
pub cache: bool,
pub row_count: Option<RowCount>,
}
Expand Down Expand Up @@ -146,3 +146,18 @@ pub struct SortArguments {
pub(crate) nulls_last: bool,
pub(crate) slice: Option<(i64, usize)>,
}

#[derive(Clone, PartialEq, Debug, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[cfg_attr(
all(feature = "serde", feature = "object"),
serde(bound(deserialize = "'de: 'static"))
)]
#[cfg(feature = "python")]
pub struct PythonOptions {
// Serialized Fn() -> Result<DataFrame>
pub(crate) scan_fn: Vec<u8>,
pub(crate) schema: SchemaRef,
pub(crate) output_schema: Option<SchemaRef>,
pub(crate) with_columns: Option<Arc<Vec<String>>>,
}
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ mod groupby_rolling;
mod join;
mod melt;
mod projection;
#[cfg(feature = "python")]
mod python_scan;
mod scan;
mod slice;
mod sort;
mod stack;
mod udf;
mod union;
#[cfg(feature = "python")]
pub(super) use self::python_scan::*;

pub(super) use self::{
cache::*, drop_duplicates::*, explode::*, filter::*, groupby::*, groupby_dynamic::*,
Expand Down
40 changes: 40 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/python_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use super::*;
use crate::prelude::*;
use pyo3::prelude::*;
use pyo3::types::PyBytes;

pub(crate) struct PythonScanExec {
pub(crate) options: PythonOptions,
}

impl Executor for PythonScanExec {
fn execute(&mut self, _cache: &ExecutionState) -> Result<DataFrame> {
let with_columns = self.options.with_columns.take();
Python::with_gil(|py| {
let pl = PyModule::import(py, "polars").unwrap();
let pli = pl.getattr("internals").unwrap();
let deser_and_exec = pli.getattr("_deser_and_exec").unwrap();

let bytes = PyBytes::new(py, &self.options.scan_fn);

let with_columns =
with_columns.map(|mut cols| std::mem::take(Arc::make_mut(&mut cols)));

let out = deser_and_exec
.call1((bytes, with_columns))
.map_err(|err| PolarsError::ComputeError(format!("{:?}", err).into()))?;
let pydf = out.getattr("_df").unwrap();
let raw_parts = pydf.call_method0("into_raw_parts").unwrap();
let raw_parts = raw_parts.extract::<(usize, usize, usize)>().unwrap();

let (ptr, len, cap) = raw_parts;
unsafe {
Ok(DataFrame::new_no_checks(Vec::from_raw_parts(
ptr as *mut Series,
len,
cap,
)))
}
})
}
}

0 comments on commit efc7d58

Please sign in to comment.