-
Notifications
You must be signed in to change notification settings - Fork 0
/
enriched-data-from-another-stream.sql
102 lines (88 loc) · 2.28 KB
/
enriched-data-from-another-stream.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
-- to consume everything from the query
SET 'auto.offset.reset' = 'earliest';
-- input stream A and B
CREATE STREAM stream_a(
value1 string,
value2 string
) WITH (
kafka_topic = 'topic_a',
value_format = 'json',
partitions = 1
);
CREATE STREAM stream_b(
value1 string,
value2 string,
value3 string,
value4 string
) WITH (
kafka_topic = 'topic_b',
value_format = 'json',
partitions = 1
);
-- create another stream with key concatenated
CREATE STREAM stream_a_concat AS
SELECT
a.value1 + '-' + a.value2 as a_key,
a.value1,
a.value2
FROM stream_a a;
CREATE STREAM stream_b_concat AS
SELECT
b.value1 + '-' + b.value2 as b_key,
b.value3,
b.value4
FROM stream_b b;
-- repartition by key
CREATE STREAM stream_a_key AS
SELECT *
FROM stream_a_concat ac
PARTITION BY ac.a_key;
CREATE STREAM stream_b_key AS
SELECT *
FROM stream_b_concat bc
PARTITION BY bc.b_key;
-- join the result
CREATE STREAM stream_final_null AS
SELECT
ak.a_key,
ak.value1,
ak.value2,
bk.value3,
bk.value4
FROM stream_a_key ak
LEFT JOIN stream_b_key bk
WITHIN 30 DAYS
ON ak.a_key = bk.b_key;
CREATE STREAM stream_final_not_null AS
SELECT
ak.a_key,
ak.value1,
ak.value2,
bk.value3,
bk.value4
FROM stream_a_key ak
LEFT OUTER JOIN stream_b_key bk
WITHIN 30 DAYS
-- GRACE PERIOD 1 DAYS
ON ak.a_key = bk.b_key
WHERE
bk.value3 is not null;
-- In another ksql cli checks the other stream
SET 'auto.offset.reset' = 'earliest';
SELECT * FROM stream_final_null EMIT CHANGES;
-- In another ksql cli checks the other stream
SET 'auto.offset.reset' = 'earliest';
SELECT * FROM stream_final_not_null EMIT CHANGES;
-- input data
INSERT INTO stream_a VALUES ('v1', 'v1');
INSERT INTO stream_b VALUES ('v1', 'v1', 'v1-c', 'v1-d');
INSERT INTO stream_b VALUES ('v2', 'v2', 'v2-c', 'v2-d');
INSERT INTO stream_a VALUES ('v2', 'v2');
INSERT INTO stream_a VALUES ('v3', 'v3');
INSERT INTO stream_b VALUES ('v4', 'v4', 'v4-c', 'v4-d');
INSERT INTO stream_b VALUES ('v5', 'v5', 'v5-c', 'v5-d');
INSERT INTO stream_a VALUES ('v5', 'v5');
INSERT INTO stream_a VALUES ('v6', 'v6');
INSERT INTO stream_a VALUES ('v7', 'v7');
INSERT INTO stream_b VALUES ('v6', 'v6', 'v6-c', 'v6-d');
INSERT INTO stream_b VALUES ('v8', 'v8', 'v8-c', 'v8-d');