Skip to content

Commit

Permalink
Begin migrating returned values to map-likes that retain backwards co…
Browse files Browse the repository at this point in the history
…mpatibility

For example, to declare a server-named queue and get the generated name
back, there is no longer a need to use interop (and know anything about
the Java client API).
  • Loading branch information
Michael Klishin committed Feb 26, 2013
1 parent 366fb54 commit 18aa647
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 20 deletions.
41 changes: 21 additions & 20 deletions src/clojure/langohr/queue.clj
Expand Up @@ -10,19 +10,19 @@
(ns langohr.queue
(:refer-clojure :exclude [declare])
(:import [com.rabbitmq.client Channel AMQP$Queue$DeclareOk AMQP$Queue$BindOk AMQP$Queue$UnbindOk AMQP$Queue$DeleteOk AMQP$Queue$PurgeOk]
java.util.Map))
java.util.Map [com.novemberain.langohr.queue DeclareOk BindOk UnbindOk DeleteOk PurgeOk]))

;;
;; API
;;

(defn ^AMQP$Queue$DeclareOk declare
(defn ^com.novemberain.langohr.queue.DeclareOk declare
"Actively declare a server-named or named queue using queue.declare AMQP method.
Usage example:
;; declare server-named, exclusive, autodelete, non-durable queue.
(lhq/declare channel) ;; will yield a name like: 'amq.gen-QtE7OdDDjlHcxNGWuSoUb3'
(lhq/declare channel) ;; will return a map that contains the name: {:queue \"amq.gen-QtE7OdDDjlHcxNGWuSoUb3\"}
;; creates named non-durable, exclusive, autodelete queue
(lhq/declare channel queue-name :durable false :exclusive true :auto-delete true)
Expand All @@ -36,51 +36,52 @@
:arguments: other properties for the Queue.
"
([^Channel channel]
(.queueDeclare channel))
(DeclareOk. (.queueDeclare channel)))
([^Channel channel ^String queue]
(.queueDeclare channel queue false false true nil))
(DeclareOk. (.queueDeclare channel queue false false true nil)))
([^Channel channel ^String queue &{:keys [^Boolean durable ^Boolean exclusive ^Boolean auto-delete arguments] :or {durable false exclusive false auto-delete true}}]
(.queueDeclare channel queue durable exclusive auto-delete arguments)))
(DeclareOk. (.queueDeclare channel queue durable exclusive auto-delete arguments))))


(defn declare-passive
(defn ^com.novemberain.langohr.queue.DeclareOk declare-passive
"Declares a queue passively (checks that it is there) using queue.declare AMQP method"
[^Channel channel ^String queue]
(.queueDeclarePassive channel queue))
(DeclareOk. (.queueDeclarePassive channel queue)))


(defn ^AMQP$Queue$BindOk bind
(defn ^com.novemberain.langohr.queue.BindOk bind
"Binds a queue to an exchange using queue.bind AMQP method"
([^Channel channel ^String queue ^String exchange]
(.queueBind channel queue exchange ""))
(BindOk. (.queueBind channel queue exchange "")))
([^Channel channel ^String queue ^String exchange &{:keys [routing-key arguments] :or {routing-key "" arguments nil}}]
(.queueBind channel queue exchange routing-key arguments)))
(BindOk. (.queueBind channel queue exchange routing-key arguments))))


(defn ^AMQP$Queue$UnbindOk unbind
(defn ^com.novemberain.langohr.queue.UnbindOk unbind
"Unbinds a queue from an exchange using queue.bind AMQP method"
([^Channel channel ^String queue ^String exchange ^String routing-key]
(.queueUnbind channel queue exchange routing-key))
(UnbindOk. (.queueUnbind channel queue exchange routing-key)))
([^Channel channel ^String queue ^String exchange ^String routing-key ^Map arguments]
(.queueUnbind channel queue exchange routing-key arguments)))
(UnbindOk. (.queueUnbind channel queue exchange routing-key arguments))))


(defn ^AMQP$Queue$DeleteOk delete
(defn ^com.novemberain.langohr.queue.DeleteOk delete
"Deletes a queue using queue.delete AMQP method"
([^Channel channel ^String queue]
(.queueDelete channel queue))
(DeleteOk. (.queueDelete channel queue)))
([^Channel channel ^String queue if-unused if-empty]
(.queueDelete channel queue if-unused if-empty)))
(DeleteOk. (.queueDelete channel queue if-unused if-empty))))


(defn ^AMQP$Queue$PurgeOk purge
(defn ^com.novemberain.langohr.queue.PurgeOk purge
"Purges a queue using queue.purge AMQP method"
[^Channel channel ^String queue]
(.queuePurge channel queue))
(PurgeOk. (.queuePurge channel queue)))


(defn status
"Returns a map with two keys: message-count and :consumer-count, for the given queue. Uses queue.declare AMQP method with the :passive attribute set."
"Returns a map with two keys: message-count and :consumer-count, for the given queue.
Uses queue.declare AMQP method with the :passive attribute set."
[^Channel channel ^String queue]
(let [declare-ok ^AMQP$Queue$DeclareOk (.queueDeclarePassive channel queue)]
{:message-count (.getMessageCount declare-ok) :consumer-count (.getConsumerCount declare-ok)}))
61 changes: 61 additions & 0 deletions src/java/com/novemberain/langohr/PersistentMapLike.java
@@ -0,0 +1,61 @@
package com.novemberain.langohr;

import clojure.lang.*;

import java.util.Iterator;

public abstract class PersistentMapLike implements IPersistentMap {
protected IPersistentMap map;

public Object valAt(Object o) {
return this.map.valAt(o);
}

public Object valAt(Object o, Object o2) {
return this.map.valAt(o, o2);
}

public IPersistentMap assoc(Object o, Object o2) {
return map.assoc(o, o2);
}

public IPersistentMap assocEx(Object o, Object o2) {
return map.assocEx(o, o2);
}

public IPersistentMap without(Object o) {
return map.without(o);
}

public Iterator iterator() {
return map.iterator();
}

public boolean containsKey(Object o) {
return map.containsKey(o);
}

public IMapEntry entryAt(Object o) {
return map.entryAt(o);
}

public IPersistentCollection cons(Object o) {
return map.cons(o);
}

public int count() {
return map.count();
}

public IPersistentCollection empty() {
return map.empty();
}

public boolean equiv(Object o) {
return map.equiv(o);
}

public ISeq seq() {
return map.seq();
}
}
34 changes: 34 additions & 0 deletions src/java/com/novemberain/langohr/exchange/DeclareOk.java
@@ -0,0 +1,34 @@
package com.novemberain.langohr.exchange;

import clojure.lang.IPersistentMap;
import clojure.lang.PersistentHashMap;
import com.novemberain.langohr.PersistentMapLike;
import com.rabbitmq.client.AMQP;

import java.util.HashMap;

public class DeclareOk extends PersistentMapLike implements AMQP.Exchange.DeclareOk {
private final AMQP.Exchange.DeclareOk method;

public DeclareOk(AMQP.Exchange.DeclareOk method) {
this.method = method;

this.map = mapFrom(method);
}

public static IPersistentMap mapFrom(AMQP.Exchange.DeclareOk method) {
return PersistentHashMap.create(new HashMap());
}

public int protocolClassId() {
return method.protocolClassId();
}

public int protocolMethodId() {
return method.protocolMethodId();
}

public String protocolMethodName() {
return method.protocolMethodName();
}
}
33 changes: 33 additions & 0 deletions src/java/com/novemberain/langohr/queue/BindOk.java
@@ -0,0 +1,33 @@
package com.novemberain.langohr.queue;

import clojure.lang.IPersistentMap;
import clojure.lang.PersistentHashMap;
import com.novemberain.langohr.PersistentMapLike;
import com.rabbitmq.client.AMQP;

import java.util.HashMap;

public class BindOk extends PersistentMapLike implements AMQP.Queue.BindOk {
private final AMQP.Queue.BindOk method;

public BindOk(AMQP.Queue.BindOk method) {
this.method = method;
this.map = mapFrom(method);
}

public static IPersistentMap mapFrom(AMQP.Queue.BindOk method) {
return PersistentHashMap.create(new HashMap());
}

public int protocolClassId() {
return method.protocolClassId();
}

public int protocolMethodId() {
return method.protocolMethodId();
}

public String protocolMethodName() {
return method.protocolMethodName();
}
}
56 changes: 56 additions & 0 deletions src/java/com/novemberain/langohr/queue/DeclareOk.java
@@ -0,0 +1,56 @@
package com.novemberain.langohr.queue;

import clojure.lang.IPersistentMap;
import clojure.lang.PersistentHashMap;
import clojure.lang.RT;
import com.novemberain.langohr.PersistentMapLike;
import com.rabbitmq.client.AMQP;

import java.util.HashMap;
import java.util.Map;

public class DeclareOk extends PersistentMapLike implements AMQP.Queue.DeclareOk {
private final AMQP.Queue.DeclareOk method;

public DeclareOk(AMQP.Queue.DeclareOk method) {
this.method = method;

this.map = mapFrom(method);

}

public static IPersistentMap mapFrom(AMQP.Queue.DeclareOk method) {
Map m = new HashMap();
m.put(RT.keyword(null, "queue"), method.getQueue());
m.put(RT.keyword(null, "message-count"), method.getMessageCount());
m.put(RT.keyword(null, "consumer-count"), method.getConsumerCount());
m.put(RT.keyword(null, "message_count"), method.getMessageCount());
m.put(RT.keyword(null, "consumer_count"), method.getConsumerCount());

return PersistentHashMap.create(m);
}

public int getConsumerCount() {
return method.getConsumerCount();
}

public int getMessageCount() {
return method.getMessageCount();
}

public String getQueue() {
return method.getQueue();
}

public int protocolClassId() {
return method.protocolClassId();
}

public int protocolMethodId() {
return method.protocolMethodId();
}

public String protocolMethodName() {
return method.protocolMethodName();
}
}
45 changes: 45 additions & 0 deletions src/java/com/novemberain/langohr/queue/DeleteOk.java
@@ -0,0 +1,45 @@
package com.novemberain.langohr.queue;

import clojure.lang.IPersistentMap;
import clojure.lang.PersistentHashMap;
import clojure.lang.RT;
import com.novemberain.langohr.PersistentMapLike;
import com.rabbitmq.client.AMQP;

import java.util.HashMap;
import java.util.Map;

public class DeleteOk extends PersistentMapLike implements AMQP.Queue.DeleteOk {
private final AMQP.Queue.DeleteOk method;

public DeleteOk(AMQP.Queue.DeleteOk method) {
this.method = method;

this.map = mapFrom(method);

}

public static IPersistentMap mapFrom(AMQP.Queue.DeleteOk method) {
Map m = new HashMap();
m.put(RT.keyword(null, "message-count"), method.getMessageCount());
m.put(RT.keyword(null, "message_count"), method.getMessageCount());

return PersistentHashMap.create(m);
}

public int getMessageCount() {
return method.getMessageCount();
}

public int protocolClassId() {
return method.protocolClassId();
}

public int protocolMethodId() {
return method.protocolMethodId();
}

public String protocolMethodName() {
return method.protocolMethodName();
}
}
45 changes: 45 additions & 0 deletions src/java/com/novemberain/langohr/queue/PurgeOk.java
@@ -0,0 +1,45 @@
package com.novemberain.langohr.queue;

import clojure.lang.IPersistentMap;
import clojure.lang.PersistentHashMap;
import clojure.lang.RT;
import com.novemberain.langohr.PersistentMapLike;
import com.rabbitmq.client.AMQP;

import java.util.HashMap;
import java.util.Map;

public class PurgeOk extends PersistentMapLike implements AMQP.Queue.PurgeOk {
private final AMQP.Queue.PurgeOk method;

public PurgeOk(AMQP.Queue.PurgeOk method) {
this.method = method;

this.map = mapFrom(method);

}

public static IPersistentMap mapFrom(AMQP.Queue.PurgeOk method) {
Map m = new HashMap();
m.put(RT.keyword(null, "message-count"), method.getMessageCount());
m.put(RT.keyword(null, "message_count"), method.getMessageCount());

return PersistentHashMap.create(m);
}

public int getMessageCount() {
return method.getMessageCount();
}

public int protocolClassId() {
return method.protocolClassId();
}

public int protocolMethodId() {
return method.protocolMethodId();
}

public String protocolMethodName() {
return method.protocolMethodName();
}
}

0 comments on commit 18aa647

Please sign in to comment.