Skip to content

Commit

Permalink
encode/decode and dmap
Browse files Browse the repository at this point in the history
  • Loading branch information
niquola committed Jun 30, 2016
1 parent 90c3e4e commit dcbc562
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 75 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ out
node_modules
.#*
.cljs_node_repl
ankusha.log
212 changes: 137 additions & 75 deletions src/ankusha/consensus.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
(ns ankusha.consensus
(:require [clojure.tools.logging :as log]
[taoensso.nippy :as nippy])
(:import (io.atomix Atomix AtomixClient AtomixReplica)
(io.atomix.catalyst.transport Address NettyTransport)
(io.atomix.copycat.server.storage Storage StorageLevel)
Expand All @@ -8,7 +10,12 @@
(io.atomix.collections DistributedMap)
(io.atomix.variables DistributedValue)))

(defn storage [cfg]
(defonce state (atom {}))

(defn get-replica [name]
(get @state name))

(defn- storage [cfg]
(-> (Storage/builder)
(.withDirectory (str (:data-dir cfg) "/.atomix"))
(.build)))
Expand All @@ -17,140 +24,195 @@

(defn- addrs ^Collection [ns] (map addr ns))

(defn localhost []
(defn- localhost []
(-> (InetAddress/getLocalHost)
(.getHostName)))

(defn replica ^AtomixReplica
[port cfg]
(defn- replica ^AtomixReplica
[cfg]
(-> (AtomixReplica/builder
(addr {:host (localhost) :port port}))
(addr {:host (localhost) :port (:atomix-port cfg)}))
(.withTransport (NettyTransport.))
(.withStorage (storage cfg))
(.build)))

(defn bootstrap [port cfg]
(-> (replica port cfg)
.bootstrap
.get))

(defn methods [obj]
(defn- methods [obj]
(sort
(map #(.getName %)
(.getMethods (type obj)))))

(defn get-private-field [instance field-name]
(defn- get-private-field [instance field-name]
(. (doto (first (filter (fn [x] (.. x getName (equals field-name)))
(.. instance getClass getDeclaredFields)))
(.setAccessible true))
(get instance)))

(defn cluster [rep]
(defn- cluster [rep]
(-> (get-private-field rep "server")
.server
.cluster))

(defn members [rep]
(defn- members [rep]
(.members (cluster rep)))

(defn on-change [mem]
(defn- on-change [mem]
(.onStatusChange
mem
(reify java.util.function.Consumer
(accept [this status]
(println "Status of " mem " Changed to " status)))))
(log/info "ATOMIX: member status " (.address mem) " changed to " status))))
(.onTypeChange
mem
(reify java.util.function.Consumer
(accept [this tp]
(log/info "ATOMIX: member type " (.address mem) " changed to " tp)))))


(comment "rep1"
(def rep-1 (bootstrap 4444 {:name "node-1" :data-dir "/tmp/node-1"}))
(defn- subscribe [rep]
(doseq [mem (members rep)]
(on-change mem)))

(.shutdown rep-1)
(defn- clear-listeneres [repl]
(-> repl
cluster
(get-private-field "leaveListeners")
(get-private-field "listeners")
.clear))

(.type rep-1)
(defn- on-leave [rep]
(.onLeave (cluster rep)
(reify java.util.function.Consumer
(accept [this mem]
(log/info "ATOMIX: member leaved " (.address mem))))))

(members rep-1)
(defn- on-join [rep]
(.onJoin (cluster rep)
(reify java.util.function.Consumer
(accept [this mem]
(log/info "ATOMIX: member joined " (.address mem))
(on-change mem)))))

(def lock-1 (get-lock rep-1 "master"))
(.thenRun (.lock lock-1)
(reify Runnable
(run [this] (println "Locked 1"))))
)
(defn get-lock [repl name]
(.thenRun (.lock (get-lock repl name))
(reify Runnable
(run [this] (log/info "Locked 1")))))

(comment "rep2"
(def rep-2 (replica 4445 {:name "node-2" :data-dir "/tmp/node-2"}))

(.join rep-2
(addrs [{:host "localhost" :port 4444}]))
(defn- bootstrap [nm]
(when-let [repl (get-replica nm)]
(-> repl .bootstrap .get)))

(.type rep-2)
(defn start [cfg]
(let [repl (replica cfg)]
(subscribe repl)
(on-join repl)
(on-leave repl)
(swap! state assoc (:name cfg) repl)))

rep-2
(defn join [nm as]
(if-let [repl (get-replica nm)]
(.join repl (addrs as))
(log/info "No replica for " nm)))

(methods rep-2)
(defn shutdown [nm]
(if-let [repl (get-replica nm)]
(log/info "SHUTDOWN" (.shutdown repl))
(log/info "No replica for " nm)))

(defn leader [nm]
(when-let [repl (get-replica nm)]
(.leader (cluster repl))))

(map (fn [m] (println (str (.address m)) " " (str (.status m))))
(members rep-2))
(defn status [nm]
(when-let [repl (get-replica nm)]
(->> (members repl)
(map (fn [m] [(str (.address m)) (str (.status m))])))))

(cluster rep-2)
(defn encode [v]
(String. (nippy/freeze v)))

(defn subscribe [rep]
(doseq [mem (members rep)]
(println "MEMBER" mem)
(on-change mem)))
(defn decode [s]
(nippy/thaw (.getBytes s)))

(-> rep-2
cluster
(get-private-field "leaveListeners")
(get-private-field "listeners")
.clear)
(decode (encode {:a 1}))

(-> rep-2
cluster
(get-private-field "joinListeners")
(get-private-field "listeners")
.clear)

(.onLeave (cluster rep-2)
(reify java.util.function.Consumer
(accept [this mem]
(println "LEAVED " mem))))
(defn dmap [nm map-name]
(when-let [repl (get-replica nm)]
(.join (.getMap repl map-name))))

(.onJoin (cluster rep-2)
(reify java.util.function.Consumer
(accept [this mem]
(println "JOINED " mem)
(on-change mem))))
(defn dmap! [nm map-name]
(let [m (dmap nm map-name)]
(reduce (fn [acc k]
(assoc acc (keyword k) (decode (.join (.get m k)))))
{} (.join (.keySet m)))))

(methods (cluster rep-2))
(defn dmap-put [nm map-name key value]
(when-let [m (dmap nm map-name)]
(.join (.put m (name key) (encode value)))))

(.leader (cluster rep-2))
(defn dmap-get [nm map-name key]
(let [m (dmap nm map-name)]
(decode (.join (.get m (name key))))))


(subscribe rep-2)
(comment "rep1"
(start {:atomix-port 4444
:name "node-1"
:data-dir "/tmp/node-1"})

(bootstrap "node-1")

(start {:atomix-port 4445
:name "node-2"
:data-dir "/tmp/node-2"})

(bootstrap "node-2")


(shutdown "node-1")

(join "node-2" [{:host "localhost" :port 4444}])

(start {:atomix-port 4446
:name "node-3"
:data-dir "/tmp/node-3"})

(bootstrap "node-3")

(join "node-3" [{:host "localhost" :port 4444}
{:host "localhost" :port 4445}])



(members (get-replica "node-1"))

(status "node-1")
(status "node-2")
(status "node-3")

(shutdown "node-1")
(shutdown "node-2")
(shutdown "node-3")

(leader "node-1")

(.shutdown rep-2)
)

(comment "rep3"
(methods (get-replica "node-1"))

(def rep-3 (replica 4446 {:name "node-3" :data-dir "/tmp/node-3"}))

(.join rep-3
(addrs [{:host "localhost" :port 4444}
{:host "localhost" :port 4445}]))

rep-3
(members rep-3)
(dmap-put "node-2" "pg-clusters" "node-1" "newone")
(dmap-put "node-2" "pg-clusters" "node-2" "newone")

(map (fn [m] (println (str (.address m)) " " (str (.status m))))
(members rep-3))
(.join (.keySet (dmap "node-1" "pg-clusters")))

(.type rep-3)
(dmap-get "node-1" "pg-clusters" "node-1")
(dmap-get "node-3" "pg-clusters" "node-1")

(.leave rep-3)
(dmap! "node-3" "pg-clusters")

(.shutdown rep-3)

)

Expand Down
24 changes: 24 additions & 0 deletions src/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<configuration debug="false">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yy-MM-dd HH:mm:ss.SSS} %level [%thread] %contextName %logger{40} - %msg%n</pattern>
</encoder>
</appender>

<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>ankusha.log</file>
<append>true</append>
<encoder>
<pattern>%d{yy-MM-dd HH:mm:ss.SSS} %level [%thread] %contextName %logger{40} || %msg%n</pattern>
</encoder>
</appender>

<logger name="ankusha" level="DEBUG"/>
<logger name="com.zaxxer.hikari" level="INFO" />
<logger name="com.zaxxer.hikari" level="INFO" />

<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="FILE"/>
</root>
</configuration>

0 comments on commit dcbc562

Please sign in to comment.