forked from onyx-platform/onyx
-
Notifications
You must be signed in to change notification settings - Fork 0
/
liveness.clj
26 lines (22 loc) · 1.06 KB
/
liveness.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(ns onyx.peer.liveness
(:require [onyx.protocol.task-state :as t :refer [evict-peer! get-messenger]]
[onyx.messaging.protocols.status-publisher :as status-pub]
[onyx.messaging.protocols.subscriber :as sub]
[onyx.messaging.protocols.publisher :as pub]))
(defn upstream-timed-out-peers [subscriber liveness-timeout-ns]
(let [curr-time (System/nanoTime)]
(->> subscriber
(sub/status-pubs)
(sequence (comp (filter (fn [[peer-id spub]]
(< (+ (status-pub/get-heartbeat spub)
liveness-timeout-ns)
curr-time)))
(map key))))))
(defn downstream-timed-out-peers [publishers timeout-ms]
(let [curr-time (System/nanoTime)]
(sequence (comp (mapcat pub/statuses)
(filter (fn [[peer-id status]]
(< (+ (:heartbeat status) timeout-ms)
curr-time)))
(map key))
publishers)))