forked from xapi-project/xen-api
-
Notifications
You must be signed in to change notification settings - Fork 0
/
updates.ml
272 lines (230 loc) · 5.84 KB
/
updates.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
(******************************************************************************)
(* Object update tracking *)
open Fun
open Pervasiveext
module type INTERFACE = sig
val service_name : string
module Dynamic : sig
type id
val rpc_of_id : id -> Rpc.t
val id_of_rpc : Rpc.t -> id
end
end
module Updates = functor(Interface : INTERFACE) -> struct
module D = Debug.Debugger(struct let name = Interface.service_name end)
open D
module Int64Map = Map.Make(struct type t = int64 let compare = compare end)
module Scheduler = struct
open Threadext
type item = {
id: int;
name: string;
fn: unit -> unit
}
let schedule = ref Int64Map.empty
let delay = Delay.make ()
let next_id = ref 0
let m = Mutex.create ()
type time =
| Absolute of int64
| Delta of int with rpc
type t = int64 * int with rpc
let now () = Unix.gettimeofday () |> ceil |> Int64.of_float
module Dump = struct
type u = {
time: int64;
thing: string;
} with rpc
type t = u list with rpc
let make () =
let now = now () in
Mutex.execute m
(fun () ->
Int64Map.fold (fun time xs acc -> List.map (fun i -> { time = Int64.sub time now; thing = i.name }) xs @ acc) !schedule []
)
end
let one_shot time (name: string) f =
let time = match time with
| Absolute x -> x
| Delta x -> Int64.(add (of_int x) (now ())) in
let id = Mutex.execute m
(fun () ->
let existing =
if Int64Map.mem time !schedule
then Int64Map.find time !schedule
else [] in
let id = !next_id in
incr next_id;
let item = {
id = id;
name = name;
fn = f
} in
schedule := Int64Map.add time (item :: existing) !schedule;
Delay.signal delay;
id
) in
(time, id)
let cancel (time, id) =
Mutex.execute m
(fun () ->
let existing =
if Int64Map.mem time !schedule
then Int64Map.find time !schedule
else [] in
schedule := Int64Map.add time (List.filter (fun i -> i.id <> id) existing) !schedule
)
let process_expired () =
let t = now () in
let expired =
Mutex.execute m
(fun () ->
let expired, unexpired = Int64Map.partition (fun t' _ -> t' <= t) !schedule in
schedule := unexpired;
Int64Map.fold (fun _ stuff acc -> acc @ stuff) expired [] |> List.rev) in
(* This might take a while *)
List.iter
(fun i ->
try
i.fn ()
with e ->
debug "Scheduler ignoring exception: %s" (Printexc.to_string e)
) expired;
expired <> [] (* true if work was done *)
let rec main_loop () =
while process_expired () do () done;
let sleep_until =
Mutex.execute m
(fun () ->
try
Int64Map.min_binding !schedule |> fst
with Not_found ->
Int64.add 3600L (now ())
) in
let seconds = Int64.sub sleep_until (now ()) in
debug "Scheduler sleep until %Ld (another %Ld seconds)" sleep_until seconds;
let (_: bool) = Delay.wait delay (Int64.to_float seconds) in
main_loop ()
let start () =
let (_: Thread.t) = Thread.create main_loop () in
()
end
module UpdateRecorder = functor(Ord: Map.OrderedType) -> struct
(* Map of thing -> last update counter *)
module M = Map.Make(struct
type t = Ord.t
let compare = compare
end)
type id = int
type t = {
map: int M.t;
next: id
}
let initial = 0
let empty = {
map = M.empty;
next = initial + 1;
}
let add x t = {
map = M.add x t.next t.map;
next = t.next + 1
}, t.next + 1
let remove x t = {
map = M.remove x t.map;
next = t.next + 1
}, t.next + 1
let get from t =
(* [from] is the id of the most recent event already seen *)
let before, after = M.partition (fun _ time -> time <= from) t.map in
let xs, last = M.fold (fun key v (acc, m) -> key :: acc, max m v) after ([], from) in
(* NB 'xs' must be in order so 'Barrier' requests don't permute *)
List.rev xs, last
let last_id t = t.next - 1
let fold f t init = M.fold f t.map init
end
open Threadext
module U = UpdateRecorder(struct type t = Interface.Dynamic.id let compare = compare end)
type id = U.id
type t = {
mutable u: U.t;
c: Condition.t;
m: Mutex.t;
}
let empty () = {
u = U.empty;
c = Condition.create ();
m = Mutex.create ();
}
type rpcable_t = {
u' : (Interface.Dynamic.id * int) list;
next : int;
} with rpc
let rpc_of_t t =
let u' = U.fold (fun x y acc -> (x,y)::acc) t.u [] in
rpc_of_rpcable_t { u'; next = t.u.U.next }
let t_of_rpc rpc =
let u' = rpcable_t_of_rpc rpc in
let map = U.M.empty in
let map = List.fold_left (fun map (x,y) -> U.M.add x y map) map u'.u' in
{ u = { U.map = map; next=u'.next };
c = Condition.create ();
m = Mutex.create ();
}
let get dbg from timeout t =
let from = Opt.default U.initial from in
let cancel = ref false in
let id = Opt.map (fun timeout ->
Scheduler.one_shot (Scheduler.Delta timeout) dbg
(fun () ->
debug "Cancelling: Update.get after %d" timeout;
Mutex.execute t.m
(fun () ->
cancel := true;
Condition.broadcast t.c
)
)
) timeout in
finally
(fun () ->
Mutex.execute t.m
(fun () ->
let current = ref ([], from) in
while fst !current = [] && not(!cancel) do
current := U.get from t.u;
if fst !current = [] && not(!cancel) then Condition.wait t.c t.m;
done;
!current
)
) (fun () -> Opt.iter Scheduler.cancel id)
let last_id dbg t =
Mutex.execute t.m
(fun () ->
U.last_id t.u
)
let add x t =
Mutex.execute t.m
(fun () ->
let result, id = U.add x t.u in
t.u <- result;
Condition.broadcast t.c
)
let remove x t =
Mutex.execute t.m
(fun () ->
let result, id = U.remove x t.u in
t.u <- result;
Condition.signal t.c
)
module Dump = struct
type u = {
id: int;
v: string;
} with rpc
type t = u list with rpc
let make t =
Mutex.execute t.m
(fun () ->
U.fold (fun key v acc -> { id = v; v = (key |> Interface.Dynamic.rpc_of_id |> Jsonrpc.to_string) } :: acc) t.u []
)
end
end