Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor Ref to Symbol #37

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ _build
*.beam
*.trace
test/generated/*
_opam/*
8 changes: 4 additions & 4 deletions riot/lib/gen_server.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ open Global
type 'res req = ..

type Message.t +=
| Call : Pid.t * 'res Ref.t * 'res req -> Message.t
| Reply : 'res Ref.t * 'res -> Message.t
| Call : Pid.t * 'res Symbol.t * 'res req -> Message.t
| Reply : 'res Symbol.t * 'res -> Message.t

type 'state init_result = Ok of 'state | Error | Ignore

Expand All @@ -22,11 +22,11 @@ type ('args, 'state) impl =

let call : type res. Pid.t -> res req -> res =
fun pid req ->
let ref = Ref.make () in
let ref = Symbol.make () in
send pid (Call (self (), ref, req));
match receive () with
| Reply (ref', res) -> (
match Ref.type_equal ref ref' with
match Symbol.type_equal ref ref' with
| Some Type.Equal -> res
| None -> failwith "bad message")
| _ -> failwith "unexpected message"
Expand Down
2 changes: 1 addition & 1 deletion riot/lib/lib.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Pid = Pid
module Process = Process
module Queue = Queue
module Hashmap = Hashmap
module Ref = Ref
module Symbol = Symbol
module SSL = Ssl
module Supervisor = Supervisor
module Task = Task
Expand Down
1 change: 0 additions & 1 deletion riot/lib/ref.ml

This file was deleted.

8 changes: 4 additions & 4 deletions riot/lib/supervisor.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ let restart_child pid state =
`continue { state with children })

type Message.t +=
| List_children_req : { reply : Pid.t; ref : unit Ref.t } -> Message.t
| List_children_res : { children : Pid.t list; ref : unit Ref.t } -> Message.t
| List_children_req : { reply : Pid.t; ref : unit Symbol.t } -> Message.t
| List_children_res : { children : Pid.t list; ref : unit Symbol.t } -> Message.t

let rec loop state =
Log.debug (fun f -> f "supervisor loop");
Expand Down Expand Up @@ -105,11 +105,11 @@ let start_link ?(strategy = One_for_one) ?(restart_limit = 1)
Ok sup_pid

let children pid =
let ref = Ref.make () in
let ref = Symbol.make () in
send pid (List_children_req { reply = self (); ref });
let rec wait_response () =
match receive ~ref () with
| List_children_res { children; ref = ref' } when Ref.equal ref ref' ->
| List_children_res { children; ref = ref' } when Symbol.equal ref ref' ->
children
| _ -> wait_response ()
in
Expand Down
1 change: 1 addition & 0 deletions riot/lib/symbol.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include Runtime.Core.Symbol
10 changes: 5 additions & 5 deletions riot/lib/task.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ module Logger = Logger.Make (struct
let namespace = [ "riot"; "task" ]
end)

type 'a t = { pid : Pid.t; ref : 'a Ref.t }
type Message.t += Reply : 'a Ref.t * 'a -> Message.t
type 'a t = { pid : Pid.t; ref : 'a Symbol.t }
type Message.t += Reply : 'a Symbol.t * 'a -> Message.t

let async fn =
let ref = Ref.make () in
let ref = Symbol.make () in
let this = self () in
let pid =
spawn (fun () ->
Expand Down Expand Up @@ -36,9 +36,9 @@ let rec await :
| exception Receive_timeout ->
Logger.trace (fun f -> f "task %a timeout" Pid.pp t.pid);
Error `Timeout
| Reply (ref', res) when Ref.equal t.ref ref' -> (
| Reply (ref', res) when Symbol.equal t.ref ref' -> (
Process.demonitor t.pid;
match Ref.type_equal t.ref ref' with
match Symbol.type_equal t.ref ref' with
| Some Type.Equal -> Ok res
| None -> failwith "bad message")
| Process.Messages.Monitor (Process_down pid) when Pid.equal pid t.pid ->
Expand Down
12 changes: 6 additions & 6 deletions riot/riot.mli
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ module Timeout : sig
type t = [ `infinity | `after of int64 ]
end

module Ref : sig
module Symbol : sig
type 'a t
(** A unique reference.

A value of `'a t` won't be created twice (but can be shared/copied),
which makes it ideal for coordination between processes.

Normally, you'd use a `'a Ref.t` to identify outgoing/incoming message
Normally, you'd use a `'a Symbol.t` to identify outgoing/incoming message
pairs, but they can also be used for type-equalities. If two refs of type
`'a Ref.t` and `'b Ref.t` are equal, then you can use `Ref.type_equal`
`'a Symbol.t` and `'b Symbol.t` are equal, then you can use `Symbol.type_equal`
to obtain a type-level witness that proves that `'a` and `'b` are equal.
*)

Expand Down Expand Up @@ -245,7 +245,7 @@ val wait_pids : Pid.t list -> unit

exception Receive_timeout

val receive : ?after:int64 -> ?ref:unit Ref.t -> unit -> Message.t
val receive : ?after:int64 -> ?ref:unit Symbol.t -> unit -> Message.t
(** [receive ()] will return the first message in the process mailbox.

This function will suspend a process that has an empty mailbox, and the
Expand All @@ -261,7 +261,7 @@ val receive : ?after:int64 -> ?ref:unit Ref.t -> unit -> Message.t
### Selective Receive

If a `ref` was passed, then `[receive ~ref ()]` will skip all messages
created before the creation of this `Ref.t` value, and will only return
created before the creation of this `Symbol.t` value, and will only return
newer messages.

This is useful to skip the queue, but not remove any of the messages before
Expand Down Expand Up @@ -301,7 +301,7 @@ module Gen_server : sig

When defining a new generic server you want to extend this with the your
custom request types, including the response type in its type variable.
Like this:
Like this:

{@ocaml[
open Riot
Expand Down
4 changes: 2 additions & 2 deletions riot/runtime/core/message.ml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
type t = ..
type select_marker = Take | Skip | Drop
type envelope = { msg : t; uid : unit Ref.t }
type envelope = { msg : t; uid : unit Symbol.t }

let envelope msg = { uid = Ref.make (); msg }
let envelope msg = { uid = Symbol.make (); msg }
2 changes: 1 addition & 1 deletion riot/runtime/core/message.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
type t = ..
type select_marker = Take | Skip | Drop
type envelope = { msg : t; uid : unit Ref.t }
type envelope = { msg : t; uid : unit Symbol.t }

val envelope : t -> envelope
2 changes: 1 addition & 1 deletion riot/runtime/core/proc_effect.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ open Util

type _ Effect.t +=
| Receive : {
ref : 'a Ref.t option;
ref : 'a Symbol.t option;
wonbyte marked this conversation as resolved.
Show resolved Hide resolved
timeout : Timeout.t;
}
-> Message.t Effect.t
Expand Down
2 changes: 1 addition & 1 deletion riot/runtime/core/process.ml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type t = {
links : Pid.t list Atomic.t;
monitors : unit Pid.Map.t;
ready_fds : Fd.t list Atomic.t;
recv_timeout : unit Ref.t option Atomic.t;
recv_timeout : unit Symbol.t option Atomic.t;
}
(** The process descriptor. *)

Expand Down
6 changes: 3 additions & 3 deletions riot/runtime/core/process.mli
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type t = {
links : Pid.t list Atomic.t;
monitors : unit Pid.Map.t;
ready_fds : Fd.t list Atomic.t;
recv_timeout : unit Ref.t option Atomic.t;
recv_timeout : unit Symbol.t option Atomic.t;
}

exception Process_reviving_is_forbidden of t
Expand Down Expand Up @@ -86,13 +86,13 @@ val pp_flags : Format.formatter -> process_flags -> unit
val pp_reason : Format.formatter -> exit_reason -> unit
val pp_state : Format.formatter -> state -> unit
val read_save_queue : t -> unit
val receive_timeout : t -> unit Ref.t option
val receive_timeout : t -> unit Symbol.t option
val remove_monitor : t -> Pid.t -> unit
val send_message : t -> Message.t -> unit
val set_cont : t -> exit_reason Proc_state.t -> unit
val set_flag : t -> process_flag -> unit
val set_ready_fds : t -> Fd.t list -> unit
val set_receive_timeout : t -> unit Ref.t -> unit
val set_receive_timeout : t -> unit Symbol.t -> unit
val should_awake : t -> bool
val sid : t -> Scheduler_uid.t
val state : t -> state
27 changes: 0 additions & 27 deletions riot/runtime/core/ref.ml

This file was deleted.

27 changes: 27 additions & 0 deletions riot/runtime/core/symbol.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
type 'a t = Symbol : int64 -> 'a t [@@unboxed]

let __current__ = Atomic.make 0L
let pp ppf (Symbol pid) = Format.fprintf ppf "#Symbol<%s>" (Int64.to_string pid)

let rec make () =
let last = Atomic.get __current__ in
let current = last |> Int64.succ in
if Atomic.compare_and_set __current__ last current then Symbol last else make ()

let equal (Symbol a) (Symbol b) = Int64.equal a b

let type_equal : type a b. a t -> b t -> (a, b) Type.eq option =
fun a b ->
match (a, b) with
| Symbol a', Symbol b' when Int64.equal a' b' -> Some (Obj.magic Type.Equal)
| _ -> None

let is_newer (Symbol a) (Symbol b) = Int64.compare a b = 1
let hash (Symbol a) = Int64.hash a

module Map = Util.Dashmap.Make (struct
type key = unit t

let hash = hash
let equal = equal
end)
File renamed without changes.
2 changes: 1 addition & 1 deletion riot/runtime/import.ml
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ let rec wait_pids pids =

module Timer = struct
type timeout = Util.Timeout.t
type timer = unit Ref.t
type timer = unit Symbol.t

let _set_timer pid msg time mode =
let sch = _get_sch () in
Expand Down
6 changes: 3 additions & 3 deletions riot/runtime/scheduler/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ module Scheduler = struct
if Scheduler_uid.equal sch.uid proc.sid then add_to_run_queue sch proc)
pool.schedulers

let handle_receive k pool sch (proc : Process.t) (ref : 'a Ref.t option)
let handle_receive k pool sch (proc : Process.t) (ref : 'a Symbol.t option)
timeout =
let open Proc_state in
(* When a timeout is specified, we want to create it in the timer
Expand Down Expand Up @@ -165,9 +165,9 @@ module Scheduler = struct
Any skipped messages will go in the same order as received into
the save queue, which will be read after the mailbox is depleted.
*)
| Some ref, Some msg when Ref.is_newer ref msg.uid ->
| Some ref, Some msg when Symbol.is_newer ref msg.uid ->
Log.trace (fun f ->
f "Skipping msg ref=%a msg.uid=%a" Ref.pp ref Ref.pp msg.uid);
f "Skipping msg ref=%a msg.uid=%a" Symbol.pp ref Symbol.pp msg.uid);
Process.add_to_save_queue proc msg;
go (fuel - 1)
(* we are special casing the process monitors here. if we receive a process down
Expand Down
4 changes: 2 additions & 2 deletions riot/runtime/scheduler/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ val set_current_process_pid : Pid.t -> unit
val get_random_scheduler : pool -> t

val set_timer :
t -> int64 -> [ `interval | `one_off ] -> (unit -> unit) -> unit Ref.t
t -> int64 -> [ `interval | `one_off ] -> (unit -> unit) -> unit Symbol.t

val remove_timer : t -> unit Ref.t -> unit
val remove_timer : t -> unit Symbol.t -> unit
val add_to_run_queue : t -> Process.t -> unit
val awake_process : pool -> Process.t -> unit
val run : pool -> t -> unit -> unit
Expand Down
9 changes: 5 additions & 4 deletions riot/runtime/time/timer_wheel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ open Util
(** A Timer in the Riot runtime. *)
module Timer = struct
type t = {
id : unit Ref.t;
id : unit Symbol.t;
mode : [ `interval | `one_off ];
mutable status : [ `pending | `finished ];
mutable started_at : Mtime.t;
Expand All @@ -43,16 +43,16 @@ module Timer = struct
let pp fmt t =
let mode = if t.mode = `interval then "interval" else "one_off" in
Format.fprintf fmt "Timer { id=%a; started_at=%a; ends_at=%a; mode=%s }"
Ref.pp t.id Mtime.pp t.started_at Mtime.Span.pp t.ends_at mode
Symbol.pp t.id Mtime.pp t.started_at Mtime.Span.pp t.ends_at mode

let make time mode fn =
let id = Ref.make () in
let id = Symbol.make () in
let started_at = Mtime_clock.now () in
let ends_at = Mtime.Span.of_uint64_ns Int64.(mul 1_000L time) in
let timeouts_at = Mtime.add_span started_at ends_at |> Option.get in
{ id; started_at; ends_at; timeouts_at; fn; mode; status = `pending }

let equal a b = Ref.equal a.id b.id
let equal a b = Symbol.equal a.id b.id
let is_finished t = t.status = `finished

let mark_as_cancelled t =
Expand All @@ -79,6 +79,7 @@ let create () =

let can_tick t = TimeHeap.size t.timers > 0


let is_finished t tid =
match Ref.Map.get t.ids tid with
| None -> true
Expand Down
8 changes: 4 additions & 4 deletions riot/runtime/time/timer_wheel.mli
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ end
type t

val create : unit -> t
val is_finished : t -> unit Ref.t -> bool
val remove_timer : t -> unit Ref.t -> unit
val is_finished : t -> unit Symbol.t -> bool
val remove_timer : t -> unit Symbol.t -> unit

val make_timer :
t -> int64 -> [ `interval | `one_off ] -> (unit -> unit) -> unit Ref.t
t -> int64 -> [ `interval | `one_off ] -> (unit -> unit) -> unit Symbol.t

val clear_timer : t -> unit Ref.t -> unit
val clear_timer : t -> unit Symbol.t -> unit
val tick : t -> unit
val can_tick : t -> bool
2 changes: 1 addition & 1 deletion test/selective_receive_test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ let main () =
(* we will wait so the first message from the process gets sent *)
sleep 0.1;

let ref = Ref.make () in
let ref = Symbol.make () in
send pid1 Continue;

let m1 = receive ~ref ~after:50_000L () in
Expand Down