Permalink
Browse files

Improved overall protocols implementation.

  • Loading branch information...
1 parent 2d90aaa commit 85200df4a465fbd8591668bb74facfba056aaa85 @sbtourist committed Aug 12, 2011
View
@@ -6,13 +6,4 @@
(defn rethrow-on-failure [failure]
(throw (:exception failure))
- )
-
-(defn receiver-seq [request-queue timeout]
- (lazy-seq
- (if-let [m (.poll request-queue timeout java.util.concurrent.TimeUnit/MILLISECONDS)]
- (cons m (receiver-seq request-queue timeout))
- nil
- )
- )
)
@@ -1,4 +1,4 @@
-(ns clamq.macros)
+(ns clamq.internal.macros)
(defmacro non-blocking-listener [listener-class listener-method converter handler-fn failure-fn limit container]
`(let [~'counter (atom 0)]
@@ -0,0 +1,10 @@
+(ns clamq.internal.utils)
+
+(defn receiver-seq [request-queue timeout]
+ (lazy-seq
+ (if-let [m (.poll request-queue timeout java.util.concurrent.TimeUnit/MILLISECONDS)]
+ (cons m (receiver-seq request-queue timeout))
+ nil
+ )
+ )
+ )
View
@@ -1,6 +1,12 @@
(ns clamq.jms
- (:use
- [clamq.helpers] [clamq.macros] [clamq.protocol]
+ (:require
+ [clamq.helpers :as helpers]
+ [clamq.internal.macros :as macros]
+ [clamq.internal.utils :as utils]
+ [clamq.protocol.connection :as connection]
+ [clamq.protocol.consumer :as consumer]
+ [clamq.protocol.seqable :as seqable]
+ [clamq.protocol.producer :as producer]
)
(:import
[java.util.concurrent SynchronousQueue]
@@ -24,22 +30,22 @@
(when (nil? connection) (throw (IllegalArgumentException. "No value specified for connection!")))
(let [template (JmsTemplate. connection)]
(doto template (.setMessageConverter (SimpleMessageConverter.)) (.setPubSubDomain pubSub))
- (reify Producer
- (send-to [self destination message attributes]
+ (reify producer/Producer
+ (publish [self destination message attributes]
(.convertAndSend template destination message (proxy-message-post-processor attributes))
)
- (send-to [self destination message] (send-to self destination message {}))
+ (publish [self destination message] (producer/publish self destination message {}))
+ )
)
)
-)
-(defn- jms-consumer [connection {endpoint :endpoint handler-fn :on-message transacted :transacted pubSub :pubSub limit :limit failure-fn :on-failure :or {pubSub false limit 0 failure-fn rethrow-on-failure}}]
+(defn- jms-consumer [connection {endpoint :endpoint handler-fn :on-message transacted :transacted pubSub :pubSub limit :limit failure-fn :on-failure :or {pubSub false limit 0 failure-fn helpers/rethrow-on-failure}}]
(when (nil? connection) (throw (IllegalArgumentException. "No value specified for connection!")))
(when (nil? endpoint) (throw (IllegalArgumentException. "No value specified for :endpoint!")))
(when (nil? transacted) (throw (IllegalArgumentException. "No value specified for :transacted!")))
(when (nil? handler-fn) (throw (IllegalArgumentException. "No value specified for :on-message!")))
(let [container (DefaultMessageListenerContainer.)
- listener (non-blocking-listener MessageListener onMessage (SimpleMessageConverter.) handler-fn failure-fn limit container)]
+ listener (macros/non-blocking-listener MessageListener onMessage (SimpleMessageConverter.) handler-fn failure-fn limit container)]
(doto container
(.setConnectionFactory connection)
(.setDestinationName endpoint)
@@ -48,9 +54,9 @@
(.setPubSubDomain pubSub)
(.setConcurrentConsumers 1)
)
- (reify Consumer
+ (reify consumer/Consumer
(start [self] (do (doto container (.start) (.initialize)) nil))
- (stop [self] (do (.shutdown container) nil))
+ (close [self] (do (.shutdown container) nil))
)
)
)
@@ -60,7 +66,7 @@
(when (nil? endpoint) (throw (IllegalArgumentException. "No value specified for :endpoint!")))
(let [request-queue (SynchronousQueue.) reply-queue (SynchronousQueue.)
container (DefaultMessageListenerContainer.)
- listener (blocking-listener MessageListener onMessage (SimpleMessageConverter.) request-queue reply-queue container)
+ listener (macros/blocking-listener MessageListener onMessage (SimpleMessageConverter.) request-queue reply-queue container)
]
(doto container
(.setConnectionFactory connection)
@@ -71,14 +77,14 @@
(.start)
(.initialize)
)
- (reify Seqable-Consumer
- (seqable [self]
- (receiver-seq request-queue timeout)
+ (reify seqable/Seqable
+ (seqc [self]
+ (utils/receiver-seq request-queue timeout)
)
(ack [self]
(.offer reply-queue :commit timeout java.util.concurrent.TimeUnit/MILLISECONDS)
)
- (abort [self]
+ (close [self]
(.offer reply-queue :rollback timeout java.util.concurrent.TimeUnit/MILLISECONDS)
(.shutdown container)
)
@@ -88,7 +94,7 @@
(defn jms-connection [connectionFactory]
"Returns a JMS Connection from the given javax.jms.ConnectionFactory object."
- (reify Connection
+ (reify connection/Connection
(producer [self]
(jms-producer connectionFactory {})
)
@@ -98,8 +104,9 @@
(consumer [self conf]
(jms-consumer connectionFactory conf)
)
- (seqable-consumer [self conf]
+ (seqable [self conf]
(jms-seqable-consumer connectionFactory conf)
)
)
)
+
View
@@ -1,12 +1,15 @@
(ns clamq.pipes
- (:use
- [clamq.helpers]
- [clamq.protocol]
+ (:require
+ [clamq.helpers :as helpers]
+ [clamq.protocol.connection :as connection]
+ [clamq.protocol.consumer :as consumer]
+ [clamq.protocol.producer :as producer]
+ [clamq.protocol.pipe :as pipe]
)
)
(defn- make-producer [connection pubSub]
- (producer connection {:pubSub pubSub})
+ (connection/producer connection {:pubSub pubSub})
)
(defn pipe [{{source :endpoint s-connection :connection s-pubSub :pubSub :or {s-pubSub false}} :from
@@ -15,17 +18,17 @@
failure-fn :on-failure
transacted :transacted
limit :limit
- :or {filter-fn identity failure-fn rethrow-on-failure limit 0}
+ :or {filter-fn identity failure-fn helpers/rethrow-on-failure limit 0}
}]
(let [memoized-producer
(memoize make-producer)
filtered-unicast
- #(send-to (memoized-producer d-connection d-pubSub) destination (filter-fn %1) {})
+ #(producer/publish (memoized-producer d-connection d-pubSub) destination (filter-fn %1) {})
head-consumer
- (consumer s-connection {:endpoint source :on-message filtered-unicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})]
- (reify Pipe
- (open [self] (start head-consumer))
- (close [self] (stop head-consumer))
+ (connection/consumer s-connection {:endpoint source :on-message filtered-unicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})]
+ (reify pipe/Pipe
+ (open [self] (consumer/start head-consumer))
+ (close [self] (consumer/close head-consumer))
)
)
)
@@ -35,22 +38,22 @@
failure-fn :on-failure
transacted :transacted
limit :limit
- :or {failure-fn rethrow-on-failure limit 0}
+ :or {failure-fn helpers/rethrow-on-failure limit 0}
}]
(let [memoized-producer
(memoize make-producer)
filtered-multicast
#(doseq [d destinations]
(let [filtered-message ((or (d :filter-by) identity) %1)]
- (when (not (nil? filtered-message)) (send-to (memoized-producer (d :connection) (or (d :pubSub) false)) (d :endpoint) filtered-message {}))
+ (when (not (nil? filtered-message)) (producer/publish (memoized-producer (d :connection) (or (d :pubSub) false)) (d :endpoint) filtered-message {}))
)
)
head-consumer
- (consumer s-connection {:endpoint source :on-message filtered-multicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})
+ (connection/consumer s-connection {:endpoint source :on-message filtered-multicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})
]
- (reify Pipe
- (open [self] (start head-consumer))
- (close [self] (stop head-consumer))
+ (reify pipe/Pipe
+ (open [self] (consumer/start head-consumer))
+ (close [self] (consumer/close head-consumer))
)
)
)
@@ -60,7 +63,7 @@
failure-fn :on-failure
transacted :transacted
limit :limit
- :or {failure-fn rethrow-on-failure limit 0}
+ :or {failure-fn helpers/rethrow-on-failure limit 0}
}]
(when (nil? router-fn) (throw (IllegalArgumentException. "No value specified for :route-with router function!")))
(let [memoized-producer
@@ -73,15 +76,15 @@
message (d :message)
producer (memoized-producer connection pubSub)
]
- (when (not (nil? message)) (send-to producer endpoint message {}))
+ (when (not (nil? message)) (producer/publish producer endpoint message {}))
)
)
head-consumer
- (consumer s-connection {:endpoint source :on-message routed-multicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})
+ (connection/consumer s-connection {:endpoint source :on-message routed-multicast :on-failure failure-fn :transacted transacted :pubSub s-pubSub :limit limit})
]
- (reify Pipe
- (open [self] (start head-consumer))
- (close [self] (stop head-consumer))
+ (reify pipe/Pipe
+ (open [self] (consumer/start head-consumer))
+ (close [self] (consumer/close head-consumer))
)
)
)
View
@@ -1,27 +0,0 @@
-(ns clamq.protocol)
-
-(defprotocol Connection
- (producer [self] [self conf])
- (consumer [self conf])
- (seqable-consumer [self conf])
- )
-
-(defprotocol Producer
- (send-to [self destination message] [self destination message attributes])
- )
-
-(defprotocol Consumer
- (start [self])
- (stop [self])
- )
-
-(defprotocol Seqable-Consumer
- (seqable [self])
- (ack [self])
- (abort [self])
- )
-
-(defprotocol Pipe
- (open [self])
- (close [self])
- )
@@ -0,0 +1,7 @@
+(ns clamq.protocol.connection)
+
+(defprotocol Connection
+ (producer [self] [self conf])
+ (consumer [self conf])
+ (seqable [self conf])
+ )
@@ -0,0 +1,6 @@
+(ns clamq.protocol.consumer)
+
+(defprotocol Consumer
+ (start [self])
+ (close [self])
+ )
@@ -0,0 +1,6 @@
+(ns clamq.protocol.pipe)
+
+(defprotocol Pipe
+ (open [self])
+ (close [self])
+ )
@@ -0,0 +1,5 @@
+(ns clamq.protocol.producer)
+
+(defprotocol Producer
+ (publish [self destination message] [self destination message attributes])
+ )
@@ -0,0 +1,7 @@
+(ns clamq.protocol.seqable)
+
+(defprotocol Seqable
+ (seqc [self])
+ (ack [self])
+ (close [self])
+ )
Oops, something went wrong.

0 comments on commit 85200df

Please sign in to comment.