Skip to content

Commit

Permalink
fix bug in agg projections and init tpch schema tests (#3771)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 22, 2022
1 parent 3e797f5 commit cd710c4
Show file tree
Hide file tree
Showing 17 changed files with 154 additions and 41 deletions.
Binary file added examples/datasets/tpc_heads/customer.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/lineitem.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/nation.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/orders.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/part.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/partsupp.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/region.feather
Binary file not shown.
Binary file added examples/datasets/tpc_heads/supplier.feather
Binary file not shown.
3 changes: 3 additions & 0 deletions polars/polars-core/src/chunked_array/random.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ impl Series {
"n is larger than the number of elements in this array".into(),
));
}
if n == 0 {
return Ok(self.slice(0, 0));
}
let len = self.len();

match with_replacement {
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-core/src/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ pub type PlHashMap<K, V> = hashbrown::HashMap<K, V, RandomState>;
pub type PlHashSet<V> = hashbrown::HashSet<V, RandomState>;
#[cfg(feature = "private")]
pub type PlIndexMap<K, V> = indexmap::IndexMap<K, V, RandomState>;
#[cfg(feature = "private")]
pub type PlIndexSet<K> = indexmap::IndexSet<K, RandomState>;

#[cfg(not(feature = "bigidx"))]
pub type IdxCa = UInt32Chunked;
Expand Down
12 changes: 8 additions & 4 deletions polars/polars-lazy/src/dsl/string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@ pub struct StringNameSpace(pub(crate) Expr);

impl StringNameSpace {
/// Check if a string value contains a literal substring.
pub fn contains_literal(self, pat: String) -> Expr {
pub fn contains_literal<S: AsRef<str>>(self, pat: S) -> Expr {
let pat = pat.as_ref().into();
self.0.map_private(
FunctionExpr::StringContains { pat, literal: true },
"str.contains_literal",
)
}

/// Check if a string value contains a Regex substring.
pub fn contains(self, pat: String) -> Expr {
pub fn contains<S: AsRef<str>>(self, pat: S) -> Expr {
let pat = pat.as_ref().into();
self.0.map_private(
FunctionExpr::StringContains {
pat,
Expand All @@ -28,13 +30,15 @@ impl StringNameSpace {
}

/// Check if a string value ends with the `sub` string.
pub fn ends_with(self, sub: String) -> Expr {
pub fn ends_with<S: AsRef<str>>(self, sub: S) -> Expr {
let sub = sub.as_ref().into();
self.0
.map_private(FunctionExpr::StringEndsWith(sub), "str.ends_with")
}

/// Check if a string value starts with the `sub` string.
pub fn starts_with(self, sub: String) -> Expr {
pub fn starts_with<S: AsRef<str>>(self, sub: S) -> Expr {
let sub = sub.as_ref().into();
self.0
.map_private(FunctionExpr::StringStartsWith(sub), "str.starts_with")
}
Expand Down
11 changes: 8 additions & 3 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,13 @@ impl LazyFrame {
/// .sort_by_exprs(vec![col("sepal.width")], vec![false])
/// }
/// ```
pub fn sort_by_exprs<E: AsRef<[Expr]>>(self, by_exprs: E, reverse: Vec<bool>) -> Self {
pub fn sort_by_exprs<E: AsRef<[Expr]>, B: AsRef<[bool]>>(
self,
by_exprs: E,
reverse: B,
) -> Self {
let by_exprs = by_exprs.as_ref().to_vec();
let reverse = reverse.as_ref().to_vec();
if by_exprs.is_empty() {
self
} else {
Expand Down Expand Up @@ -603,14 +608,14 @@ impl LazyFrame {
rules.push(Box::new(AggregatePushdown::new()))
}

#[cfg(any(feature = "parquet", feature = "csv-file"))]
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
if agg_scan_projection {
// scan the LP to aggregate all the column used in scans
// these columns will be added to the state of the AggScanProjection rule
let mut columns = PlHashMap::with_capacity(32);
agg_projection(lp_top, &mut columns, lp_arena);

let opt = AggScanProjection { columns };
let opt = AggScanProjection::new(columns);
rules.push(Box::new(opt));
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ impl LogicalPlanBuilder {
}

pub fn sort(self, by_column: Vec<Expr>, reverse: Vec<bool>, null_last: bool) -> Self {
let by_column = rewrite_projections(by_column, self.0.schema(), &[]);
LogicalPlan::Sort {
input: Box::new(self.0),
by_column,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,43 +1,36 @@
use crate::logical_plan::optimizer::stack_opt::OptimizationRule;
use crate::logical_plan::ALogicalPlanBuilder;
use crate::prelude::*;
use polars_core::datatypes::{PlHashMap, PlHashSet};
use polars_core::prelude::Schema;
use polars_core::datatypes::PlHashMap;
use polars_core::prelude::{PlIndexSet, Schema};
use std::path::{Path, PathBuf};
use std::sync::Arc;

fn process_with_columns(
path: &Path,
with_columns: &Option<Arc<Vec<String>>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
columns: &mut PlHashMap<PathBuf, PlIndexSet<String>>,
schema: &Schema,
) {
let cols = columns
.entry(path.to_owned())
.or_insert_with(PlHashSet::new);
.or_insert_with(|| PlIndexSet::with_capacity_and_hasher(32, Default::default()));

match with_columns {
// add only the projected columns
Some(with_columns) => {
cols.extend(with_columns.iter().enumerate().map(|t| (t.0, t.1.clone())));
}
Some(with_columns) => cols.extend(with_columns.iter().cloned()),
// no projection, so we must take all columns
None => {
cols.extend(
schema
.iter_names()
.enumerate()
.map(|t| (t.0, t.1.to_string())),
);
cols.extend(schema.iter_names().map(|t| t.to_string()));
}
}
}

/// Aggregate all the projections in an LP
pub(crate) fn agg_projection(
root: Node,
// The hashmap maps files to a hashset over column names. (There is a usize to be able to sort them later)
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
// The hashmap maps files to a hashset over column names.
columns: &mut PlHashMap<PathBuf, PlIndexSet<String>>,
lp_arena: &Arena<ALogicalPlan>,
) {
use ALogicalPlan::*;
Expand Down Expand Up @@ -82,10 +75,23 @@ pub(crate) fn agg_projection(
/// Due to self joins there can be multiple Scans of the same file in a LP. We already cache the scans
/// in the PhysicalPlan, but we need to make sure that the first scan has all the columns needed.
pub struct AggScanProjection {
pub columns: PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
columns: PlHashMap<PathBuf, Arc<Vec<String>>>,
}

impl AggScanProjection {
pub(crate) fn new(columns: PlHashMap<PathBuf, PlIndexSet<String>>) -> Self {
let new_columns_mapping = columns
.into_iter()
.map(|(k, agg)| {
let columns = agg.iter().cloned().collect::<Vec<_>>();
(k, Arc::new(columns))
})
.collect();
Self {
columns: new_columns_mapping,
}
}

fn finish_rewrite(
&self,
mut lp: ALogicalPlan,
Expand All @@ -112,6 +118,10 @@ impl AggScanProjection {
}
lp
}

fn extract_columns(&mut self, path: &PathBuf) -> Option<Arc<Vec<String>>> {
self.columns.get(path).cloned()
}
}

impl OptimizationRule for AggScanProjection {
Expand All @@ -135,12 +145,7 @@ impl OptimizationRule for AggScanProjection {
mut options,
} = lp
{
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
let with_columns = self.extract_columns(&path);
// prevent infinite loop
if options.with_columns == with_columns {
let lp = ALogicalPlan::IpcScan {
Expand Down Expand Up @@ -181,12 +186,7 @@ impl OptimizationRule for AggScanProjection {
mut options,
} = lp
{
let mut with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
let mut with_columns = self.extract_columns(&path);
// prevent infinite loop
if options.with_columns == with_columns {
let lp = ALogicalPlan::ParquetScan {
Expand Down Expand Up @@ -227,12 +227,7 @@ impl OptimizationRule for AggScanProjection {
aggregate,
} = lp
{
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|k| k.0);
Arc::new(columns.into_iter().map(|k| k.1).collect())
});
let with_columns = self.extract_columns(&path);
if options.with_columns == with_columns {
let lp = ALogicalPlan::CsvScan {
path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn run_partitions(
}

fn estimate_unique_count(keys: &[Series], mut sample_size: usize) -> usize {
dbg!(keys);
// https://stats.stackexchange.com/a/19090/147321
// estimated unique size
// u + ui / m (s - m)
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ mod predicate_queries;
mod projection_queries;
#[cfg(feature = "test")]
mod queries;
mod tpch;

fn load_df() -> DataFrame {
df!("a" => &[1, 2, 3, 4, 5],
Expand Down
101 changes: 101 additions & 0 deletions polars/polars-lazy/src/tests/tpch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! The tpch files only got ten rows, so after all the joins filters there is not data
//! Still we can use this to test the schema, operation correctness on empty data, and optimizations
//! taken.
use super::*;

const fn base_path() -> &'static str {
"../../examples/datasets/tpc_heads"
}

fn region() -> LazyFrame {
let base_path = base_path();
LazyFrame::scan_ipc(
format!("{base_path}/region.feather"),
ScanArgsIpc::default(),
)
.unwrap()
}
fn nation() -> LazyFrame {
let base_path = base_path();
LazyFrame::scan_ipc(
format!("{base_path}/nation.feather"),
ScanArgsIpc::default(),
)
.unwrap()
}

fn supplier() -> LazyFrame {
let base_path = base_path();
LazyFrame::scan_ipc(
format!("{base_path}/supplier.feather"),
ScanArgsIpc::default(),
)
.unwrap()
}

fn part() -> LazyFrame {
let base_path = base_path();
LazyFrame::scan_ipc(format!("{base_path}/part.feather"), ScanArgsIpc::default()).unwrap()
}

fn partsupp() -> LazyFrame {
let base_path = base_path();
LazyFrame::scan_ipc(
format!("{base_path}/partsupp.feather"),
ScanArgsIpc::default(),
)
.unwrap()
}

#[test]
fn test_q2() -> Result<()> {
let q1 = (part()
.inner_join(partsupp(), "p_partkey", "ps_partkey")
.inner_join(supplier(), "ps_suppkey", "s_suppkey")
.inner_join(nation(), "s_nationkey", "n_nationkey")
.inner_join(region(), "n_regionkey", "r_regionkey")
.filter(col("p_size").eq(15))
.filter(col("p_type").str().ends_with("BRASS")));

let q = q1
.clone()
.groupby([col("p_partkey")])
.agg([col("ps_supplycost").min()])
.join(
q1,
[col("p_partkey"), col("ps_supplycost")],
[col("p_partkey"), col("ps_supplycost")],
JoinType::Inner,
)
.select([cols([
"s_acctbal",
"s_name",
"n_name",
"p_partkey",
"p_mfgr",
"s_address",
"s_phone",
"s_comment",
])])
.sort_by_exprs(
[cols(["s_acctbal", "n_name", "s_name", "p_partkey"])],
[true, false, false, false],
)
.limit(100);

let out = q.collect()?;
let schema = Schema::from([
Field::new("s_acctbal", DataType::Float64),
Field::new("s_name", DataType::Utf8),
Field::new("n_name", DataType::Utf8),
Field::new("p_partkey", DataType::Int64),
Field::new("p_mfgr", DataType::Utf8),
Field::new("s_address", DataType::Utf8),
Field::new("s_phone", DataType::Utf8),
Field::new("s_comment", DataType::Utf8),
]);
assert_eq!(&out.schema(), &schema);
dbg!(out);

Ok(())
}

0 comments on commit cd710c4

Please sign in to comment.