Skip to content

Commit 0a92b41

Browse files
committed
Create base Join tests
1 parent 77d4136 commit 0a92b41

File tree

3 files changed

+39
-65
lines changed

3 files changed

+39
-65
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import pytest
2+
3+
from quixstreams.dataframe.joins.base import Join
4+
from quixstreams.models.topics.exceptions import TopicPartitionsMismatch
5+
6+
7+
def test_how_invalid_value():
8+
with pytest.raises(ValueError, match='Invalid "how" value'):
9+
Join(how="invalid", on_merge="raise", grace_ms=1)
10+
11+
12+
def test_on_merge_invalid_value():
13+
with pytest.raises(ValueError, match='Invalid "on_merge"'):
14+
Join(how="inner", on_merge="invalid", grace_ms=1)
15+
16+
17+
def test_mismatching_partitions_fails(topic_manager_topic_factory, create_sdf):
18+
left_topic = topic_manager_topic_factory()
19+
right_topic = topic_manager_topic_factory(partitions=2)
20+
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
21+
join = Join(how="inner", on_merge="raise", grace_ms=1)
22+
23+
with pytest.raises(TopicPartitionsMismatch):
24+
join.join(left_sdf, right_sdf)
25+
26+
27+
def test_self_join_not_supported(topic_manager_topic_factory, create_sdf):
28+
match = "Joining dataframes originating from the same topic is not yet supported."
29+
join = Join(how="inner", on_merge="raise", grace_ms=1)
30+
31+
# The very same sdf object
32+
sdf = create_sdf(topic_manager_topic_factory())
33+
with pytest.raises(ValueError, match=match):
34+
join.join(sdf, sdf)
35+
36+
# Same topic, different branch
37+
sdf2 = sdf.apply(lambda _: _)
38+
with pytest.raises(ValueError, match=match):
39+
join.join(sdf, sdf2)

tests/test_quixstreams/test_dataframe/test_joins/test_join_asof.py

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import pytest
44

5-
from quixstreams.models.topics.exceptions import TopicPartitionsMismatch
65
from quixstreams.state.exceptions import StoreAlreadyRegisteredError
76

87

@@ -71,25 +70,6 @@ def test_how(
7170
)
7271
assert joined_value == expected
7372

74-
def test_how_invalid_value(self, topic_manager_topic_factory, create_sdf):
75-
left_topic = topic_manager_topic_factory()
76-
right_topic = topic_manager_topic_factory()
77-
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
78-
79-
match = 'Invalid "how" value'
80-
with pytest.raises(ValueError, match=match):
81-
left_sdf.join_asof(right_sdf, how="invalid")
82-
83-
def test_mismatching_partitions_fails(
84-
self, topic_manager_topic_factory, create_sdf
85-
):
86-
left_topic = topic_manager_topic_factory()
87-
right_topic = topic_manager_topic_factory(partitions=2)
88-
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
89-
90-
with pytest.raises(TopicPartitionsMismatch):
91-
left_sdf.join_asof(right_sdf)
92-
9373
@pytest.mark.parametrize(
9474
"on_merge, right, left, expected",
9575
[
@@ -165,15 +145,6 @@ def test_on_merge(
165145
)
166146
assert joined_value == [(expected, b"key", 2, None)]
167147

168-
def test_on_merge_invalid_value(self, topic_manager_topic_factory, create_sdf):
169-
left_topic = topic_manager_topic_factory()
170-
right_topic = topic_manager_topic_factory()
171-
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
172-
173-
match = 'Invalid "on_merge"'
174-
with pytest.raises(ValueError, match=match):
175-
left_sdf.join_asof(right_sdf, on_merge="invalid")
176-
177148
def test_on_merge_callback(
178149
self, topic_manager_topic_factory, create_sdf, assign_partition, publish
179150
):
@@ -223,22 +194,6 @@ def test_grace_ms(
223194
assert publish_left(timestamp=4) == []
224195
assert publish_left(timestamp=5) == [({"left": 4, "right": 2}, b"key", 5, None)]
225196

226-
def test_self_join_not_supported(self, topic_manager_topic_factory, create_sdf):
227-
topic = topic_manager_topic_factory()
228-
match = (
229-
"Joining dataframes originating from the same topic is not yet supported."
230-
)
231-
232-
# The very same sdf object
233-
sdf = create_sdf(topic)
234-
with pytest.raises(ValueError, match=match):
235-
sdf.join_asof(sdf)
236-
237-
# Same topic, different branch
238-
sdf2 = sdf.apply(lambda v: v)
239-
with pytest.raises(ValueError, match=match):
240-
sdf.join_asof(sdf2)
241-
242197
def test_join_same_topic_multiple_times_fails(
243198
self, topic_manager_topic_factory, create_sdf
244199
):

tests/test_quixstreams/test_dataframe/test_joins/test_join_interval.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import pytest
66

77
from quixstreams.dataframe.joins.base import OnOverlap
8-
from quixstreams.models.topics.exceptions import TopicPartitionsMismatch
98

109

1110
@dataclass
@@ -274,22 +273,3 @@ def test_backward_ms_greater_than_grace_ms(
274273
match="The backward_ms must not be greater than the grace_ms to avoid losing data.",
275274
):
276275
left_sdf.join_interval(right_sdf, grace_ms=1, backward_ms=2)
277-
278-
def test_how_invalid_value(self, topic_manager_topic_factory, create_sdf):
279-
left_topic = topic_manager_topic_factory()
280-
right_topic = topic_manager_topic_factory()
281-
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
282-
283-
match = 'Invalid "how" value'
284-
with pytest.raises(ValueError, match=match):
285-
left_sdf.join_interval(right_sdf, how="invalid")
286-
287-
def test_mismatching_partitions_fails(
288-
self, topic_manager_topic_factory, create_sdf
289-
):
290-
left_topic = topic_manager_topic_factory()
291-
right_topic = topic_manager_topic_factory(partitions=2)
292-
left_sdf, right_sdf = create_sdf(left_topic), create_sdf(right_topic)
293-
294-
with pytest.raises(TopicPartitionsMismatch):
295-
left_sdf.join_interval(right_sdf)

0 commit comments

Comments
 (0)