feat(join): unify async_execute into N-way streaming MJoin [ENG-301]#101
feat(join): unify async_execute into N-way streaming MJoin [ENG-301]#101eywalker merged 2 commits intonauticalab:devfrom
Conversation
…ctness fixes for suffix collisions and system tag sorting
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
This PR refactors Join.async_execute from a mixed strategy (2-way streaming + N>2 barrier/static_process) into a unified N-way streaming hash join (MJoin), aiming to emit matches as soon as all sides contribute matching keys and to align system-tag behavior with static_process.
Changes:
- Replace the async join execution path with an N-way streaming MJoin implementation.
- Fix/adjust system-tag suffix canonicalization for duplicate pipeline hashes and add commutativity sorting of system-tag values.
- Expand async operator tests to cover 3-way/4-way joins, early emission, empty inputs, and cross-products.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| tests/test_channels/test_native_async_operators.py | Adds/updates async Join tests for N-way streaming behavior and sync/async equivalence. |
| src/orcapod/core/operators/join.py | Implements N-way streaming MJoin, updates system-tag suffixing and merged system-tag sorting for commutativity. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # All sides represented; compute shared keys as the | ||
| # intersection of all sides' tag key sets | ||
| all_key_sets = [] | ||
| for s in range(n): | ||
| if s == side: | ||
| all_key_sets.append(set(tag.keys())) | ||
| else: | ||
| all_key_sets.append(set(buffers[s][0][0].keys())) | ||
| shared_keys = tuple(sorted(set.intersection(*all_key_sets))) |
There was a problem hiding this comment.
The async MJoin computes shared_keys as the intersection of tag keys across all inputs. This diverges from static_process, which joins each new stream on the keys it shares with the accumulated table (and otherwise falls back to the cartesian _common key). With partially-overlapping tag schemas (e.g., streams with tags {a}, {b}, {a}), static_process enforces a==a between the two {a} sides, but the current async logic will treat shared_keys as empty and produce a full cartesian product. Consider matching static_process semantics by deriving join constraints per key that appears in 2+ inputs (and treating missing keys as unconstrained), rather than requiring keys to be present in every input.
| n = len(inputs) | ||
| suffixes = ( | ||
| self._compute_system_tag_suffixes(input_pipeline_hashes) | ||
| if input_pipeline_hashes is not None | ||
| else [str(i) for i in range(n)] | ||
| ) | ||
| await self._mjoin(inputs, output, suffixes) |
There was a problem hiding this comment.
async_execute accepts input_pipeline_hashes, but there’s no validation that its length matches len(inputs). If it’s shorter/longer, _compute_system_tag_suffixes (and later _merge_rows via suffixes[i]) can raise an IndexError or silently mis-suffix system tags. Consider raising a clear InputValidationError/ValueError when input_pipeline_hashes is not None and len(input_pipeline_hashes) != len(inputs).
| async def _emit_buffered_matches( | ||
| self, | ||
| buffers: list[list[tuple[TagProtocol, PacketProtocol]]], | ||
| indexes: list[dict[tuple, list[int]]], | ||
| shared_keys: tuple[str, ...], | ||
| suffixes: list[str], | ||
| block_sep: str, | ||
| output: WritableChannel[tuple[TagProtocol, PacketProtocol]], | ||
| ) -> None: |
There was a problem hiding this comment.
_emit_buffered_matches takes shared_keys but doesn’t use it. Dropping the unused parameter (and updating the call site) would reduce cognitive overhead and avoid implying that the function depends on it.
| @pytest.mark.asyncio | ||
| async def test_matches_sync_three_way(self): | ||
| """Three-way MJoin must produce the same data as sync static_process.""" | ||
| t1 = pa.table( | ||
| {"id": pa.array([1, 2, 3], type=pa.int64()), "a": pa.array([10, 20, 30], type=pa.int64())} | ||
| ) | ||
| t2 = pa.table( | ||
| {"id": pa.array([1, 2, 3], type=pa.int64()), "b": pa.array([100, 200, 300], type=pa.int64())} | ||
| ) | ||
| t3 = pa.table( | ||
| {"id": pa.array([1, 2, 3], type=pa.int64()), "c": pa.array([1000, 2000, 3000], type=pa.int64())} | ||
| ) |
There was a problem hiding this comment.
The new async MJoin tests cover cases where all inputs share the same join tag(s) (e.g. id) and the fully-disjoint cartesian case, but there’s no test for partially overlapping tag schemas (some sides share a tag key, others don’t). Join.static_process currently constrains equality on keys shared by a subset of streams (due to the iterative join), so adding an async-vs-sync regression test for a case like tags {a}, {b}, {a} would help ensure async semantics stay aligned.
…for correct partially-overlapping tag semantics
|
Changes introduced in commit " fix(join): replace N-way MJoin with staggered pairwise binary joins for correct partially-overlapping tag semantics":
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
| await output.send(merged) | ||
| renamed[new_key] = v | ||
| tag = Tag(tag.as_dict(), system_tags=renamed) |
There was a problem hiding this comment.
_rename_sys_tags reconstructs Tag via Tag(tag.as_dict(), system_tags=renamed). Because Tag.as_dict() defaults to excluding meta/context, this drops any meta columns (and context key) carried by the incoming Tag. This diverges from the sync join path, which preserves meta columns via table joins, and from other streaming operators that preserve datagram metadata. Consider building the new Tag from tag.as_dict(columns={"meta": True, "context": True}) (excluding system tags) or copying the existing Tag and only rewriting its system-tag dict.
| tag = Tag(tag.as_dict(), system_tags=renamed) | |
| tag = Tag( | |
| tag.as_dict(columns={"meta": True, "context": True}), | |
| system_tags=renamed, | |
| ) |
There was a problem hiding this comment.
Re: _rename_sys_tags drops meta/context columns (line 540)
Investigated — this is a false positive. Tag and Packet objects in the async flow are produced by iter_packets(), which constructs them from Arrow record batches that have already had prefixed columns (meta __, source _source_, etc.) separated out by prepare_prefixed_columns. So tag.as_dict() doesn't drop meta columns because the Tag never carried them in the first place.
The sync path preserves meta through Arrow table joins because as_table(columns={"meta": True}) re-materializes them from the separated prefix tables — but this happens at the table/stream layer, not at the Tag/Packet dict layer. Both paths are consistent: meta columns are a table-level concern, not a datagram-level one.
| # Merge packet dicts (non-overlapping by Join's validation) | ||
| merged_pkt_d: dict = {} | ||
| merged_si: dict = {} | ||
| merged_pkt_d.update(left_pkt.as_dict()) | ||
| merged_pkt_d.update(right_pkt.as_dict()) | ||
| merged_si.update(left_pkt.source_info()) | ||
| merged_si.update(right_pkt.source_info()) | ||
|
|
||
| merged_pkt = Packet(merged_pkt_d, source_info=merged_si) | ||
| return merged_tag, merged_pkt |
There was a problem hiding this comment.
In _merge_pair_rename / _merge_pair_passthrough, the merged Packet is constructed from left_pkt.as_dict() + right_pkt.as_dict(). Since Packet.as_dict() excludes meta columns by default, any meta info present on either input Packet is silently dropped in the async join output. The sync static_process joins Arrow tables that include meta columns, so this breaks parity between sync and async for callers that rely on packet meta. Consider explicitly merging/preserving get_meta_info() (and any other requested info) when constructing the merged Packet (and likewise for Tags if needed), handling key collisions consistently with static_process's column-renaming behavior.
There was a problem hiding this comment.
Re: _merge_pair_* drops Packet meta (line 594)
Same finding as the _rename_sys_tags comment — this is a false positive. Packet objects in the async flow never carry meta columns. They're constructed by iter_packets() from Arrow record batches where prepare_prefixed_columns has already separated meta columns into a distinct prefix table. pkt.as_dict() returns exactly the data columns — nothing is lost.
|
|
||
| # Output should be empty — side 3 hasn't contributed yet | ||
| with pytest.raises(asyncio.TimeoutError): | ||
| await asyncio.wait_for(out.reader.receive(), timeout=0.05) |
There was a problem hiding this comment.
This test uses a very small negative-assertion timeout (0.05s) to prove the join doesn't emit before the third side arrives. On slower CI runners or under load, this can be flaky (false failures if the scheduler delays). Consider increasing the timeout and/or using a more deterministic synchronization mechanism (e.g., wait until the join task has consumed the first two sends before asserting no output).
| await asyncio.wait_for(out.reader.receive(), timeout=0.05) | |
| await asyncio.wait_for(out.reader.receive(), timeout=0.5) |
There was a problem hiding this comment.
Fixed — increased timeout from 0.05s to 0.5s.
| await ch1.writer.send((Tag({"id": 2}), Packet({"a": 20}))) | ||
|
|
||
| # Give the event loop a chance to process | ||
| await asyncio.sleep(0.01) |
There was a problem hiding this comment.
await asyncio.sleep(0.01) is used to “give the event loop a chance to process”. Fixed sleeps tend to be flaky and slow tests down unnecessarily. Prefer a deterministic wait (e.g., awaiting a specific output/event, or yielding with await asyncio.sleep(0) if you only need to allow task switching).
| await asyncio.sleep(0.01) | |
| await asyncio.sleep(0) |
There was a problem hiding this comment.
Fixed — changed to await asyncio.sleep(0) for a deterministic event loop yield.
| class TestJoinNativeAsync: | ||
| """Tests for Join.async_execute (symmetric hash join + N>2 barrier).""" | ||
| """Tests for Join.async_execute (N-way streaming MJoin).""" | ||
|
|
There was a problem hiding this comment.
The module/class docstrings refer to a “streaming N-way MJoin” implementation, but Join.async_execute is documented/implemented as a staggered chain of pairwise binary joins (join(join(x, y), z)). This mismatch makes the behavior/algorithm harder to understand when maintaining the tests. Consider renaming the test/docstring wording to match the implemented semantics (or updating the implementation if true N-way MJoin was intended).
There was a problem hiding this comment.
Fixed — updated all MJoin references in module docstring, class docstring, and individual test docstrings to "staggered pairwise streaming join".
| """Async streaming join with pairwise iterative semantics. | ||
|
|
||
| Single input: streams through directly without any buffering. | ||
|
|
||
| Two inputs: symmetric hash join — each arriving row is | ||
| immediately probed against the opposite side's buffer, emitting | ||
| matches as soon as found. System-tag columns are correctly | ||
| renamed using the ``input_pipeline_hashes``. | ||
| Two inputs: binary symmetric hash join — each arriving row is | ||
| probed against the opposite side's buffer, emitting matches as | ||
| soon as found. | ||
|
|
||
| Three or more inputs: collects all inputs concurrently, then | ||
| delegates to ``static_process`` for the Polars N-way join. | ||
| Three or more inputs: staggered pairwise binary joins in | ||
| canonical order — ``join(join(x, y), z)`` — matching | ||
| ``static_process``'s iterative accumulation. Each binary join | ||
| uses the per-pair intersection of tag keys, so partially | ||
| overlapping tag schemas are handled correctly. |
There was a problem hiding this comment.
PR description/title and the test plan describe a unified N-way “MJoin” that indexes/probes across all sides, but the current async implementation is a staggered chain of pairwise joins. If the intent is the staggered iterative semantics (matching static_process), consider adjusting the PR description and/or naming to avoid implying a different algorithm/behavior.
There was a problem hiding this comment.
Fixed — the async_execute docstring was already updated to describe staggered pairwise semantics in this commit. The PR title/description will be updated to match before merge.
The Join operator's async_execute was replaced from a split implementation (2-input streaming / 3+ blocking
barrier) with a unified N-way streaming hash join (MJoin algorithm). Each arriving row is immediately indexed
and probed against all other sides, emitting matches as soon as all N sides have a matching key — so downstream
can start work before any input is fully consumed. The merge logic was generalized from pair-wise to N-way, and
three correctness issues were fixed: duplicate pipeline hashes now get distinct canonical positions in system
tag suffixes, system tag values are sorted for commutativity (matching static_process), and a stale docstring
was corrected. Tests cover 2/3/4-way joins, early emission, partial matches, empty inputs, cartesian products,
and multi-row cross-products.