Skip to content

Commit

Permalink
asof dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 21, 2021
1 parent 3bcd0b1 commit db08156
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 5 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ checked_arithmetic = ["polars-core/checked_arithmetic"]
repeat_by = ["polars-core/repeat_by", "polars-lazy/repeat_by"]
is_first = ["polars-core/is_first", "polars-lazy/is_first"]
is_last = ["polars-core/is_last"]
asof_join = ["polars-core/asof_join"]
asof_join = ["polars-core/asof_join", "polars-lazy/asof_join"]
cross_join = ["polars-core/cross_join", "polars-lazy/cross_join"]
dot_product = ["polars-core/dot_product", "polars-lazy/dot_product"]
concat_str = ["polars-core/concat_str", "polars-lazy/concat_str"]
Expand Down
34 changes: 34 additions & 0 deletions polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,38 @@ mod test {
);
Ok(())
}

#[test]
fn test_asof_by2() -> Result<()> {
let trades = df![
"time" => [1464183000023i64, 1464183000038, 1464183000048, 1464183000048, 1464183000048],
"ticker" => ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
"bid" => [51.95, 51.95, 720.77, 720.92, 98.0]
]?;

let quotes = df![
"time" => [1464183000023i64,
1464183000023,
1464183000030,
1464183000041,
1464183000048,
1464183000049,
1464183000072,
1464183000075],
"ticker" => ["GOOG", "MSFT", "MSFT", "MSFT", "GOOG", "AAPL", "GOOG", "MSFT"],
"bid" => [720.5, 51.95, 51.97, 51.99, 720.5, 97.99, 720.5, 52.01]

]?;

let out = trades.join_asof_by(&quotes, "time", "time", "ticker", "ticker")?;
let a = out.column("bid_right").unwrap();
let a = a.f64().unwrap();

assert_eq!(
Vec::from(a),
&[Some(51.95), Some(51.97), Some(720.5), Some(720.5), None]
);

Ok(())
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ repeat_by = ["polars-core/repeat_by"]
round_series = ["polars-core/round_series"]
is_first = ["polars-core/is_first"]
cross_join = ["polars-core/cross_join"]
asof_join = ["polars-core/asof_join"]
dot_product = ["polars-core/dot_product"]
concat_str = ["polars-core/concat_str"]
arange = []
Expand Down
17 changes: 17 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ pub struct JoinOptions {
pub force_parallel: bool,
pub how: JoinType,
pub suffix: Option<String>,
pub asof_by_left: Vec<String>,
pub asof_by_right: Vec<String>,
}

impl Default for JoinOptions {
Expand All @@ -175,6 +177,8 @@ impl Default for JoinOptions {
force_parallel: false,
how: JoinType::Left,
suffix: None,
asof_by_left: vec![],
asof_by_right: vec![],
}
}
}
Expand Down Expand Up @@ -1133,6 +1137,8 @@ pub struct JoinBuilder {
allow_parallel: bool,
force_parallel: bool,
suffix: Option<String>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
}
impl JoinBuilder {
fn new(lf: LazyFrame) -> Self {
Expand All @@ -1145,6 +1151,8 @@ impl JoinBuilder {
allow_parallel: true,
force_parallel: false,
suffix: None,
asof_by_left: vec![],
asof_by_right: vec![],
}
}

Expand Down Expand Up @@ -1190,6 +1198,13 @@ impl JoinBuilder {
self
}

/// Set the `by` subgrouper of an asof join.
pub fn asof_by(mut self, left_by: Vec<String>, right_by: Vec<String>) -> Self {
self.asof_by_left = left_by;
self.asof_by_right = right_by;
self
}

/// Finish builder
pub fn finish(self) -> LazyFrame {
let opt_state = self.lf.opt_state;
Expand All @@ -1206,6 +1221,8 @@ impl JoinBuilder {
force_parallel: self.force_parallel,
how: self.how,
suffix: self.suffix,
asof_by_left: self.asof_by_left,
asof_by_right: self.asof_by_right,
},
)
.build();
Expand Down
37 changes: 37 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ pub struct JoinExec {
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
}

impl JoinExec {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
input_left: Box<dyn Executor>,
input_right: Box<dyn Executor>,
Expand All @@ -23,6 +26,8 @@ impl JoinExec {
right_on: Vec<Arc<dyn PhysicalExpr>>,
parallel: bool,
suffix: Option<String>,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
) -> Self {
JoinExec {
input_left: Some(input_left),
Expand All @@ -32,6 +37,8 @@ impl JoinExec {
right_on,
parallel,
suffix,
asof_by_left,
asof_by_right,
}
}
}
Expand Down Expand Up @@ -76,13 +83,43 @@ impl Executor for JoinExec {
.map(|e| e.evaluate(&df_right, state).map(|s| s.name().to_string()))
.collect::<Result<Vec<_>>>()?;

#[cfg(feature = "asof_join")]
let df = if let (JoinType::AsOf, true, true) = (
self.how,
!self.asof_by_right.is_empty(),
!self.asof_by_left.is_empty(),
) {
if left_names.len() > 1 || right_names.len() > 1 {
return Err(PolarsError::ValueError(
"only one column allowed in asof join".into(),
));
}
df_left.join_asof_by(
&df_right,
&left_names[0],
&right_names[0],
&self.asof_by_left,
&self.asof_by_right,
)
} else {
df_left.join(
&df_right,
&left_names,
&right_names,
self.how,
self.suffix.clone(),
)
};

#[cfg(not(feature = "asof_join"))]
let df = df_left.join(
&df_right,
&left_names,
&right_names,
self.how,
self.suffix.clone(),
);

if state.verbose {
eprintln!("{:?} join dataframes finished", self.how);
};
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,8 @@ impl DefaultPlanner {
right_on,
parallel,
options.suffix,
options.asof_by_left,
options.asof_by_right,
)))
}
HStack { input, exprs, .. } => {
Expand Down
27 changes: 24 additions & 3 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,9 @@ def join(
on: Optional[Union[str, tp.List[str]]] = None,
how: str = "inner",
suffix: str = "_right",
asof_by: Optional[Union[str, tp.List[str]]] = None,
asof_by_left: Optional[Union[str, tp.List[str]]] = None,
asof_by_right: Optional[Union[str, tp.List[str]]] = None,
) -> Union["DataFrame", "pl.LazyFrame"]:
"""
SQL like joins.
Expand Down Expand Up @@ -1913,9 +1916,27 @@ def join(
if left_on_ is None or right_on_ is None:
raise ValueError("You should pass the column to join on as an argument.")

if isinstance(left_on_[0], pl.Expr) or isinstance(right_on_[0], pl.Expr):
return self.lazy().join(
df.lazy(), left_on, right_on, how=how, suffix=suffix
if (
isinstance(left_on_[0], pl.Expr)
or isinstance(right_on_[0], pl.Expr)
or asof_by_left is not None
or asof_by_right is not None
or asof_by is not None
):
return (
self.lazy()
.join(
df.lazy(),
left_on,
right_on,
on=on,
how=how,
suffix=suffix,
asof_by_right=asof_by_right,
asof_by_left=asof_by_left,
asof_by=asof_by,
)
.collect(no_optimization=True)
)
else:
return wrap_df(self._df.join(df._df, left_on_, right_on_, how, suffix))
Expand Down
41 changes: 40 additions & 1 deletion py-polars/polars/lazy/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,9 @@ def join(
suffix: str = "_right",
allow_parallel: bool = True,
force_parallel: bool = False,
asof_by: Optional[Union[str, tp.List[str]]] = None,
asof_by_left: Optional[Union[str, tp.List[str]]] = None,
asof_by_right: Optional[Union[str, tp.List[str]]] = None,
) -> "LazyFrame":
"""
Add a join operation to the Logical Plan.
Expand Down Expand Up @@ -468,7 +471,15 @@ def join(
if how == "cross":
return wrap_ldf(
self._ldf.join(
ldf._ldf, [], [], allow_parallel, force_parallel, how, suffix
ldf._ldf,
[],
[],
allow_parallel,
force_parallel,
how,
suffix,
[],
[],
)
)

Expand Down Expand Up @@ -505,6 +516,32 @@ def join(
column = col(column)
new_right_on.append(column._pyexpr)

# set asof_by

left_asof_by_: Union[tp.List[str], None]
if isinstance(asof_by_left, str):
left_asof_by_ = [asof_by_left] # type: ignore[assignment]
else:
left_asof_by_ = asof_by_left

right_asof_by_: Union[tp.List[str], None]
if isinstance(asof_by_right, (str, Expr)):
right_asof_by_ = [asof_by_right] # type: ignore[assignment]
else:
right_asof_by_ = asof_by_right

if isinstance(asof_by, str):
left_asof_by_ = [asof_by]
right_asof_by_ = [asof_by]
elif isinstance(on, list):
left_asof_by_ = asof_by
right_asof_by_ = asof_by

if left_asof_by_ is None:
left_asof_by_ = []
if right_asof_by_ is None:
right_asof_by_ = []

return wrap_ldf(
self._ldf.join(
ldf._ldf,
Expand All @@ -514,6 +551,8 @@ def join(
force_parallel,
how,
suffix,
left_asof_by_,
right_asof_by_,
)
)

Expand Down
3 changes: 3 additions & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ impl PyLazyFrame {
force_parallel: bool,
how: &str,
suffix: String,
asof_by_left: Vec<String>,
asof_by_right: Vec<String>,
) -> PyLazyFrame {
let how = match how {
"left" => JoinType::Left,
Expand All @@ -261,6 +263,7 @@ impl PyLazyFrame {
.force_parallel(force_parallel)
.how(how)
.suffix(suffix)
.asof_by(asof_by_left, asof_by_right)
.finish()
.into()
}
Expand Down
14 changes: 14 additions & 0 deletions py-polars/tests/test_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -1127,3 +1127,17 @@ def test_asof_join():
1464183000048,
1464183000048,
]
out = trades.join(quotes, on="dates", how="asof", asof_by="ticker")
assert out["bid_right"].to_list() == [51.95, 51.97, 720.5, 720.5, None]

out = quotes.join(trades, on="dates", asof_by="ticker", how="asof")
assert out["bid_right"].to_list() == [
None,
51.95,
51.95,
51.95,
720.92,
98.0,
720.92,
51.95,
]

0 comments on commit db08156

Please sign in to comment.