-
Notifications
You must be signed in to change notification settings - Fork 31
/
search.clj
95 lines (85 loc) · 2.88 KB
/
search.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
(ns knossos.search
"Manages reporting and aborting searches"
(:require [knossos.memory :as memory]
[knossos.util :refer [with-thread-name]]
[clojure.tools.logging :refer [info warn]]))
(defprotocol Search
(abort! [search cause])
(report [search])
(results [search]
[search timeout timeout-val]))
(defn results-from-any
"Tries to deref any of a collection of Searches. If a period is provided,
that's roughly how long in ms it should take to cycle through polling the
collection."
([xs]
(results-from-any xs 100))
([xs period]
(results-from-any (vec xs) (long (/ period (count xs))) 0))
([xs period i]
(let [res (results (nth xs i) period ::next)]
(if (= res ::next)
(recur xs period (mod (inc i) (count xs)))
res))))
(defn competition
"Takes a map of names to searches and creates a composed search which runs
all of them in competition."
[searches]
(reify Search
(abort! [_ cause]
(doseq [s (vals searches)]
(abort! s cause)))
(report [_]
(->> searches
(map (fn [[name search]] [name (report search)]))
(into {})))
(results [this]
(let [r (results-from-any (vals searches))]
(abort! this :competition-ended)
r))
(results [this timeout timeout-val]
(let [p (promise)
_ (future
(deliver p (results-from-any (vals searches))))
r (deref p timeout timeout-val)]
(abort! this :competition-ended)
r))))
(defn reporter
"Spawns a reporting thread that periodically logs the state of the analysis,
if it's different from where we last left off."
[search interval]
(let [running? (atom true)]
(future
(with-thread-name "knossos reporter"
(loop [last-state nil]
(Thread/sleep interval)
(when @running?
(let [state (try (report search)
(catch Throwable t
(warn t "Error reporting")))]
(when (not= last-state state)
(info state))
(recur state))))))
(fn kill! []
(reset! running? false))))
(defn run
"Given a Search, spins up reporting and memory-pressure handling, executes
the search, and returns results.
Can also take an options map:
{:time-limit ms} Duration to wait before returning with result :unknown"
([search]
(run search {}))
([search opts]
(let [mem-watch (memory/on-low-mem!
(fn abort []
(warn "Out of memory; aborting search")
(abort! search :out-of-memory)))
reporter (reporter search 5000)
time-limit (:time-limit opts)]
(try
(if time-limit
(results search time-limit {:valid? :unknown :cause :timeout})
(results search))
(finally
(mem-watch)
(reporter))))))