Skip to content


Refactor to avoid global variables and refs, pass around atom to opaq…
Browse files Browse the repository at this point in the history
…ue data
  • Loading branch information
lfranchi committed Oct 14, 2012
1 parent d5bbff4 commit 1dbe8fb
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/ono/db.clj
Expand Up @@ -8,7 +8,7 @@

;; TODO properly generate dbid UUID when initalizing a new
;; database
(def dbid "55bd135d-113f-481a-977e-999991111121")
(def dbid "55bd135d-113f-481a-977e-999991111124")

(def testtrack { :title "One",:artist "U2", :album "Joshua Tree" , :year 1992 , :track 3 , :duration 240, :bitrate 256, :mtime 123123123 , :size 0, :url "/test/mp3", :source 0 })
(def dbworker (agent nil))
Expand Down
170 changes: 109 additions & 61 deletions src/ono/net.clj
Expand Up @@ -20,7 +20,6 @@
(def zeroconf-port 55555)
(def dgram-size 16384)
(def udp-listener (agent nil))
(def udp-sock (ref nil))

;; TCP protocol
(def tcp-port 55555)
Expand Down Expand Up @@ -50,14 +49,62 @@
[x y]
(not (zero? (bit-and x y))))

;; Ono<->Tomahawk connections
;; Keyed by dbid
(def control-connections (ref {}))
(def dbsync-connections (ref {}))

;; Bookkeeping: dbid<--> {:host :port :sourceid}
(def known-peers (ref {}))
;; Main data structure. Peerid is the dbid of a peer's database:
;; {
;; :udp-socket udp-socket
;; :control-connections {peerid channel,
;; peerid channel}
;; :dbsync-connections {peerid channel,
;; peerid channel}
;; :known-peers {peerid {:host "host",
;; :port "port",
;; :sourceid id}}
;; }

;; Helper functions for manipulating opaque data structure
(defn set-udp-socket!
"Sets the udp socket for this network connection
to listen on the given port"
[data port]
(swap! data assoc :udp-socket (DatagramSocket. port))

(defn get-udp-socket
"Returns the UDP socket that is in use"
(@data :udp-socket))

(defn add-connection!
"Adds a connection of the desired type for the given peer, returning the new
opaque data structure"
[data connection-type peer channel]
(swap! data assoc-in [connection-type peer] channel)

(defn get-connection
"Returns a connection channel for the desired type and peer"
[data connection-type peer]
(get-in @data [connection-type peer]))

(defn add-peer-data!
"Adds a key/value pair to the peer data structure, returning the
modified opaque atom"
[data peerid key value]
(swap! data assoc-in [:known-peers peerid key] value)

(defn get-peer-data
"Get a piece of metadata associated with the desired peer"
[data peerid key]
(get-in @data [:known-peers peerid key]))

; ;; Ono<->Tomahawk connections
; ;; Keyed by dbid
; (def control-connections (ref {}))
; (def dbsync-connections (ref {}))

; ;; Bookkeeping: dbid<--> {:host :port :sourceid}
; (def known-peers (ref {}))

(def ping-agent (agent nil))

Expand Down Expand Up @@ -105,15 +152,10 @@
[ag excp]
(println "An agent threw an exception:" excp))

(defn is-dbsync-connection?
"Returns true if the given connection is a dbsyncconnection"
(dosync (has-value @dbsync-connections ch)))

(defn source-for-peer
"Returns the sourceid for a given peer"
(dosync (known-peers :sourceid)))
[data peer]
(get-peer-data data peer :sourceid))

(defn generate-json
"Generates a vector to be serialized from a map"
Expand All @@ -124,22 +166,26 @@

(defn get-handshake-msg
"Returns a JSON handshake msg from zeroconf peers"
[foreign-dbid, connection-map, key]
[data foreign-dbid connection-type key]
(let [main-msg {:conntype "accept-offer"
:key key
:port tcp-port}]
(if (dosync (connection-map foreign-dbid))
;; Determine if we need a handshake for a control connection
;; or a secondary connection
; (println "ARE WE SENDING FIRST MSG?" foreign-dbid connection-type (get-connection data connection-type foreign-dbid))
(if (get-connection data connection-type foreign-dbid)
(generate-json (assoc main-msg :controlid db/dbid)) ;; All subsequent (dbsync and stream connections) require controlid
(generate-json (assoc main-msg :nodeid db/dbid))))) ;; ControlConnection (first connection) requires nodeid

(defn ping-peers
"Sends a PING message every 10 minutes to any
active peer connection"
(doseq [ch (dosync (vals @control-connections))]
(lamina/enqueue ch [(flags :PING) ""]))
(. Thread (sleep 5000))
(send-off ping-agent ping-peers))
(fn [_]
(doseq [ch (vals (get @data :control-connections))]
(lamina/enqueue ch [(flags :PING) ""]))
(. Thread (sleep 5000))
(send-off ping-agent (ping-peers data))))

(defn handle-handshake-msg
"Handles the handshake after an initial SETUP message
Expand All @@ -150,40 +196,40 @@

;; Forward-declare add-peer as it is required by handle-json-msg
;; but add-peer requires get-tcp-handler (which require handle-json-message)
(declare add-peer-connection)
(declare add-peer-connection!)

(defn send-ops-from
"Send all ops for the desired source, through the given channel,
that are later than the given op."
[ch source lastop]
(println "Sending ops from" lastop "with source" source "to channel")
(if-let [ops (ono.db/get-ops-since source lastop)]
(let [flags #(bit-or (flags :DBOP) (if (= % (last ops)) 0 (flags :FRAGMENT)))]
(doseq [cmd ops]
(println "SENDING DBOP:" (cmd :guid) (cmd :command) (flags cmd) "body:" (cmd :json))
(lamina/enqueue ch [(bit-or (flags :JSON) (flags :DBOP))
(cmd :json)]))))
(let [myflags #(bit-or (flags :DBOP) (if (= % (last ops)) 0 (flags :FRAGMENT)))]
(doseq [cmd ops]
(println "SENDING DBOP:" (cmd :guid) (cmd :command) (bit-or (flags :JSON) (flags :DBOP) (myflags cmd)) "body:" (cmd :json))
(lamina/enqueue ch [(bit-or (flags :JSON) (flags :DBOP) (myflags cmd))
(cmd :json)]))))
(lamina/enqueue ch [(flags :DBOP) "ok"])) ;; else if there are no new ops, send OK message
; (doseq [cmd ops]
; (println "Sending CMD in fetchops:" (cmd :command)))))

(defn handle-json-msg
"Handles an incoming JSON message from a peer"
[ch peer flag body]
[data ch peer flag body]
(let [msg (json/parse-string body (fn [k] (keyword k)))
cmd (msg :method)
key (msg :key)]
; (print "Handing MSG" cmd key)
(condp = cmd
"dbsync-offer" :>> (fn [_] (let [host (dosync ((known-peers peer) :host))
port (dosync ((known-peers peer) :port))]
(add-peer-connection host port peer key dbsync-connections)))
"dbsync-offer" :>> (fn [_] (let [host (get-peer-data data peer :host)
port (get-peer-data data peer :port)]
(add-peer-connection! data host port peer key :dbsync-connections)))
"fetchops" :>> (fn [_] (send-ops-from ch nil (msg :lastop)))
;; DBop messages only have a "command" field
(when (msg :command)
;; Add the source field to each msg coming from the network
(ono.db/dispatch-db-cmd flag (assoc msg :source (source-for-peer peer))))))
(ono.db/dispatch-db-cmd flag (assoc msg :source (source-for-peer data peer))))))

(defn uncompress
"Uncompresses the tcp request that has been compressed with zlib plus 4-byte big-endian size header.
Expand All @@ -193,7 +239,7 @@

(defn get-tcp-handler
"Handles the TCP message for a specific peer"
[ch peer]
[data ch peer]
(fn handle-tcp-request[[flag body]]
; (info "Connection msg:" peer flag)
(if (test-flag flag (flags :COMPRESSED))
Expand All @@ -202,26 +248,27 @@
(condp test-flag flag
(flags :SETUP) :>> (fn [_] (handle-handshake-msg ch peer flag body))
(flags :PING) :>> (fn [_] (print)) ;; Ignore PING messages for now, TODO if no ping in 10s, disconnect
(flags :JSON) :>> (fn [_] (handle-json-msg ch peer flag body))))))
(flags :JSON) :>> (fn [_] (handle-json-msg data ch peer flag body))))))

(defn add-peer-connection
(defn add-peer-connection!
"Adds a new peer's connection (main ControlConnection or secondary connection)
and starts the TCP communication"
[ip, port, foreign-dbid, key, connection-map]
[data, ip, port, foreign-dbid, key, connection-type]
;; Attempt to connect to the remote tomahawk
; (println "Asked to connect to peer, control or subsequent connection:" ip port foreign-dbid key connection-map)
; (println "Asked to connect to peer, control or subsequent connection:" ip port foreign-dbid key connection-type)
(lamina/on-realized (tcp/tcp-client {:host ip :port (Integer/parseInt port) :frame frame})
(fn [ch]
;; Connection suceeded, here's our channel
(let [handshake-msg (get-handshake-msg foreign-dbid control-connections key)] ;; Get the handshake message before we add the
(let [handshake-msg (get-handshake-msg data foreign-dbid :control-connections key)]
;; Get the handshake message before we add the
;; peer to connection-map, because we need to check
;; if this is our first connection (and thus controlconnection)
;; by testing for existence of this peer in the connection map
(alter connection-map assoc foreign-dbid ch)
(if-not (known-peers foreign-dbid)
(alter known-peers assoc foreign-dbid {:host ip :port port})))
(lamina/receive-all ch (get-tcp-handler ch foreign-dbid))
(add-connection! data connection-type foreign-dbid ch)
(when-not (get-peer-data data foreign-dbid :host)
(add-peer-data! data foreign-dbid :host ip)
(add-peer-data! data foreign-dbid :port port))
(lamina/receive-all ch (get-tcp-handler data ch foreign-dbid))
(lamina/enqueue ch handshake-msg)))
; (if (is-dbsync-connection? ch) ;; HACK for development only, force fetch of all dbops
; (lamina/enqueue ch (generate-json {:method "fetchops" :lastop ""})))))
Expand All @@ -233,10 +280,10 @@
(defn listen
"Listens on our UDP port and watches for Tomahawk broadcasts. When found, calls addPeer
with the newly found dbid, host and port."
[_ data]
(let [buf (byte-array dgram-size)
packet (DatagramPacket. buf dgram-size)
strval (dosync (.receive @udp-sock packet) (String. (.getData packet) 0 (.getLength packet)))]
strval (do (.receive (get-udp-socket data) packet) (String. (.getData packet) 0 (.getLength packet)))]
; (println "Received packet:" strval)
;; We only support v2 broadcasts
(let [parts (clojure.string/split strval #":")]
Expand All @@ -246,26 +293,27 @@
foreign-dbid (nth parts 2)]
;; Initial setup in the control-connection uses a magic "whitelist" key
;; Make sure we are not already connected
(when-not (dosync (control-connections foreign-dbid)) ;; Keep track of each peer by a sourceid. That will be used in the db
(when-not (get-connection data :control-connections foreign-dbid) ;; Keep track of each peer by a sourceid. That will be used in the db
(let [sourceid (ono.db/get-or-insert-source! foreign-dbid ip)]
(dosync (alter known-peers assoc :sourceid sourceid))
(add-peer-connection ip port foreign-dbid "whitelist" control-connections)))))))
(send udp-listener listen))
(add-peer-data! data foreign-dbid :sourceid sourceid)
(add-peer-connection! data ip port foreign-dbid "whitelist" :control-connections)))))))
(send udp-listener listen data))

(defn start-udp
"Starts the UDP listener and periodically sends
UDP broacasts"
(dosync (ref-set udp-sock (DatagramSocket. zeroconf-port)))
(println "Beginning to listen on port " zeroconf-port)
(set-error-handler! udp-listener agent-error-handler)
(set-error-handler! ping-agent agent-error-handler)
(send-off udp-listener listen)
(send-off ping-agent ping-peers))
; (reset! udp-sock (DatagramSocket. zeroconf-port))
(let [opaque-data (set-udp-socket! (atom {}) zeroconf-port)]
; (println "Beginning to listen on port " zeroconf-port "data:" (type opaque-data))
(set-error-handler! udp-listener agent-error-handler)
(set-error-handler! ping-agent agent-error-handler)
(send-off udp-listener listen opaque-data)
(send-off ping-agent (ping-peers opaque-data))

(defn stop-udp
(defn stop-udp!
"Stops all UDP sockets"
(.close @udp-sock)))
(.close (get-udp-socket data)))

0 comments on commit 1dbe8fb

Please sign in to comment.