Skip to content

Commit

Permalink
fix join asof tolerance (#3816)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 26, 2022
1 parent 077deab commit e92fc5e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 6 deletions.
23 changes: 17 additions & 6 deletions polars/polars-core/src/frame/asof_join/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,35 @@ pub(super) unsafe fn join_asof_backward_with_indirection_and_tolerance<
if offsets.is_empty() {
return (None, 0);
}
let mut previous = *offsets.get_unchecked(0);
let first = *right.get_unchecked(previous as usize);
let mut previous_idx = *offsets.get_unchecked(0);
let first = *right.get_unchecked(previous_idx as usize);
if val_l < first {
(None, 0)
} else {
for (idx, &offset) in offsets.iter().enumerate() {
let val_r = *right.get_unchecked(offset as usize);

// the point that is larger is not allowed
if val_r > val_l {
let dist = val_l - val_r;
// compute the distance of previous point, that one was still backwards
let previous_value = *right.get_unchecked(previous_idx as usize);
let dist = val_l - previous_value;
return if dist > tolerance {
(None, idx)
} else {
(Some(previous), idx)
(Some(previous_idx), idx)
};
}
previous = offset
previous_idx = offset
}
// check remaining values that still suffice the distance constraint
let previous_value = *right.get_unchecked(previous_idx as usize);
let dist = val_l - previous_value;
if dist > tolerance {
(None, offsets.len())
} else {
(Some(previous_idx), offsets.len())
}
(None, offsets.len())
}
}

Expand Down
58 changes: 58 additions & 0 deletions py-polars/tests/test_joins.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import datetime

import numpy as np

import polars as pl
Expand Down Expand Up @@ -122,3 +124,59 @@ def test_join_asof_floats() -> None:
"b": ["lrow1", "lrow2", "lrow3"],
"b_right": ["rrow1", "rrow2", "rrow3"],
}


def test_join_asof_tolerance() -> None:
df_trades = pl.DataFrame(
{
"time": [
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 3),
datetime(2020, 1, 1, 9, 0, 6),
],
"stock": ["A", "B", "B", "C"],
"trade": [101, 299, 301, 500],
}
)

df_quotes = pl.DataFrame(
{
"time": [
datetime(2020, 1, 1, 9, 0, 0),
datetime(2020, 1, 1, 9, 0, 2),
datetime(2020, 1, 1, 9, 0, 4),
datetime(2020, 1, 1, 9, 0, 6),
],
"stock": ["A", "B", "C", "A"],
"quote": [100, 300, 501, 102],
}
)

assert df_trades.join_asof(
df_quotes, on="time", by="stock", tolerance="2s"
).to_dict(False) == {
"time": [
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 3),
datetime(2020, 1, 1, 9, 0, 6),
],
"stock": ["A", "B", "B", "C"],
"trade": [101, 299, 301, 500],
"quote": [100, None, 300, 501],
}

assert df_trades.join_asof(
df_quotes, on="time", by="stock", tolerance="1s"
).to_dict(False) == {
"time": [
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 1),
datetime(2020, 1, 1, 9, 0, 3),
datetime(2020, 1, 1, 9, 0, 6),
],
"stock": ["A", "B", "B", "C"],
"trade": [101, 299, 301, 500],
"quote": [100, None, 300, None],
}

0 comments on commit e92fc5e

Please sign in to comment.