-
Notifications
You must be signed in to change notification settings - Fork 7
/
sim.clj
315 lines (280 loc) · 12.7 KB
/
sim.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
(ns com.yetanalytics.datasim.sim
"Given input, compose a simulation model"
(:require [clojure.spec.alpha :as s]
[clojure.core.async :as a]
[java-time.api :as t]
[xapi-schema.spec :as xs]
[com.yetanalytics.datasim :as-alias datasim]
[com.yetanalytics.datasim.model :as model]
[com.yetanalytics.datasim.xapi.actor :as actor]
[com.yetanalytics.datasim.xapi.profile :as p]
[com.yetanalytics.datasim.xapi.statement :as statement]
[com.yetanalytics.datasim.util.random :as random]
[com.yetanalytics.datasim.util.sequence :as su]
[com.yetanalytics.datasim.util.async :as au]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Specs
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::statement-seq
(s/every ::xs/statement))
(s/def ::skeleton
(s/map-of ::actor/actor-ifi ::statement-seq))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Statement Sequence
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/fdef statement-seq
:args (s/cat :inputs (s/keys :req-un [::statement/type-iri-map
::statement/activity-map
::statement/pattern-map
::statement/statement-base-map
::statement/parsed-rules-map
::statement/actor
::statement/alignments]
:opt-un [::statement/object-overrides])
:rng ::random/rng
:alignments ::model/alignments
:start-time t/local-date-time?
:?end-time t/local-date-time?
:?from-time t/local-date-time?
:zone-region string?)
:ret ::statement-seq)
(def empty-seq-limit 10000)
(defn- init-statement-seq
"Init sequence of registration IDs"
[seed]
(let [rng (random/seed-rng seed)]
(repeatedly (partial random/rand-uuid rng))))
(defn- temp-statement-seq
"Generate sequence of maps of `:template`, `:timestamp`, `:time-since-last`,
and `:registration` values."
[inputs alignments seed max-restarts timestamp registration-seq]
(let [profile-rng
(random/seed-rng seed)
fill-statement-seq*
(fn fill-statement-seq*
[timestamp [registration & rest-regs] num-empties]
(lazy-seq
(let [profile-seed
(random/rand-unbound-int profile-rng)
template-maps
(p/walk-profile-patterns inputs
alignments
profile-seed
max-restarts
timestamp)
?next-timestamp
(:timestamp (meta template-maps))
template-maps*
(not-empty
(map #(assoc % :registration registration) template-maps))]
(cond
;; Usual case: valid seq + next timestamp
(and template-maps* ?next-timestamp)
(concat template-maps*
(fill-statement-seq* ?next-timestamp rest-regs 0))
;; Empty generated seq; must be limited by `empty-seq-limit`
(and (< num-empties empty-seq-limit) ?next-timestamp)
(fill-statement-seq* ?next-timestamp rest-regs (inc num-empties))
;; No next timestamp; terminate early
template-maps* template-maps*
;; "Base case"; terminate early with empty list
:else (list)))))]
(fill-statement-seq* timestamp registration-seq 0)))
(defn- drop-statement-seq
"Drop sequence entries after `?end-time` (or none if `?end-time` is `nil`)."
[?end-time simulation-seq]
(let [before-end?
(if (some? ?end-time)
(fn [{:keys [timestamp]}]
(t/before? timestamp ?end-time))
(constantly true))]
(take-while before-end? simulation-seq)))
(defn- seed-statement-seq
"Generate seeds for each sequence generation. (We do this so that if
`from-statement-seq` drops entries, we wouldn't have wasted time generating
dropped statements)."
[rng simulation-seq]
(map #(assoc % :seed (random/rand-unbound-int rng))
simulation-seq))
(defn- from-statement-seq
"Drop seeded simulation entries before `?from-time` (or none if
`?from-time` is `nil`)."
[?from-time simulation-seq]
(let [before-from?
(if (some? ?from-time)
(fn [{:keys [timestamp]}]
;; Also excludes timestamps that equal from-time
(not (t/after? timestamp ?from-time)))
(constantly false))]
(drop-while before-from? simulation-seq)))
(defn- gens-statement-seq
"Generate the actual statements from the entries in `simulation-seq`."
[input simulation-seq]
(map #(statement/generate-statement (merge input %))
simulation-seq))
(defn statement-seq
"Generate a lazy sequence of xAPI Statements occuring as a Poisson
process. The sequence will either end at `?end-time` or, if `nil`,
be infinite."
[input seed alignments start-time ?end-time ?from-time zone-region max-restarts]
(let [sim-rng (random/seed-rng seed)
reg-seed (random/rand-unbound-int sim-rng)
temp-seed (random/rand-unbound-int sim-rng)
stmt-rng (random/seed-rng (random/rand-unbound-int sim-rng))]
(->> (init-statement-seq reg-seed)
(temp-statement-seq input alignments temp-seed max-restarts start-time)
(drop-statement-seq ?end-time)
(seed-statement-seq stmt-rng)
(from-statement-seq ?from-time)
(gens-statement-seq (assoc input :timezone zone-region)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Skeleton
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Data structure helpers
(s/fdef build-skeleton
:args (s/cat :input ::datasim/input)
:ret ::skeleton)
(defn build-skeleton
"Given simulation input, return a skeleton with statement sequences per
actor from `start` of sim. Should be run once (in a single thread).
Spooky."
[{:keys [profiles personae-array models parameters]}]
(let [;; Input parameters
{:keys [start end from timezone seed maxRestarts]} parameters
;; RNG for generating the rest of the seeds
sim-rng (random/seed-rng seed)
;; Set timezone region and timestamps
zone-region (t/zone-id timezone)
start-time (-> start t/instant (t/local-date-time zone-region))
?end-time (some-> end t/instant (t/local-date-time zone-region))
?from-time (some-> from t/instant (t/local-date-time zone-region))
;; Derive actor, activity, and profile object colls and maps
actor-seq (apply concat (map :member personae-array))
actor-group-map (actor/groups->agent-group-ifi-map personae-array)
;; Derive profiles map
activity-seed (random/rand-unbound-int sim-rng)
profiles-map (p/profiles->profile-map profiles parameters activity-seed)
;; Derive model alignments + object overrides
models-map (model/models->map models)]
;; Now, for each actor we initialize what is needed for the sim
(->> actor-seq
(sort-by actor/actor-ifi)
(reduce
(fn [m actor]
(let [;; Actor basics + alignment
actor-id (actor/actor-ifi actor)
actor-role (:role actor)
actor-group-id (get actor-group-map actor-id)
actor-model-map (model/get-actor-model models-map
actor-id
actor-group-id
actor-role)
actor-alignment (dissoc actor-model-map :personae)
;; Additional seed for further gen
actor-seed (random/rand-unbound-int sim-rng)
;; Dissoc `:role` since it is not an xAPI property
actor-xapi (dissoc actor :role)
actor-xapi-map {:actor actor-xapi}
;; Statement seq
actor-input (merge profiles-map
actor-model-map
actor-xapi-map)
actor-stmt-seq (statement-seq actor-input
actor-seed
actor-alignment
start-time
?end-time
?from-time
zone-region
maxRestarts)]
(assoc m actor-id actor-stmt-seq)))
{}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Statement Sequence Simulation (Sync)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::select-agents
(s/every ::actor/actor-ifi))
(s/fdef sim-seq
:args (s/cat :input :com.yetanalytics.datasim/input
:options (s/keys*
:opt-un [::select-agents]))
:ret ::statement-seq)
(defn sim-seq
"Given input, build a skeleton and produce a seq of statements."
[{{?max-statements :max} :parameters :as input}
& {:keys [select-agents]}]
(let [skeleton (cond-> (build-skeleton input)
select-agents
(select-keys select-agents))]
(cond->> (->> skeleton vals (su/seq-sort (comp :time-ms meta)))
?max-statements (take ?max-statements))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Statement Sequence Simulation (Async)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; simulate multiple channels
(s/def ::pad-chan-max
pos-int?)
(s/fdef sim-chans
:args (s/cat :input :com.yetanalytics.datasim/input
:options (s/keys*
:opt-un [::select-agents
::pad-chan-max]))
:ret (s/map-of ::actor/actor-ifi au/chan?))
(defn sim-chans
"Given input, build a skeleton and produce a map from agent IFIs to
agent statement simulation channels.
Uses the `core.async` thread pool for concurrency.
Note that the `:max` parameter is used as a quotient and may
have unexpected results if it is zero. The `:end` parameter is preferable.
The `:max` parameter is divided by the number of agents in the simulation.
Thus `pad-chan-max` is provided as a kwarg so we can add that amount to
the length of each channel's statement seq - either a little bit to get over
`:max`, or a lot to account for an imbalance in activity at the tail end
of the simulation."
[{{?max-statements :max} :parameters :as input}
& {:keys [select-agents
pad-chan-max]
:or {pad-chan-max 1}}]
(let [skeleton (cond-> (build-skeleton input)
select-agents
(select-keys select-agents))
?take-n (when ?max-statements ; TODO: Handle division by zero error
(->> (count skeleton)
(quot ?max-statements)
(+ pad-chan-max)))
seq->chan (fn [agent-seq]
(cond->> (a/to-chan! agent-seq)
?take-n
(a/take ?take-n)))]
(update-vals skeleton seq->chan)))
;; simulate single channel
(defn- compare-time-ms-meta
[stmt-1 stmt-2]
(compare
(-> stmt-1 meta :time-ms)
(-> stmt-2 meta :time-ms)))
(s/def ::sort boolean?)
(s/def ::buffer-size pos-int?)
(s/fdef sim-chan
:args (s/cat :input :com.yetanalytics.datasim/input
:options (s/keys*
:opt-un [::select-agents
::pad-chan-max
::sort
::buffer-size]))
:ret au/chan?)
(defn sim-chan
"Merged output of `sim-chans` for parallel generation."
[input
& {:keys [sort buffer-size]
:or {sort true
buffer-size 100}
:as kwargs}]
(let [chan-map (sim-chans input kwargs)
chans (vals chan-map)]
(if sort
(->> chans
(au/sequence-messages (a/chan buffer-size)
compare-time-ms-meta))
(-> chans
(a/merge buffer-size)))))