-
Notifications
You must be signed in to change notification settings - Fork 31
/
history.clj
364 lines (326 loc) · 12.9 KB
/
history.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
(ns knossos.history
"Operations on histories"
(:refer-clojure :exclude [indexed?])
(:require [clojure.core.reducers :as r]
[clojure.pprint :refer [pprint]]
[knossos.op :as op])
(:import [clojure.core.protocols CollReduce]
[clojure.lang IMapEntry
IPersistentMap
IPersistentSet
IPersistentVector
IPersistentCollection
ITransientMap
ITransientSet
ITransientVector
ITransientCollection]))
(defn parse-ops
"We're going to construct Op defrecords throughout our analysis, which means
that a map without a value, e.g. {:type :invoke, :f :lock, :process 2} is not
equal to the corresponding Op, which will have an explicit :value nil. We
convert all maps in the history to Ops before doing anything else."
[history]
(mapv op/map->Op history))
(defn unmatched-invokes
"Which invoke ops in a history have no corresponding :ok, :fail, or :info?"
[history]
(loop [history (seq history)
calls (transient {})]
(if-not history
(vals (persistent! calls))
(let [op (first history)
process (:process op)]
(recur (next history)
(cond (op/invoke? op) (assoc! calls process op)
(op/ok? op) (dissoc! calls process)
(op/fail? op) (dissoc! calls process)
(op/info? op) (dissoc! calls process)
true (throw (IllegalArgumentException.
(str "Unknown op type " (:type op) " from " (pr-str op))))))))))
(defn crashed-invokes
"Which invoke ops in a history have no corresponding :ok or :fail?"
[history]
(loop [history (seq history)
calls (transient {})]
(if-not history
(vals (persistent! calls))
(let [op (first history)
process (:process op)]
(recur (next history)
(cond (op/invoke? op) (assoc! calls process op)
(op/ok? op) (dissoc! calls process)
(op/fail? op) (dissoc! calls process)
(op/info? op) calls))))))
(defn processes
"What processes are in a history?"
[history]
(->> history
(r/map :process)
(into #{})))
(defn sort-processes
"Sort a collection of processes. Puts numbers second, keywords first."
[coll]
(sort (fn comparator [a b]
(if (number? a)
(if (number? b)
(compare a b)
1)
(if (number? b)
-1
(compare a b))))
coll))
(defn pairs
"Pairs up ops from each process in a history. Yields a lazy sequence of [info]
or [invoke, ok|fail] pairs."
([history]
(pairs {} history))
([invocations [op & ops]]
(lazy-seq
(when op
(case (:type op)
:info (cons [op] (pairs invocations ops))
:invoke (do (assert (not (contains? invocations (:process op))))
(pairs (assoc invocations (:process op) op) ops))
(:ok :fail) (do (assert (contains? invocations (:process op)))
(cons [(get invocations (:process op)) op]
(pairs (dissoc invocations (:process op))
ops))))))))
(defn pairs+
"Pairs up ops from each process in a history. Yields a lazy sequence of [info]
or [invoke, ok|fail|info] pairs. The difference from `pairs` is that this
variant maps invocations to infos."
([history]
(pairs+ {} history))
([invocations [op & ops]]
(lazy-seq
(when op
(let [p (:process op)]
(case (:type op)
:invoke (do (assert (not (contains? invocations p)))
(pairs+ (assoc invocations p op) ops))
:info (if (contains? invocations p)
(cons [(get invocations p) op]
(pairs+ (dissoc invocations p) ops))
(cons [op] (pairs+ invocations ops)))
(:ok :fail) (do (assert (contains? invocations p))
(cons [(get invocations p) op]
(pairs+ (dissoc invocations p)
ops)))))))))
(defn pair-index
"Given a history, constructs a map from operations to their
counterparts--invocations to their completions or completions to their
invocations. Infos map to nil."
([history]
(pair-index pairs history))
([pair-fn history]
(->> history
pair-fn
(reduce (fn [index [invoke complete]]
; We need these to be unique! Otherwise the index we build
; won't be bijective
(assert (:index invoke))
(assoc! (if complete
(assoc! index complete invoke)
index)
invoke complete))
(transient {}))
persistent!)))
(defn pair-index+
"Like pair-index, but matches invokes to infos as well."
[history]
(pair-index pairs+ history))
(defn invocation
"Returns the invocation for an op, using a pair index. If the op is itself a
completion, returns the op. If the op is an invocation, looks up its
completion in the pair index."
[pair-index op]
(if (op/invoke? op)
op
(pair-index op)))
(defn completion
"Returns the completion for an op, using a pair index. If the op is itself a
completion, returns the op. If the op is an invocation, looks up its
completion in the pair index."
[pair-index op]
(if (op/invoke? op)
(pair-index op)
op))
(defn complete-fold-op
"Folds an operation into a completed history, keeping track of outstanding
invocations.
History is our complete history of operations: a transient vector. Index is a
transient map of processes to the index of their most recent invocation. Note
that we assume processes are singlethreaded; e.g. they do not perform
multiple invocations without receiving responses."
[[history index] op]
(condp = (:type op)
; An invocation; remember where it is
:invoke
(do
; Enforce the singlethreaded constraint.
(when-let [prior (get index (:process op))]
(throw (RuntimeException.
(str "Process " (:process op) " already running "
(pr-str (get history prior))
", yet attempted to invoke "
(pr-str op) " concurrently"))))
[(conj! history op)
(assoc! index (:process op) (dec (count history)))])
; A completion; fill in the completed value.
:ok
(let [i (get index (:process op))
_ (assert i (str "Process completed an operation without a "
"prior invocation: "
(pr-str op)))
invocation (nth history i)
value (:value op)
invocation' (assoc invocation :value value)]
[(-> history
(assoc! i invocation')
(conj! op))
(dissoc! index (:process op))])
; A failure; fill in either value.
:fail
(let [i (get index (:process op))
_ (assert i (str "Process failed an operation without a "
"prior invocation: "
(pr-str op)))
invocation (nth history i)
_ (assert (= (:value op) (:value invocation))
(str "invocation value "
(pr-str (:value invocation))
" and failure value "
(pr-str (:value op))
" don't match"))
invocation' (assoc invocation :value (:value op), :fails? true)]
[(-> history
(assoc! i invocation')
(conj! op))
(dissoc! index (:process op))])
; No change for info messages
:info
[(conj! history op) index]))
(defn complete
"When a request is initiated, we may not know what the result will be--but
find out when it completes. In the history, this might look like
[{:type :invoke
:f :read
:value nil} ; We don't know what we're going to read.
{:type :ok
:f :read
:value 2}] ; We received 2.
This function fills in missing values for invocations, where those requests
complete. It constructs a new history in which we 'already knew' what the
results of successful operations would have been.
For failed operations, complete fills in the value for both invocation
and completion; depending on whichever has a value available. We *also* add a
:fails? key to invocations which will fail, allowing checkers to skip them."
[history]
(->> history
(reduce complete-fold-op
[(transient []) (transient {})])
first
persistent!))
(defn index
"Attaches an :index key to each element of the history, identifying its
position in the history vector."
[history]
(->> history
(mapv (fn [i op] (assoc op :index i)) (range))
vec))
(defn kindex
"Takes a history and returns a new history with internal knossos indices and
a map of knossos indices to external indices. Throws IllegalArgumentException
if given history does not have unique indices"
[history]
(let [eindices (map :index history)]
(if (or (every? #(= -1 %) eindices)
(every? nil? eindices)
(apply distinct? eindices))
(let [history' (index history)
m (->> history'
(map :index)
(#(map vector % eindices))
(into {}))]
[history' m])
(throw (IllegalArgumentException. (str "History starting with "
(pr-str (first history))
" contains ops with non-unique indices"))))))
(defn indexed?
"Is the given history indexed?"
[history]
(or (empty? history)
(integer? (:index (first history)))))
(defn ensure-indexed
"Makes sure a history is indexed when we start our analysis"
[history]
(if-not (indexed? history)
(index history)
history))
(defn convert-op-index
"Maps the index of the op to its corresponding value in the given mapping"
[mapping op]
(when-let [new-i (:index op)]
(assoc op :index (get mapping new-i))))
(defn convert-op-indices
"Maps `convert-op-index` over a collection of ops"
[mapping ops]
(map #(convert-op-index mapping %) ops))
(defn render-op
"Prepares an op to be returned by converting it to a plain old map and reassigning its
external index"
[indices op]
(let [m (convert-op-index indices (op/Op->map op))]
(if (:index m)
m
(dissoc m :index))))
(defn with-synthetic-infos
"Histories may arrive with :invoke operations that never complete. We append
synthetic :info ops to the end of the history for any in-process calls."
[history]
(assert (vector? history))
(let [max-index (reduce max 0 (map :index history))]
(->> (unmatched-invokes history)
(map-indexed
(fn [i invoke] (assoc invoke
:type :info
:index (+ (inc i) max-index))))
(into history))))
(defn without-failures
"Takes a completed history, and returns a copy of the given history without
any failed operations--either invocations or completions."
[history]
(->> history
(remove (fn [op]
(or (op/fail? op)
(:fails? op))))
vec))
(defn without-infos
"Takes a completed history, and returns a copy of that history without any
:info operations."
[history]
(->> history
(remove op/info?)
vec))
(defn preprocess
"We're going to make several assumptions in Knossos about the structure of
histories. This function massages histories to make those assumptions legal.
1. First, we make sure we have an :index field on ops. This is important so we
can uniquely identify operations.
2. We convert all operation maps to our internal Op representation.
3. We fill in invocation :values with the results of completions, because we
may not know exactly what an operation does at invocation time.
5. We strip out failed ops; they can't influence state.
6. We append synthetic :info operations for any incomplete invocations.
7. We re-index the history so that indices go 0, 1, 2, ..., and compute a map
of these knossos :index'es to original :index'es."
[history]
(let [history (-> history
ensure-indexed
parse-ops
complete
without-failures
with-synthetic-infos)
[history kindex-eindex] (kindex history)]
{:history history
:kindex-eindex kindex-eindex}))