Skip to content

Commit

Permalink
Async delete
Browse files Browse the repository at this point in the history
  • Loading branch information
mtelvers committed Feb 27, 2024
1 parent 912ecfd commit c4846bf
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 145 deletions.
25 changes: 8 additions & 17 deletions bin/worker.ml
Expand Up @@ -45,7 +45,7 @@ let update_docker () =
let update_normal () =
Lwt.return (fun () -> Lwt.return_unit)

let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
setup_log ?style_renderer ?formatter level;
let update =
if Sys.file_exists "/.dockerenv" then update_docker
Expand All @@ -57,16 +57,16 @@ let main ?style_renderer level ?formatter registration_path capacity name allow_
Lwt_main.run begin
let vat = Capnp_rpc_unix.client_only_vat () in
let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ~obuilder_prune_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr
end

(* Command-line parsing *)
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) =
if install then
Ok (Winsvc_wrapper.install name "OCluster Worker" "Run a build worker" (args1 @ args2 @ args3))
else
Ok (Winsvc_wrapper.run name state_dir (fun ?formatter () ->
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics))
main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics))


open Cmdliner
Expand Down Expand Up @@ -123,22 +123,13 @@ let docker_max_df_size =

let obuilder_prune_threshold =
Arg.value @@
Arg.opt Arg.(some float) None @@
Arg.opt Arg.float 30.0 @@
Arg.info
~doc:"If using OBuilder, this threshold is used to prune the stored builds if the free space falls below this (0-100)."
~docv:"PERCENTAGE"
~docs:"OBUILDER"
["obuilder-prune-threshold"]

let obuilder_prune_item_threshold =
Arg.value @@
Arg.opt Arg.(some int64) None @@
Arg.info
~doc:"If using OBuilder, this threshold is used to prune the stored builds if the number of cached steps exceeds this value."
~docv:"ITEMS"
~docs:"OBUILDER"
["obuilder-prune-item-threshold"]

let obuilder_prune_limit =
Arg.value @@
Arg.opt Arg.int 100 @@
Expand Down Expand Up @@ -202,11 +193,11 @@ module Obuilder_config = struct
end

let worker_opts_t =
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics =
(registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in
Term.(with_used_args
(const worker_opts $ connect_addr $ capacity $ worker_name $ allow_push $ healthcheck_period
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_item_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))
$ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics))

let cmd ~install =
let doc = "Run a build worker" in
Expand Down
2 changes: 1 addition & 1 deletion obuilder
4 changes: 2 additions & 2 deletions test/mock_builder.ml
Expand Up @@ -90,14 +90,14 @@ let update () =
Lwt.return (fun () -> failwith "Mock restart")

let run ?(capacity=1) ?(name="worker-1") ~switch t registration_service =
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service in
let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service in
Lwt.on_failure thread
(fun ex -> if Lwt_switch.is_on switch then raise ex)

let run_remote ~builder_switch ~network_switch ?(capacity=1) ?(name="worker-1") t registration_service =
let thread =
let registration_service = Mock_network.remote ~switch:network_switch registration_service in
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service
Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service
in
Lwt.on_failure thread
(fun ex ->
Expand Down
63 changes: 15 additions & 48 deletions worker/cluster_worker.ml
Expand Up @@ -4,49 +4,6 @@ open Capnp_rpc_lwt
module Log_data = Log_data
module Process = Process

module Metrics = struct
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"

let jobs_accepted =
let help = "Number of jobs accepted in total" in
Counter.v ~help ~namespace ~subsystem "jobs_accepted_total"

let job_time =
let help = "Time jobs ran for" in
Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds"

let docker_push_time =
let help = "Time uploading to Docker Hub" in
Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds"

let docker_prune_time =
let help = "Time spent pruning Docker cache" in
Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds"

let running_jobs =
let help = "Number of jobs currently running" in
Gauge.v ~help ~namespace ~subsystem "running_jobs"

let healthcheck_time =
let help = "Time to perform last healthcheck" in
Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds"

let unhealthy =
let help = "Number of unhealthy workers" in
Gauge.v ~help ~namespace ~subsystem "unhealthy"

let cache_hits =
let help = "Number of OBuilder cache hits" in
Gauge.v ~help ~namespace ~subsystem "cache_hits"

let cache_misses =
let help = "Number of OBuilder cache misses" in
Gauge.v ~help ~namespace ~subsystem "cache_misses"
end

let buildkit_env =
let orig = Unix.environment () |> Array.to_list in
"DOCKER_BUILDKIT=1" :: orig |> Array.of_list
Expand Down Expand Up @@ -310,15 +267,24 @@ let loop ~switch ?obuilder t queue =
check_health t ~last_healthcheck ~queue obuilder >>= fun () ->
let outcome, set_outcome = Lwt.wait () in
let log = Log_data.create () in
Log.info (fun f -> f "Requesting a new job…");
Log.info (fun f -> f "Requesting a new job… (%i running)" t.in_use);
let switch = Lwt_switch.create () in
let pop =
Capability.with_ref (Cluster_api.Job.local ~switch ~outcome ~stream_log_data:(Log_data.stream log)) @@ fun job ->
Cluster_api.Queue.pop queue job
in
t.cancel <- (fun () -> Lwt.cancel pop);
pop >>= fun request ->
t.in_use <- t.in_use + 1;
let module R = Cluster_api.Raw.Reader.JobDescr in
let cache_hint = R.cache_hint_get request in
let weights = [
(Str.regexp {|.*tezos.*|}, 2);
(Str.regexp {|.*octez.*|}, 3);
] in
let weight = List.fold_left (fun a (re, w) -> if Str.string_match re cache_hint 0 then w else a) 1 weights in
t.in_use <- t.in_use + weight;
Log.info (fun f -> f "Cache_hint %s" cache_hint);
Log.info (fun f -> f "Job weight %i" weight);
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
Prometheus.Counter.inc_one Metrics.jobs_accepted;
Lwt.async (fun () ->
Expand All @@ -333,6 +299,7 @@ let loop ~switch ?obuilder t queue =
(fun (outcome, metric_label) ->
let t1 = Unix.gettimeofday () in
Prometheus.Summary.observe (Metrics.job_time metric_label) (t1 -. t0);
Log.info (fun f -> f "Build duration: %fs" (t1 -. t0));
Log_data.close log;
Lwt.wakeup set_outcome outcome;
Lwt.return_unit)
Expand All @@ -346,7 +313,7 @@ let loop ~switch ?obuilder t queue =
Lwt.return_unit)
)
(fun () ->
t.in_use <- t.in_use - 1;
t.in_use <- t.in_use - weight;
Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use);
let h, m = cache_stats obuilder in
Prometheus.Gauge.set Metrics.cache_hits (float_of_int h);
Expand Down Expand Up @@ -501,7 +468,7 @@ let self_update ~update t =
Lwt_result.fail (`Msg (Printexc.to_string ex))
)

let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ?obuilder_prune_limit ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
let run ?switch ?build ?(allow_push=[]) ?(healthcheck_period = 600.0) ?prune_threshold ?docker_max_df_size ?(obuilder_prune_threshold = 30.0) ?(obuilder_prune_limit = 100) ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service =
begin match prune_threshold, docker_max_df_size with
| None, None -> Log.info (fun f -> f "Prune threshold not set and docker max df size is not. Will not check for low disk-space!")
| None, Some size -> Log.info (fun f -> f "Pruning docker whenever the memory used exceeds %3.2fGB" size)
Expand All @@ -510,7 +477,7 @@ let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?do
end;
begin match obuilder with
| None -> Lwt.return_none
| Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold ?prune_item_threshold:obuilder_prune_item_threshold ?prune_limit:obuilder_prune_limit config >|= Option.some
| Some config -> Obuilder_build.create ~prune_threshold:obuilder_prune_threshold ~prune_limit:obuilder_prune_limit config >|= Option.some
end >>= fun obuilder ->
let build =
match build with
Expand Down
3 changes: 1 addition & 2 deletions worker/cluster_worker.mli
Expand Up @@ -21,11 +21,10 @@ val run :
?switch:Lwt_switch.t ->
?build:build ->
?allow_push:string list ->
healthcheck_period:float ->
?healthcheck_period:float ->
?prune_threshold:float ->
?docker_max_df_size:float ->
?obuilder_prune_threshold:float ->
?obuilder_prune_item_threshold:int64 ->
?obuilder_prune_limit:int ->
?obuilder:Obuilder_config.t ->
?additional_metrics:(string * Uri.t) list ->
Expand Down
2 changes: 1 addition & 1 deletion worker/dune
Expand Up @@ -13,4 +13,4 @@
(library
(name cluster_worker)
(public_name ocluster-worker)
(libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix))
(libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix str))
45 changes: 45 additions & 0 deletions worker/metrics.ml
@@ -0,0 +1,45 @@
open Prometheus

let namespace = "ocluster"
let subsystem = "worker"

let jobs_accepted =
let help = "Number of jobs accepted in total" in
Counter.v ~help ~namespace ~subsystem "jobs_accepted_total"

let job_time =
let help = "Time jobs ran for" in
Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds"

let docker_push_time =
let help = "Time uploading to Docker Hub" in
Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds"

let docker_prune_time =
let help = "Time spent pruning Docker cache" in
Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds"

let running_jobs =
let help = "Number of jobs currently running" in
Gauge.v ~help ~namespace ~subsystem "running_jobs"

let healthcheck_time =
let help = "Time to perform last healthcheck" in
Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds"

let unhealthy =
let help = "Number of unhealthy workers" in
Gauge.v ~help ~namespace ~subsystem "unhealthy"

let cache_hits =
let help = "Number of OBuilder cache hits" in
Gauge.v ~help ~namespace ~subsystem "cache_hits"

let cache_misses =
let help = "Number of OBuilder cache misses" in
Gauge.v ~help ~namespace ~subsystem "cache_misses"

let obuilder_space_free =
let help = "OBuilder percentage of space free" in
Gauge.v ~help ~namespace ~subsystem "obuilder_space_free"

98 changes: 25 additions & 73 deletions worker/obuilder_build.ml
@@ -1,7 +1,5 @@
open Lwt.Infix

let prune_margin = 600.0 (* Don't prune anything used less than 10 minutes ago *)

type builder = Builder : (module Obuilder.BUILDER with type t = 'a) * 'a -> builder

module Config = struct
Expand All @@ -18,9 +16,6 @@ type t = {
builder : builder;
mutable pruning : bool;
cond : unit Lwt_condition.t; (* Fires when we finish pruning *)
prune_threshold : float option;
prune_item_threshold : int64 option; (* Threshold number of items to hold in obuilder store *)
prune_limit : int option; (* Number of items to prune from obuilder when threshold is reached *)
}

let ( / ) = Filename.concat
Expand All @@ -37,7 +32,7 @@ let log_to log_data tag msg =
| `Note -> Log_data.info log_data "\027[01;2m\027[01;35m%a %s\027[0m" pp_timestamp (Unix.gettimeofday ()) msg
| `Output -> Log_data.write log_data msg

let create ?prune_threshold ?prune_item_threshold ?prune_limit config =
let create ?(prune_threshold = 30.0) ?(prune_limit = 100) config =
let { Config.store; sandbox_config } = config in
store >>= fun (Obuilder.Store_spec.Store ((module Store), store)) ->
begin match sandbox_config with
Expand All @@ -58,79 +53,36 @@ let create ?prune_threshold ?prune_item_threshold ?prune_limit config =
| Error (`Msg m) -> Fmt.failwith "Initial OBuilder healthcheck failed: %s" m
| Ok () ->
Log.info (fun f -> f "OBuilder self-test passed");
let r =
{
builder = Builder ((module Builder), builder);
pruning = false;
prune_threshold;
prune_item_threshold;
prune_limit;
cond = Lwt_condition.create ();
}

(* Prune [t] until free space rises above [prune_threshold]
or number of items falls below [prune_item_threshold]. *)
let do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t =
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
let stop = Unix.gettimeofday () -. prune_margin |> Unix.gmtime in
Builder.prune builder ~before:stop prune_limit >>= fun n ->
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items after pruning %d items" free count n);
if free > prune_threshold && count < prune_item_threshold
then Lwt.return_unit (* Space problem is fixed! *)
else if n < prune_limit then (
Log.warn (fun f -> f "Out of space, but nothing left to prune! (will wait and then retry)");
Lwt_unix.sleep 600.0 >>= aux
) else (
(* Continue pruning *)
aux ()
)
in
aux ()

(* Check the free space and/or number of items in [t]'s store.
If less than [t.prune_threshold] or items > [t.prune_item_threshold], spawn a prune operation (if not already running).
If less than half that is remaining, also wait for it to finish.
Returns once there is enough free space to proceed. *)
let check_free_space t =
let prune_limit = Option.value t.prune_limit ~default:100 in
let prune_threshold = Option.value t.prune_threshold ~default:0. in
let prune_item_threshold = Option.value t.prune_item_threshold ~default:Int64.max_int in
if prune_threshold = 0. && prune_item_threshold = Int64.max_int then
Lwt.return_unit (* No limits have been set *)
else
let Builder ((module Builder), builder) = t.builder in
let rec aux () =
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count);
(* If we're low on space, or over the threshold number of items spawn a pruning thread. *)
if ((prune_threshold > 0. && free < prune_threshold) ||
(prune_item_threshold < Int64.max_int && count > prune_item_threshold)) && not t.pruning then (
t.pruning <- true;
Lwt.async (fun () ->
Lwt.finalize
(fun () -> do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t)
(fun () ->
Lwt.pause () >|= fun () ->
t.pruning <- false;
Lwt_condition.broadcast t.cond ()
)
);
);
if free < prune_threshold /. 2.0 then (
assert (t.pruning);
Log.info (fun f -> f "OBuilder space very low. Waiting for prune to finish…");
Lwt_condition.wait t.cond >>= aux
) else (
Lwt.return_unit
)
in
aux ()
} in
Lwt.async (fun () ->
let rec loop () =
Builder.df builder >>= fun free ->
let count = Builder.count builder in
Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count);
Prometheus.Gauge.set Metrics.obuilder_space_free free;
if free > prune_threshold then (
r.pruning <- false;
Lwt_condition.signal r.cond (); (* release one waiting process *)
Lwt_unix.sleep 30.0 >>= fun () -> loop ()
) else (
r.pruning <- true;
let stop = Unix.gettimeofday () |> Unix.gmtime in
Builder.prune builder ~before:stop prune_limit >>= fun n ->
Log.info (fun f -> f "Pruned %i items" n);
(if n = 0 then Lwt_unix.sleep 30.0
else Lwt.return_unit )>>= fun () -> loop ()
)
in loop ()
); r

let build t ~switch ~log ~spec ~src_dir ~secrets =
check_free_space t >>= fun () ->
(if t.pruning then Lwt_condition.wait t.cond
else Lwt.return ()) >>= fun () ->
let log = log_to log in
let context = Obuilder.Context.v ~switch ~log ~src_dir ~secrets () in
let Builder ((module Builder), builder) = t.builder in
Expand Down

0 comments on commit c4846bf

Please sign in to comment.