Skip to content

Commit

Permalink
Provide semaphores in the threading library (#9930)
Browse files Browse the repository at this point in the history
This commit adds a new thread-related module Semaphore, implementing
counting semaphores and binary semaphores.

The two kinds of semaphores are presented as two different
abstract types in two sub-modules, Counting and Binary.
  • Loading branch information
xavierleroy committed Oct 12, 2020
1 parent f809e9d commit 426b10c
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 4 deletions.
5 changes: 5 additions & 0 deletions Changes
Expand Up @@ -300,6 +300,11 @@ Working version
(Ivan Gotovchits and Xavier Leroy, review by Sébastien Hinderer and
David Allsopp)

- #9930: new module Semaphore in the thread library, implementing
counting semaphores and binary semaphores
(Xavier Leroy, review by Daniel Bünzli and Damien Doligez,
additional suggestions by Stephen Dolan and Craig Ferguson)

- #9958: Raise exception in case of error in Unix.setsid.
(Nicolás Ojeda Bär, review by Stephen Dolan)

Expand Down
2 changes: 1 addition & 1 deletion manual/manual/library/Makefile
Expand Up @@ -30,7 +30,7 @@ COMPILER_LIBS_INTF = Asthelper.tex Astmapper.tex Asttypes.tex \
$(COMPILER_LIBS_PLUGIN_HOOKS)

OTHERLIB_INTF = Unix.tex UnixLabels.tex Str.tex \
Thread.tex Mutex.tex Condition.tex Event.tex \
Thread.tex Mutex.tex Condition.tex Semaphore.tex Event.tex \
Dynlink.tex Bigarray.tex

INTF = $(CORE_INTF) $(STDLIB_INTF) $(COMPILER_LIBS_INTF) $(OTHERLIB_INTF)
Expand Down
2 changes: 2 additions & 0 deletions manual/manual/library/libthreads.etex
Expand Up @@ -31,11 +31,13 @@ the "-I +threads" option (see chapter~\ref{c:camlc}).
\item \ahref{libref/Thread.html}{Module \texttt{Thread}: lightweight threads}
\item \ahref{libref/Mutex.html}{Module \texttt{Mutex}: locks for mutual exclusion}
\item \ahref{libref/Condition.html}{Module \texttt{Condition}: condition variables to synchronize between threads}
\item \ahref{libref/Semaphore.html}{Module \texttt{Semaphore}: semaphores, another thread synchronization mechanism}
\item \ahref{libref/Event.html}{Module \texttt{Event}: first-class synchronous communication}
\end{links}
\else
\input{Thread.tex}
\input{Mutex.tex}
\input{Condition.tex}
\input{Semaphore.tex}
\input{Event.tex}
\fi
2 changes: 1 addition & 1 deletion ocamldoc/Makefile.docfiles
Expand Up @@ -23,7 +23,7 @@ STR_MLIS = $(addprefix $(SRC)/otherlibs/str/, str.mli)
UNIX_MLIS = $(addprefix $(SRC)/otherlibs/unix/, unix.mli unixLabels.mli)
DYNLINK_MLIS = $(addprefix $(SRC)/otherlibs/dynlink/, dynlink.mli)
THREAD_MLIS = $(addprefix $(SRC)/otherlibs/systhreads/, \
thread.mli condition.mli mutex.mli event.mli threadUnix.mli)
thread.mli condition.mli mutex.mli event.mli semaphore.mli threadUnix.mli)
DRIVER_MLIS = $(SRC)/driver/pparse.mli


Expand Down
9 changes: 9 additions & 0 deletions otherlibs/systhreads/.depend
Expand Up @@ -20,6 +20,15 @@ mutex.cmo : \
mutex.cmx : \
mutex.cmi
mutex.cmi :
semaphore.cmo : \
mutex.cmi \
condition.cmi \
semaphore.cmi
semaphore.cmx : \
mutex.cmx \
condition.cmx \
semaphore.cmi
semaphore.cmi :
thread.cmo : \
thread.cmi
thread.cmx : \
Expand Down
7 changes: 5 additions & 2 deletions otherlibs/systhreads/Makefile
Expand Up @@ -49,12 +49,15 @@ LIBNAME=threads
BYTECODE_C_OBJS=st_stubs.b.$(O)
NATIVECODE_C_OBJS=st_stubs.n.$(O)

THREADS_SOURCES = thread.ml mutex.ml condition.ml event.ml threadUnix.ml
THREADS_SOURCES = thread.ml mutex.ml condition.ml event.ml threadUnix.ml \
semaphore.ml

THREADS_BCOBJS = $(THREADS_SOURCES:.ml=.cmo)
THREADS_NCOBJS = $(THREADS_SOURCES:.ml=.cmx)

MLIFILES=thread.mli mutex.mli condition.mli event.mli threadUnix.mli
MLIFILES=thread.mli mutex.mli condition.mli event.mli threadUnix.mli \
semaphore.mli

CMIFILES=$(MLIFILES:.mli=.cmi)

all: lib$(LIBNAME).$(A) $(LIBNAME).cma $(CMIFILES)
Expand Down
86 changes: 86 additions & 0 deletions otherlibs/systhreads/semaphore.ml
@@ -0,0 +1,86 @@
(**************************************************************************)
(* *)
(* OCaml *)
(* *)
(* Xavier Leroy, Collège de France and INRIA Paris *)
(* *)
(* Copyright 2020 Institut National de Recherche en Informatique et *)
(* en Automatique. *)
(* *)
(* All rights reserved. This file is distributed under the terms of *)
(* the GNU Lesser General Public License version 2.1, with the *)
(* special exception on linking described in the file LICENSE. *)
(* *)
(**************************************************************************)

(** Semaphores *)

type sem = {
mut: Mutex.t; (* protects [v] *)
mutable v: int; (* the current value *)
nonzero: Condition.t (* signaled when [v > 0] *)
}

module Counting = struct

type t = sem

let make v =
if v < 0 then invalid_arg "Semaphore.Counting.init: wrong initial value";
{ mut = Mutex.create(); v; nonzero = Condition.create() }

let release s =
Mutex.lock s.mut;
if s.v < max_int then begin
s.v <- s.v + 1;
Condition.signal s.nonzero;
Mutex.unlock s.mut
end else begin
Mutex.unlock s.mut;
raise (Sys_error "Semaphore.Counting.release: overflow")
end

let acquire s =
Mutex.lock s.mut;
while s.v = 0 do Condition.wait s.nonzero s.mut done;
s.v <- s.v - 1;
Mutex.unlock s.mut

let try_acquire s =
Mutex.lock s.mut;
let ret = if s.v = 0 then false else (s.v <- s.v - 1; true) in
Mutex.unlock s.mut;
ret

let get_value s = s.v

end

module Binary = struct

type t = sem

let make b =
{ mut = Mutex.create();
v = if b then 1 else 0;
nonzero = Condition.create() }

let release s =
Mutex.lock s.mut;
s.v <- 1;
Condition.signal s.nonzero;
Mutex.unlock s.mut

let acquire s =
Mutex.lock s.mut;
while s.v = 0 do Condition.wait s.nonzero s.mut done;
s.v <- 0;
Mutex.unlock s.mut

let try_acquire s =
Mutex.lock s.mut;
let ret = if s.v = 0 then false else (s.v <- 0; true) in
Mutex.unlock s.mut;
ret

end
140 changes: 140 additions & 0 deletions otherlibs/systhreads/semaphore.mli
@@ -0,0 +1,140 @@
(**************************************************************************)
(* *)
(* OCaml *)
(* *)
(* Xavier Leroy, Collège de France and INRIA Paris *)
(* *)
(* Copyright 2020 Institut National de Recherche en Informatique et *)
(* en Automatique. *)
(* *)
(* All rights reserved. This file is distributed under the terms of *)
(* the GNU Lesser General Public License version 2.1, with the *)
(* special exception on linking described in the file LICENSE. *)
(* *)
(**************************************************************************)

(** Semaphores
A semaphore is a thread synchronization device that can be used to
control access to a shared resource.
Two flavors of semaphores are provided: counting semaphores and
binary semaphores.
@since 4.12 *)

(** {2 Counting semaphores} *)

(**
A counting semaphore is a counter that can be accessed concurrently
by several threads. The typical use is to synchronize producers and
consumers of a resource by counting how many units of the resource
are available.
The two basic operations on semaphores are:
- "release" (also called "V", "post", "up", and "signal"), which
increments the value of the counter. This corresponds to producing
one more unit of the shared resource and making it available to others.
- "acquire" (also called "P", "wait", "down", and "pend"), which
waits until the counter is greater than zero and decrements it.
This corresponds to consuming one unit of the shared resource.
@since 4.12 *)

module Counting : sig

type t
(** The type of counting semaphores. *)

val make : int -> t
(** [make n] returns a new counting semaphore, with initial value [n].
The initial value [n] must be nonnegative.
@raise Invalid_argument if [n < 0]
*)

val release : t -> unit
(** [release s] increments the value of semaphore [s].
If other threads are waiting on [s], one of them is restarted.
If the current value of [s] is equal to [max_int], the value of
the semaphore is unchanged and a [Sys_error] exception is raised
to signal overflow.
@raise Sys_error if the value of the semaphore would overflow [max_int]
*)

val acquire : t -> unit
(** [acquire s] blocks the calling thread until the value of semaphore [s]
is not zero, then atomically decrements the value of [s] and returns.
*)

val try_acquire : t -> bool
(** [try_acquire s] immediately returns [false] if the value of semaphore [s]
is zero. Otherwise, the value of [s] is atomically decremented
and [try_acquire s] returns [true].
*)

val get_value : t -> int
(** [get_value s] returns the current value of semaphore [s].
The current value can be modified at any time by concurrent
{!release} and {!acquire} operations. Hence, the [get_value]
operation is racy, and its result should only be used for debugging
or informational messages.
*)

end

(** {2 Binary semaphores} *)

(** Binary semaphores are a variant of counting semaphores
where semaphores can only take two values, 0 and 1.
A binary semaphore can be used to control access to a single
shared resource, with value 1 meaning "resource is available" and
value 0 meaning "resource is unavailable".
The "release" operation of a binary semaphore sets its value to 1,
and "acquire" waits until the value is 1 and sets it to 0.
A binary semaphore can be used instead of a mutex (see module
{!Mutex}) when the mutex discipline (of unlocking the mutex from the
thread that locked it) is too restrictive. The "acquire" operation
corresponds to locking the mutex, and the "release" operation to
unlocking it, but "release" can be performed in a thread different
than the one that performed the "acquire". Likewise, it is safe
to release a binary semaphore that is already available.
@since 4.12
*)

module Binary : sig

type t
(** The type of binary semaphores. *)

val make : bool -> t
(** [make b] returns a new binary semaphore.
If [b] is [true], the initial value of the semaphore is 1, meaning
"available". If [b] is [false], the initial value of the
semaphore is 0, meaning "unavailable".
*)

val release : t -> unit
(** [release s] sets the value of semaphore [s] to 1, putting it in the
"available" state. If other threads are waiting on [s], one of them is
restarted.
*)

val acquire : t -> unit
(** [acquire s] blocks the calling thread until the semaphore [s]
has value 1 (is available), then atomically sets it to 0
and returns.
*)

val try_acquire : t -> bool
(** [try_acquire s] immediately returns [false] if the semaphore [s]
has value 0. If [s] has value 1, its value is atomically set to 0
and [try_acquire s] returns [true].
*)

end

0 comments on commit 426b10c

Please sign in to comment.