Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initial commit

  • Loading branch information...
commit 9255833e1a10a68de7b1dc632bb0090ffd20aa25 0 parents
Sun Ning authored
10 .gitignore
@@ -0,0 +1,10 @@
+/target
+/lib
+/classes
+/checkouts
+pom.xml
+*.jar
+*.class
+.lein-deps-sum
+.lein-failures
+.lein-plugins
13 README.md
@@ -0,0 +1,13 @@
+# metaq
+
+A Clojure library designed to ... well, that part is up to you.
+
+## Usage
+
+FIXME
+
+## License
+
+Copyright © 2012 FIXME
+
+Distributed under the Eclipse Public License, the same as Clojure.
3  doc/intro.md
@@ -0,0 +1,3 @@
+# Introduction to metaq
+
+TODO: write [great documentation](http://jacobian.org/writing/great-documentation/what-to-write/)
12 project.clj
@@ -0,0 +1,12 @@
+(defproject metaq "0.1.0"
+ :description "a set of metaq client APIs in favour of clojure"
+ :url "http://example.com/FIXME"
+ :license {:name "Eclipse Public License"
+ :url "http://www.eclipse.org/legal/epl-v10.html"}
+ :dependencies [[org.clojure/clojure "1.4.0"]
+ [org.clojure/tools.logging "0.2.4"]
+ [com.taobao.metamorphosis/metamorphosis-client "1.4.2"
+ :exclusions [jline/jline
+ junit/junit
+ javax.servlet/servlet-api]]])
+
74 src/metaq/core.clj
@@ -0,0 +1,74 @@
+(ns metaq.core
+ (:require [clojure.tools.logging :as logging])
+
+ (:import [java.util.concurrent TimeUnit])
+ (:import [com.taobao.metamorphosis Message])
+ (:import [com.taobao.metamorphosis.client
+ MetaClientConfig
+ MetaMessageSessionFactory])
+ (:import [com.taobao.metamorphosis.client.producer
+ MessageProducer])
+ (:import [com.taobao.metamorphosis.client.consumer
+ ConsumerConfig
+ MessageListener])
+ (:import [com.taobao.metamorphosis.exception MetaClientException])
+ (:import [com.taobao.metamorphosis.utils ZkUtils$ZKConfig]))
+
+(defn zookeeper-based-metaq-config [zkaddr zkroot]
+ (let [meta-config (MetaClientConfig.)
+ zk-config (ZkUtils$ZKConfig.)]
+ (set! (. zk-config zkConnect) zkaddr)
+ (set! (. zk-config zkRoot) zkroot)
+ (.setZkConfig meta-config zk-config)
+ meta-config))
+
+(defn metaq-session-factory [^MetaClientConfig config]
+ (MetaMessageSessionFactory. config))
+
+(defmacro defproducer [name zkaddr zkroot]
+ `(defonce ~name
+ (let [config# (zookeeper-based-metaq-config ~zkaddr ~zkroot)
+ factory# (metaq-session-factory config#)
+ producer# (.createProducer factory#)]
+ producer#)))
+
+(defn publish [^MessageProducer producer topic]
+ (.publish producer topic))
+
+(defn produce [^MessageProducer producer topic data]
+ (try
+ (let [r (.sendMessage producer (Message. topic data)
+ (long 10) TimeUnit/SECONDS)]
+ (when-not (.isSuccess r)
+ (logging/warn "Error publishing to MQ: " (.getErrorMessage r))))
+ (catch MetaClientException e
+ (do
+ (logging/warn e "Error publishing to MQ.")))))
+
+(defmacro defhandler [name executor arg-vec & handler-body]
+ `(defonce ~name
+ (let [executor# ~executor]
+ (reify MessageListener
+ (^void recieveMessages [this# ^Message msg#]
+ (try
+ ((fn ~arg-vec ~@handler-body) (.getData msg#))
+ (catch Exception e#
+ (logging/warn e# "Error processing message: "
+ (String. (.getData msg#) "UTF-8")))))
+ (getExecutor [this]
+ executor#)))))
+
+(defmacro defconsumer [name zkaddr zkroot group]
+ `(defonce ~name
+ (let [config# (zookeeper-based-metaq-config ~zkaddr ~zkroot)
+ factory# (metaq-session-factory config#)
+ consumer# (.createConsumer
+ ^MetaMessageSessionFactory factory#
+ (ConsumerConfig. ~group))]
+ consumer#)))
+
+(defn subscribe [consumer topic handler]
+ (.subscribe consumer topic (* 1024 1024) handler))
+
+(defn subscribe-done [consumer]
+ (.completeSubscribe consumer))
Please sign in to comment.
Something went wrong with that request. Please try again.