Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: evaluate terms in parallel #464

Merged
merged 4 commits into from
Jan 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions ci/src/cI_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,37 @@ module Make(B : CI_s.BUILDER) = struct
val find : t -> string -> B.value status option
val set : t -> string -> B.value status -> unit
val remove : t -> string -> unit
val with_lock : t -> string -> (unit -> 'a Lwt.t) -> 'a Lwt.t
end = struct
type entry = {
lock : Lwt_mutex.t;
mutable value : B.value status option;
}

type t = {
mutable cache : B.value status String.Map.t;
mutable cache : entry String.Map.t;
}

let create () = { cache = String.Map.empty }
let find t k = String.Map.find k t.cache
let set t key value =
t.cache <- String.Map.add key value t.cache
let remove t key =
t.cache <- String.Map.remove key t.cache

let entry t k =
match String.Map.find k t.cache with
| Some e -> e
| None ->
let e = { lock = Lwt_mutex.create (); value = None } in
t.cache <- String.Map.add k e t.cache;
e

let find t k = (entry t k).value
let set t k value = (entry t k).value <- Some value
let remove t k = t.cache <- String.Map.remove k t.cache

let with_lock t k fn =
Lwt_mutex.with_lock (entry t k).lock fn
end

type t = {
logs : CI_live_log.manager;
mutex : Lwt_mutex.t; (* Held while updating [cache] *)
cache : Cache.t; (* In-memory cache, including pending items *)
builder : B.t; (* The underlying builder *)
}
Expand All @@ -95,7 +110,6 @@ module Make(B : CI_s.BUILDER) = struct
{
logs;
cache = Cache.create ();
mutex = Lwt_mutex.create ();
builder;
}

Expand Down Expand Up @@ -123,7 +137,7 @@ module Make(B : CI_s.BUILDER) = struct
>>*= Lwt.return

let mark_for_rebuild t conn branch_name =
Lwt_mutex.with_lock t.mutex @@ fun () ->
Cache.with_lock t.cache branch_name @@ fun () ->
match Cache.find t.cache branch_name with
| Some {result = Error (`Pending _); _} -> Lwt.return () (* If already building, ignore rebuild request *)
| _ ->
Expand Down Expand Up @@ -248,8 +262,8 @@ module Make(B : CI_s.BUILDER) = struct
| Some v -> v

let lookup t conn ctx k =
Lwt_mutex.with_lock t.mutex @@ fun () ->
let branch_name = B.branch t.builder k in
Cache.with_lock t.cache branch_name @@ fun () ->
match Cache.find t.cache branch_name with
| Some v -> Lwt.return v
| None ->
Expand Down
140 changes: 92 additions & 48 deletions ci/src/cI_engine.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ end
type job = {
name : string;
parent : target;
term_lock : Lwt_mutex.t; (* Held while evaluating term *)
mutable term : string CI_term.t;
mutable cancel : unit -> unit; (* Cancel the previous evaluation, if any *)
mutable state : string * state; (* The last result of evaluating [term]
(commit, state) *)
mutable cancel : unit -> unit; (* Cancel the previous evaluation, if any *)
mutable state : string * state; (* The last result of evaluating [term] (commit, state) *)
}
and target = {
mutable v : CI_target.v;
Expand Down Expand Up @@ -94,12 +94,17 @@ type t = {
web_ui : Uri.t;
connect_dk : unit -> DK.t Lwt.t;
projects : project Repo.Map.t;
term_lock : Lwt_mutex.t; (* Held while evaluating terms *)
mutable dk : DK.t Lwt.t;
mutable snapshot : DK.Tree.t option;
}

let dk t = t.dk

let snapshot t =
match t.snapshot with
| None -> CI_utils.failf "CI engine not yet initialised!"
| Some s -> s

let rec connect connect_dk =
Lwt.catch
(fun () ->
Expand All @@ -113,6 +118,8 @@ let rec connect connect_dk =
connect connect_dk
)

let metadata_branch = "github-metadata"

let create ~web_ui ?canaries connect_dk projects =
begin match canaries with
| None -> ()
Expand All @@ -138,27 +145,18 @@ let create ~web_ui ?canaries connect_dk projects =
canaries }
) projects
in
let dk = connect connect_dk in
{
web_ui;
connect_dk;
dk = connect connect_dk;
dk;
projects;
term_lock = Lwt_mutex.create ();
snapshot = None;
}

let prs t = t.projects |> Repo.Map.map (fun project -> project.open_prs)
let refs t = t.projects |> Repo.Map.map (fun project -> project.refs)

let metadata_branch = "github-metadata"

let take_snapshot t =
t.dk >>= fun t ->
DK.branch t metadata_branch >>*= fun metadata ->
DK.Branch.head metadata >|*= function
| None -> failf "Metadata branch does not exist!"
| Some c -> DK.Commit.tree c


let update_status t ~message s =
t.dk >>= fun t ->
DK.branch t metadata_branch >>*= fun metadata ->
Expand Down Expand Up @@ -254,15 +252,15 @@ let rec auto_restart t ?switch label fn =
auto_restart t label fn
)

(* Note: must hold [t.term_lock] while calling this. *)
let rec recalculate t ~snapshot job =
let rec recalculate t job =
Log.debug (fun f -> f "Recalculate %a" pp_job job);
(* Need to avoid either recalculating the same term twice at the same time,
or doing a second calculation with an earlier snapshot. *)
Lwt_mutex.with_lock job.term_lock @@ fun () ->
let snapshot = snapshot t in
let recalc () =
Lwt.async (fun () ->
Lwt_mutex.with_lock t.term_lock (fun () ->
take_snapshot t >>= fun snapshot ->
recalculate t ~snapshot job
)
recalculate t job
)
in
job.cancel (); (* Stop any previous evaluation *)
Expand Down Expand Up @@ -301,7 +299,8 @@ let rec recalculate t ~snapshot job =
let state = (new_hash, { status; descr; logs }) in
job.state <- state

let make_job snapshot ~parent name term =
let make_job t ~parent name term =
let snapshot = snapshot t in
let head_commit = CI_target.head parent.v in
let id = head_commit, datakit_ci name in
Conv.status snapshot id >|= fun status ->
Expand All @@ -315,9 +314,11 @@ let make_job snapshot ~parent name term =
let hash = Commit.hash head_commit in
{ name;
parent;
term_lock = Lwt_mutex.create ();
term;
cancel = ignore;
state = (hash, state); }
state = (hash, state);
}

let apply_canaries canaries prs refs =
match canaries with
Expand All @@ -339,18 +340,66 @@ let is_tag = function
| _, "tags" :: _ -> true
| _ -> false

module Pool : sig
type t

val create : int -> t

val iter : t -> ('a -> unit Lwt.t) -> 'a list -> unit Lwt.t
(** [iter t fn xs] iterates over [xs], starting [f x] for each element.
If the pool is full, it waits before starting new jobs.
It does not wait for the jobs to complete. *)

val wait : t -> unit Lwt.t
(** [wait t] waits for all jobs to complete. *)
end = struct
type t = {
cond : unit Lwt_condition.t;
mutable free : int;
mutable outstanding : int;
}

let create free = { free; outstanding = 0; cond = Lwt_condition.create () }

let rec push t f =
match t.free with
| 0 -> Lwt_condition.wait t.cond >>= fun () -> push t f
| free ->
t.free <- free - 1;
t.outstanding <- t.outstanding + 1;
Lwt.async (fun () ->
Lwt.finalize f
(fun () ->
t.outstanding <- t.outstanding - 1;
t.free <- t.free + 1;
Lwt_condition.broadcast t.cond ();
Lwt.return_unit
)
);
Lwt.return_unit

let iter t f =
Lwt_list.iter_s (fun x -> push t (fun () -> f x))

let rec wait t =
if t.outstanding = 0 then Lwt.return_unit
else Lwt_condition.wait t.cond >>= fun () -> wait t
end

let listen ?switch t =
auto_restart t ?switch "monitor" @@ fun () ->
let pool_size = 50 in
let pool = Pool.create pool_size in
Log.info (fun f -> f "Starting monitor loop");
let check_pr ~snapshot project (id, pr) =
let check_pr project (id, pr) =
Log.debug (fun f -> f "Checking for work on %a" PR.pp_id id);
begin match PR.Index.find id project.open_prs with
| None ->
let open_pr = { v = `PR pr; jobs = [] } in
let terms = project.make_terms (`PR id) in
String.Map.bindings terms
|> Lwt_list.map_s (fun (name, term) ->
make_job snapshot ~parent:open_pr name term)
make_job t ~parent:open_pr name term)
>>= fun jobs ->
open_pr.jobs <- jobs;
project.open_prs <- PR.Index.add id open_pr project.open_prs;
Expand All @@ -360,17 +409,17 @@ let listen ?switch t =
XXX: so compare is very misleading here! *)
Lwt.return open_pr
end >>= fun open_pr ->
Lwt_list.iter_s (recalculate t ~snapshot) open_pr.jobs
Lwt_list.iter_p (recalculate t) open_pr.jobs
in
let check_ref ~snapshot project (id, r) =
let check_ref project (id, r) =
Log.debug (fun f -> f "Checking for work on %a" Ref.pp_id id);
begin match Ref.Index.find id project.refs with
| None ->
let target = { v = `Ref r; jobs = []; } in
let terms = project.make_terms @@ `Ref id in
String.Map.bindings terms
|> Lwt_list.map_s (fun (name, term) ->
make_job snapshot ~parent:target name term)
make_job t ~parent:target name term)
>>= fun jobs ->
target.jobs <- jobs;
project.refs <- Ref.Index.add id target project.refs;
Expand All @@ -379,16 +428,18 @@ let listen ?switch t =
target.v <- `Ref r;
Lwt.return target
end >>= fun target ->
Lwt_list.iter_s (recalculate t ~snapshot) target.jobs
Lwt_list.iter_p (recalculate t) target.jobs
in
enable_monitoring t (List.map fst (Repo.Map.bindings t.projects)) >>= fun () ->
monitor ?switch t (fun snapshot ->
t.snapshot <- Some snapshot;
Prometheus.Counter.inc_one Metrics.update_notifications;
let active_tags = ref 0 in
let active_braches = ref 0 in
let active_prs = ref 0 in

t.projects |> Repo.Map.bindings |> Lwt_list.iter_s (fun (repo, project) ->
t.projects |> Repo.Map.bindings |> Lwt_list.iter_p (fun (repo, project) ->
Log.debug (fun f -> f "Monitor iter");
Conv.prs snapshot ~repos:(Repo.Set.singleton repo) >>= fun prs ->
Conv.refs snapshot ~repos:(Repo.Set.singleton repo) >>= fun refs ->
let prs = match Repo.Map.find repo (PR.index prs) with
Expand All @@ -398,8 +449,7 @@ let listen ?switch t =
let refs = match Repo.Map.find repo (Ref.index refs) with
| None -> Ref.Index.empty
| Some i -> i
in
Log.debug (fun f -> f "Monitor iter");
in
let prs, refs = apply_canaries project.canaries prs refs in
(* PRs *)
let is_current id open_pr =
Expand All @@ -411,11 +461,9 @@ let listen ?switch t =
current
in
project.open_prs <- PR.Index.filter is_current project.open_prs;
Lwt_mutex.with_lock t.term_lock (fun () ->
PR.Index.bindings prs |> Lwt_list.iter_s (fun pr ->
incr active_prs;
check_pr ~snapshot project pr
)
PR.Index.bindings prs |> Pool.iter pool (fun pr ->
incr active_prs;
check_pr project pr
)
>>= fun () ->
(* Refs *)
Expand All @@ -428,17 +476,16 @@ let listen ?switch t =
current
in
project.refs <- Ref.Index.filter is_current project.refs;
Lwt_mutex.with_lock t.term_lock (fun () ->
Ref.Index.bindings refs |> Lwt_list.iter_s (fun r ->
if is_tag (fst r) then incr active_tags else incr active_braches;
check_ref ~snapshot project r
)
Ref.Index.bindings refs |> Pool.iter pool (fun r ->
if is_tag (fst r) then incr active_tags else incr active_braches;
check_ref project r
)
)
>|= fun () ->
>>= fun () ->
Metrics.set_active_targets `Tag !active_tags;
Metrics.set_active_targets `Branch !active_braches;
Metrics.set_active_targets `PR !active_prs;
Pool.wait pool
)

let rebuild t ~branch_name =
Expand Down Expand Up @@ -472,7 +519,4 @@ let rebuild t ~branch_name =
| [], [] -> CI_utils.failf "No job depends on %S, so can't rebuild anything" branch_name
| triggers, jobs_needing_recalc ->
Lwt.join triggers >>= fun () ->
Lwt_mutex.with_lock t.term_lock (fun () ->
take_snapshot t >>= fun snapshot ->
Lwt_list.iter_s (recalculate t ~snapshot) jobs_needing_recalc
)
Lwt_list.iter_s (recalculate t) jobs_needing_recalc
2 changes: 1 addition & 1 deletion ci/src/cI_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let connect protocol address =
(* Connect to 9p server *)
Log.info (fun f -> f "Connecting to DataKit server on %s:%s" protocol address);
Lwt.catch
(fun () -> Client9p.connect protocol address ())
(fun () -> Client9p.connect protocol address ~max_fids:Int32.max_int ())
(fun ex ->
failf "Failed to connect to DataKit server at proto=%S addr=%S: %s"
protocol address (Printexc.to_string ex)
Expand Down