Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/clj_kafka_x/consumers/simple.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
(:require [clj-kafka-x.data :refer :all]
[clj-kafka-x.impl.helpers :refer :all])
(:import java.util.regex.Pattern
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback]
[org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback ConsumerConfig]
[org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer]
org.apache.kafka.common.TopicPartition
(java.util Map Collection)
(java.time Duration)))

(def ^:private config-def (ConsumerConfig/configDef))

(defn string-deserializer [] (StringDeserializer.))
(defn byte-array-deserializer [] (ByteArrayDeserializer.))
Expand Down Expand Up @@ -48,9 +49,9 @@

"
([^Map config]
(KafkaConsumer. (safe-config config)))
(KafkaConsumer. ^Map (coerce-config config-def config)))
([^Map config ^Deserializer key-deserializer ^Deserializer value-deserializer]
(KafkaConsumer. (safe-config config) key-deserializer value-deserializer)))
(KafkaConsumer. ^Map (coerce-config config-def config) key-deserializer value-deserializer)))


(defn subscribe
Expand Down
102 changes: 25 additions & 77 deletions src/clj_kafka_x/impl/helpers.clj
Original file line number Diff line number Diff line change
@@ -1,82 +1,30 @@
(ns clj-kafka-x.impl.helpers
(:import (clojure.lang MapEntry IRecord IMapEntry)
(java.util Map)))
(:import (java.util Arrays)
(org.apache.kafka.common.config ConfigDef ConfigDef$ConfigKey)))

(defn- walk
[inner outer form]
(cond
(list? form) (outer (apply list (map inner form)))
(instance? IMapEntry form)
(outer (MapEntry/create (inner (key form)) (inner (val form))))
(seq? form) (outer (doall (map inner form)))
(instance? IRecord form)
(outer (reduce (fn [r x] (conj r (inner x))) form form))
(coll? form) (outer (into (empty form) (map inner form)))
:else (outer form)))

(defn- postwalk
[f form]
(walk (partial postwalk f) f form))
(defn- coerce-value [t v]
(case t
:SHORT (when (number? v) (short v))
:INT (when (number? v) (int v))
:LONG (when (number? v) (long v))
:DOUBLE (when (number? v) (double v))
:LIST (when (coll? v)
(->> v
(into-array String)
Arrays/asList))
nil))

(defn- stringify-keys
[m]
(let [f (fn [[k v]] (if (keyword? k) [(name k) v] [k v]))]
(postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) m)))
(defn- coerce-config-entry [config-def [k v]]
(let [k (cond-> k keyword? name)]
(or
(when-let [^ConfigDef$ConfigKey key (get config-def k)]
(let [t (-> (.type key) str keyword)]
(when-let [v (coerce-value t v)]
[k v])))
[k v])))

(defn update-in-when
[m k f & args]
(if (not= ::not-found (get-in m k ::not-found))
(apply update-in m k f args)
m))

(defn- short! [v]
(if (number? v)
(short v)
(condp instance? v
String (try
(-> v str Short/parseShort)
(catch NumberFormatException _ nil))
nil)))

(defn- int! [v]
(if (number? v)
(int v)
(condp instance? v
String (try
(-> v str Integer/parseInt)
(catch NumberFormatException _ nil))
nil)))

(defn safe-config ^Map [^Map config]
(if (map? config)
(-> config
stringify-keys
;; common
(update-in-when ["request.timeout.ms"] int!)
(update-in-when ["receive.buffer.bytes"] int!)
(update-in-when ["metrics.num.samples"] int!)
;; producer
(update-in-when ["send.buffer.bytes"] int!)
(update-in-when ["retries"] int!)
(update-in-when ["batch.size"] int!)
(update-in-when ["delivery.timeout.ms"] int!)
(update-in-when ["max.request.size"] int!)
(update-in-when ["max.in.flight.requests.per.connection"] int!)
(update-in-when ["sasl.login.connect.timeout.ms"] int!)
(update-in-when ["sasl.login.read.timeout.ms"] int!)
(update-in-when ["sasl.login.refresh.buffer.seconds"] short!)
(update-in-when ["sasl.login.refresh.min.period.seconds"] short!)
(update-in-when ["sasl.oauthbearer.clock.skew.seconds"] int!)
(update-in-when ["transaction.timeout.ms"] int!)
;; consumer
(update-in-when ["fetch.min.bytes"] int!)
(update-in-when ["fetch.max.bytes"] int!)
(update-in-when ["fetch.max.wait.ms"] int!)
(update-in-when ["heartbeat.interval.ms"] int!)
(update-in-when ["max.partition.fetch.bytes"] int!)
(update-in-when ["session.timeout.ms"] int!)
(update-in-when ["max.poll.interval.ms"] int!)
(update-in-when ["max.poll.records"] int!)
(update-in-when ["auto.commit.interval.ms"] int!)
(update-in-when ["fetch.max.wait.ms"] int!))
{}))
(defn coerce-config [^ConfigDef def conf]
(->> conf
(map (partial coerce-config-entry (into {} (.configKeys def))))
(into {})))
13 changes: 5 additions & 8 deletions src/clj_kafka_x/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
(:refer-clojure :exclude [send flush])
(:require [clj-kafka-x.data :refer :all]
[clj-kafka-x.impl.helpers :refer :all])
(:import [java.util.concurrent Future TimeUnit]
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord]
(:import [java.util.concurrent Future]
[org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord ProducerConfig]
(org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer)
(java.util Map)
(java.time Duration)))


(def ^:private config-def (ProducerConfig/configDef))

(defn- map-future-val
[^Future fut f]
Expand All @@ -23,8 +23,6 @@
(isCancelled [_] (.isCancelled fut))
(isDone [_] (.isDone fut))))



(defn string-serializer [] (StringSerializer.))
(defn byte-array-serializer [] (ByteArraySerializer.))

Expand Down Expand Up @@ -56,11 +54,10 @@
(-> (send p (record \"topic-a\" \"Hello World\"))
(.get)))
"

([^Map config]
(KafkaProducer. (safe-config config)))
(KafkaProducer. ^Map (coerce-config config-def config)))
([^Map config ^Serializer key-serializer ^Serializer value-serializer]
(KafkaProducer. (safe-config config) key-serializer value-serializer)))
(KafkaProducer. ^Map (coerce-config config-def config) key-serializer value-serializer)))

(defn record
"Return a record that can be published to Kafka using [[send]]."
Expand Down