Permalink
Browse files

remove slacker cluster from core codebase

  • Loading branch information...
1 parent 3f0c07a commit 73d596c1f8e2b4fa1719b60aede44461361c5ce0 @sunng87 committed Apr 17, 2012
@@ -1,24 +0,0 @@
-(ns slacker.example.cluster-client
- (:use [slacker.common])
- (:use [slacker.client.cluster])
- (:use [slacker.client :only [close-slackerc]]))
-
-(def sc (clustered-slackerc "example-cluster" "127.0.0.1:2181"))
-
-(use-remote 'sc 'slacker.example.api)
-(defn-remote sc async-timestamp
- :remote-name "timestamp"
- :remote-ns "slacker.example.api"
- :async? true)
-
-(defn -main [& args]
- (binding [*debug* true]
- (println (timestamp))
- (println (rand-ints 10))
- (println @(async-timestamp)))
-
- (dotimes [_ 100] (timestamp))
-
- (close-slackerc sc)
- (System/exit 0))
-
@@ -1,17 +0,0 @@
-(ns slacker.example.cluster-server
- (:use [slacker server interceptor])
- (:require [slacker.example.api]))
-
-(definterceptor log-function-calls
- :before (fn [req]
- (println (str "calling: " (:fname req)))
- req))
-
-(defn -main [& args]
- (start-slacker-server (the-ns 'slacker.example.api)
- (Integer/valueOf (first args))
- :cluster {:zk "127.0.0.1:2181"
- :name "example-cluster"}
- :interceptors (interceptors [log-function-calls])))
-(println "Slacker example server (cluster enabled) started.")
-
View
@@ -6,15 +6,12 @@
[cheshire "3.0.0"]
[slingshot "0.10.1"]
[org.clojure/java.jmx "0.1"]
- [zookeeper-clj "0.9.2"]
[org.clojure/tools.logging "0.2.3"]]
:dev-dependencies [[codox "0.5.0"]
[lein-exec "0.1"]]
:extra-classpath-dirs ["examples"]
:run-aliases {:server "slacker.example.server"
- :client "slacker.example.client"
- :cluster-server "slacker.example.cluster-server"
- :cluster-client "slacker.example.cluster-client"}
+ :client "slacker.example.client"}
:warn-on-reflection false)
@@ -1,154 +0,0 @@
-(ns slacker.client.cluster
- (:require [zookeeper :as zk])
- (:require [slacker.client])
- (:require [slacker.utils :as utils])
- (:use [slacker.client.common])
- (:use [slacker.serialization])
- (:use [clojure.string :only [split]])
- (:require [clojure.tools.logging :as logging])
- (:use [slingshot.slingshot :only [throw+]]))
-
-(defprotocol CoordinatorAwareClient
- (refresh-associated-servers [this ns])
- (refresh-all-servers [this])
- (get-connected-servers [this])
- (get-ns-mappings [this])
- (delete-ns-mapping [this fname]))
-
-(defmacro defn-remote
- "cluster enabled defn-remote"
- [sc fname & options]
- (let [fname-str (str fname)
- remote-ns-declared (> (.indexOf fname-str "/") 0)
- {:keys [remote-ns] :or {remote-ns (ns-name *ns*)}} options
- remote-ns (if remote-ns-declared
- (first (split fname-str #"/" 2))
- remote-ns)]
- `(do
- (when (nil? ((get-ns-mappings ~sc) ~remote-ns))
- (refresh-associated-servers ~sc ~remote-ns))
- (slacker.client/defn-remote ~sc ~fname ~@options))))
-
-(defn use-remote
- "cluster enabled use-remote"
- ([sc-sym] (use-remote sc-sym (ns-name *ns*)))
- ([sc-sym rns & options]
- (let [sc @(resolve sc-sym)]
- (do
- (when (nil? ((get-ns-mappings sc) (str rns)))
- (refresh-associated-servers sc (str rns)))
- (apply slacker.client/use-remote sc-sym rns options)))))
-
-(defn- create-slackerc [connection-info & options]
- (apply slacker.client/slackerc connection-info options))
-
-(defn- find-server [slacker-ns-servers ns-name]
- (if-let [servers (@slacker-ns-servers ns-name)]
- (rand-nth servers)
- (throw+ {:code :not-found})))
-
-(defn- ns-callback [e sc nsname]
- (case (:event-type e)
- :NodeDeleted (delete-ns-mapping sc nsname)
- :NodeChildrenChanged (refresh-associated-servers sc nsname)
- nil))
-
-(defn- clients-callback [e sc]
- (case (:event-type e)
- :NodeChildrenChanged (refresh-all-servers sc) ;;TODO
- nil))
-
-(defn- meta-data-from-zk [zk-conn cluster-name fname]
- (let [fnode (utils/zk-path cluster-name "functions" fname)]
- (if-let [node-data (zk/data zk-conn fnode)]
- (deserialize :clj (:data node-data) :bytes))))
-
-(deftype ClusterEnabledSlackerClient
- [cluster-name zk-conn
- slacker-clients slacker-ns-servers
- options]
- CoordinatorAwareClient
- (refresh-associated-servers [this nsname]
- (let [node-path (utils/zk-path cluster-name "namespaces" nsname)
- servers (zk/children zk-conn node-path
- :watch? true)]
- ;; update servers for this namespace
- (swap! slacker-ns-servers assoc nsname servers)
- ;; establish connection if the server is not connected
- (doseq [s servers]
- (if-not (contains? slacker-clients s)
- (let [sc (apply create-slackerc s options)]
- (logging/info (str "establishing connection to " s))
- (swap! slacker-clients assoc s sc))))
- servers))
- (refresh-all-servers [this]
- (let [node-path (utils/zk-path cluster-name "servers")
- servers (into #{} (zk/children zk-conn node-path :watch? true))]
- ;; close connection to offline servers, remove from slacker-clients
- (doseq [s (keys @slacker-clients)]
- (when-not (contains? servers s)
- (logging/info (str "closing connection of " s))
- (close (@slacker-clients s))
- (swap! slacker-clients dissoc s)))))
- (get-connected-servers [this]
- (keys @slacker-clients))
- (get-ns-mappings [this]
- @slacker-ns-servers)
- (delete-ns-mapping [this ns]
- (swap! slacker-ns-servers dissoc ns))
-
- SlackerClientProtocol
- (sync-call-remote [this ns-name func-name params]
- (let [fname (str ns-name "/" func-name)
- target-server (find-server slacker-ns-servers ns-name)
- target-conn (@slacker-clients target-server)]
- (logging/debug (str "calling " ns-name "/"
- func-name " on " target-server))
- (sync-call-remote target-conn ns-name func-name params)))
- (async-call-remote [this ns-name func-name params cb]
- (let [fname (str ns-name "/" func-name)
- target-server (find-server slacker-ns-servers ns-name)
- target-conn (@slacker-clients target-server)]
- (logging/debug (str "calling " ns-name "/"
- func-name " on " target-server))
- (async-call-remote target-conn ns-name func-name params cb)))
- (close [this]
- (zk/close zk-conn)
- (doseq [s (vals @slacker-clients)] (close s))
- (reset! slacker-clients {})
- (reset! slacker-ns-servers {}))
- (inspect [this cmd args]
- (case cmd
- :functions
- (let [nsname (or args "")
- ns-root (utils/zk-path cluster-name "functions" nsname)
- fnames (or (zk/children zk-conn ns-root) [])]
- (map #(str nsname "/" %) fnames))
- :meta (meta-data-from-zk zk-conn cluster-name args))))
-
-(defn- on-zk-events [e sc]
- (if (.endsWith ^String (:path e) "servers")
- ;; event on `servers` node
- (clients-callback e sc)
- ;; event on `namespaces` nodes
- (let [matcher (re-matches #"/.+/namespaces/?(.*)" (:path e))]
- (if-not (nil? matcher)
- (ns-callback e sc (second matcher))))))
-
-(defn clustered-slackerc
- "create a cluster enalbed slacker client"
- [cluster-name zk-server & options]
- (let [zk-conn (zk/connect zk-server)
- slacker-clients (atom {})
- slacker-ns-servers (atom {})
- sc (ClusterEnabledSlackerClient.
- cluster-name zk-conn
- slacker-clients slacker-ns-servers
- options)]
- (zk/register-watcher zk-conn (fn [e] (on-zk-events e sc)))
- ;; watch 'servers' node
- (zk/children zk-conn
- (utils/zk-path cluster-name "servers") :watch? true)
- sc))
-
-
@@ -1,11 +1,10 @@
(ns slacker.server
(:refer-clojure :exclude [send])
(:use [slacker common serialization protocol])
- (:use [slacker.server http cluster])
+ (:use [slacker.server http])
(:use [slacker.acl.core])
(:use [link core tcp http])
(:use [slingshot.slingshot :only [try+]])
- (:require [zookeeper :as zk])
(:require [clojure.tools.logging :as log])
(:import [java.util.concurrent Executors]))
@@ -170,10 +169,9 @@
* cluster publish server information to zookeeper
* acl the acl rules defined by defrules"
[exposed-ns port
- & {:keys [http interceptors cluster acl]
+ & {:keys [http interceptors acl]
:or {http nil
interceptors {:before identity :after identity}
- cluster nil
acl nil}}]
(let [exposed-ns (if (coll? exposed-ns) exposed-ns [exposed-ns])
funcs (apply merge (map ns-funcs exposed-ns))
@@ -189,10 +187,6 @@
(when-not (nil? http)
(http-server http (wrap-http-server-handler
(build-server-pipeline funcs interceptors))
- :debug *debug*))
- (when-not (nil? cluster)
- (with-zk (zk/connect (:zk cluster))
- (publish-cluster cluster port
- (map ns-name exposed-ns) funcs)))))
+ :debug *debug*))))
@@ -1,70 +0,0 @@
-(ns slacker.server.cluster
- (:require [zookeeper :as zk])
- (:use [slacker common serialization])
- (:use [clojure.string :only [split]])
- (:require [slacker.utils :as utils])
- (:import java.net.Socket))
-
-(declare ^{:dynamic true} *zk-conn* )
-
-(defn- auto-detect-ip
- "check IP address contains?
- if not connect to zookeeper and getLocalAddress"
- [zk-addr]
- (let [zk-address (split zk-addr #":")
- zk-ip (first zk-address)
- zk-port (Integer/parseInt (second zk-address))
- socket (Socket. ^String ^Integer zk-ip zk-port)
- local-ip (.getHostAddress (.getLocalAddress socket))]
- (.close socket)
- local-ip))
-
-(defn- create-node
- "get zk connector & node :persistent?
- check whether exist already
- if not ,create & set node data with func metadata
- "
- [zk-conn node-name
- & {:keys [data persistent?]
- :or {data nil
- persistent? false}}]
- (if-not (zk/exists zk-conn node-name )
- (zk/create-all zk-conn node-name :persistent? persistent?))
- (if-not (nil? data)
- (zk/set-data zk-conn node-name data
- (:version (zk/exists zk-conn node-name)))))
-
-(defn publish-cluster
- "publish server information to zookeeper as cluster for client"
- [cluster port ns-names funcs-map]
- (let [cluster-name (cluster :name)
- server-node (str (or (cluster :node)
- (auto-detect-ip (:zk cluster)))
- ":" port)
- funcs (keys funcs-map)]
- (create-node *zk-conn* (utils/zk-path cluster-name "servers")
- :persistent? true)
- (create-node *zk-conn*
- (utils/zk-path cluster-name "servers" server-node ))
- (doseq [nn ns-names]
- (create-node *zk-conn* (utils/zk-path cluster-name "namespaces" nn)
- :persistent? true)
- (create-node *zk-conn* (utils/zk-path cluster-name "namespaces"
- nn server-node)))
- (doseq [fname funcs]
- (create-node *zk-conn*
- (utils/zk-path cluster-name "functions" fname )
- :persistent? true
- :data (serialize
- :clj
- (select-keys
- (meta (funcs-map fname))
- [:name :doc :arglists])
- :bytes)))))
-
-(defmacro with-zk
- "publish server information to specifized zookeeper for client"
- [zk-conn & body]
- `(binding [*zk-conn* ~zk-conn]
- ~@body))
-
@@ -1,47 +0,0 @@
-(ns slacker.test.client.cluster
- (:use [clojure.test])
- (:use [slacker.client common cluster])
- (:use [slacker.serialization])
- (:use [slacker.utils :only [zk-path]])
- (:require [zookeeper :as zk]))
-
-(deftest test-clustered-client
- (let [cluster-name "test-cluster"
- test-server "127.0.0.1:2104"
- test-server2 "127.0.0.1:2105"
- zk-server "127.0.0.1:2181"
- zk-verify-conn (zk/connect zk-server)
- test-ns "test-ns"
- sc (clustered-slackerc cluster-name zk-server)]
- (zk/create-all zk-verify-conn (zk-path cluster-name "servers" test-server))
- (zk/create-all zk-verify-conn
- (zk-path cluster-name "namespaces" test-ns test-server))
- (doseq [f (map #(str test-ns "/" %) ["hello" "world"])]
- (zk/create-all zk-verify-conn (zk-path cluster-name "functions" f)
- :persistent? true)
- (zk/set-data zk-verify-conn
- (zk-path cluster-name "functions" f)
- (serialize :clj {:name f :doc "test function"} :bytes)
- (:version (zk/exists
- zk-verify-conn
- (zk-path cluster-name "functions" f)))))
-
- (is (= ["127.0.0.1:2104"] (refresh-associated-servers sc test-ns)))
-
- (is (= {:name (str test-ns "/world") :doc "test function"}
- (inspect sc :meta (str test-ns "/world"))))
-
- (zk/create zk-verify-conn (zk-path cluster-name "servers" test-server2))
- (zk/create zk-verify-conn
- (zk-path cluster-name "namespaces" test-ns test-server2))
-
- (Thread/sleep 1000) ;; wait for watchers
- (is (= [test-server test-server2] ((get-ns-mappings sc) test-ns)))
- (is (= 2 (count (get-connected-servers sc))))
- (is (= 2 (count (inspect sc :functions test-ns))))
-
- (close sc)
- (zk/delete-all zk-verify-conn (zk-path cluster-name))
- (zk/close zk-verify-conn)))
-
-
Oops, something went wrong.

0 comments on commit 73d596c

Please sign in to comment.