Skip to content

Commit

Permalink
Cache file reads (tpch 2/7) ~5% faster (#3774)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 23, 2022
1 parent bdb1dd0 commit ac8b273
Show file tree
Hide file tree
Showing 35 changed files with 1,012 additions and 674 deletions.
63 changes: 43 additions & 20 deletions polars/polars-lazy/src/dsl/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::prelude::*;
use polars_core::prelude::*;
use polars_core::utils::get_supertype;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::Deref;

use crate::dsl::function_expr::FunctionExpr;
Expand Down Expand Up @@ -65,28 +66,35 @@ impl Debug for dyn RenameAliasFn {
}

#[derive(Clone)]
/// Wrapper type that indicates that the inner type is not equal to anything
pub struct NoEq<T>(T);
/// Wrapper type that has special equality properties
/// depending on the inner type specialization
pub struct SpecialEq<T>(T);

impl<T> NoEq<T> {
impl<T> SpecialEq<T> {
pub fn new(val: T) -> Self {
NoEq(val)
SpecialEq(val)
}
}

impl<T> PartialEq for NoEq<T> {
fn eq(&self, _other: &Self) -> bool {
false
impl<T: ?Sized> PartialEq for SpecialEq<Arc<T>> {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}

impl<T> Debug for NoEq<T> {
impl PartialEq for SpecialEq<Series> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl<T> Debug for SpecialEq<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "no_eq")
}
}

impl<T> Deref for NoEq<T> {
impl<T> Deref for SpecialEq<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -123,11 +131,11 @@ pub trait FunctionOutputField: Send + Sync {
fn get_field(&self, input_schema: &Schema, cntxt: Context, fields: &[Field]) -> Field;
}

pub type GetOutput = NoEq<Arc<dyn FunctionOutputField>>;
pub type GetOutput = SpecialEq<Arc<dyn FunctionOutputField>>;

impl Default for GetOutput {
fn default() -> Self {
NoEq::new(Arc::new(
SpecialEq::new(Arc::new(
|_input_schema: &Schema, _cntxt: Context, fields: &[Field]| fields[0].clone(),
))
}
Expand All @@ -139,25 +147,25 @@ impl GetOutput {
}

pub fn from_type(dt: DataType) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
SpecialEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
Field::new(flds[0].name(), dt.clone())
}))
}

pub fn map_field<F: 'static + Fn(&Field) -> Field + Send + Sync>(f: F) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
SpecialEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
f(&flds[0])
}))
}

pub fn map_fields<F: 'static + Fn(&[Field]) -> Field + Send + Sync>(f: F) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
SpecialEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
f(flds)
}))
}

pub fn map_dtype<F: 'static + Fn(&DataType) -> DataType + Send + Sync>(f: F) -> Self {
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
SpecialEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
let mut fld = flds[0].clone();
let new_type = f(fld.data_type());
fld.coerce(new_type);
Expand All @@ -179,7 +187,7 @@ impl GetOutput {
where
F: 'static + Fn(&[&DataType]) -> DataType + Send + Sync,
{
NoEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
SpecialEq::new(Arc::new(move |_: &Schema, _: Context, flds: &[Field]| {
let mut fld = flds[0].clone();
let dtypes = flds.iter().map(|fld| fld.data_type()).collect::<Vec<_>>();
let new_type = f(&dtypes);
Expand Down Expand Up @@ -244,7 +252,8 @@ impl AsRef<Expr> for AggExpr {
}
}

/// Queries consists of multiple expressions.
/// Queries consists of multiple ex
/// pressions.
#[derive(Clone, PartialEq)]
#[must_use]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
Expand Down Expand Up @@ -297,7 +306,7 @@ pub enum Expr {
/// function arguments
input: Vec<Expr>,
/// function to apply
function: NoEq<Arc<dyn SeriesUdf>>,
function: SpecialEq<Arc<dyn SeriesUdf>>,
/// output dtype of the function
output_type: GetOutput,
options: FunctionOptions,
Expand Down Expand Up @@ -343,7 +352,7 @@ pub enum Expr {
KeepName(Box<Expr>),
#[cfg_attr(feature = "serde", serde(skip))]
RenameAlias {
function: NoEq<Arc<dyn RenameAliasFn>>,
function: SpecialEq<Arc<dyn RenameAliasFn>>,
expr: Box<Expr>,
},
/// Special case that does not need columns
Expand All @@ -352,13 +361,27 @@ pub enum Expr {
Nth(i64),
}

// TODO! derive. This is only a temporary fix
// Because PartialEq will have a lot of `false`, e.g. on Function
// Types, this may lead to many file reads, as we use predicate comparison
// to check if we can cache a file
#[allow(clippy::derive_hash_xor_eq)]
impl Hash for Expr {
fn hash<H: Hasher>(&self, state: &mut H) {
let s = format!("{:?}", self);
s.hash(state)
}
}

impl Eq for Expr {}

impl Default for Expr {
fn default() -> Self {
Expr::Literal(LiteralValue::Null)
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Excluded {
Name(Arc<str>),
Dtype(DataType),
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/dsl/function_expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use polars_core::prelude::*;
use serde::{Deserialize, Serialize};

#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Clone, PartialEq, Debug)]
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
pub enum FunctionExpr {
NullCount,
Pow,
Expand Down Expand Up @@ -73,7 +73,7 @@ impl FunctionExpr {

macro_rules! wrap {
($e:expr) => {
NoEq::new(Arc::new($e))
SpecialEq::new(Arc::new($e))
};
}

Expand All @@ -84,11 +84,11 @@ macro_rules! map_with_args {
$func(s, $($args),*)
};

NoEq::new(Arc::new(f))
SpecialEq::new(Arc::new(f))
}};
}

impl From<FunctionExpr> for NoEq<Arc<dyn SeriesUdf>> {
impl From<FunctionExpr> for SpecialEq<Arc<dyn SeriesUdf>> {
fn from(func: FunctionExpr) -> Self {
use FunctionExpr::*;
match func {
Expand Down
12 changes: 6 additions & 6 deletions polars/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ pub fn spearman_rank_corr(a: Expr, b: Expr) -> Expr {
/// be used and so on.
pub fn argsort_by<E: AsRef<[Expr]>>(by: E, reverse: &[bool]) -> Expr {
let reverse = reverse.to_vec();
let function = NoEq::new(Arc::new(move |by: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |by: &mut [Series]| {
polars_core::functions::argsort_by(by, &reverse).map(|ca| ca.into_series())
}) as Arc<dyn SeriesUdf>);

Expand All @@ -189,7 +189,7 @@ pub fn argsort_by<E: AsRef<[Expr]>>(by: E, reverse: &[bool]) -> Expr {
pub fn concat_str<E: AsRef<[Expr]>>(s: E, sep: &str) -> Expr {
let s = s.as_ref().to_vec();
let sep = sep.to_string();
let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| {
polars_core::functions::concat_str(s, &sep).map(|ca| ca.into_series())
}) as Arc<dyn SeriesUdf>);
Expr::AnonymousFunction {
Expand All @@ -211,7 +211,7 @@ pub fn concat_str<E: AsRef<[Expr]>>(s: E, sep: &str) -> Expr {
pub fn concat_lst<E: AsRef<[IE]>, IE: Into<Expr> + Clone>(s: E) -> Expr {
let s = s.as_ref().iter().map(|e| e.clone().into()).collect();

let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| {
let mut first = std::mem::take(&mut s[0]);
let other = &s[1..];

Expand Down Expand Up @@ -352,7 +352,7 @@ pub fn datetime(args: DatetimeArgs) -> Expr {
let second = args.second;
let millisecond = args.millisecond;

let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| {
assert_eq!(s.len(), 7);
let max_len = s.iter().map(|s| s.len()).max().unwrap();
let mut year = s[0].cast(&DataType::Int32)?;
Expand Down Expand Up @@ -454,7 +454,7 @@ pub struct DurationArgs {

#[cfg(feature = "temporal")]
pub fn duration(args: DurationArgs) -> Expr {
let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| {
assert_eq!(s.len(), 7);
let days = s[0].cast(&DataType::Int64).unwrap();
let seconds = s[1].cast(&DataType::Int64).unwrap();
Expand Down Expand Up @@ -691,7 +691,7 @@ where
if exprs.iter().any(has_wildcard) {
exprs.push(acc);

let function = NoEq::new(Arc::new(move |series: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |series: &mut [Series]| {
let mut series = series.to_vec();
let mut acc = series.pop().unwrap();

Expand Down
20 changes: 10 additions & 10 deletions polars/polars-lazy/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ impl Expr {

Expr::AnonymousFunction {
input: vec![self],
function: NoEq::new(Arc::new(f)),
function: SpecialEq::new(Arc::new(f)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
Expand Down Expand Up @@ -642,7 +642,7 @@ impl Expr {

Expr::AnonymousFunction {
input,
function: NoEq::new(Arc::new(function)),
function: SpecialEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
Expand All @@ -668,7 +668,7 @@ impl Expr {

Expr::AnonymousFunction {
input: vec![self],
function: NoEq::new(Arc::new(f)),
function: SpecialEq::new(Arc::new(f)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyList,
Expand All @@ -693,7 +693,7 @@ impl Expr {

Expr::AnonymousFunction {
input: vec![self],
function: NoEq::new(Arc::new(f)),
function: SpecialEq::new(Arc::new(f)),
output_type,
options,
}
Expand All @@ -716,7 +716,7 @@ impl Expr {

Expr::AnonymousFunction {
input: vec![self],
function: NoEq::new(Arc::new(f)),
function: SpecialEq::new(Arc::new(f)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyGroups,
Expand Down Expand Up @@ -752,7 +752,7 @@ impl Expr {

Expr::AnonymousFunction {
input,
function: NoEq::new(Arc::new(function)),
function: SpecialEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyGroups,
Expand Down Expand Up @@ -1336,7 +1336,7 @@ impl Expr {
where
F: Fn(&str) -> String + 'static + Send + Sync,
{
let function = NoEq::new(Arc::new(function) as Arc<dyn RenameAliasFn>);
let function = SpecialEq::new(Arc::new(function) as Arc<dyn RenameAliasFn>);
Expr::RenameAlias {
expr: Box::new(self),
function,
Expand Down Expand Up @@ -2128,7 +2128,7 @@ where

Expr::AnonymousFunction {
input,
function: NoEq::new(Arc::new(function)),
function: SpecialEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
Expand All @@ -2155,7 +2155,7 @@ where

Expr::AnonymousFunction {
input,
function: NoEq::new(Arc::new(function)),
function: SpecialEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyList,
Expand Down Expand Up @@ -2184,7 +2184,7 @@ where

Expr::AnonymousFunction {
input,
function: NoEq::new(Arc::new(function)),
function: SpecialEq::new(Arc::new(function)),
output_type,
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyGroups,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/dsl/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl StringNameSpace {
/// * `delimiter` - A string that will act as delimiter between values.
pub fn concat(self, delimiter: &str) -> Expr {
let delimiter = delimiter.to_owned();
let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
let function = SpecialEq::new(Arc::new(move |s: &mut [Series]| {
Ok(s[0].str_concat(&delimiter).into_series())
}) as Arc<dyn SeriesUdf>);
Expr::AnonymousFunction {
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/dummies.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::dsl::{BinaryUdfOutputField, NoEq, SeriesBinaryUdf};
use crate::dsl::{BinaryUdfOutputField, SeriesBinaryUdf, SpecialEq};
use crate::logical_plan::Context;
use polars_core::prelude::*;
use std::sync::Arc;

impl Default for NoEq<Arc<dyn SeriesBinaryUdf>> {
impl Default for SpecialEq<Arc<dyn SeriesBinaryUdf>> {
fn default() -> Self {
panic!("implementation error");
}
}

impl Default for NoEq<Arc<dyn BinaryUdfOutputField>> {
impl Default for SpecialEq<Arc<dyn BinaryUdfOutputField>> {
fn default() -> Self {
let output_field = move |_: &Schema, _: Context, _: &Field, _: &Field| None;
NoEq::new(Arc::new(output_field))
SpecialEq::new(Arc::new(output_field))
}
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl<'a> LazyCsvReader<'a> {
)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
lf.opt_state.file_caching = true;
Ok(lf)
}

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl LazyFrame {
rechunk: args.rechunk,
};
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
lf.opt_state.agg_scan_projection = true;
lf.opt_state.file_caching = true;
Ok(lf)
}

Expand Down

0 comments on commit ac8b273

Please sign in to comment.