Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

import files from slacker

  • Loading branch information...
commit d4dfec64ac6043c13179cda698eab6656e17d0e3 1 parent 8f5962b
@sunng87 authored
View
24 examples/slacker/example/cluster_client.clj
@@ -0,0 +1,24 @@
+(ns slacker.example.cluster-client
+ (:use [slacker.common])
+ (:use [slacker.client.cluster])
+ (:use [slacker.client :only [close-slackerc]]))
+
+(def sc (clustered-slackerc "example-cluster" "127.0.0.1:2181"))
+
+(use-remote 'sc 'slacker.example.api)
+(defn-remote sc async-timestamp
+ :remote-name "timestamp"
+ :remote-ns "slacker.example.api"
+ :async? true)
+
+(defn -main [& args]
+ (binding [*debug* true]
+ (println (timestamp))
+ (println (rand-ints 10))
+ (println @(async-timestamp)))
+
+ (dotimes [_ 100] (timestamp))
+
+ (close-slackerc sc)
+ (System/exit 0))
+
View
17 examples/slacker/example/cluster_server.clj
@@ -0,0 +1,17 @@
+(ns slacker.example.cluster-server
+ (:use [slacker server interceptor])
+ (:require [slacker.example.api]))
+
+(definterceptor log-function-calls
+ :before (fn [req]
+ (println (str "calling: " (:fname req)))
+ req))
+
+(defn -main [& args]
+ (start-slacker-server (the-ns 'slacker.example.api)
+ (Integer/valueOf (first args))
+ :cluster {:zk "127.0.0.1:2181"
+ :name "example-cluster"}
+ :interceptors (interceptors [log-function-calls])))
+(println "Slacker example server (cluster enabled) started.")
+
View
154 src/slacker/client/cluster.clj
@@ -0,0 +1,154 @@
+(ns slacker.client.cluster
+ (:require [zookeeper :as zk])
+ (:require [slacker.client])
+ (:require [slacker.utils :as utils])
+ (:use [slacker.client.common])
+ (:use [slacker.serialization])
+ (:use [clojure.string :only [split]])
+ (:require [clojure.tools.logging :as logging])
+ (:use [slingshot.slingshot :only [throw+]]))
+
+(defprotocol CoordinatorAwareClient
+ (refresh-associated-servers [this ns])
+ (refresh-all-servers [this])
+ (get-connected-servers [this])
+ (get-ns-mappings [this])
+ (delete-ns-mapping [this fname]))
+
+(defmacro defn-remote
+ "cluster enabled defn-remote"
+ [sc fname & options]
+ (let [fname-str (str fname)
+ remote-ns-declared (> (.indexOf fname-str "/") 0)
+ {:keys [remote-ns] :or {remote-ns (ns-name *ns*)}} options
+ remote-ns (if remote-ns-declared
+ (first (split fname-str #"/" 2))
+ remote-ns)]
+ `(do
+ (when (nil? ((get-ns-mappings ~sc) ~remote-ns))
+ (refresh-associated-servers ~sc ~remote-ns))
+ (slacker.client/defn-remote ~sc ~fname ~@options))))
+
+(defn use-remote
+ "cluster enabled use-remote"
+ ([sc-sym] (use-remote sc-sym (ns-name *ns*)))
+ ([sc-sym rns & options]
+ (let [sc @(resolve sc-sym)]
+ (do
+ (when (nil? ((get-ns-mappings sc) (str rns)))
+ (refresh-associated-servers sc (str rns)))
+ (apply slacker.client/use-remote sc-sym rns options)))))
+
+(defn- create-slackerc [connection-info & options]
+ (apply slacker.client/slackerc connection-info options))
+
+(defn- find-server [slacker-ns-servers ns-name]
+ (if-let [servers (@slacker-ns-servers ns-name)]
+ (rand-nth servers)
+ (throw+ {:code :not-found})))
+
+(defn- ns-callback [e sc nsname]
+ (case (:event-type e)
+ :NodeDeleted (delete-ns-mapping sc nsname)
+ :NodeChildrenChanged (refresh-associated-servers sc nsname)
+ nil))
+
+(defn- clients-callback [e sc]
+ (case (:event-type e)
+ :NodeChildrenChanged (refresh-all-servers sc) ;;TODO
+ nil))
+
+(defn- meta-data-from-zk [zk-conn cluster-name fname]
+ (let [fnode (utils/zk-path cluster-name "functions" fname)]
+ (if-let [node-data (zk/data zk-conn fnode)]
+ (deserialize :clj (:data node-data) :bytes))))
+
+(deftype ClusterEnabledSlackerClient
+ [cluster-name zk-conn
+ slacker-clients slacker-ns-servers
+ options]
+ CoordinatorAwareClient
+ (refresh-associated-servers [this nsname]
+ (let [node-path (utils/zk-path cluster-name "namespaces" nsname)
+ servers (zk/children zk-conn node-path
+ :watch? true)]
+ ;; update servers for this namespace
+ (swap! slacker-ns-servers assoc nsname servers)
+ ;; establish connection if the server is not connected
+ (doseq [s servers]
+ (if-not (contains? slacker-clients s)
+ (let [sc (apply create-slackerc s options)]
+ (logging/info (str "establishing connection to " s))
+ (swap! slacker-clients assoc s sc))))
+ servers))
+ (refresh-all-servers [this]
+ (let [node-path (utils/zk-path cluster-name "servers")
+ servers (into #{} (zk/children zk-conn node-path :watch? true))]
+ ;; close connection to offline servers, remove from slacker-clients
+ (doseq [s (keys @slacker-clients)]
+ (when-not (contains? servers s)
+ (logging/info (str "closing connection of " s))
+ (close (@slacker-clients s))
+ (swap! slacker-clients dissoc s)))))
+ (get-connected-servers [this]
+ (keys @slacker-clients))
+ (get-ns-mappings [this]
+ @slacker-ns-servers)
+ (delete-ns-mapping [this ns]
+ (swap! slacker-ns-servers dissoc ns))
+
+ SlackerClientProtocol
+ (sync-call-remote [this ns-name func-name params]
+ (let [fname (str ns-name "/" func-name)
+ target-server (find-server slacker-ns-servers ns-name)
+ target-conn (@slacker-clients target-server)]
+ (logging/debug (str "calling " ns-name "/"
+ func-name " on " target-server))
+ (sync-call-remote target-conn ns-name func-name params)))
+ (async-call-remote [this ns-name func-name params cb]
+ (let [fname (str ns-name "/" func-name)
+ target-server (find-server slacker-ns-servers ns-name)
+ target-conn (@slacker-clients target-server)]
+ (logging/debug (str "calling " ns-name "/"
+ func-name " on " target-server))
+ (async-call-remote target-conn ns-name func-name params cb)))
+ (close [this]
+ (zk/close zk-conn)
+ (doseq [s (vals @slacker-clients)] (close s))
+ (reset! slacker-clients {})
+ (reset! slacker-ns-servers {}))
+ (inspect [this cmd args]
+ (case cmd
+ :functions
+ (let [nsname (or args "")
+ ns-root (utils/zk-path cluster-name "functions" nsname)
+ fnames (or (zk/children zk-conn ns-root) [])]
+ (map #(str nsname "/" %) fnames))
+ :meta (meta-data-from-zk zk-conn cluster-name args))))
+
+(defn- on-zk-events [e sc]
+ (if (.endsWith ^String (:path e) "servers")
+ ;; event on `servers` node
+ (clients-callback e sc)
+ ;; event on `namespaces` nodes
+ (let [matcher (re-matches #"/.+/namespaces/?(.*)" (:path e))]
+ (if-not (nil? matcher)
+ (ns-callback e sc (second matcher))))))
+
+(defn clustered-slackerc
+ "create a cluster enalbed slacker client"
+ [cluster-name zk-server & options]
+ (let [zk-conn (zk/connect zk-server)
+ slacker-clients (atom {})
+ slacker-ns-servers (atom {})
+ sc (ClusterEnabledSlackerClient.
+ cluster-name zk-conn
+ slacker-clients slacker-ns-servers
+ options)]
+ (zk/register-watcher zk-conn (fn [e] (on-zk-events e sc)))
+ ;; watch 'servers' node
+ (zk/children zk-conn
+ (utils/zk-path cluster-name "servers") :watch? true)
+ sc))
+
+
View
70 src/slacker/server/cluster.clj
@@ -0,0 +1,70 @@
+(ns slacker.server.cluster
+ (:require [zookeeper :as zk])
+ (:use [slacker common serialization])
+ (:use [clojure.string :only [split]])
+ (:require [slacker.utils :as utils])
+ (:import java.net.Socket))
+
+(declare ^{:dynamic true} *zk-conn* )
+
+(defn- auto-detect-ip
+ "check IP address contains?
+ if not connect to zookeeper and getLocalAddress"
+ [zk-addr]
+ (let [zk-address (split zk-addr #":")
+ zk-ip (first zk-address)
+ zk-port (Integer/parseInt (second zk-address))
+ socket (Socket. ^String ^Integer zk-ip zk-port)
+ local-ip (.getHostAddress (.getLocalAddress socket))]
+ (.close socket)
+ local-ip))
+
+(defn- create-node
+ "get zk connector & node :persistent?
+ check whether exist already
+ if not ,create & set node data with func metadata
+ "
+ [zk-conn node-name
+ & {:keys [data persistent?]
+ :or {data nil
+ persistent? false}}]
+ (if-not (zk/exists zk-conn node-name )
+ (zk/create-all zk-conn node-name :persistent? persistent?))
+ (if-not (nil? data)
+ (zk/set-data zk-conn node-name data
+ (:version (zk/exists zk-conn node-name)))))
+
+(defn publish-cluster
+ "publish server information to zookeeper as cluster for client"
+ [cluster port ns-names funcs-map]
+ (let [cluster-name (cluster :name)
+ server-node (str (or (cluster :node)
+ (auto-detect-ip (:zk cluster)))
+ ":" port)
+ funcs (keys funcs-map)]
+ (create-node *zk-conn* (utils/zk-path cluster-name "servers")
+ :persistent? true)
+ (create-node *zk-conn*
+ (utils/zk-path cluster-name "servers" server-node ))
+ (doseq [nn ns-names]
+ (create-node *zk-conn* (utils/zk-path cluster-name "namespaces" nn)
+ :persistent? true)
+ (create-node *zk-conn* (utils/zk-path cluster-name "namespaces"
+ nn server-node)))
+ (doseq [fname funcs]
+ (create-node *zk-conn*
+ (utils/zk-path cluster-name "functions" fname )
+ :persistent? true
+ :data (serialize
+ :clj
+ (select-keys
+ (meta (funcs-map fname))
+ [:name :doc :arglists])
+ :bytes)))))
+
+(defmacro with-zk
+ "publish server information to specifized zookeeper for client"
+ [zk-conn & body]
+ `(binding [*zk-conn* ~zk-conn]
+ ~@body))
+
View
5 test/log4j.properties
@@ -0,0 +1,5 @@
+### the default log4j configuration to suppress warning ###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
+log4j.rootLogger=off, stdout
View
47 test/slacker/test/client/cluster.clj
@@ -0,0 +1,47 @@
+(ns slacker.test.client.cluster
+ (:use [clojure.test])
+ (:use [slacker.client common cluster])
+ (:use [slacker.serialization])
+ (:use [slacker.utils :only [zk-path]])
+ (:require [zookeeper :as zk]))
+
+(deftest test-clustered-client
+ (let [cluster-name "test-cluster"
+ test-server "127.0.0.1:2104"
+ test-server2 "127.0.0.1:2105"
+ zk-server "127.0.0.1:2181"
+ zk-verify-conn (zk/connect zk-server)
+ test-ns "test-ns"
+ sc (clustered-slackerc cluster-name zk-server)]
+ (zk/create-all zk-verify-conn (zk-path cluster-name "servers" test-server))
+ (zk/create-all zk-verify-conn
+ (zk-path cluster-name "namespaces" test-ns test-server))
+ (doseq [f (map #(str test-ns "/" %) ["hello" "world"])]
+ (zk/create-all zk-verify-conn (zk-path cluster-name "functions" f)
+ :persistent? true)
+ (zk/set-data zk-verify-conn
+ (zk-path cluster-name "functions" f)
+ (serialize :clj {:name f :doc "test function"} :bytes)
+ (:version (zk/exists
+ zk-verify-conn
+ (zk-path cluster-name "functions" f)))))
+
+ (is (= ["127.0.0.1:2104"] (refresh-associated-servers sc test-ns)))
+
+ (is (= {:name (str test-ns "/world") :doc "test function"}
+ (inspect sc :meta (str test-ns "/world"))))
+
+ (zk/create zk-verify-conn (zk-path cluster-name "servers" test-server2))
+ (zk/create zk-verify-conn
+ (zk-path cluster-name "namespaces" test-ns test-server2))
+
+ (Thread/sleep 1000) ;; wait for watchers
+ (is (= [test-server test-server2] ((get-ns-mappings sc) test-ns)))
+ (is (= 2 (count (get-connected-servers sc))))
+ (is (= 2 (count (inspect sc :functions test-ns))))
+
+ (close sc)
+ (zk/delete-all zk-verify-conn (zk-path cluster-name))
+ (zk/close zk-verify-conn)))
+
+
View
49 test/slacker/test/cluster.clj
@@ -0,0 +1,49 @@
+(ns slacker.test.server.cluster
+ (:use [clojure.test] )
+ (:use [slacker.server cluster])
+ (:use [slacker.utils :only [zk-path]])
+ (:use [zookeeper :as zk]))
+
+(def namespaces ["slacker.test.server.cluster"])
+(def funcs {"plus" + "minus" -})
+
+(defn- create-data [cluster-map]
+ (doall
+ (map #(zk-path %1 "functions" %2)
+ (repeat (cluster-map :name))
+ (keys funcs))))
+
+(deftest test-publish-cluster
+ (let [cluster-map {:name "test-cluster" :zk "127.0.0.1:2181"}
+ node-list (create-data cluster-map)
+ test-conn (zk/connect "127.0.0.1:2181")
+ zk-conn (zk/connect "127.0.0.1:2181")]
+
+ ;; make sure all functions are published
+ (with-zk zk-conn
+ (publish-cluster cluster-map 2104 namespaces funcs))
+ (is (false? (every? (fn[x](false? x))
+ (map zk/children (repeat test-conn) node-list))))
+ (is (not (nil? (zk/exists test-conn
+ (zk-path (:name cluster-map)
+ "servers"
+ (str "127.0.0.1:2104"))))))
+ ;; close the server connection, ephemeral node will be deleted
+ (zk/close zk-conn)
+
+ ;; we want to make that the server is no longer listed in our
+ ;; serevr directory
+ (is (nil? (zk/exists test-conn
+ (zk-path (:name cluster-map)
+ "servers"
+ (str "127.0.0.1:2104")))))
+
+ (is (false? (zk/children test-conn (zk-path (:name cluster-map)
+ "namespaces"
+ (first namespaces)
+ "127.0.0.1:2104"))))
+
+ ;; clean up
+ (zk/delete-all test-conn (zk-path (:name cluster-map)))
+ (zk/close test-conn)))
+
View
49 test/slacker/test/server/cluster.clj
@@ -0,0 +1,49 @@
+(ns slacker.test.server.cluster
+ (:use [clojure.test] )
+ (:use [slacker.server cluster])
+ (:use [slacker.utils :only [zk-path]])
+ (:use [zookeeper :as zk]))
+
+(def namespaces ["slacker.test.server.cluster"])
+(def funcs {"plus" + "minus" -})
+
+(defn- create-data [cluster-map]
+ (doall
+ (map #(zk-path %1 "functions" %2)
+ (repeat (cluster-map :name))
+ (keys funcs))))
+
+(deftest test-publish-cluster
+ (let [cluster-map {:name "test-cluster" :zk "127.0.0.1:2181"}
+ node-list (create-data cluster-map)
+ test-conn (zk/connect "127.0.0.1:2181")
+ zk-conn (zk/connect "127.0.0.1:2181")]
+
+ ;; make sure all functions are published
+ (with-zk zk-conn
+ (publish-cluster cluster-map 2104 namespaces funcs))
+ (is (false? (every? (fn[x](false? x))
+ (map zk/children (repeat test-conn) node-list))))
+ (is (not (nil? (zk/exists test-conn
+ (zk-path (:name cluster-map)
+ "servers"
+ (str "127.0.0.1:2104"))))))
+ ;; close the server connection, ephemeral node will be deleted
+ (zk/close zk-conn)
+
+ ;; we want to make that the server is no longer listed in our
+ ;; serevr directory
+ (is (nil? (zk/exists test-conn
+ (zk-path (:name cluster-map)
+ "servers"
+ (str "127.0.0.1:2104")))))
+
+ (is (false? (zk/children test-conn (zk-path (:name cluster-map)
+ "namespaces"
+ (first namespaces)
+ "127.0.0.1:2104"))))
+
+ ;; clean up
+ (zk/delete-all test-conn (zk-path (:name cluster-map)))
+ (zk/close test-conn)))
+
Please sign in to comment.
Something went wrong with that request. Please try again.