Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
zanmato1984 committed May 10, 2024
1 parent e3b0bd1 commit ea3b24c
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 0 deletions.
16 changes: 16 additions & 0 deletions cpp/src/arrow/acero/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,12 @@ class AsofJoinNode : public ExecNode {
// Advance each of the RHS as far as possible to be up to date for the LHS timestamp
ARROW_ASSIGN_OR_RAISE(bool any_rhs_advanced, UpdateRhs());

// UpdateRhs saw empty RHS at time 200 and didn't do any Advance.
// Then wait until 400 by when right batch will be already pushed (at time 300).
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Now is time 400, IsUpToDateWithLhsRow will see non-empty RHS up to date for LHS
// and emit wrong results, without RHS properly advanced.

// If we have received enough inputs to produce the next output batch
// (decided by IsUpToDateWithLhsRow), we will perform the join and
// materialize the output batch. The join is done by advancing through
Expand Down Expand Up @@ -1032,6 +1038,9 @@ class AsofJoinNode : public ExecNode {
}

bool Process() {
// Process starts at time 200, by when LHS is not empty and RHS is empty.
std::this_thread::sleep_for(std::chrono::milliseconds(200));

std::lock_guard<std::mutex> guard(gate_);
if (!CheckEnded()) {
return false;
Expand Down Expand Up @@ -1388,6 +1397,13 @@ class AsofJoinNode : public ExecNode {
// Get the input
ARROW_DCHECK(std_has(inputs_, input));
size_t k = std_find(inputs_, input) - inputs_.begin();
if (k == 0) {
// At time 100 the left batch is pushed.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
} else {
// At time 300 the right batch is pushed.
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}

// Put into the queue
auto rb = *batch.ToRecordBatch(input->output_schema());
Expand Down
50 changes: 50 additions & 0 deletions cpp/src/arrow/acero/asof_join_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1678,5 +1678,55 @@ TEST(AsofJoinTest, BackpressureWithBatchesGen) {
/*slow_r0=*/false);
}

TEST(AsofJoinTest, GH40675) {
auto left_batch = ExecBatchFromJSON(
{int64(), utf8()}, R"([[1, "a"], [1, "b"], [5, "a"], [6, "b"], [7, "f"]])");
auto right_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()}, R"([[2, "a", 1.0], [9, "b", 3.0], [15, "g", 5.0]])");

Declaration left{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colA", int64()), field("col2", utf8())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colB", int64()), field("col3", utf8()),
field("colC", float64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"colA"}, {{"col2"}}}, {{"colB"}, {{"col3"}}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch = ExecBatchFromJSON(
{int64(), utf8(), float64()},
R"([[1, "a", 1.0], [1, "b", null], [5, "a", null], [6, "b", null], [7, "f", null]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

TEST(AsofJoinTest, GH41149) {
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
auto right_batch =
ExecBatchFromJSON({utf8(), int64()}, R"([["Z", 2], ["B", 3], ["A", 4]])");

Declaration left{"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
{std::move(left_batch)})};
Declaration right{
"exec_batch_source",
ExecBatchSourceNodeOptions(schema({field("colVals", utf8()), field("on", int64())}),
{std::move(right_batch)})};
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
Declaration asof_join{
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};

ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));

auto exp_batch =
ExecBatchFromJSON({int64(), utf8()}, R"([[1, "Z"], [2, "Z"], [3, "B"]])");
AssertExecBatchesEqualIgnoringOrder(result.schema, {exp_batch}, result.batches);
}

} // namespace acero
} // namespace arrow

0 comments on commit ea3b24c

Please sign in to comment.