Skip to content

Commit

Permalink
[python] init lazy api
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 8, 2020
1 parent 1f44e9f commit 1fb099d
Show file tree
Hide file tree
Showing 16 changed files with 474 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-test-docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install maturin pytest black==19.10b
pip install maturin pytest black==20.8b1
- name: Run formatting checks
run: |
cargo fmt -- --check
Expand Down
46 changes: 45 additions & 1 deletion polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,51 @@ impl fmt::Debug for Expr {
}
}

pub(crate) fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Operator {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
Plus,
Minus,
Multiply,
Divide,
Modulus,
And,
Or,
Not,
Like,
NotLike,
}

impl From<u8> for Operator {
fn from(op: u8) -> Self {
match op {
0 => Operator::Eq,
1 => Operator::NotEq,
2 => Operator::Lt,
3 => Operator::LtEq,
4 => Operator::Gt,
5 => Operator::GtEq,
6 => Operator::Plus,
7 => Operator::Minus,
8 => Operator::Multiply,
9 => Operator::Divide,
10 => Operator::Modulus,
11 => Operator::And,
12 => Operator::Or,
13 => Operator::Not,
14 => Operator::Like,
15 => Operator::NotLike,
_ => panic!("not an operator"),
}
}
}

pub fn binary_expr(l: Expr, op: Operator, r: Expr) -> Expr {
Expr::BinaryExpr {
left: Box::new(l),
op,
Expand Down
11 changes: 11 additions & 0 deletions polars/src/lazy/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ pub struct LazyFrame {
type_coercion: bool,
}

impl Default for LazyFrame {
fn default() -> Self {
LazyFrame {
logical_plan: LogicalPlan::default(),
projection_pushdown: false,
predicate_pushdown: false,
type_coercion: false,
}
}
}

impl From<LogicalPlan> for LazyFrame {
fn from(plan: LogicalPlan) -> Self {
Self {
Expand Down
32 changes: 12 additions & 20 deletions polars/src/lazy/logical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod optimizer;
use crate::lazy::logical_plan::LogicalPlan::CsvScan;
use crate::{
lazy::{prelude::*, utils},
prelude::*,
Expand Down Expand Up @@ -58,26 +59,6 @@ impl ScalarValue {
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
pub enum Operator {
Eq,
NotEq,
Lt,
LtEq,
Gt,
GtEq,
Plus,
Minus,
Multiply,
Divide,
Modulus,
And,
Or,
Not,
Like,
NotLike,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[derive(Clone)]
pub enum LogicalPlan {
Expand Down Expand Up @@ -123,6 +104,17 @@ pub enum LogicalPlan {
},
}

impl Default for LogicalPlan {
fn default() -> Self {
CsvScan {
path: "".to_string(),
schema: Schema::new(vec![Field::new("", ArrowDataType::Null, true)]),
has_header: false,
delimiter: None,
}
}
}

impl fmt::Debug for LogicalPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use LogicalPlan::*;
Expand Down
2 changes: 1 addition & 1 deletion polars/src/lazy/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub use crate::lazy::{
predicate::PredicatePushDown, projection::ProjectionPushDown,
type_coercion::TypeCoercion, Optimize,
},
JoinType, LogicalPlan, LogicalPlanBuilder, Operator, ScalarValue,
JoinType, LogicalPlan, LogicalPlanBuilder, ScalarValue,
},
physical_plan::{
executors::{CsvExec, DataFrameExec, FilterExec, GroupByExec, PipeExec, SortExec},
Expand Down
2 changes: 1 addition & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
polars = {path = "../polars", features = ["parquet", "simd", "parallel"]}
polars = {path = "../polars", features = ["parquet", "simd", "parallel", "lazy"]}
pyo3 = {version = "0.11", features = ["extension-module"] }
thiserror = "1.0.20"
numpy = "0.11"
Expand Down
8 changes: 6 additions & 2 deletions py-polars/pypolars/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os

if not os.environ.get("DOC_BUILDING", False):
from .pypolars import PyDataFrame, PySeries
from .pypolars import PyDataFrame, PySeries, PyLazyFrame
from typing import (
Dict,
Sequence,
Expand Down Expand Up @@ -72,7 +72,8 @@ def read_csv(

@staticmethod
def read_parquet(
file: Union[str, BinaryIO], batch_size: int = 250000,
file: Union[str, BinaryIO],
batch_size: int = 250000,
) -> DataFrame:
"""
Read into a DataFrame from a parquet file.
Expand Down Expand Up @@ -634,6 +635,9 @@ def shift(self, periods: int) -> DataFrame:
"""
return wrap_df(self._df.shift(periods))

def lazy(self) -> "LazyFrame":
pass


class GroupBy:
def __init__(self, df: DataFrame, by: List[str]):
Expand Down
89 changes: 89 additions & 0 deletions py-polars/pypolars/lazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from __future__ import annotations
import os
from typing import Union, List

from pypolars.frame import DataFrame, wrap_df

if not os.environ.get("DOC_BUILDING", False):
from .pypolars import PyLazyFrame, col, lit, binary_expr, PyExpr, PyLazyGroupBy


def lazy(self) -> "LazyFrame":
return wrap_ldf(self._df.lazy())


DataFrame.lazy = lazy


def wrap_ldf(ldf: PyLazyFrame) -> LazyFrame:
return LazyFrame.from_pyldf(ldf)


class LazyGroupBy:
def __init__(self, lgb: PyLazyGroupBy):
self.lgb = lgb

def agg(self, aggs: List[PyExpr]) -> LazyFrame:
return wrap_ldf(self.lgb.agg(aggs))


class LazyFrame:
def __init__(self):
self._ldf = None

@staticmethod
def from_pyldf(ldf: "PyLazyFrame") -> "LazyFrame":
self = LazyFrame.__new__(LazyFrame)
self._ldf = ldf
return self

def describe_plan(self) -> str:
return self._ldf.describe_plan()

def describe_optimized_plan(self) -> str:
return self._ldf.describe_optimized_plan()

def sort(self, by_column: str) -> LazyFrame:
return wrap_ldf(self._ldf.sort(by_column))

def collect(self) -> DataFrame:
return wrap_df(self._ldf.collect())

def filter(self, predicate: PyExpr) -> LazyFrame:
return wrap_ldf(self._ldf.filter(predicate))

def select(self, exprs: PyExpr) -> LazyFrame:
if not isinstance(exprs, list):
exprs = [exprs]
return wrap_ldf(self._ldf.select(exprs))

def groupby(self, by: Union[str, List[str]]) -> LazyGroupBy:
"""
Start a groupby operation
Parameters
----------
by
Column(s) to group by.
"""
if isinstance(by, str):
by = [by]
lgb = self._ldf.groupby(by)
return LazyGroupBy(lgb)

def join(
self,
ldf: LazyFrame,
left_on: str,
right_on: str,
how="inner",
) -> LazyFrame:
if how == "inner":
inner = self._ldf.inner_join(ldf._ldf, left_on, right_on)
elif how == "left":
inner = self._ldf.left_join(ldf._ldf, left_on, right_on)
elif how == "outer":
inner = self._ldf.outer_join(ldf._ldf, left_on, right_on)
else:
return NotImplemented
return wrap_ldf(inner)
11 changes: 11 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use polars::prelude::*;
use pyo3::{exceptions::RuntimeError, prelude::*};

use crate::datatypes::DataType;
use crate::lazy::dataframe::PyLazyFrame;
use crate::{
error::PyPolarsEr,
file::{get_either_file, get_file_like, EitherRustPythonFile},
Expand All @@ -20,6 +21,12 @@ impl PyDataFrame {
}
}

impl From<DataFrame> for PyDataFrame {
fn from(df: DataFrame) -> Self {
PyDataFrame { df }
}
}

#[pymethods]
impl PyDataFrame {
#[new]
Expand Down Expand Up @@ -389,4 +396,8 @@ impl PyDataFrame {
let df = self.df.shift(periods).map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

pub fn lazy(&self) -> PyLazyFrame {
self.df.clone().lazy().into()
}
}

0 comments on commit 1fb099d

Please sign in to comment.