From 8357db02b1e0a3c2d18be39798db2b3b28e819b2 Mon Sep 17 00:00:00 2001 From: Oleg Noga Date: Wed, 20 Jul 2022 18:21:52 +0300 Subject: [PATCH] coerce config values with config definition --- src/clj_kafka_x/consumers/simple.clj | 7 +- src/clj_kafka_x/impl/helpers.clj | 102 +++++++-------------------- src/clj_kafka_x/producer.clj | 13 ++-- 3 files changed, 34 insertions(+), 88 deletions(-) diff --git a/src/clj_kafka_x/consumers/simple.clj b/src/clj_kafka_x/consumers/simple.clj index 854345a..879b418 100644 --- a/src/clj_kafka_x/consumers/simple.clj +++ b/src/clj_kafka_x/consumers/simple.clj @@ -5,12 +5,13 @@ (:require [clj-kafka-x.data :refer :all] [clj-kafka-x.impl.helpers :refer :all]) (:import java.util.regex.Pattern - [org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback] + [org.apache.kafka.clients.consumer ConsumerRebalanceListener Consumer KafkaConsumer OffsetCommitCallback ConsumerConfig] [org.apache.kafka.common.serialization ByteArrayDeserializer Deserializer StringDeserializer] org.apache.kafka.common.TopicPartition (java.util Map Collection) (java.time Duration))) +(def ^:private config-def (ConsumerConfig/configDef)) (defn string-deserializer [] (StringDeserializer.)) (defn byte-array-deserializer [] (ByteArrayDeserializer.)) @@ -48,9 +49,9 @@ " ([^Map config] - (KafkaConsumer. (safe-config config))) + (KafkaConsumer. ^Map (coerce-config config-def config))) ([^Map config ^Deserializer key-deserializer ^Deserializer value-deserializer] - (KafkaConsumer. (safe-config config) key-deserializer value-deserializer))) + (KafkaConsumer. ^Map (coerce-config config-def config) key-deserializer value-deserializer))) (defn subscribe diff --git a/src/clj_kafka_x/impl/helpers.clj b/src/clj_kafka_x/impl/helpers.clj index ec2c2ad..aea1ad6 100644 --- a/src/clj_kafka_x/impl/helpers.clj +++ b/src/clj_kafka_x/impl/helpers.clj @@ -1,82 +1,30 @@ (ns clj-kafka-x.impl.helpers - (:import (clojure.lang MapEntry IRecord IMapEntry) - (java.util Map))) + (:import (java.util Arrays) + (org.apache.kafka.common.config ConfigDef ConfigDef$ConfigKey))) -(defn- walk - [inner outer form] - (cond - (list? form) (outer (apply list (map inner form))) - (instance? IMapEntry form) - (outer (MapEntry/create (inner (key form)) (inner (val form)))) - (seq? form) (outer (doall (map inner form))) - (instance? IRecord form) - (outer (reduce (fn [r x] (conj r (inner x))) form form)) - (coll? form) (outer (into (empty form) (map inner form))) - :else (outer form))) -(defn- postwalk - [f form] - (walk (partial postwalk f) f form)) +(defn- coerce-value [t v] + (case t + :SHORT (when (number? v) (short v)) + :INT (when (number? v) (int v)) + :LONG (when (number? v) (long v)) + :DOUBLE (when (number? v) (double v)) + :LIST (when (coll? v) + (->> v + (into-array String) + Arrays/asList)) + nil)) -(defn- stringify-keys - [m] - (let [f (fn [[k v]] (if (keyword? k) [(name k) v] [k v]))] - (postwalk (fn [x] (if (map? x) (into {} (map f x)) x)) m))) +(defn- coerce-config-entry [config-def [k v]] + (let [k (cond-> k keyword? name)] + (or + (when-let [^ConfigDef$ConfigKey key (get config-def k)] + (let [t (-> (.type key) str keyword)] + (when-let [v (coerce-value t v)] + [k v]))) + [k v]))) -(defn update-in-when - [m k f & args] - (if (not= ::not-found (get-in m k ::not-found)) - (apply update-in m k f args) - m)) - -(defn- short! [v] - (if (number? v) - (short v) - (condp instance? v - String (try - (-> v str Short/parseShort) - (catch NumberFormatException _ nil)) - nil))) - -(defn- int! [v] - (if (number? v) - (int v) - (condp instance? v - String (try - (-> v str Integer/parseInt) - (catch NumberFormatException _ nil)) - nil))) - -(defn safe-config ^Map [^Map config] - (if (map? config) - (-> config - stringify-keys - ;; common - (update-in-when ["request.timeout.ms"] int!) - (update-in-when ["receive.buffer.bytes"] int!) - (update-in-when ["metrics.num.samples"] int!) - ;; producer - (update-in-when ["send.buffer.bytes"] int!) - (update-in-when ["retries"] int!) - (update-in-when ["batch.size"] int!) - (update-in-when ["delivery.timeout.ms"] int!) - (update-in-when ["max.request.size"] int!) - (update-in-when ["max.in.flight.requests.per.connection"] int!) - (update-in-when ["sasl.login.connect.timeout.ms"] int!) - (update-in-when ["sasl.login.read.timeout.ms"] int!) - (update-in-when ["sasl.login.refresh.buffer.seconds"] short!) - (update-in-when ["sasl.login.refresh.min.period.seconds"] short!) - (update-in-when ["sasl.oauthbearer.clock.skew.seconds"] int!) - (update-in-when ["transaction.timeout.ms"] int!) - ;; consumer - (update-in-when ["fetch.min.bytes"] int!) - (update-in-when ["fetch.max.bytes"] int!) - (update-in-when ["fetch.max.wait.ms"] int!) - (update-in-when ["heartbeat.interval.ms"] int!) - (update-in-when ["max.partition.fetch.bytes"] int!) - (update-in-when ["session.timeout.ms"] int!) - (update-in-when ["max.poll.interval.ms"] int!) - (update-in-when ["max.poll.records"] int!) - (update-in-when ["auto.commit.interval.ms"] int!) - (update-in-when ["fetch.max.wait.ms"] int!)) - {})) +(defn coerce-config [^ConfigDef def conf] + (->> conf + (map (partial coerce-config-entry (into {} (.configKeys def)))) + (into {}))) diff --git a/src/clj_kafka_x/producer.clj b/src/clj_kafka_x/producer.clj index 78979cb..bba5e33 100644 --- a/src/clj_kafka_x/producer.clj +++ b/src/clj_kafka_x/producer.clj @@ -5,13 +5,13 @@ (:refer-clojure :exclude [send flush]) (:require [clj-kafka-x.data :refer :all] [clj-kafka-x.impl.helpers :refer :all]) - (:import [java.util.concurrent Future TimeUnit] - [org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord] + (:import [java.util.concurrent Future] + [org.apache.kafka.clients.producer Callback Producer KafkaProducer ProducerRecord ProducerConfig] (org.apache.kafka.common.serialization Serializer ByteArraySerializer StringSerializer) (java.util Map) (java.time Duration))) - +(def ^:private config-def (ProducerConfig/configDef)) (defn- map-future-val [^Future fut f] @@ -23,8 +23,6 @@ (isCancelled [_] (.isCancelled fut)) (isDone [_] (.isDone fut)))) - - (defn string-serializer [] (StringSerializer.)) (defn byte-array-serializer [] (ByteArraySerializer.)) @@ -56,11 +54,10 @@ (-> (send p (record \"topic-a\" \"Hello World\")) (.get))) " - ([^Map config] - (KafkaProducer. (safe-config config))) + (KafkaProducer. ^Map (coerce-config config-def config))) ([^Map config ^Serializer key-serializer ^Serializer value-serializer] - (KafkaProducer. (safe-config config) key-serializer value-serializer))) + (KafkaProducer. ^Map (coerce-config config-def config) key-serializer value-serializer))) (defn record "Return a record that can be published to Kafka using [[send]]."