Skip to content

Commit

Permalink
decouple producers and topics
Browse files Browse the repository at this point in the history
  • Loading branch information
tolitius committed Jul 28, 2020
1 parent 3faaa4e commit d873581
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 14 deletions.
28 changes: 28 additions & 0 deletions CHANGELOG.md
@@ -0,0 +1,28 @@
# 0.1.3

- (!) producers are now created without a topic, so topics can be specified on send!

starting from `0.1.3`:

```cloure
=> (def p (g/producer (get-in config [:kafka :producer])))
;; send a couple of messages to topics: "foos" and "bars"
=> (g/send! p "foos" "{:answer 42}")
=> (g/send! p "bars" "{:answer 42}")
=> (g/close p)
```

before:

grete/producer returned a map of `{:producer .. :topic ..}`:

```cloure
=> (def p (g/producer "foos" (get-in config [:kafka :producer])))
=> (g/send! p "{:answer 42}")
=> (g/send! p "{:answer 42}")
=> (g/close p)
```
18 changes: 13 additions & 5 deletions README.md
@@ -1,15 +1,22 @@
## grete
# grete

is [gregor](https://github.com/tolitius/grete/blob/master/src/grete/gregor.clj#L2)'s sister that adds a threadpool and a scheduler

[![<! release](https://img.shields.io/badge/dynamic/json.svg?label=release&url=https%3A%2F%2Fclojars.org%2Ftolitius%2Fgrete%2Flatest-version.json&query=version&colorB=blue)](https://github.com/tolitius/grete/releases)
[![<! clojars](https://img.shields.io/clojars/v/tolitius/grete.svg)](https://clojars.org/tolitius/grete)
[![<! clojars>](https://img.shields.io/clojars/v/tolitius/grete.svg)](https://clojars.org/tolitius/grete)

... and some Java API<br/>
... and the latest kafka (at the moment of writing)

the idea behind `grete` is to be able to start a farm of kafka consumers that listen to (potentially) multiple topics and apply a simple consuming function.

- [spilling the beans](#spilling-the-beans)
- [produce](#produce)
- [consume](#consume)
- [Java API](#java-api)
- [several topics at once](#several-topics-at-once)
- [License](#license)

## spilling the beans

```clojure
Expand Down Expand Up @@ -39,10 +46,11 @@ hence we'll use one config for producing and consuming:
produce a couple of messages (to `foos` topic):

```clojure
=> (def p (g/producer "foos" (get-in config [:kafka :producer])))
=> (def p (g/producer (get-in config [:kafka :producer])))

=> (g/send! p "{:answer 42}")
=> (g/send! p "{:answer 42}")
;; send a couple of messages to topics: "foos" and "bars"
=> (g/send! p "foos" "{:answer 42}")
=> (g/send! p "bars" "{:answer 42}")

=> (g/close p)
```
Expand Down
2 changes: 1 addition & 1 deletion build.boot
@@ -1,4 +1,4 @@
(def +version+ "0.1.2")
(def +version+ "0.1.3")

(set-env!
:source-paths #{"src"}
Expand Down
17 changes: 9 additions & 8 deletions src/grete/core.clj
Expand Up @@ -22,22 +22,23 @@
(when-not (gregor/topic-exists? zoo name)
(gregor/create-topic zoo name topic))))

(defn producer [topic {:keys [bootstrap-servers] :as conf}]
(defn producer [{:keys [bootstrap-servers] :as conf}]
(let [serializers (-> (select-keys conf
[:key-serializer :value-serializer])
to-props)]
; (create-topic topic conf)
{:producer (gregor/producer bootstrap-servers
serializers)
:topic topic}))
(gregor/producer bootstrap-servers
serializers)))

(defn send!
([{:keys [producer topic]} msg]
"dummy gregor send wrap to:
1. give it a '!'
2. avoid requiring another ns to 'send'"
([producer topic msg]
(gregor/send producer topic msg))
([{:keys [producer topic]} key msg]
([producer topic key msg]
(gregor/send producer topic key msg)))

(defn close [{:keys [producer]}]
(defn close [producer]
(gregor/close producer))

(defn- edn-to-consumer [{:keys [bootstrap-servers
Expand Down

0 comments on commit d873581

Please sign in to comment.