Skip to content

Commit

Permalink
Add JSON producers/consumers using Snappy and LZF compression, includ…
Browse files Browse the repository at this point in the history
…ing unit tests.
  • Loading branch information
ruedigergad committed Jun 11, 2018
1 parent 559c6e1 commit 2e5e0a6
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 2 deletions.
5 changes: 3 additions & 2 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
[org.eclipse.jetty.websocket/websocket-server "9.4.11.v20180605"]
[org.clojure/clojure "1.9.0"]
[org.clojure/tools.cli "0.3.7"]
[rgad/stompjms-client "1.20-SNAPSHOT"]
[org.eclipse.paho/org.eclipse.paho.client.mqttv3 "1.2.0"]
[org.iq80.snappy/snappy "0.4"]
[org.springframework/spring-messaging "5.0.6.RELEASE"]
[org.springframework/spring-websocket "5.0.6.RELEASE"]
[org.slf4j/slf4j-simple "1.7.25"]]
[org.slf4j/slf4j-simple "1.7.25"]
[rgad/stompjms-client "1.20-SNAPSHOT"]]
:license {:name "Eclipse Public License (EPL) - v 1.0"
:url "http://www.eclipse.org/legal/epl-v10.html"
:distribution :repo
Expand Down
47 changes: 47 additions & 0 deletions src/bowerick/jms.clj
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
(org.eclipse.paho.client.mqttv3 MqttCallback MqttClient MqttConnectOptions MqttMessage)
(org.eclipse.paho.client.mqttv3.persist MemoryPersistence)
(org.fusesource.stomp.jms StompJmsConnectionFactory)
(org.iq80.snappy Snappy)
(org.springframework.messaging.converter ByteArrayMessageConverter SmartMessageConverter StringMessageConverter)
(org.springframework.messaging.simp.stomp DefaultStompSession StompFrameHandler StompHeaders StompSession StompSessionHandler StompSessionHandlerAdapter)
(org.springframework.scheduling.concurrent DefaultManagedTaskScheduler ThreadPoolTaskScheduler)
Expand Down Expand Up @@ -1043,6 +1044,52 @@
(fn [^bytes ba]
(cheshire/parse-string (String. ^bytes (pre-process-fn ba) ^Charset *default-charset*))))))

(defn create-json-lzf-producer
([broker-url destination-description]
(create-json-lzf-producer broker-url destination-description 1))
([broker-url destination-description pool-size]
(create-json-producer
broker-url
destination-description
pool-size
(fn [^bytes ba]
(LZFEncoder/encode ba)))))

(defn create-json-lzf-consumer
([broker-url destination-description cb]
(create-json-lzf-consumer broker-url destination-description cb 1))
([broker-url destination-description cb pool-size]
(create-json-consumer
broker-url
destination-description
cb
pool-size
(fn [^bytes ba]
(LZFDecoder/decode ba)))))

(defn create-json-snappy-producer
([broker-url destination-description]
(create-json-snappy-producer broker-url destination-description 1))
([broker-url destination-description pool-size]
(create-json-producer
broker-url
destination-description
pool-size
(fn [^bytes ba]
(Snappy/compress ba)))))

(defn create-json-snappy-consumer
([broker-url destination-description cb]
(create-json-snappy-consumer broker-url destination-description cb 1))
([broker-url destination-description cb pool-size]
(create-json-consumer
broker-url
destination-description
cb
pool-size
(fn [^bytes ba]
(Snappy/uncompress ba 0 (alength ba))))))

(defn create-failsafe-json-consumer
([broker-url destination-description cb]
(create-failsafe-json-consumer broker-url destination-description cb 1))
Expand Down
26 changes: 26 additions & 0 deletions test/bowerick/test/jms.clj
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,32 @@
(close producer)
(close consumer)))

(deftest json-producer-consumer-lzf
(let [producer (create-json-lzf-producer local-jms-server test-topic)
was-run (prepare-flag)
received (atom nil)
consume-fn (fn [obj] (reset! received obj) (set-flag was-run))
consumer (create-json-lzf-consumer local-jms-server test-topic consume-fn)]
(producer {:a "b"})
(await-flag was-run)
(is (flag-set? was-run))
(is (= {"a" "b"} @received))
(close producer)
(close consumer)))

(deftest json-producer-consumer-snappy
(let [producer (create-json-snappy-producer local-jms-server test-topic)
was-run (prepare-flag)
received (atom nil)
consume-fn (fn [obj] (reset! received obj) (set-flag was-run))
consumer (create-json-snappy-consumer local-jms-server test-topic consume-fn)]
(producer {:a "b"})
(await-flag was-run)
(is (flag-set? was-run))
(is (= {"a" "b"} @received))
(close producer)
(close consumer)))

(deftest json-producer-failsafe-json-consumer
(let [producer (create-json-producer local-jms-server test-topic)
was-run (prepare-flag)
Expand Down

0 comments on commit 2e5e0a6

Please sign in to comment.