Skip to content

Commit

Permalink
Support disposal of Lwt_pool.t elements
Browse files Browse the repository at this point in the history
  • Loading branch information
hcarty committed Oct 14, 2017
1 parent af91fad commit 3f3068e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 23 deletions.
35 changes: 27 additions & 8 deletions src/core/lwt_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type 'a t = {
(* Check a member when its use failed. *)
validate : 'a -> bool Lwt.t;
(* Validate old pool members. *)
dispose : 'a -> unit Lwt.t;
(* Dispose of a pool member. *)
max : int;
(* Size of the pool. *)
mutable count : int;
Expand All @@ -44,11 +46,12 @@ type 'a t = {
(* Threads waiting for a member. *)
}

let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true) create =
let create m ?(check = fun _ f -> f true) ?(validate = fun _ -> Lwt.return_true) ?(dispose = fun _ -> Lwt.return_unit) create =
{ max = m;
create = create;
validate = validate;
check = check;
dispose = dispose;
count = 0;
list = Queue.create ();
waiters = Lwt_sequence.create () }
Expand Down Expand Up @@ -104,10 +107,12 @@ let check_elt p c =
| false ->
(* Remove this member and create a new one. *)
p.count <- p.count - 1;
p.dispose c >>= fun () ->
create_member p)
(fun e ->
(* Validation failed: create a new member if at least one
thread is waiting. *)
p.dispose c >>= fun () ->
replace_acquired p;
Lwt.fail e)
Expand All @@ -127,12 +132,19 @@ let acquire p =
(* Release a member when its use failed. *)
let checked_release p c =
p.check c begin fun ok ->
if ok then
release p c
else
replace_acquired p
end
let ok = ref false in
p.check c (fun result -> ok := result);
if !ok then (
(* Element is ok - release it back to the pool *)
release p c;
Lwt.return_unit
)
else (
(* Element is not ok - dispose of it and replace with a new one *)
p.dispose c >>= fun () ->
replace_acquired p;
Lwt.return_unit
)
let use p f =
acquire p >>= fun c ->
Expand All @@ -143,5 +155,12 @@ let use p f =
release p c;
t)
(fun e ->
checked_release p c;
checked_release p c >>= fun () ->
Lwt.fail e)
let clear p =
Queue.fold (
fun promise element ->
promise >>= fun () ->
p.dispose element
) Lwt.return_unit p.list
38 changes: 26 additions & 12 deletions src/core/lwt_pool.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,45 @@
The pool also provides a limit on the number of connections that can
simultaneously be open.
Note that pools are not for keeping Lwt threads. Lwt threads are very cheap
to create and are pure. It is neither desirable nor possible to reuse them.
If you want to have a pool of {e system} threads, consider module
Note that pools are not for keeping Lwt promises. Lwt promises are very
cheap to create. It is neither desirable nor possible to reuse them. If
you want to have a pool of {e system} threads, consider using
[Lwt_preemptive]. *)

type 'a t
(** Pools containing values of type ['a]. *)
(** Pools containing elements of type ['a]. *)

val create :
int ->
?check : ('a -> (bool -> unit) -> unit) ->
?validate : ('a -> bool Lwt.t) ->
?dispose : ('a -> unit Lwt.t) ->
(unit -> 'a Lwt.t) -> 'a t
(** [create n ?check ?validate f] creates a new pool with at most
[n] elements. [f] is the function to use to create a new element
(** [create n ?check ?validate ?dispose f] creates a new pool with at most
[n] elements. [f] is the function to use to create a new element.
Elements are created on demand.
An element of the pool is validated by the optional [validate]
function before its {!use}. Invalid elements are re-created.
An element of the pool is validated by the optional [validate] function
before it is accessed by {!use}. Invalid elements are passed to [dispose]
and then re-created with [f].
The optional function [check] is called after a [use] of an
element failed. It must call its argument exactly once with
[true] if the element is still valid and [false] otherwise. *)
If a call to {!use} fails with a pool element that element will be passed
to the optional function [check] as [check element callback]. [check]
must call [callback] exactly once with [true] if [element] is still valid
and [false] otherwise. If [check] calls [callback false] then [dispose]
will be run on [element].
Note that [dispose] is {b not} guaranteed to be called on the elements in
a pool when the pool is garbage collected. The {!clear} function should
be used if the elements of the pool need to be explicitly disposed of. *)

val use : 'a t -> ('a -> 'b Lwt.t) -> 'b Lwt.t
(** [use p f] takes one free element of the pool [p] and gives it to
the function [f]. The element is put back into the pool after the
thread created by [f] completes. *)
promise created by [f] completes. *)

val clear : 'a t -> unit Lwt.t
(** [clear p] will call the [dispose] function associated with [p] on every
element in [p] if [dispose] was defined. Otherwise it is a no-op.
Disposals are performed sequentially in an undefined order. *)
20 changes: 17 additions & 3 deletions test/core/test_lwt_pool.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* 02111-1307, USA.
*)

open Lwt.Infix
open Test

exception Dummy_error
Expand Down Expand Up @@ -100,15 +101,28 @@ let suite =
Lwt.return (Lwt.state u2 = Lwt.Return 2)
);

test "on check, bad elements are replaced"
test "on check, bad elements are disposed of and replaced"
(fun () ->
let gen = (fun () -> let n = ref 1 in Lwt.return n) in
let c = (fun n f -> f (!n > 0)) in
let p = Lwt_pool.create 1 ~check: c gen in
let disposed = ref false in
let d _ = disposed := true; Lwt.return_unit in
let p = Lwt_pool.create 1 ~check: c ~dispose:d gen in
let task = (fun n -> n := !n + 1; Lwt.return !n) in
let _ = Lwt_pool.use p (fun n -> n := 0; Lwt.fail Dummy_error) in
let u2 = Lwt_pool.use p task in
Lwt.return (Lwt.state u2 = Lwt.Return 2)
Lwt.return (Lwt.state u2 = Lwt.Return 2 && !disposed)
);

test "clear disposes of all elements"
(fun () ->
let gen = (fun () -> let n = ref 1 in Lwt.return n) in
let count = ref 0 in
let d _ = incr count; Lwt.return_unit in
let p = Lwt_pool.create 1 ~dispose:d gen in
let _ = Lwt_pool.use p (fun _ -> Lwt.return_unit) in
Lwt_pool.clear p >>= fun () ->
Lwt.return (!count = 1)
);

test "waiter are notified on replacement"
Expand Down

0 comments on commit 3f3068e

Please sign in to comment.