Skip to content

Commit

Permalink
fix join global
Browse files Browse the repository at this point in the history
  • Loading branch information
yijiezhen committed Apr 7, 2021
1 parent ff13973 commit 6281510
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
6 changes: 4 additions & 2 deletions src/jackdaw/streams.clj
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,10 @@
(p/transform-values kstream value-transformer-supplier-fn state-store-names)))

(defn join-global
[kstream global-ktable kv-mapper joiner]
(p/join-global kstream global-ktable kv-mapper joiner))
([kstream global-ktable kv-mapper joiner]
(p/join-global kstream global-ktable kv-mapper joiner))
([kstream global-ktable kv-mapper joiner join-name]
(p/join-global kstream global-ktable kv-mapper joiner join-name)))

(defn left-join-global
([kstream global-ktable kv-mapper joiner]
Expand Down
32 changes: 19 additions & 13 deletions src/jackdaw/streams/configured.clj
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
(global-ktable
[_ topic-config]
(configured-global-ktable
config
(global-ktable streams-builder topic-config)))
config
(global-ktable streams-builder topic-config)))

(source-topics
[_]
Expand Down Expand Up @@ -81,8 +81,8 @@
(left-join kstream ktable value-joiner-fn)))

(left-join
[_ ktable value-joiner-fn topic-config other-topic-config]
(configured-kstream
[_ ktable value-joiner-fn topic-config other-topic-config]
(configured-kstream
config
(left-join kstream ktable value-joiner-fn topic-config other-topic-config)))

Expand Down Expand Up @@ -250,9 +250,9 @@
(merge
[_ other-kstream]
(configured-kstream
config
(merge kstream
other-kstream)))
config
(merge kstream
other-kstream)))

(outer-join-windowed
[_ other-kstream value-joiner-fn windows]
Expand Down Expand Up @@ -297,7 +297,7 @@
(select-key kstream key-value-mapper-fn)))

(transform
[this transformer-supplier-fn]
[this transformer-supplier-fn]
(transform this transformer-supplier-fn []))

(transform
Expand All @@ -307,7 +307,7 @@
(transform kstream transformer-supplier-fn state-store-names)))

(transform-values
[this value-transformer-supplier-fn]
[this value-transformer-supplier-fn]
(transform-values this value-transformer-supplier-fn []))

(transform-values
Expand All @@ -319,8 +319,8 @@
(left-join-global
[_ global-ktable kv-mapper joiner]
(configured-kstream
config
(left-join-global kstream global-ktable kv-mapper joiner)))
config
(left-join-global kstream global-ktable kv-mapper joiner)))

(left-join-global
[_ global-ktable kv-mapper joiner join-name]
Expand All @@ -331,8 +331,14 @@
(join-global
[_ global-ktable kv-mapper joiner]
(configured-kstream
config
(join-global kstream global-ktable kv-mapper joiner)))
config
(join-global kstream global-ktable kv-mapper joiner)))

(join-global
[_ global-ktable kv-mapper joiner]
(configured-kstream
config
(join-global kstream global-ktable kv-mapper joiner join-name)))

(kstream* [_]
(kstream* kstream))
Expand Down
11 changes: 10 additions & 1 deletion src/jackdaw/streams/interop.clj
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,15 @@
^KeyValueMapper (select-key-value-mapper key-value-mapper-fn)
^ValueJoiner (value-joiner joiner-fn))))

(join-global
[_ global-ktable key-value-mapper-fn joiner-fn join-name]
(clj-kstream
(.join kstream
^GlobalKTable (global-ktable* global-ktable)
^KeyValueMapper (select-key-value-mapper key-value-mapper-fn)
^ValueJoiner (value-joiner joiner-fn)
(Named/as join-name))))

(left-join-global
[_ global-ktable key-value-mapper-fn joiner-fn]
(clj-kstream
Expand Down Expand Up @@ -486,7 +495,7 @@
(suppress
[_ suppress-config]
(clj-ktable
(.suppress ^KTable ktable (suppress-config->suppressed suppress-config))))
(.suppress ^KTable ktable (suppress-config->suppressed suppress-config))))

(to-kstream
[_]
Expand Down
3 changes: 2 additions & 1 deletion src/jackdaw/streams/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@
to each value in the input stream.")

(join-global
[kstream global-ktable kv-mapper joiner])
[kstream global-ktable kv-mapper joiner]
[kstream global-ktable kv-mapper joiner join-name])

(left-join-global
[kstream global-ktable kv-mapper joiner]
Expand Down
3 changes: 2 additions & 1 deletion src/jackdaw/streams/specs.clj
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@
:args (s/cat :kstream kstream?
:global-ktable global-ktable?
:kv-mapper ifn?
:joiner ifn?)
:joiner ifn?
:join-name (s/? string?))
:ret kstream?)

(s/fdef k/left-join-global
Expand Down

0 comments on commit 6281510

Please sign in to comment.