Skip to content

Commit

Permalink
remove layer of indirection in logical plan strings
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 3, 2021
1 parent 8d97a8d commit ce79acf
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 88 deletions.
4 changes: 2 additions & 2 deletions polars/polars-core/src/frame/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ pub trait AsRefPolars<T: ?Sized> {
fn as_ref_p(&self) -> &T;
}

impl AsRefPolars<str> for std::sync::Arc<String> {
impl AsRefPolars<str> for std::sync::Arc<str> {
fn as_ref_p(&self) -> &str {
&**self
&*self
}
}

Expand Down
12 changes: 6 additions & 6 deletions polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ impl From<AggExpr> for Expr {
/// Queries consists of multiple expressions.
#[derive(Clone, PartialEq)]
pub enum Expr {
Alias(Box<Expr>, Arc<String>),
Column(Arc<String>),
Alias(Box<Expr>, Arc<str>),
Column(Arc<str>),
Columns(Vec<String>),
DtypeColumn(Vec<DataType>),
Literal(LiteralValue),
Expand Down Expand Up @@ -328,7 +328,7 @@ pub enum Expr {
output_field: NoEq<Arc<dyn BinaryUdfOutputField>>,
},
/// Can be used in a select statement to exclude a column from selection
Exclude(Box<Expr>, Vec<Arc<String>>),
Exclude(Box<Expr>, Vec<Arc<str>>),
/// Set root name as Alias
KeepName(Box<Expr>),
SufPreFix {
Expand Down Expand Up @@ -641,7 +641,7 @@ impl Expr {

/// Rename Column.
pub fn alias(self, name: &str) -> Expr {
Expr::Alias(Box::new(self), Arc::new(name.into()))
Expr::Alias(Box::new(self), Arc::from(name))
}

/// Run is_null operation on `Expr`.
Expand Down Expand Up @@ -1433,7 +1433,7 @@ impl Expr {
let v = columns
.to_selection_vec()
.iter()
.map(|s| Arc::new(s.to_string()))
.map(|s| Arc::from(*s))
.collect();
Expr::Exclude(Box::new(self), v)
}
Expand Down Expand Up @@ -1668,7 +1668,7 @@ impl Expr {
pub fn col(name: &str) -> Expr {
match name {
"*" => Expr::Wildcard,
_ => Expr::Column(Arc::new(name.to_owned())),
_ => Expr::Column(Arc::from(name)),
}
}

Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ pub enum AExpr {
Duplicated(Node),
Reverse(Node),
Explode(Node),
Alias(Node, Arc<String>),
Column(Arc<String>),
Alias(Node, Arc<str>),
Column(Arc<str>),
Literal(LiteralValue),
BinaryExpr {
left: Node,
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,15 @@ impl<'a> ALogicalPlanBuilder<'a> {
let schema_right = self.lp_arena.get(other).schema(self.lp_arena);

// column names of left table
let mut names: HashSet<&String, RandomState> = HashSet::with_capacity_and_hasher(
let mut names: HashSet<&str, RandomState> = HashSet::with_capacity_and_hasher(
schema_left.len() + schema_right.len(),
Default::default(),
);
// fields of new schema
let mut fields = Vec::with_capacity(schema_left.len() + schema_right.len());

for f in schema_left.fields() {
names.insert(f.name());
names.insert(f.name().as_ref());
fields.push(f.clone());
}

Expand All @@ -699,8 +699,8 @@ impl<'a> ALogicalPlanBuilder<'a> {

for f in schema_right.fields() {
let name = f.name();
if !right_names.contains(name) {
if names.contains(name) {
if !right_names.contains(name.as_str()) {
if names.contains(name.as_str()) {
let new_name = format!("{}_right", name);
let field = Field::new(&new_name, f.data_type().clone());
fields.push(field)
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,7 +978,7 @@ impl LogicalPlanBuilder {
.iter()
.map(|e| {
if let Expr::Column(name) = e {
(**name).clone()
(**name).to_owned()
} else {
panic!("expected column expression")
}
Expand Down Expand Up @@ -1048,7 +1048,7 @@ impl LogicalPlanBuilder {
for f in schema_right.fields() {
let name = f.name();

if !right_names.contains(name) {
if !right_names.iter().any(|s| s.as_ref() == name) {
if names.contains(name) {
let new_name = format!("{}_right", name);
let field = Field::new(&new_name, f.data_type().clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl AggScanProjection {

let projections = with_columns
.into_iter()
.map(|s| expr_arena.add(AExpr::Column(Arc::new(s))))
.map(|s| expr_arena.add(AExpr::Column(Arc::from(s))))
.collect();

lp = ALogicalPlanBuilder::new(node, expr_arena, lp_arena)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl PredicatePushDown {
fn finish_at_leaf(
&self,
lp: ALogicalPlan,
acc_predicates: PlHashMap<Arc<String>, Node>,
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> ALogicalPlan {
Expand All @@ -49,7 +49,7 @@ impl PredicatePushDown {
fn pushdown_and_assign(
&self,
input: Node,
acc_predicates: PlHashMap<Arc<String>, Node>,
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<()> {
Expand All @@ -73,7 +73,7 @@ impl PredicatePushDown {
fn push_down(
&self,
logical_plan: ALogicalPlan,
mut acc_predicates: PlHashMap<Arc<String>, Node>,
mut acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
Expand Down Expand Up @@ -142,9 +142,11 @@ impl PredicatePushDown {
schema,
} => {
// predicates that will be done at this level
let condition = |name: Arc<String>| {
let condition = |name: Arc<str>| {
let name = &*name;
name == "variable" || name == "value" || value_vars.contains(name)
name == "variable"
|| name == "value"
|| value_vars.iter().any(|s| s.as_str() == name)
};
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);
Expand Down Expand Up @@ -223,7 +225,7 @@ impl PredicatePushDown {
Ok(lp)
}
Explode { input, columns } => {
let condition = |name: Arc<String>| columns.contains(&*name);
let condition = |name: Arc<str>| columns.iter().any(|s| s.as_str() == &*name);
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

Expand Down Expand Up @@ -492,11 +494,11 @@ mod test {
let predicate = to_aexpr(predicate_expr.clone(), &mut expr_arena);
insert_and_combine_predicate(
&mut acc_predicates,
Arc::new("foo".into()),
Arc::from("foo"),
predicate,
&mut expr_arena,
);
let root = *acc_predicates.get(&String::from("foo")).unwrap();
let root = *acc_predicates.get("foo").unwrap();
let expr = node_to_exp(root, &expr_arena);
assert_eq!(
format!("{:?}", &expr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ impl Dsl for Node {

/// Don't overwrite predicates but combine them.
pub(super) fn insert_and_combine_predicate(
acc_predicates: &mut PlHashMap<Arc<String>, Node>,
name: Arc<String>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
name: Arc<str>,
predicate: Node,
arena: &mut Arena<AExpr>,
) {
Expand Down Expand Up @@ -61,7 +61,7 @@ where
}

pub(super) fn predicate_at_scan(
acc_predicates: PlHashMap<Arc<String>, Node>,
acc_predicates: PlHashMap<Arc<str>, Node>,
predicate: Option<Node>,
expr_arena: &mut Arena<AExpr>,
) -> Option<Node> {
Expand All @@ -78,30 +78,30 @@ pub(super) fn predicate_at_scan(
}

/// Determine the hashmap key by combining all the root column names of a predicate
pub(super) fn roots_to_key(roots: &[Arc<String>]) -> Arc<String> {
pub(super) fn roots_to_key(roots: &[Arc<str>]) -> Arc<str> {
if roots.len() == 1 {
roots[0].clone()
} else {
let mut new = String::with_capacity(32 * roots.len());
for name in roots {
new.push_str(name);
}
Arc::new(new)
Arc::from(new)
}
}

pub(super) fn get_insertion_name(
expr_arena: &Arena<AExpr>,
predicate: Node,
schema: &Schema,
) -> Arc<String> {
Arc::new(
) -> Arc<str> {
Arc::from(
expr_arena
.get(predicate)
.to_field(schema, Context::Default, expr_arena)
.unwrap()
.name()
.clone(),
.as_ref(),
)
}

Expand Down Expand Up @@ -139,7 +139,7 @@ pub(super) fn is_pushdown_boundary(node: Node, expr_arena: &Arena<AExpr>) -> boo
pub(super) fn rewrite_projection_node(
expr_arena: &mut Arena<AExpr>,
lp_arena: &mut Arena<ALogicalPlan>,
acc_predicates: &mut PlHashMap<Arc<String>, Node>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
expr: Vec<Node>,
input: Node,
) -> (Vec<Node>, Vec<Node>)
Expand Down Expand Up @@ -237,7 +237,7 @@ pub(super) fn no_pushdown_preds<F>(
matches: F,
// predicates that will be filtered at this node in the LP
local_predicates: &mut Vec<Node>,
acc_predicates: &mut PlHashMap<Arc<String>, Node>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
) where
F: Fn(&AExpr) -> bool,
{
Expand All @@ -246,7 +246,7 @@ pub(super) fn no_pushdown_preds<F>(
// columns that are projected. We check if we can push down the predicates past this projection
let columns = aexpr_to_root_names(node, arena);

let condition = |name: Arc<String>| columns.contains(&name);
let condition = |name: Arc<str>| columns.contains(&name);
local_predicates.extend(transfer_to_local(arena, acc_predicates, condition));
}
}
Expand All @@ -255,11 +255,11 @@ pub(super) fn no_pushdown_preds<F>(
/// to a local_predicates vec based on a condition.
pub(super) fn transfer_to_local<F>(
expr_arena: &Arena<AExpr>,
acc_predicates: &mut PlHashMap<Arc<String>, Node>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
mut condition: F,
) -> Vec<Node>
where
F: FnMut(Arc<String>) -> bool,
F: FnMut(Arc<str>) -> bool,
{
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::{datatypes::PlHashSet, prelude::*};
fn init_vec() -> Vec<Node> {
Vec::with_capacity(100)
}
fn init_set() -> PlHashSet<Arc<String>> {
fn init_set() -> PlHashSet<Arc<str>> {
PlHashSet::with_capacity(32)
}

Expand All @@ -21,7 +21,7 @@ fn get_scan_columns(
let mut columns = Vec::with_capacity(acc_projections.len());
for expr in acc_projections {
for name in aexpr_to_root_names(*expr, expr_arena) {
columns.push((*name).clone())
columns.push((*name).to_owned())
}
}
with_columns = Some(columns);
Expand All @@ -38,7 +38,7 @@ fn split_acc_projections(
acc_projections: Vec<Node>,
down_schema: &Schema,
expr_arena: &mut Arena<AExpr>,
) -> (Vec<Node>, Vec<Node>, PlHashSet<Arc<String>>) {
) -> (Vec<Node>, Vec<Node>, PlHashSet<Arc<str>>) {
// If node above has as many columns as the projection there is nothing to pushdown.
if down_schema.fields().len() == acc_projections.len() {
let local_projections = acc_projections;
Expand All @@ -61,7 +61,7 @@ fn split_acc_projections(
fn add_expr_to_accumulated(
expr: Node,
acc_projections: &mut Vec<Node>,
projected_names: &mut PlHashSet<Arc<String>>,
projected_names: &mut PlHashSet<Arc<str>>,
expr_arena: &mut Arena<AExpr>,
) {
for root_node in aexpr_to_root_nodes(expr, expr_arena) {
Expand All @@ -76,12 +76,12 @@ fn add_expr_to_accumulated(
fn add_str_to_accumulated(
name: &str,
acc_projections: &mut Vec<Node>,
projected_names: &mut PlHashSet<Arc<String>>,
projected_names: &mut PlHashSet<Arc<str>>,
expr_arena: &mut Arena<AExpr>,
) {
// if empty: all columns are already projected.
if !acc_projections.is_empty() {
let node = expr_arena.add(AExpr::Column(Arc::new(name.to_string())));
let node = expr_arena.add(AExpr::Column(Arc::from(name)));
add_expr_to_accumulated(node, acc_projections, projected_names, expr_arena);
}
}
Expand Down Expand Up @@ -124,8 +124,8 @@ impl ProjectionPushDown {
proj: Node,
pushdown_left: &mut Vec<Node>,
pushdown_right: &mut Vec<Node>,
names_left: &mut PlHashSet<Arc<String>>,
names_right: &mut PlHashSet<Arc<String>>,
names_left: &mut PlHashSet<Arc<str>>,
names_right: &mut PlHashSet<Arc<str>>,
expr_arena: &mut Arena<AExpr>,
) -> bool {
let mut pushed_at_least_one = false;
Expand Down Expand Up @@ -155,7 +155,7 @@ impl ProjectionPushDown {
&self,
input: Node,
acc_projections: Vec<Node>,
names: PlHashSet<Arc<String>>,
names: PlHashSet<Arc<str>>,
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
Expand Down Expand Up @@ -188,7 +188,7 @@ impl ProjectionPushDown {
&self,
logical_plan: ALogicalPlan,
mut acc_projections: Vec<Node>,
mut projected_names: PlHashSet<Arc<String>>,
mut projected_names: PlHashSet<Arc<str>>,
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
Expand Down Expand Up @@ -675,16 +675,16 @@ impl ProjectionPushDown {
.split_at(root_column_name.len() - suffix.len());

let downwards_name_column =
expr_arena.add(AExpr::Column(Arc::new(downwards_name.into())));
expr_arena.add(AExpr::Column(Arc::from(downwards_name)));
// project downwards and locally immediately alias to prevent wrong projections
if names_right.insert(Arc::new(downwards_name.to_string())) {
if names_right.insert(Arc::from(downwards_name)) {
pushdown_right.push(downwards_name_column);
}

// locally we project and alias
let projection = expr_arena.add(AExpr::Alias(
downwards_name_column,
Arc::new(format!("{}{}", downwards_name, suffix)),
Arc::from(format!("{}{}", downwards_name, suffix)),
));
local_projection.push(projection);
}
Expand Down

0 comments on commit ce79acf

Please sign in to comment.