Skip to content

Commit 673b05e

Browse files
committed
Add an actor model implementation example
1 parent 12af9b2 commit 673b05e

File tree

5 files changed

+219
-0
lines changed

5 files changed

+219
-0
lines changed

lib/picos/bootstrap/picos_bootstrap.ml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,10 @@ module Fiber = struct
499499
r.fls <- fls;
500500
Array.unsafe_set fls key
501501
(Sys.opaque_identity (Obj.magic value : non_float))
502+
503+
let remove (type a) (Fiber r : fiber) (key : a t) =
504+
let fls = r.fls in
505+
if key < Array.length fls then Array.unsafe_set fls key unique
502506
end
503507
end
504508

lib/picos/picos.mli

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -994,6 +994,9 @@ module Fiber : sig
994994
995995
⚠️ It is only safe to call [set] from the fiber itself or when the fiber
996996
is known not to be running. *)
997+
998+
val remove : fiber -> 'a t -> unit
999+
(** *)
9971000
end
9981001

9991002
(** {2 Interface for spawning} *)

test/lib/hoot/dune

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
(library
2+
(name hoot)
3+
(libraries backoff multicore-magic picos))

test/lib/hoot/hoot.ml

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
open Picos
2+
3+
let[@inline never] impossible () = failwith "impossible"
4+
let[@inline never] not_in_a_hoot () = raise (Sys_error "Not in a hoot")
5+
let[@inline never] already_in_a_hoot () = raise (Sys_error "Already in a hoot")
6+
7+
module Message = struct
8+
type t = ..
9+
end
10+
11+
type _ tdt =
12+
| Nil : [> `Nil ] tdt
13+
| Message : {
14+
message : Message.t;
15+
next : [ `Nil | `Message ] tdt;
16+
}
17+
-> [> `Message ] tdt
18+
| Wait : Trigger.t -> [> `Wait ] tdt
19+
20+
type incoming = In : [< `Nil | `Message | `Wait ] tdt -> incoming [@@unboxed]
21+
22+
type pid = {
23+
computation : unit Computation.t;
24+
terminated : unit Computation.t;
25+
incoming : incoming Atomic.t;
26+
mutable received : [ `Nil | `Message ] tdt;
27+
}
28+
29+
let pid_key = Fiber.FLS.create ()
30+
31+
let self () =
32+
let fiber = Fiber.current () in
33+
try Fiber.FLS.get_exn fiber pid_key
34+
with Fiber.FLS.Not_set -> not_in_a_hoot ()
35+
36+
module Pid = struct
37+
type t = pid
38+
end
39+
40+
let run main =
41+
let fiber = Fiber.current () in
42+
match Fiber.FLS.get_exn fiber pid_key with
43+
| _ -> already_in_a_hoot ()
44+
| exception Fiber.FLS.Not_set ->
45+
let computation = Computation.create ~mode:`LIFO () in
46+
let inner = Computation.Packed computation in
47+
let t =
48+
{
49+
computation;
50+
terminated = Computation.create ~mode:`LIFO ();
51+
incoming = Atomic.make (In Nil) |> Multicore_magic.copy_as_padded;
52+
received = Nil;
53+
}
54+
in
55+
Fiber.FLS.set fiber pid_key t;
56+
let (Packed parent as outer) = Fiber.get_computation fiber in
57+
let canceler =
58+
Computation.attach_canceler ~from:parent ~into:computation
59+
in
60+
Fiber.set_computation fiber inner;
61+
Computation.capture t.terminated main ();
62+
Computation.finish t.computation;
63+
Fiber.set_computation fiber outer;
64+
Computation.detach parent canceler;
65+
Fiber.FLS.remove fiber pid_key
66+
67+
let wait t = Computation.await t.terminated
68+
69+
let spawn main =
70+
let t =
71+
{
72+
computation = Computation.create ~mode:`LIFO ();
73+
terminated = Computation.create ~mode:`LIFO ();
74+
incoming = Atomic.make (In Nil);
75+
received = Nil;
76+
}
77+
in
78+
let fiber = Fiber.create ~forbid:false t.computation in
79+
Fiber.FLS.set fiber pid_key t;
80+
begin
81+
Fiber.spawn fiber @@ fun fiber ->
82+
let t = Fiber.FLS.get_exn fiber pid_key in
83+
Computation.capture t.terminated main ();
84+
Computation.finish t.computation
85+
end;
86+
t
87+
88+
let rec rev_to (Message _ as ms : [ `Message ] tdt) :
89+
[ `Nil | `Message ] tdt -> _ = function
90+
| Nil -> ms
91+
| Message r -> rev_to (Message { message = r.message; next = ms }) r.next
92+
93+
let rev (Message r : [ `Message ] tdt) =
94+
rev_to (Message { message = r.message; next = Nil }) r.next
95+
96+
let rec receive t =
97+
match Atomic.get t.incoming with
98+
| In Nil as before ->
99+
let trigger = Trigger.create () in
100+
let after = In (Wait trigger) in
101+
if Atomic.compare_and_set t.incoming before after then begin
102+
match Trigger.await trigger with
103+
| None -> ()
104+
| Some (exn, bt) -> Printexc.raise_with_backtrace exn bt
105+
end;
106+
receive t
107+
| _ -> begin
108+
match Atomic.exchange t.incoming (In Nil) with
109+
| In (Wait _ | Nil) -> impossible ()
110+
| In (Message _ as ms) ->
111+
let (Message r : [ `Message ] tdt) = rev ms in
112+
t.received <- r.next;
113+
r.message
114+
end
115+
116+
let receive () =
117+
let fiber = Fiber.current () in
118+
match Fiber.FLS.get_exn fiber pid_key with
119+
| t -> begin
120+
match t.received with
121+
| Message r ->
122+
t.received <- r.next;
123+
r.message
124+
| Nil -> receive t
125+
end
126+
| exception Fiber.FLS.Not_set -> not_in_a_hoot ()
127+
128+
let rec send t message backoff =
129+
match Atomic.get t.incoming with
130+
| In ((Nil | Message _) as before) ->
131+
let after = Message { message; next = before } in
132+
if not (Atomic.compare_and_set t.incoming (In before) (In after)) then
133+
send t message (Backoff.once backoff)
134+
| In (Wait trigger as before) ->
135+
let after = Message { message; next = Nil } in
136+
if Atomic.compare_and_set t.incoming (In before) (In after) then
137+
Trigger.signal trigger
138+
else send t message (Backoff.once backoff)
139+
140+
let send t message = send t message Backoff.default
141+
142+
type Message.t += Terminated of Pid.t
143+
144+
let monitor ~at ~the =
145+
let[@alert "-handler"] trigger =
146+
Trigger.from_action at the @@ fun _ at the -> send at (Terminated the)
147+
in
148+
if not (Computation.try_attach the.terminated trigger) then
149+
send at (Terminated the)
150+
151+
exception Terminate
152+
153+
let empty_bt = Printexc.get_callstack 0
154+
155+
let link t1 t2 =
156+
let[@alert "-handler"] trigger =
157+
Trigger.from_action t1 t2 @@ fun _ t1 t2 ->
158+
Computation.cancel t1.computation Terminate empty_bt;
159+
Computation.cancel t2.computation Terminate empty_bt
160+
in
161+
if
162+
(not (Computation.try_attach t1.terminated trigger))
163+
|| not (Computation.try_attach t2.terminated trigger)
164+
then begin
165+
Computation.cancel t1.computation Terminate empty_bt;
166+
Computation.cancel t2.computation Terminate empty_bt
167+
end

test/lib/hoot/hoot.mli

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
val run : (unit -> unit) -> unit
2+
(** *)
3+
4+
module Pid : sig
5+
(** *)
6+
7+
type t
8+
(** *)
9+
end
10+
11+
val spawn : (unit -> unit) -> Pid.t
12+
(** *)
13+
14+
val self : unit -> Pid.t
15+
(** *)
16+
17+
val wait : Pid.t -> unit
18+
(** *)
19+
20+
module Message : sig
21+
(** *)
22+
23+
type t = ..
24+
(** *)
25+
end
26+
27+
val receive : unit -> Message.t
28+
(** *)
29+
30+
val send : Pid.t -> Message.t -> unit
31+
(** *)
32+
33+
type Message.t += Terminated of Pid.t (** *)
34+
35+
val monitor : at:Pid.t -> the:Pid.t -> unit
36+
(** *)
37+
38+
exception Terminate
39+
(** *)
40+
41+
val link : Pid.t -> Pid.t -> unit
42+
(** *)

0 commit comments

Comments
 (0)