diff --git a/bin/worker.ml b/bin/worker.ml index d47c3a21..c5b57525 100644 --- a/bin/worker.ml +++ b/bin/worker.ml @@ -45,7 +45,7 @@ let update_docker () = let update_normal () = Lwt.return (fun () -> Lwt.return_unit) -let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics = +let main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics = setup_log ?style_renderer ?formatter level; let update = if Sys.file_exists "/.dockerenv" then update_docker @@ -57,16 +57,16 @@ let main ?style_renderer level ?formatter registration_path capacity name allow_ Lwt_main.run begin let vat = Capnp_rpc_unix.client_only_vat () in let sr = Capnp_rpc_unix.Cap_file.load vat registration_path |> or_die in - Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr + Cluster_worker.run ~capacity ~name ~allow_push ~healthcheck_period ?prune_threshold ?docker_max_df_size ~obuilder_prune_threshold ~obuilder_prune_limit ?obuilder ~additional_metrics ~state_dir ~update sr end (* Command-line parsing *) -let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) = +let main ~install (style_renderer, args1) (level, args2) ((registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics), args3) = if install then Ok (Winsvc_wrapper.install name "OCluster Worker" "Run a build worker" (args1 @ args2 @ args3)) else Ok (Winsvc_wrapper.run name state_dir (fun ?formatter () -> - main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics)) + main ?style_renderer level ?formatter registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics)) open Cmdliner @@ -123,22 +123,13 @@ let docker_max_df_size = let obuilder_prune_threshold = Arg.value @@ - Arg.opt Arg.(some float) None @@ + Arg.opt Arg.float 30.0 @@ Arg.info ~doc:"If using OBuilder, this threshold is used to prune the stored builds if the free space falls below this (0-100)." ~docv:"PERCENTAGE" ~docs:"OBUILDER" ["obuilder-prune-threshold"] -let obuilder_prune_item_threshold = - Arg.value @@ - Arg.opt Arg.(some int64) None @@ - Arg.info - ~doc:"If using OBuilder, this threshold is used to prune the stored builds if the number of cached steps exceeds this value." - ~docv:"ITEMS" - ~docs:"OBUILDER" - ["obuilder-prune-item-threshold"] - let obuilder_prune_limit = Arg.value @@ Arg.opt Arg.int 100 @@ @@ -202,11 +193,11 @@ module Obuilder_config = struct end let worker_opts_t = - let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_item_threshold obuilder_prune_limit state_dir obuilder additional_metrics = - (registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_item_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in + let worker_opts registration_path capacity name allow_push healthcheck_period prune_threshold docker_max_df_size obuilder_prune_threshold obuilder_prune_limit state_dir obuilder additional_metrics = + (registration_path, capacity, name, allow_push, healthcheck_period, prune_threshold, docker_max_df_size, obuilder_prune_threshold, obuilder_prune_limit, state_dir, obuilder, additional_metrics) in Term.(with_used_args (const worker_opts $ connect_addr $ capacity $ worker_name $ allow_push $ healthcheck_period - $ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_item_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics)) + $ prune_threshold $ docker_max_df_size $ obuilder_prune_threshold $ obuilder_prune_limit $ state_dir $ Obuilder_config.v $ additional_metrics)) let cmd ~install = let doc = "Run a build worker" in diff --git a/obuilder b/obuilder index 2d0af3df..15001472 160000 --- a/obuilder +++ b/obuilder @@ -1 +1 @@ -Subproject commit 2d0af3df9d934c936736986e827e9d1f5b94aed9 +Subproject commit 15001472488f92a990cfb7be27fdd64d5f001162 diff --git a/test/mock_builder.ml b/test/mock_builder.ml index cfcb36f1..49e60951 100644 --- a/test/mock_builder.ml +++ b/test/mock_builder.ml @@ -90,14 +90,14 @@ let update () = Lwt.return (fun () -> failwith "Mock restart") let run ?(capacity=1) ?(name="worker-1") ~switch t registration_service = - let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service in + let thread = Cluster_worker.run ~switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service in Lwt.on_failure thread (fun ex -> if Lwt_switch.is_on switch then raise ex) let run_remote ~builder_switch ~network_switch ?(capacity=1) ?(name="worker-1") t registration_service = let thread = let registration_service = Mock_network.remote ~switch:network_switch registration_service in - Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~healthcheck_period:600.0 ~update ~state_dir registration_service + Cluster_worker.run ~switch:builder_switch ~capacity ~name ~build:(build t) ~update ~state_dir registration_service in Lwt.on_failure thread (fun ex -> diff --git a/worker/cluster_worker.ml b/worker/cluster_worker.ml index 2f4f62bc..6471dd70 100644 --- a/worker/cluster_worker.ml +++ b/worker/cluster_worker.ml @@ -4,49 +4,6 @@ open Capnp_rpc_lwt module Log_data = Log_data module Process = Process -module Metrics = struct - open Prometheus - - let namespace = "ocluster" - let subsystem = "worker" - - let jobs_accepted = - let help = "Number of jobs accepted in total" in - Counter.v ~help ~namespace ~subsystem "jobs_accepted_total" - - let job_time = - let help = "Time jobs ran for" in - Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds" - - let docker_push_time = - let help = "Time uploading to Docker Hub" in - Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds" - - let docker_prune_time = - let help = "Time spent pruning Docker cache" in - Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds" - - let running_jobs = - let help = "Number of jobs currently running" in - Gauge.v ~help ~namespace ~subsystem "running_jobs" - - let healthcheck_time = - let help = "Time to perform last healthcheck" in - Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds" - - let unhealthy = - let help = "Number of unhealthy workers" in - Gauge.v ~help ~namespace ~subsystem "unhealthy" - - let cache_hits = - let help = "Number of OBuilder cache hits" in - Gauge.v ~help ~namespace ~subsystem "cache_hits" - - let cache_misses = - let help = "Number of OBuilder cache misses" in - Gauge.v ~help ~namespace ~subsystem "cache_misses" -end - let buildkit_env = let orig = Unix.environment () |> Array.to_list in "DOCKER_BUILDKIT=1" :: orig |> Array.of_list @@ -310,7 +267,7 @@ let loop ~switch ?obuilder t queue = check_health t ~last_healthcheck ~queue obuilder >>= fun () -> let outcome, set_outcome = Lwt.wait () in let log = Log_data.create () in - Log.info (fun f -> f "Requesting a new job…"); + Log.info (fun f -> f "Requesting a new job… (%i running)" t.in_use); let switch = Lwt_switch.create () in let pop = Capability.with_ref (Cluster_api.Job.local ~switch ~outcome ~stream_log_data:(Log_data.stream log)) @@ fun job -> @@ -318,7 +275,16 @@ let loop ~switch ?obuilder t queue = in t.cancel <- (fun () -> Lwt.cancel pop); pop >>= fun request -> - t.in_use <- t.in_use + 1; + let module R = Cluster_api.Raw.Reader.JobDescr in + let cache_hint = R.cache_hint_get request in + let weights = [ + (Str.regexp {|.*tezos.*|}, 2); + (Str.regexp {|.*octez.*|}, 3); + ] in + let weight = List.fold_left (fun a (re, w) -> if Str.string_match re cache_hint 0 then w else a) 1 weights in + t.in_use <- t.in_use + weight; + Log.info (fun f -> f "Cache_hint %s" cache_hint); + Log.info (fun f -> f "Job weight %i" weight); Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use); Prometheus.Counter.inc_one Metrics.jobs_accepted; Lwt.async (fun () -> @@ -333,6 +299,7 @@ let loop ~switch ?obuilder t queue = (fun (outcome, metric_label) -> let t1 = Unix.gettimeofday () in Prometheus.Summary.observe (Metrics.job_time metric_label) (t1 -. t0); + Log.info (fun f -> f "Build duration: %fs" (t1 -. t0)); Log_data.close log; Lwt.wakeup set_outcome outcome; Lwt.return_unit) @@ -346,7 +313,7 @@ let loop ~switch ?obuilder t queue = Lwt.return_unit) ) (fun () -> - t.in_use <- t.in_use - 1; + t.in_use <- t.in_use - weight; Prometheus.Gauge.set Metrics.running_jobs (float_of_int t.in_use); let h, m = cache_stats obuilder in Prometheus.Gauge.set Metrics.cache_hits (float_of_int h); @@ -501,7 +468,7 @@ let self_update ~update t = Lwt_result.fail (`Msg (Printexc.to_string ex)) ) -let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?docker_max_df_size ?obuilder_prune_threshold ?obuilder_prune_item_threshold ?obuilder_prune_limit ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service = +let run ?switch ?build ?(allow_push=[]) ?(healthcheck_period = 600.0) ?prune_threshold ?docker_max_df_size ?(obuilder_prune_threshold = 30.0) ?(obuilder_prune_limit = 100) ?obuilder ?(additional_metrics=[]) ~update ~capacity ~name ~state_dir registration_service = begin match prune_threshold, docker_max_df_size with | None, None -> Log.info (fun f -> f "Prune threshold not set and docker max df size is not. Will not check for low disk-space!") | None, Some size -> Log.info (fun f -> f "Pruning docker whenever the memory used exceeds %3.2fGB" size) @@ -510,7 +477,7 @@ let run ?switch ?build ?(allow_push=[]) ~healthcheck_period ?prune_threshold ?do end; begin match obuilder with | None -> Lwt.return_none - | Some config -> Obuilder_build.create ?prune_threshold:obuilder_prune_threshold ?prune_item_threshold:obuilder_prune_item_threshold ?prune_limit:obuilder_prune_limit config >|= Option.some + | Some config -> Obuilder_build.create ~prune_threshold:obuilder_prune_threshold ~prune_limit:obuilder_prune_limit config >|= Option.some end >>= fun obuilder -> let build = match build with diff --git a/worker/cluster_worker.mli b/worker/cluster_worker.mli index b6c9167f..9e411cae 100644 --- a/worker/cluster_worker.mli +++ b/worker/cluster_worker.mli @@ -21,11 +21,10 @@ val run : ?switch:Lwt_switch.t -> ?build:build -> ?allow_push:string list -> - healthcheck_period:float -> + ?healthcheck_period:float -> ?prune_threshold:float -> ?docker_max_df_size:float -> ?obuilder_prune_threshold:float -> - ?obuilder_prune_item_threshold:int64 -> ?obuilder_prune_limit:int -> ?obuilder:Obuilder_config.t -> ?additional_metrics:(string * Uri.t) list -> diff --git a/worker/dune b/worker/dune index 7edb9d59..ae4c768a 100644 --- a/worker/dune +++ b/worker/dune @@ -13,4 +13,4 @@ (library (name cluster_worker) (public_name ocluster-worker) - (libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix)) + (libraries ocluster-api digestif fpath logs capnp-rpc-lwt lwt.unix prometheus-app cohttp-lwt-unix obuilder extunix str)) diff --git a/worker/metrics.ml b/worker/metrics.ml new file mode 100644 index 00000000..877ccc15 --- /dev/null +++ b/worker/metrics.ml @@ -0,0 +1,45 @@ +open Prometheus + +let namespace = "ocluster" +let subsystem = "worker" + +let jobs_accepted = + let help = "Number of jobs accepted in total" in + Counter.v ~help ~namespace ~subsystem "jobs_accepted_total" + +let job_time = + let help = "Time jobs ran for" in + Summary.v_label ~label_name:"result" ~help ~namespace ~subsystem "job_time_seconds" + +let docker_push_time = + let help = "Time uploading to Docker Hub" in + Summary.v ~help ~namespace ~subsystem "docker_push_time_seconds" + +let docker_prune_time = + let help = "Time spent pruning Docker cache" in + Summary.v ~help ~namespace ~subsystem "docker_prune_time_seconds" + +let running_jobs = + let help = "Number of jobs currently running" in + Gauge.v ~help ~namespace ~subsystem "running_jobs" + +let healthcheck_time = + let help = "Time to perform last healthcheck" in + Gauge.v ~help ~namespace ~subsystem "healthcheck_time_seconds" + +let unhealthy = + let help = "Number of unhealthy workers" in + Gauge.v ~help ~namespace ~subsystem "unhealthy" + +let cache_hits = + let help = "Number of OBuilder cache hits" in + Gauge.v ~help ~namespace ~subsystem "cache_hits" + +let cache_misses = + let help = "Number of OBuilder cache misses" in + Gauge.v ~help ~namespace ~subsystem "cache_misses" + +let obuilder_space_free = + let help = "OBuilder percentage of space free" in + Gauge.v ~help ~namespace ~subsystem "obuilder_space_free" + diff --git a/worker/obuilder_build.ml b/worker/obuilder_build.ml index 9b2d35c4..31df0eaf 100644 --- a/worker/obuilder_build.ml +++ b/worker/obuilder_build.ml @@ -1,7 +1,5 @@ open Lwt.Infix -let prune_margin = 600.0 (* Don't prune anything used less than 10 minutes ago *) - type builder = Builder : (module Obuilder.BUILDER with type t = 'a) * 'a -> builder module Config = struct @@ -18,9 +16,6 @@ type t = { builder : builder; mutable pruning : bool; cond : unit Lwt_condition.t; (* Fires when we finish pruning *) - prune_threshold : float option; - prune_item_threshold : int64 option; (* Threshold number of items to hold in obuilder store *) - prune_limit : int option; (* Number of items to prune from obuilder when threshold is reached *) } let ( / ) = Filename.concat @@ -37,7 +32,7 @@ let log_to log_data tag msg = | `Note -> Log_data.info log_data "\027[01;2m\027[01;35m%a %s\027[0m" pp_timestamp (Unix.gettimeofday ()) msg | `Output -> Log_data.write log_data msg -let create ?prune_threshold ?prune_item_threshold ?prune_limit config = +let create ?(prune_threshold = 30.0) ?(prune_limit = 100) config = let { Config.store; sandbox_config } = config in store >>= fun (Obuilder.Store_spec.Store ((module Store), store)) -> begin match sandbox_config with @@ -58,79 +53,36 @@ let create ?prune_threshold ?prune_item_threshold ?prune_limit config = | Error (`Msg m) -> Fmt.failwith "Initial OBuilder healthcheck failed: %s" m | Ok () -> Log.info (fun f -> f "OBuilder self-test passed"); + let r = { builder = Builder ((module Builder), builder); pruning = false; - prune_threshold; - prune_item_threshold; - prune_limit; cond = Lwt_condition.create (); - } - -(* Prune [t] until free space rises above [prune_threshold] - or number of items falls below [prune_item_threshold]. *) -let do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t = - let Builder ((module Builder), builder) = t.builder in - let rec aux () = - let stop = Unix.gettimeofday () -. prune_margin |> Unix.gmtime in - Builder.prune builder ~before:stop prune_limit >>= fun n -> - Builder.df builder >>= fun free -> - let count = Builder.count builder in - Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items after pruning %d items" free count n); - if free > prune_threshold && count < prune_item_threshold - then Lwt.return_unit (* Space problem is fixed! *) - else if n < prune_limit then ( - Log.warn (fun f -> f "Out of space, but nothing left to prune! (will wait and then retry)"); - Lwt_unix.sleep 600.0 >>= aux - ) else ( - (* Continue pruning *) - aux () - ) - in - aux () - -(* Check the free space and/or number of items in [t]'s store. - If less than [t.prune_threshold] or items > [t.prune_item_threshold], spawn a prune operation (if not already running). - If less than half that is remaining, also wait for it to finish. - Returns once there is enough free space to proceed. *) -let check_free_space t = - let prune_limit = Option.value t.prune_limit ~default:100 in - let prune_threshold = Option.value t.prune_threshold ~default:0. in - let prune_item_threshold = Option.value t.prune_item_threshold ~default:Int64.max_int in - if prune_threshold = 0. && prune_item_threshold = Int64.max_int then - Lwt.return_unit (* No limits have been set *) - else - let Builder ((module Builder), builder) = t.builder in - let rec aux () = - Builder.df builder >>= fun free -> - let count = Builder.count builder in - Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count); - (* If we're low on space, or over the threshold number of items spawn a pruning thread. *) - if ((prune_threshold > 0. && free < prune_threshold) || - (prune_item_threshold < Int64.max_int && count > prune_item_threshold)) && not t.pruning then ( - t.pruning <- true; - Lwt.async (fun () -> - Lwt.finalize - (fun () -> do_prune ~prune_threshold ~prune_item_threshold ~prune_limit t) - (fun () -> - Lwt.pause () >|= fun () -> - t.pruning <- false; - Lwt_condition.broadcast t.cond () - ) - ); - ); - if free < prune_threshold /. 2.0 then ( - assert (t.pruning); - Log.info (fun f -> f "OBuilder space very low. Waiting for prune to finish…"); - Lwt_condition.wait t.cond >>= aux - ) else ( - Lwt.return_unit - ) - in - aux () + } in + Lwt.async (fun () -> + let rec loop () = + Builder.df builder >>= fun free -> + let count = Builder.count builder in + Log.info (fun f -> f "OBuilder partition: %.0f%% free, %Li items" free count); + Prometheus.Gauge.set Metrics.obuilder_space_free free; + if free > prune_threshold then ( + r.pruning <- false; + Lwt_condition.signal r.cond (); (* release one waiting process *) + Lwt_unix.sleep 30.0 >>= fun () -> loop () + ) else ( + r.pruning <- true; + let stop = Unix.gettimeofday () |> Unix.gmtime in + Builder.prune builder ~before:stop prune_limit >>= fun n -> + Log.info (fun f -> f "Pruned %i items" n); + (if n = 0 then Lwt_unix.sleep 30.0 + else Lwt.return_unit )>>= fun () -> loop () + ) + in loop () + ); r let build t ~switch ~log ~spec ~src_dir ~secrets = - check_free_space t >>= fun () -> + (if t.pruning then Lwt_condition.wait t.cond + else Lwt.return ()) >>= fun () -> let log = log_to log in let context = Obuilder.Context.v ~switch ~log ~src_dir ~secrets () in let Builder ((module Builder), builder) = t.builder in diff --git a/worker/obuilder_build.mli b/worker/obuilder_build.mli index 2b523b2b..60c3ebb8 100644 --- a/worker/obuilder_build.mli +++ b/worker/obuilder_build.mli @@ -8,7 +8,7 @@ module Config : sig -> Obuilder.Store_spec.store Lwt.t -> t end -val create : ?prune_threshold:float -> ?prune_item_threshold:int64 -> ?prune_limit:int -> Config.t -> t Lwt.t +val create : ?prune_threshold:float -> ?prune_limit:int -> Config.t -> t Lwt.t val build : t -> switch:Lwt_switch.t ->