Skip to content

Commit

Permalink
Working on brain data model and various things.
Browse files Browse the repository at this point in the history
  • Loading branch information
rosejn committed Jun 8, 2011
1 parent 7ea7ce2 commit 1819023
Show file tree
Hide file tree
Showing 9 changed files with 280 additions and 92 deletions.
1 change: 1 addition & 0 deletions TODO
@@ -1,4 +1,5 @@
* extend Peer to support the graph API directly without with-peer-graph

* start using transactions

* add a function to peer that returns its url
Expand Down
81 changes: 57 additions & 24 deletions src/benchmark/d2ht.clj
Expand Up @@ -80,23 +80,32 @@
(let [dist (ring-abs-distance ADDR-BITS (peer-id p) (:id props))
ts (current-time)
props (assoc props :timestamp ts :distance dist)]
(construct p
(-> (nodes [kps (q/path [:net :kps])
new-peer props])
(edges [kps new-peer :peer])))))
(if (get-node p (:id props))
(do
(log/to :d2 "exists... adding edge")
(with-peer-graph
p
(apply assoc-node (:id props) (flatten (seq (dissoc props :id))))
(make-edge (:id (first (q/path [:net :kps]))) (:id props) :peer)))
(do
(log/to :d2 "construct...")
(construct p
(-> (nodes [kps (q/path [:net :kps])
new-peer props])
(edges [kps new-peer :peer])))))))

(defn add-kps-peers
[p peers]
(log/to :d2 "[add-kps-peers] " (count peers))
(doseq [new-peer peers]
(add-kps-peer p new-peer)))

(defn oldest-kps-peer
"Select the least-fresh peer."
[p]
(first (query p (with-peer-info
(-> (kps-peers)
(q/order-by 'p :timestamp :asc)
(q/limit 1))))))
[]
(-> (kps-peers)
(q/order-by 'p :timestamp :asc)
(q/limit 1)))

(defn- take-sample
[n rt]
Expand Down Expand Up @@ -124,7 +133,7 @@

(defn remove-peers
[p ptype peers]
(let [type-id (:id (query p (q/path [:net ptype])))]
(let [type-id (:id (first (query p (q/path [:net ptype]))))]
(with-peer-graph p
(doseq [p-id (map :id peers)]
(remove-edge type-id p-id)))))
Expand All @@ -133,36 +142,56 @@
[p]
(let [peers (query p (with-peer-info (kps-peers)))
keepers (harmonic-close-peers peers (- KPS-PEERS KPS-ROUND-DROP))
to-exchange (filter #((set (map :id keepers)) (:id %)) peers)
to-exchange (conj to-exchange {:id (peer-id p) :proxy (:url p)})]
(remove-peers p :kps to-exchange)
(log/to :d2 "exchanging: " (vec (map (comp trim-id :id) to-exchange)))
keeper-set (set (map :id keepers))
to-exchange (filter #(not (keeper-set (:id %))) peers)
to-exchange (map #(select-keys % [:id :proxy]) to-exchange)]
(log/format :d2 "[%s] exchanging: %s"
(trim-id (peer-id p))
(vec (map (comp trim-id :id) to-exchange)))
to-exchange))

(defn update-timestamp
[p id]
(with-peer-graph p (assoc-node id :timestamp (current-time))))

(defn kps-purge
[p exchanged]
(let [num-peers (first (query p (q/count* (kps-peers))))]
(when (> num-peers KPS-PEERS)
(remove-peers p :kps (take (- num-peers KPS-PEERS) exchanged)))))

(defn conj-peer
[elems p]
(conj elems {:id (peer-id p) :proxy (:url p)}))

(defn kps-gossip
"Perform a single round of kps gossiping."
[p]
(when-let [url (:proxy (oldest-kps-peer p))]
(when-let [{url :proxy id :id} (first (query p (with-peer-info (oldest-kps-peer))))]
(log/to :d2 "gossip url: " url)
(let [partner (peer-connection p url)
to-exchange (kps-exchange-peers p)]
(let [res-chan (request partner 'kps-exchange [to-exchange])]
(lamina/receive res-chan
to-exchange (kps-exchange-peers p)
sending (conj-peer to-exchange p)]
(log/to :d2 "sending: " sending)
(let [res-chan (request partner 'kps-exchange [sending])]
(lamina/on-success res-chan
(fn [res]
(log/to :d2 "response: " res)
(add-kps-peers p (:result res)))))
(update-timestamp p (:id partner)))))
(add-kps-peers p res)
(kps-purge p to-exchange)))
(lamina/on-error res-chan
(fn [e]
(log/to :d2 "ERROR: " e))))
(update-timestamp p id))))

(defmethod rpc-handler 'kps-exchange
[p req]
(log/to :d2 "inside handler..." req)
(let [new-peers (first (:params req))
to-exchange (kps-exchange-peers p)]
(add-kps-peers p new-peers)
to-exchange))
(kps-purge p to-exchange)
(conj-peer to-exchange p)))

(defn kps-on
[p period]
Expand Down Expand Up @@ -233,7 +262,7 @@
q-con (peer-connection p (:proxy q-peer))
to-exchange (nps-exchange-peers p q-peer)
res-chan (request q-con 'nps-exchange [to-exchange])]
(lamina/receive res-chan
(lamina/on-success res-chan
#(add-nps-peers p %))
(update-timestamp p (:id q-peer))))

Expand Down Expand Up @@ -270,7 +299,6 @@
[a b]
(add-kps-peer a {:id (peer-id b) :proxy (plasma-url "localhost" (:port b))}))


(defn setup
[]
(log/file :d2 "d2.log")
Expand All @@ -281,13 +309,18 @@
(def p2 (nth peers 1))
(doseq [p (drop 1 peers)] (addp p p1)))

(defn reset
[]
(doseq [p peers] (close p))
(setup))

(defn gossip
[]
(doseq [p peers] (kps-gossip p)))

(defn pc
[]
(println "peer-counts: " (vec (map #(count (query % (all-peers))) peers))))
(println "peer-counts: " (vec (map #(first (query % (q/count* (all-peers)))) peers))))

(defn ids
[]
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/graph.clj
Expand Up @@ -127,7 +127,7 @@ For example:\n\t(with-graph G (find-node id))\n")))
"Create an edge from src to tgt with the associated properties. At minimum
there must be a :label property.
(make-edge alice bob :label :friend)
(make-edge alice bob {:label :friend})
"
[src tgt label-or-props]
(let [src (if (= ROOT-ID src)
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/net/address.clj
Expand Up @@ -46,7 +46,7 @@

(defn public-url
[port]
(plasma-url (:public (addr-info)) port))
(plasma-url (or (:public (addr-info)) "127.0.0.1") port))

(defn set-port-forward
([port service]
Expand Down
8 changes: 4 additions & 4 deletions src/plasma/net/peer.clj
Expand Up @@ -205,7 +205,6 @@
(defmethod rpc-handler 'query
[peer req]
(apply query peer (:params req)))
; (query peer (first (:params req))))

(defn- request-handler
[peer [ch req]]
Expand All @@ -218,16 +217,17 @@
res)
rpc-res (rpc-response req res)]
(lamina/enqueue ch rpc-res))
(catch java.lang.IllegalArgumentException e
#_(catch java.lang.IllegalArgumentException e
(lamina/enqueue
ch
(rpc-error req (format "No handler found for method: %s" (:method req)) e)))
(rpc-error req (format "No handler found for method: %s\n\n%s" (:method req)
(with-out-str (.printStackTrace e))) e)))
(catch Exception e
(log/to :peer "error handling request!\n------------------\n"
(with-out-str (print-cause-trace e)))
(.printStackTrace e)
(lamina/enqueue ch
(rpc-error req "Exception occured while handling request." e))))))
(rpc-error req (str "Exception occured while handling request:\n" (with-out-str (.printStackTrace e))) e))))))

(defmulti stream-handler
"A general purpose stream multimethod."
Expand Down
4 changes: 2 additions & 2 deletions src/plasma/query/core.clj
Expand Up @@ -8,7 +8,7 @@
[logjam.core :as log]
[lamina.core :as lamina]))

(def MAX-QUERY-TIME (* 3 1000)) ; in ms
(def MAX-QUERY-TIME (* 2 1000)) ; in ms
(def PROMISE-WAIT-TIME 100)

(log/channel :query :debug)
Expand Down Expand Up @@ -201,7 +201,7 @@
"Create a path query, which is composed of a set of edge
traversal specifications.
; Return the set of paths traversing from the root across two
; Return the set of paths traversing from the root across two
; edges, :foo and :bar.
(path [:foo :bar])
Expand Down
2 changes: 1 addition & 1 deletion src/plasma/query/exec.clj
Expand Up @@ -56,7 +56,7 @@ input-params: %s"
[param-val])
param-op (get-in tree [:ops param-id])]
(apply lamina/enqueue-and-close (get param-op :in) param-val)))
(schedule (:timeout tree)
#_(schedule (:timeout tree)
(fn []
(doseq [op (:ops tree)]
(try
Expand Down
8 changes: 3 additions & 5 deletions src/plasma/query/operator.clj
Expand Up @@ -163,8 +163,9 @@
(on-drained chan #(do (log/to :flow "remote-channel drained")
(all-closed)))))

(siphon left-out out)
(on-drained left-out all-closed)
(on-closed out #(doseq [sub @sub-chans] (close sub)))
(siphon left-out out)
(flow-log "receive" id out)

{:type :receive
Expand All @@ -184,10 +185,7 @@
(siphon left-out out)
(siphon out dest)
(flow-log "send" id out)
(on-drained left-out
#(do
(log/to :close "[send] closed")
(close dest))))
(on-drained left-out #(close dest)))
{:type :send
:id id
:dest dest})
Expand Down

0 comments on commit 1819023

Please sign in to comment.