Skip to content

Commit

Permalink
Allow user-defined pools
Browse files Browse the repository at this point in the history
Add `Pool.of_fn` so that users can provide their own pools. This allows
waiting for resources from an external service before marking a job as
started.
  • Loading branch information
talex5 committed Jul 6, 2020
1 parent d56695b commit 31ee63d
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 91 deletions.
24 changes: 18 additions & 6 deletions lib/current.mli
Expand Up @@ -181,13 +181,22 @@ end
(** Resource pools, to control how many jobs can use a resource at a time.
To use a pool within a job, pass the pool to {!Job.start} or call {!Job.use_pool}. *)
module Pool : sig
type t
type 'a t

type priority = [ `High | `Low ]

val create : label:string -> int -> t
val create : label:string -> int -> unit t
(** [create ~label n] is a pool with [n] resources.
@param label Used for metric reporting and logging. *)

val of_fn :
label : string ->
(priority:priority -> switch:Switch.t -> 'a Lwt.t * (unit -> unit Lwt.t)) ->
'a t
(** [of_fn ~label f] is a pool that uses [f] to get a resource.
It should return a promise for the resource and a function that cancels
the request (if still queued). Return the resource to the pool when
[switch] is turned off. *)
end

(** Jobs with log files. This is mostly an internal interface - use {!Current_cache} instead. *)
Expand All @@ -207,12 +216,15 @@ module Job : sig
@param priority Passed to the pool when {!start} is called. Default is [`Low].
@param label A label to use in the job's filename (for debugging). *)

val start : ?timeout:Duration.t -> ?pool:Pool.t -> level:Level.t -> t -> unit Lwt.t
val start : ?timeout:Duration.t -> ?pool:unit Pool.t -> level:Level.t -> t -> unit Lwt.t
(** [start t ~level] marks [t] as running. This can only be called once per job.
If confirmation has been configured for [level], then this will wait for confirmation first.
@param timeout If given, the job will be cancelled automatically after this period of time.
@param pool If given, the job cannot start until a pool resource is available.
The resource is freed when the job finishes. *)
@param pool Deprecated. Use [start_with] instead. *)

val start_with : ?timeout:Duration.t -> pool:'a Pool.t -> level:Level.t -> t -> 'a Lwt.t
(** [start_with] is like [start] except that it waits for a resource from [pool].
The resource is freed when the job finishes. *)

val start_time : t -> float Lwt.t
(** [start_time t] is the time when [start] was called, or an
Expand Down Expand Up @@ -275,7 +287,7 @@ module Job : sig
val register_actions : job_id -> actions -> unit
(** [register_actions job_id actions] is used to register handlers for e.g. rebuilding jobs. *)

val use_pool : ?priority:Pool.priority -> switch:Switch.t -> t -> Pool.t -> unit Lwt.t
val use_pool : ?priority:Pool.priority -> switch:Switch.t -> t -> 'a Pool.t -> 'a Lwt.t
(** [use_pool ~switch t pool] gets one resource from [pool].
The resource is returned to the pool when the switch is turned off.
The operation will be aborted if the job is cancelled. *)
Expand Down
34 changes: 20 additions & 14 deletions lib/job.ml
Expand Up @@ -159,9 +159,14 @@ let with_handler t ~on_cancel fn =
Lwt.finalize fn (fun () -> Lwt_dllist.remove node; Lwt.return_unit)

let use_pool ?(priority=`Low) ~switch t pool =
Pool.get ~priority ~on_cancel:(on_cancel t) ~switch pool
let th, cancel = Pool.get ~priority ~switch pool in
on_cancel t (fun _ -> cancel ()) >>= fun () ->
th

let confirm t ?pool level =
let no_pool =
Pool.of_fn ~label:"no pool" (fun ~priority:_ ~switch:_ -> Lwt.return_unit, Lwt.return)

let confirm t ~pool level =
let confirmed =
let confirmed = Config.confirmed level t.config in
on_cancel t (fun _ -> Lwt.cancel confirmed; Lwt.return_unit) >>= fun () ->
Expand All @@ -183,24 +188,22 @@ let confirm t ?pool level =
Lwt.return_unit
in
confirmed >>= fun () ->
match pool with
| None -> Lwt.return_unit
| Some pool ->
let res = use_pool t ~priority:t.priority ~switch:t.switch pool in
if Lwt.is_sleeping res then (
log t "Waiting for resource in pool %a" Pool.pp pool;
res >|= fun () ->
log t "Got resource from pool %a" Pool.pp pool
) else res
let res = use_pool t ~priority:t.priority ~switch:t.switch pool in
if Lwt.is_sleeping res then (
log t "Waiting for resource in pool %a" Pool.pp pool;
res >|= fun r ->
log t "Got resource from pool %a" Pool.pp pool;
r
) else res

let pp_duration f d =
let d = Duration.to_f d in
if d > 120.0 then Fmt.pf f "%.1f minutes" (d /. 60.)
else if d > 2.0 then Fmt.pf f "%.1f seconds" d
else Fmt.pf f "%f seconds" d

let start ?timeout ?pool ~level t =
confirm t ?pool level >|= fun () ->
let start_with ?timeout ~pool ~level t =
confirm t ~pool level >|= fun r ->
if is_running t then (
Log.warn (fun f -> f "start called, but job %s is already running!" t.id);
Fmt.failwith "Job.start called twice!"
Expand All @@ -214,7 +217,10 @@ let start ?timeout ?pool ~level t =
| `Cancelled _ -> ()
| `Hooks _ -> cancel t (Fmt.strf "Timeout (%a)" pp_duration duration)
)
)
);
r

let start ?timeout ?(pool=no_pool) = start_with ?timeout ~pool

let start_time t = t.start_time

Expand Down
128 changes: 73 additions & 55 deletions lib/pool.ml
Expand Up @@ -32,66 +32,84 @@ end

type priority = [ `High | `Low ]

type t = {
label : string;
mutable used : int;
capacity : int;
queue_low : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
queue_high : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
type 'a t = {
name : string;
get : priority:priority -> switch:Switch.t -> 'a Lwt.t * (unit -> unit Lwt.t);
}

let create ~label capacity =
Prometheus.Gauge.set (Metrics.capacity label) (float_of_int capacity);
{ label; used = 0; capacity;
queue_low = Lwt_dllist.create ();
queue_high = Lwt_dllist.create ()
module Local = struct
type t = {
label : string;
mutable used : int;
capacity : int;
queue_low : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
queue_high : [`Use | `Cancel] Lwt.u Lwt_dllist.t;
}

let check t =
if t.used < t.capacity then (
let next =
match Lwt_dllist.take_opt_l t.queue_high with
| None -> Lwt_dllist.take_opt_l t.queue_low
| Some _ as x -> x
let check t =
if t.used < t.capacity then (
let next =
match Lwt_dllist.take_opt_l t.queue_high with
| None -> Lwt_dllist.take_opt_l t.queue_low
| Some _ as x -> x
in
next |> Option.iter @@ fun waiter ->
t.used <- t.used + 1;
Lwt.wakeup_later waiter `Use
)

let get t ~priority ~switch =
let ready, set_ready = Lwt.wait () in
let queue =
match priority with
| `High -> t.queue_high
| `Low -> t.queue_low
in
next |> Option.iter @@ fun waiter ->
t.used <- t.used + 1;
Lwt.wakeup_later waiter `Use
)

let get ~priority ~on_cancel ~switch t =
let ready, set_ready = Lwt.wait () in
let queue =
match priority with
| `High -> t.queue_high
| `Low -> t.queue_low
in
let node = Lwt_dllist.add_r set_ready queue in
on_cancel (fun _ ->
Lwt_dllist.remove node;
if Lwt.is_sleeping ready then Lwt.wakeup_later set_ready `Cancel;
Lwt.return_unit
) >>= fun () ->
check t;
let start_wait = Unix.gettimeofday () in
Prometheus.Gauge.inc_one (Metrics.qlen t.label);
ready >|= fun ready ->
Prometheus.Gauge.dec_one (Metrics.qlen t.label);
match ready with
| `Cancel -> Fmt.failwith "Cancelled waiting for resource from pool %S" t.label
| `Use ->
let stop_wait = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.wait_time t.label) (stop_wait -. start_wait);
Prometheus.Gauge.inc_one (Metrics.resources_in_use t.label);
Switch.add_hook_or_fail switch (fun _reason ->
assert (t.used > 0);
Prometheus.Gauge.dec_one (Metrics.resources_in_use t.label);
t.used <- t.used - 1;
let release_time = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.use_time t.label) (release_time -. stop_wait);
check t;
let node = Lwt_dllist.add_r set_ready queue in
let cancel () =
Lwt_dllist.remove node;
if Lwt.is_sleeping ready then Lwt.wakeup_later set_ready `Cancel;
Lwt.return_unit
)
in
check t;
let start_wait = Unix.gettimeofday () in
Prometheus.Gauge.inc_one (Metrics.qlen t.label);
let th =
Switch.add_hook_or_exec switch cancel >>= fun () ->
ready >|= fun ready ->
Prometheus.Gauge.dec_one (Metrics.qlen t.label);
match ready with
| `Cancel -> Fmt.failwith "Cancelled waiting for resource from pool %S" t.label
| `Use ->
let stop_wait = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.wait_time t.label) (stop_wait -. start_wait);
Prometheus.Gauge.inc_one (Metrics.resources_in_use t.label);
Switch.add_hook_or_fail switch (fun _reason ->
assert (t.used > 0);
Prometheus.Gauge.dec_one (Metrics.resources_in_use t.label);
t.used <- t.used - 1;
let release_time = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.use_time t.label) (release_time -. stop_wait);
check t;
Lwt.return_unit
)
in
th, cancel

let create ~label capacity =
Prometheus.Gauge.set (Metrics.capacity label) (float_of_int capacity);
let t = { label; used = 0; capacity;
queue_low = Lwt_dllist.create ();
queue_high = Lwt_dllist.create ()
} in
{ name = label; get = get t }
end

let create = Local.create

let of_fn ~label get = { name = label; get }

let get t = t.get

let pp f t =
Fmt.string f t.label
Fmt.string f t.name
18 changes: 12 additions & 6 deletions lib/pool.mli
@@ -1,15 +1,21 @@
type t
type 'a t

type priority = [ `High | `Low ]

val create : label:string -> int -> t
val create : label:string -> int -> unit t

val of_fn :
label : string ->
(priority:priority -> switch:Switch.t -> 'a Lwt.t * (unit -> unit Lwt.t)) ->
'a t

val get :
'a t ->
priority:priority ->
on_cancel:((string -> unit Lwt.t) -> unit Lwt.t) ->
switch:Switch.t ->
t -> unit Lwt.t
(** [get ~priority ~on_cancel ~switch t] waits for a resource and then returns.
'a Lwt.t * (unit -> unit Lwt.t)
(** [get ~priority ~switch t] waits for a resource and then returns.
It also returns a function that can be used to cancel the request.
The resource will be returned to the pool when [switch] is turned off. *)

val pp : t Fmt.t
val pp : _ t Fmt.t
2 changes: 1 addition & 1 deletion plugins/docker/build.ml
Expand Up @@ -2,7 +2,7 @@ open Lwt.Infix

type t = {
pull : bool;
pool : Current.Pool.t option;
pool : unit Current.Pool.t option;
timeout : Duration.t option;
}

Expand Down
6 changes: 3 additions & 3 deletions plugins/docker/current_docker.mli
Expand Up @@ -31,22 +31,22 @@ module Raw : sig
?timeout:Duration.t ->
?squash:bool ->
?dockerfile:[`File of Fpath.t | `Contents of Dockerfile.t] ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?build_args:string list ->
pull:bool ->
[ `Git of Current_git.Commit.t | `No_context ] ->
Image.t Current.Primitive.t

val run :
docker_context:string option ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?run_args:string list ->
Image.t -> args:string list ->
unit Current.Primitive.t

val pread :
docker_context:string option ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?run_args:string list ->
Image.t -> args:string list ->
string Current.Primitive.t
Expand Down
2 changes: 1 addition & 1 deletion plugins/docker/pread.ml
@@ -1,7 +1,7 @@
open Lwt.Infix

type t = {
pool : Current.Pool.t option;
pool : unit Current.Pool.t option;
}

let id = "docker-pread"
Expand Down
2 changes: 1 addition & 1 deletion plugins/docker/run.ml
@@ -1,7 +1,7 @@
open Lwt.Infix

type t = {
pool : Current.Pool.t option;
pool : unit Current.Pool.t option;
}

let id = "docker-run"
Expand Down
6 changes: 3 additions & 3 deletions plugins/docker/s.ml
Expand Up @@ -27,7 +27,7 @@ module type DOCKER = sig
?squash:bool ->
?label:string ->
?dockerfile:[`File of Fpath.t | `Contents of Dockerfile.t] Current.t ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?build_args:string list ->
pull:bool ->
source ->
Expand All @@ -41,7 +41,7 @@ module type DOCKER = sig

val run :
?label:string ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?run_args:string list ->
Image.t Current.t -> args:string list ->
unit Current.t
Expand All @@ -51,7 +51,7 @@ module type DOCKER = sig

val pread :
?label:string ->
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
?run_args:string list ->
Image.t Current.t -> args:string list ->
string Current.t
Expand Down
2 changes: 1 addition & 1 deletion plugins/git/current_git.mli
Expand Up @@ -45,7 +45,7 @@ val clone : schedule:Current_cache.Schedule.t -> ?gref:string -> string -> Commi
val fetch : Commit_id.t Current.t -> Commit.t Current.t

val with_checkout :
?pool:Current.Pool.t ->
?pool:unit Current.Pool.t ->
job:Current.Job.t ->
Commit.t ->
(Fpath.t -> 'a Current.or_error Lwt.t) ->
Expand Down

0 comments on commit 31ee63d

Please sign in to comment.