Skip to content

Commit

Permalink
minor: factored duplicated assertion, refined doc, fixed indentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Francois committed Jan 20, 2020
1 parent 876451e commit bd07134
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 31 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ Kinsky provides the following:
**JSON**, **EDN** and a **keyword** serializer for keys.
- Documentation

The `core.async` facade that used to be part of this project is now
[a separate project](https://github.com/fmjrey/kinsky-async).

## Usage

```clojure
Expand Down Expand Up @@ -50,6 +53,7 @@ Thanks a lot to these awesome contributors
### 0.1.25

- Removal of the asynchronous façade, transducers should suffice
- Minor typo and code refactoring

### 0.1.24

Expand Down
56 changes: 25 additions & 31 deletions src/kinsky/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@
"Subscribe to a topic or list of topics.
The topics argument can be:
- A simple string when subscribing to a single topic
- A simple string or keyword when subscribing to a single topic
- A regex pattern to subscribe to matching topics
- A sequence of strings
- A collection of strings or keywords
The optional listener argument is either a callback
function or an implementation of
Expand Down Expand Up @@ -404,9 +404,17 @@
:by-partition (group-by (juxt :topic :partition) edc)}))

(defn ->topics
"Yield a valid object for subscription"
"Yield a valid topic object for subscription given a string, keyword,
regex pattern or collection of strings or keywords."
^Collection
[topics]
(assert (or (string? topics)
(keyword? topics)
(instance? Pattern topics)
(and (instance? Collection topics)
(every? (some-fn string? keyword?) topics)))
(str "topics argument must be a string, keyword, regex pattern or "
"collection of strings or keywords, received " topics))
(cond
(keyword? topics) [(name topics)]
(string? topics) [topics]
Expand All @@ -415,8 +423,8 @@
:else (throw (ex-info "topics argument is invalid" {:topics topics}))))

(defn consumer->driver
"Given a consumer-driver and an optional callback to callback
to call when stopping, yield a consumer driver.
"Given a KafkaConsumer and an optional callback to call when stopping,
yield a consumer driver.
The consumer driver implements the following protocols:
Expand Down Expand Up @@ -453,22 +461,8 @@
(.resume consumer
(map ->topic-partition topic-partitions)))
(subscribe! [this topics]
(assert (or (string? topics)
(keyword? topics)
(instance? Pattern topics)
(and (instance? Collection topics)
(every? (some-fn string? keyword?) topics)))
(str "topic argument must be a string, keyword, regex pattern or "
"collection of strings or keywords."))
(.subscribe consumer (->topics topics)))
(subscribe! [this topics listener]
(assert (or (string? topics)
(keyword? topics)
(instance? Pattern topics)
(and (instance? Collection topics)
(every? (some-fn string? keyword?) topics)))
(str "topic argument must be a string, keyword, regex pattern or "
"collection of strings or keywords."))
(.subscribe consumer (->topics topics) (rebalance-listener listener)))
(unsubscribe! [this]
(.unsubscribe consumer))
Expand All @@ -481,17 +475,17 @@
(map (juxt ->topic-partition ->offset-metadata))
(reduce merge {}))]
(.commitSync consumer ^Map offsets)))
(seek! [this topic-partition offset]
(.seek consumer
(->topic-partition topic-partition)
(long offset)))
(position! [this topic-partition]
(.position consumer (->topic-partition topic-partition)))
(seek! [this topic-partition offset]
(.seek consumer
(->topic-partition topic-partition)
(long offset)))
(position! [this topic-partition]
(.position consumer (->topic-partition topic-partition)))
(subscription [this]
(.subscription consumer))
GenericDriver
(close! [this]
(.close consumer))
GenericDriver
(close! [this]
(.close consumer))
MetadataDriver
(partitions-for [this topic]
(mapv partition-info->data (.partitionsFor consumer topic)))
Expand Down Expand Up @@ -524,8 +518,9 @@
(map ->header headers))

(defn ->record
"Build a producer record from a clojure map. Leave ProducerRecord instances
untouched."
"Build a ProducerRecord from a clojure map.
Leave ProducerRecord instances untouched."
^ProducerRecord
[payload]
(if (instance? ProducerRecord payload)
payload
Expand Down Expand Up @@ -567,7 +562,6 @@
(send! [this topic k v]
(.send producer (->record {:key k :value v :topic topic})))
(send! [this topic k v headers]
"Defaults partition and timestamp to 0, if you need to set either, use the single arity version"
(.send producer (->record {:key k :value v :topic topic :headers headers})))
(flush! [this]
(.flush producer))
Expand Down

0 comments on commit bd07134

Please sign in to comment.