Skip to content

Commit

Permalink
Merge pull request #83 from macielti/add-dlq-capabilities-to-rebbitmq…
Browse files Browse the repository at this point in the history
…-consumer

Add DQL capabilities to RabbitMQ consumer component
  • Loading branch information
macielti committed Aug 13, 2023
2 parents 0f8042d + 4cadc33 commit e1fbe98
Show file tree
Hide file tree
Showing 40 changed files with 181 additions and 51 deletions.
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ of [keepachangelog.com](http://keepachangelog.com/).

## [Unreleased]

## [20.30.36] - 2023-08-12

### Added

- Implemented integration of RabbitMQ consumer component with Dead Letter Queue
service [Wraith King](https://github.com/macielti/wraith-king).
- Added schema validation for message payloads for RabbitMQ consumer component.

## [19.30.36] - 2023-08-01

### Added
Expand Down Expand Up @@ -470,7 +478,9 @@ of [keepachangelog.com](http://keepachangelog.com/).

- Add `loose-schema` function.

[Unreleased]: https://github.com/macielti/common-clj/compare/v19.30.36...HEAD
[Unreleased]: https://github.com/macielti/common-clj/compare/v20.30.36...HEAD

[19.30.36]: https://github.com/macielti/common-clj/compare/v19.30.36...v20.30.36

[19.30.36]: https://github.com/macielti/common-clj/compare/v19.29.36...v19.30.36

Expand Down
2 changes: 1 addition & 1 deletion docs/common-clj.auth.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.config.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.datalevin.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.datomic.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.helper.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.http-client.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.http.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.kafka.adapters.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.kafka.consumer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.kafka.models.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.kafka.producer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.rabbitmq.consumer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.rabbitmq.producer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.routes.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.service.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.telegram.adapters.update.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.telegram.consumer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.component.telegram.models.consumer.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.error.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.io.interceptors.auth.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.io.interceptors.datalevin.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.io.interceptors.datomic.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.io.interceptors.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.keyword.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.money.converter.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.schema.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.test.helper.auth.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.test.helper.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.test.helper.time.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.time.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.time.parser.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/common-clj.traceability.core.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/index.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/intro.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject net.clojars.macielti/common-clj "19.30.36"
(defproject net.clojars.macielti/common-clj "20.30.36"
:description "Just common Clojure code that I use across projects"
:url "https://github.com/macielti/common-clj"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
Expand Down
18 changes: 10 additions & 8 deletions resources/config_test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
:service-authentication {:auth-server-base-url "https://example.com"
:username "service-name"
:password "random-password"}}
:test {:bootstrap-server "http://localhost:9092"
:service {:host "0.0.0.0"
:port 8000}
:topics ["test.example"]
:rabbitmq-uri "amqp://guest:guest@localhost:5672"
:service-authentication {:auth-server-base-url "https://example.com"
:username "service-name"
:password "random-password"}}}
:test {:bootstrap-server "http://localhost:9092"
:service-name "test-service-name"
:service {:host "0.0.0.0"
:port 8000}
:topics ["test.example"]
:rabbitmq-uri "amqp://guest:guest@localhost:5672"
:dead-letter-queue-service-integration-enabled true
:service-authentication {:auth-server-base-url "https://example.com"
:username "service-name"
:password "random-password"}}}
14 changes: 14 additions & 0 deletions resources/config_test_dead_letter_disabled.edn
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{:prod {:bootstrap-server "http://localhost:9092"
:service-authentication {:auth-server-base-url "https://example.com"
:username "service-name"
:password "random-password"}}
:test {:bootstrap-server "http://localhost:9092"
:service-name "test-service-name"
:service {:host "0.0.0.0"
:port 8000}
:topics ["test.example"]
:rabbitmq-uri "amqp://guest:guest@localhost:5672"
:dead-letter-queue-service-integration-enabled false
:service-authentication {:auth-server-base-url "https://example.com"
:username "service-name"
:password "random-password"}}}
29 changes: 23 additions & 6 deletions src/common_clj/component/rabbitmq/consumer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
[langohr.queue :as lq]
[langohr.consumers :as lc]
[plumbing.core :as plumbing]
[schema.core :as s])
[schema.core :as s]
[taoensso.timbre :as log]
[common-clj.component.rabbitmq.producer :as component.rabbitmq.producer])
(:import (clojure.lang IFn)))

(s/defschema Consumers
{s/Keyword {:handler-fn IFn}})
{s/Keyword {:schema s/Any
:handler-fn IFn}})

(defrecord Consumer [config datomic datalevin http-client rabbitmq-producer consumers]
component/Lifecycle
Expand All @@ -24,18 +27,32 @@
:rabbitmq-producer (:rabbitmq-producer rabbitmq-producer)
:datomic (:datomic datomic)
:datalevin (:datalevin datalevin)
:http-client (:http-client http-client))]
:http-client (:http-client http-client))
service-name (:service-name config-content)]

(s/validate Consumers consumers)

(doseq [raw-topic topics
:let [topic (keyword raw-topic)
consumer (topic consumers)
handler-fn (:handler-fn consumer)]]
handler-fn (:handler-fn consumer)
schema (:schema consumer)]]
(lq/declare channel raw-topic {:exclusive false :auto-delete false})
(lc/subscribe channel raw-topic
(fn [_channel _meta payload] (handler-fn {:components components
:payload (edn/read-string (String. payload "UTF-8"))}))
(fn [_channel _meta payload]
(try
(s/validate schema (edn/read-string (String. payload "UTF-8")))
(handler-fn {:components components
:payload (edn/read-string (String. payload "UTF-8"))})
(catch Exception e
(do (log/error e)
(when (-> components :config :dead-letter-queue-service-integration-enabled)
(component.rabbitmq.producer/produce! {:topic :create-dead-letter
:payload {:service service-name
:topic topic
:exception-info (str e)
:payload (edn/read-string (String. payload "UTF-8"))}}
(:rabbitmq-producer components)))))))
{:auto-ack true}))

(merge component {:rabbitmq-consumer {:connection connection
Expand Down
89 changes: 88 additions & 1 deletion test/integration/integration/rabbitmq_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
[common-clj.component.helper.core :as component.helper]
[common-clj.component.rabbitmq.consumer :as component.rabbitmq.consumer]
[common-clj.component.rabbitmq.producer :as component.rabbitmq.producer]
[schema.core :as schema]
[schema.test :as s]))

(def test-state (atom nil))

(def consumers {:test.example {:handler-fn (fn [{:keys [payload]}]
(schema/defschema TestSchema
{:test schema/Keyword})

(def consumers {:test.example {:schema TestSchema
:handler-fn (fn [{:keys [payload]}]
(reset! test-state payload))}})

(def ^:private system-test
Expand Down Expand Up @@ -41,3 +46,85 @@
@(:produced-messages producer))))

(component/stop system)))

(s/deftest rabbitmq-consumer-and-producer-component-test-should-produce-dead-letter-on-wrong-message-schema
(let [system (component/start system-test)
producer (component.helper/get-component-content :rabbitmq-producer system)]

(testing "that we can use kafka consumer to consumer messages"
(component.rabbitmq.producer/produce! {:topic :test.example
:payload {:test 1}}
producer)
(Thread/sleep 5000))

;TODO: Create a helper function in order fo fetch produced messages
(testing "that we can retrieve produced messages"
(is (= [{:payload {:test 1}
:topic :test.example}
{:payload {:exception-info "clojure.lang.ExceptionInfo: Value does not match schema: {:test (not (keyword? 1))} {:type :schema.core/error, :schema {:test Keyword}, :value {:test 1}, :error {:test (not (keyword? 1))}}"
:payload {:test 1}
:service "test-service-name"
:topic :test.example}
:topic :create-dead-letter}]
@(:produced-messages producer))))

(component/stop system)))

(def problematic-consumers {:test.example {:schema TestSchema
:handler-fn (fn [_]
(throw (Exception. "my exception message")))}})

(def ^:private system-test-problematic-consumer-with-dead-letter-enabled
(component/system-map
:config (component.config/new-config "resources/config_test.edn" :test :edn)
:rabbitmq-producer (component/using (component.rabbitmq.producer/new-producer) [:config])
:rabbitmq-consumer (component/using (component.rabbitmq.consumer/new-consumer problematic-consumers) [:config :rabbitmq-producer])))

(s/deftest rabbitmq-consumer-and-producer-component-test-should-produce-dead-letter
(let [system (component/start system-test-problematic-consumer-with-dead-letter-enabled)
producer (component.helper/get-component-content :rabbitmq-producer system)]

(testing "that we can use kafka consumer to consumer messages"
(component.rabbitmq.producer/produce! {:topic :test.example
:payload {:test :ok}}
producer)

(Thread/sleep 5000))

;TODO: Create a helper function in order fo fetch produced messages
(testing "that we can retrieve produced messages"
(is (= [{:payload {:test :ok}
:topic :test.example}
{:payload {:exception-info "java.lang.Exception: my exception message"
:payload {:test :ok}
:service "test-service-name"
:topic :test.example}
:topic :create-dead-letter}]
@(:produced-messages producer))))

(component/stop system)))

(def ^:private system-test-problematic-consumer-with-dead-letter-disabled
(component/system-map
:config (component.config/new-config "resources/config_test_dead_letter_disabled.edn" :test :edn)
:rabbitmq-producer (component/using (component.rabbitmq.producer/new-producer) [:config])
:rabbitmq-consumer (component/using (component.rabbitmq.consumer/new-consumer problematic-consumers) [:config :rabbitmq-producer])))

(s/deftest rabbitmq-consumer-and-producer-component-test-should-not-produce-dead-letter
(let [system (component/start system-test-problematic-consumer-with-dead-letter-disabled)
producer (component.helper/get-component-content :rabbitmq-producer system)]

(testing "that we can use kafka consumer to consumer messages"
(component.rabbitmq.producer/produce! {:topic :test.example
:payload {:test :ok}}
producer)

(Thread/sleep 5000))

;TODO: Create a helper function in order fo fetch produced messages
(testing "that we can retrieve produced messages"
(is (= [{:payload {:test :ok}
:topic :test.example}]
@(:produced-messages producer))))

(component/stop system)))

0 comments on commit e1fbe98

Please sign in to comment.