Skip to content

Commit 05163e2

Browse files
committed
Add priority based scheduler example
1 parent 03dbf79 commit 05163e2

File tree

3 files changed

+210
-0
lines changed

3 files changed

+210
-0
lines changed

test/lib/prios/dune

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
(library
2+
(name prios)
3+
(libraries backoff multicore-magic picos picos_std.sync psq))

test/lib/prios/prios.ml

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
open Picos
2+
module S = Picos_std_sync
3+
4+
module List_ext = struct
5+
let[@tail_mod_cons] rec drop_first_or_not_found x' = function
6+
| [] -> raise_notrace Not_found
7+
| x :: xs -> if x == x' then xs else x :: drop_first_or_not_found x' xs
8+
end
9+
10+
module Id = struct
11+
type t = T of int [@@unboxed]
12+
13+
let[@inline] compare (T l) (T r) = Int.compare l r
14+
let key = Fiber.FLS.create ()
15+
let next_id = Atomic.make 0
16+
17+
let get_as fiber =
18+
match Fiber.FLS.get_exn fiber key with
19+
| id -> id
20+
| exception Fiber.FLS.Not_set ->
21+
let id = T (Atomic.fetch_and_add next_id 1) in
22+
Fiber.FLS.set fiber key id;
23+
id
24+
25+
let get () = get_as @@ Fiber.current ()
26+
end
27+
28+
module Priority = struct
29+
type t = T of int [@@unboxed]
30+
31+
let[@inline] compare (T l) (T r) = Int.compare l r
32+
let[@inline] max (T l) (T r) = T (Int.max l r)
33+
let default = T 0
34+
let higher (T p) = T (p + 1)
35+
let key = Fiber.FLS.create ()
36+
let get_as fiber = Fiber.FLS.get fiber key ~default
37+
let get () = get_as @@ Fiber.current ()
38+
39+
let set priority =
40+
let fiber = Fiber.current () in
41+
if priority = default then Fiber.FLS.remove fiber key
42+
else Fiber.FLS.set fiber key priority
43+
end
44+
45+
module Priority_inv = struct
46+
type t = Priority.t
47+
48+
let compare l r = Priority.compare r l
49+
end
50+
51+
module Pq_hi = Psq.Make (Id) (Priority_inv)
52+
53+
(*
54+
type _ tdt =
55+
| Nothing : [> `Nothing ] tdt
56+
| Holder :
57+
*)
58+
type mutex = { waiters : Pq_hi.t Atomic.t; mutex : S.Mutex.t }
59+
60+
module Mutex = struct
61+
type t = mutex
62+
63+
let key = Fiber.FLS.create ()
64+
let get_as fiber = Fiber.FLS.get fiber key ~default:[]
65+
66+
let add_mutex_as fiber t =
67+
get_as fiber |> List.cons t |> Fiber.FLS.set fiber key
68+
69+
let remove_mutex_as fiber t =
70+
get_as fiber
71+
|> List_ext.drop_first_or_not_found t
72+
|> Fiber.FLS.set fiber key
73+
74+
let rec add_waiter t id priority backoff =
75+
let before = Atomic.get t.waiters in
76+
let after = Pq_hi.add id priority before in
77+
if not (Atomic.compare_and_set t.waiters before after) then
78+
add_waiter t id priority (Backoff.once backoff)
79+
80+
let rec remove_waiter t id backoff =
81+
let before = Atomic.get t.waiters in
82+
let after = Pq_hi.remove id before in
83+
if not (Atomic.compare_and_set t.waiters before after) then
84+
remove_waiter t id (Backoff.once backoff)
85+
86+
let max_waiter t =
87+
match Pq_hi.min (Atomic.get t.waiters) with
88+
| None -> Priority.default
89+
| Some (_id, priority) -> priority
90+
91+
let create ?padded () =
92+
let waiters = Atomic.make Pq_hi.empty |> Multicore_magic.copy_as ?padded in
93+
let mutex = S.Mutex.create ?padded () in
94+
Multicore_magic.copy_as ?padded { waiters; mutex }
95+
96+
let lock t =
97+
let fiber = Fiber.current () in
98+
let id = Id.get_as fiber in
99+
let priority = Priority.get_as fiber in
100+
add_waiter t id priority Backoff.default;
101+
match S.Mutex.lock t.mutex with
102+
| () ->
103+
remove_waiter t id Backoff.default;
104+
add_mutex_as fiber t
105+
| exception exn ->
106+
remove_waiter t id Backoff.default;
107+
raise exn
108+
109+
let unlock t =
110+
let fiber = Fiber.current () in
111+
remove_mutex_as fiber t;
112+
S.Mutex.unlock t.mutex
113+
end
114+
115+
module Condition = struct
116+
type t = S.Condition.t
117+
118+
let create = S.Condition.create
119+
let wait t m = S.Condition.wait t m.mutex
120+
let broadcast = S.Condition.broadcast
121+
end
122+
123+
let _get_dynamic_priority_as fiber =
124+
Mutex.get_as fiber
125+
|> List.fold_left
126+
(fun p m -> Priority.max p (Mutex.max_waiter m))
127+
(Priority.get_as fiber)
128+
129+
let run_fiber ?fatal_exn_handler:_ _fiber _main = failwith "XXX"
130+
let run ?fatal_exn_handler:_ ?forbid:_ _main = failwith "XXX"

test/lib/prios/prios.mli

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
(** *)
2+
3+
open Picos
4+
5+
module Id : sig
6+
(** *)
7+
8+
type t
9+
(** *)
10+
11+
val compare : t -> t -> int
12+
(** *)
13+
14+
val get : unit -> t
15+
(** *)
16+
end
17+
18+
module Priority : sig
19+
(** *)
20+
21+
type t
22+
(** *)
23+
24+
val compare : t -> t -> int
25+
(** *)
26+
27+
val default : t
28+
(** *)
29+
30+
val higher : t -> t
31+
(** *)
32+
33+
val get : unit -> t
34+
(** *)
35+
36+
val set : t -> unit
37+
(** *)
38+
end
39+
40+
module Mutex : sig
41+
(** *)
42+
43+
type t
44+
(** *)
45+
46+
val create : ?padded:bool -> unit -> t
47+
(** *)
48+
49+
val lock : t -> unit
50+
(** *)
51+
52+
val unlock : t -> unit
53+
(** *)
54+
end
55+
56+
module Condition : sig
57+
(** *)
58+
59+
type t
60+
(** *)
61+
62+
val create : ?padded:bool -> unit -> t
63+
(** *)
64+
65+
val wait : t -> Mutex.t -> unit
66+
(** *)
67+
68+
val broadcast : t -> unit
69+
(** *)
70+
end
71+
72+
val run_fiber :
73+
?fatal_exn_handler:(exn -> unit) -> Fiber.t -> (Fiber.t -> unit) -> unit
74+
(** *)
75+
76+
val run : ?fatal_exn_handler:(exn -> unit) -> ?forbid:bool -> (unit -> 'a) -> 'a
77+
(** *)

0 commit comments

Comments
 (0)