Skip to content

Commit

Permalink
Merge pull request #464 from talex5/parallel-eval
Browse files Browse the repository at this point in the history
ci: evaluate terms in parallel
  • Loading branch information
talex5 committed Jan 27, 2017
2 parents ea6f402 + 6705ffa commit 1e8e287
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 59 deletions.
34 changes: 24 additions & 10 deletions ci/src/cI_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,37 @@ module Make(B : CI_s.BUILDER) = struct
val find : t -> string -> B.value status option
val set : t -> string -> B.value status -> unit
val remove : t -> string -> unit
val with_lock : t -> string -> (unit -> 'a Lwt.t) -> 'a Lwt.t
end = struct
type entry = {
lock : Lwt_mutex.t;
mutable value : B.value status option;
}

type t = {
mutable cache : B.value status String.Map.t;
mutable cache : entry String.Map.t;
}

let create () = { cache = String.Map.empty }
let find t k = String.Map.find k t.cache
let set t key value =
t.cache <- String.Map.add key value t.cache
let remove t key =
t.cache <- String.Map.remove key t.cache

let entry t k =
match String.Map.find k t.cache with
| Some e -> e
| None ->
let e = { lock = Lwt_mutex.create (); value = None } in
t.cache <- String.Map.add k e t.cache;
e

let find t k = (entry t k).value
let set t k value = (entry t k).value <- Some value
let remove t k = t.cache <- String.Map.remove k t.cache

let with_lock t k fn =
Lwt_mutex.with_lock (entry t k).lock fn
end

type t = {
logs : CI_live_log.manager;
mutex : Lwt_mutex.t; (* Held while updating [cache] *)
cache : Cache.t; (* In-memory cache, including pending items *)
builder : B.t; (* The underlying builder *)
}
Expand All @@ -95,7 +110,6 @@ module Make(B : CI_s.BUILDER) = struct
{
logs;
cache = Cache.create ();
mutex = Lwt_mutex.create ();
builder;
}

Expand Down Expand Up @@ -123,7 +137,7 @@ module Make(B : CI_s.BUILDER) = struct
>>*= Lwt.return

let mark_for_rebuild t conn branch_name =
Lwt_mutex.with_lock t.mutex @@ fun () ->
Cache.with_lock t.cache branch_name @@ fun () ->
match Cache.find t.cache branch_name with
| Some {result = Error (`Pending _); _} -> Lwt.return () (* If already building, ignore rebuild request *)
| _ ->
Expand Down Expand Up @@ -248,8 +262,8 @@ module Make(B : CI_s.BUILDER) = struct
| Some v -> v

let lookup t conn ctx k =
Lwt_mutex.with_lock t.mutex @@ fun () ->
let branch_name = B.branch t.builder k in
Cache.with_lock t.cache branch_name @@ fun () ->
match Cache.find t.cache branch_name with
| Some v -> Lwt.return v
| None ->
Expand Down
140 changes: 92 additions & 48 deletions ci/src/cI_engine.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ end
type job = {
name : string;
parent : target;
term_lock : Lwt_mutex.t; (* Held while evaluating term *)
mutable term : string CI_term.t;
mutable cancel : unit -> unit; (* Cancel the previous evaluation, if any *)
mutable state : string * state; (* The last result of evaluating [term]
(commit, state) *)
mutable cancel : unit -> unit; (* Cancel the previous evaluation, if any *)
mutable state : string * state; (* The last result of evaluating [term] (commit, state) *)
}
and target = {
mutable v : CI_target.v;
Expand Down Expand Up @@ -94,12 +94,17 @@ type t = {
web_ui : Uri.t;
connect_dk : unit -> DK.t Lwt.t;
projects : project Repo.Map.t;
term_lock : Lwt_mutex.t; (* Held while evaluating terms *)
mutable dk : DK.t Lwt.t;
mutable snapshot : DK.Tree.t option;
}

let dk t = t.dk

let snapshot t =
match t.snapshot with
| None -> CI_utils.failf "CI engine not yet initialised!"
| Some s -> s

let rec connect connect_dk =
Lwt.catch
(fun () ->
Expand All @@ -113,6 +118,8 @@ let rec connect connect_dk =
connect connect_dk
)

let metadata_branch = "github-metadata"

let create ~web_ui ?canaries connect_dk projects =
begin match canaries with
| None -> ()
Expand All @@ -138,27 +145,18 @@ let create ~web_ui ?canaries connect_dk projects =
canaries }
) projects
in
let dk = connect connect_dk in
{
web_ui;
connect_dk;
dk = connect connect_dk;
dk;
projects;
term_lock = Lwt_mutex.create ();
snapshot = None;
}

let prs t = t.projects |> Repo.Map.map (fun project -> project.open_prs)
let refs t = t.projects |> Repo.Map.map (fun project -> project.refs)

let metadata_branch = "github-metadata"

let take_snapshot t =
t.dk >>= fun t ->
DK.branch t metadata_branch >>*= fun metadata ->
DK.Branch.head metadata >|*= function
| None -> failf "Metadata branch does not exist!"
| Some c -> DK.Commit.tree c


let update_status t ~message s =
t.dk >>= fun t ->
DK.branch t metadata_branch >>*= fun metadata ->
Expand Down Expand Up @@ -254,15 +252,15 @@ let rec auto_restart t ?switch label fn =
auto_restart t label fn
)

(* Note: must hold [t.term_lock] while calling this. *)
let rec recalculate t ~snapshot job =
let rec recalculate t job =
Log.debug (fun f -> f "Recalculate %a" pp_job job);
(* Need to avoid either recalculating the same term twice at the same time,
or doing a second calculation with an earlier snapshot. *)
Lwt_mutex.with_lock job.term_lock @@ fun () ->
let snapshot = snapshot t in
let recalc () =
Lwt.async (fun () ->
Lwt_mutex.with_lock t.term_lock (fun () ->
take_snapshot t >>= fun snapshot ->
recalculate t ~snapshot job
)
recalculate t job
)
in
job.cancel (); (* Stop any previous evaluation *)
Expand Down Expand Up @@ -301,7 +299,8 @@ let rec recalculate t ~snapshot job =
let state = (new_hash, { status; descr; logs }) in
job.state <- state

let make_job snapshot ~parent name term =
let make_job t ~parent name term =
let snapshot = snapshot t in
let head_commit = CI_target.head parent.v in
let id = head_commit, datakit_ci name in
Conv.status snapshot id >|= fun status ->
Expand All @@ -315,9 +314,11 @@ let make_job snapshot ~parent name term =
let hash = Commit.hash head_commit in
{ name;
parent;
term_lock = Lwt_mutex.create ();
term;
cancel = ignore;
state = (hash, state); }
state = (hash, state);
}

let apply_canaries canaries prs refs =
match canaries with
Expand All @@ -339,18 +340,66 @@ let is_tag = function
| _, "tags" :: _ -> true
| _ -> false

module Pool : sig
type t

val create : int -> t

val iter : t -> ('a -> unit Lwt.t) -> 'a list -> unit Lwt.t
(** [iter t fn xs] iterates over [xs], starting [f x] for each element.
If the pool is full, it waits before starting new jobs.
It does not wait for the jobs to complete. *)

val wait : t -> unit Lwt.t
(** [wait t] waits for all jobs to complete. *)
end = struct
type t = {
cond : unit Lwt_condition.t;
mutable free : int;
mutable outstanding : int;
}

let create free = { free; outstanding = 0; cond = Lwt_condition.create () }

let rec push t f =
match t.free with
| 0 -> Lwt_condition.wait t.cond >>= fun () -> push t f
| free ->
t.free <- free - 1;
t.outstanding <- t.outstanding + 1;
Lwt.async (fun () ->
Lwt.finalize f
(fun () ->
t.outstanding <- t.outstanding - 1;
t.free <- t.free + 1;
Lwt_condition.broadcast t.cond ();
Lwt.return_unit
)
);
Lwt.return_unit

let iter t f =
Lwt_list.iter_s (fun x -> push t (fun () -> f x))

let rec wait t =
if t.outstanding = 0 then Lwt.return_unit
else Lwt_condition.wait t.cond >>= fun () -> wait t
end

let listen ?switch t =
auto_restart t ?switch "monitor" @@ fun () ->
let pool_size = 50 in
let pool = Pool.create pool_size in
Log.info (fun f -> f "Starting monitor loop");
let check_pr ~snapshot project (id, pr) =
let check_pr project (id, pr) =
Log.debug (fun f -> f "Checking for work on %a" PR.pp_id id);
begin match PR.Index.find id project.open_prs with
| None ->
let open_pr = { v = `PR pr; jobs = [] } in
let terms = project.make_terms (`PR id) in
String.Map.bindings terms
|> Lwt_list.map_s (fun (name, term) ->
make_job snapshot ~parent:open_pr name term)
make_job t ~parent:open_pr name term)
>>= fun jobs ->
open_pr.jobs <- jobs;
project.open_prs <- PR.Index.add id open_pr project.open_prs;
Expand All @@ -360,17 +409,17 @@ let listen ?switch t =
XXX: so compare is very misleading here! *)
Lwt.return open_pr
end >>= fun open_pr ->
Lwt_list.iter_s (recalculate t ~snapshot) open_pr.jobs
Lwt_list.iter_p (recalculate t) open_pr.jobs
in
let check_ref ~snapshot project (id, r) =
let check_ref project (id, r) =
Log.debug (fun f -> f "Checking for work on %a" Ref.pp_id id);
begin match Ref.Index.find id project.refs with
| None ->
let target = { v = `Ref r; jobs = []; } in
let terms = project.make_terms @@ `Ref id in
String.Map.bindings terms
|> Lwt_list.map_s (fun (name, term) ->
make_job snapshot ~parent:target name term)
make_job t ~parent:target name term)
>>= fun jobs ->
target.jobs <- jobs;
project.refs <- Ref.Index.add id target project.refs;
Expand All @@ -379,16 +428,18 @@ let listen ?switch t =
target.v <- `Ref r;
Lwt.return target
end >>= fun target ->
Lwt_list.iter_s (recalculate t ~snapshot) target.jobs
Lwt_list.iter_p (recalculate t) target.jobs
in
enable_monitoring t (List.map fst (Repo.Map.bindings t.projects)) >>= fun () ->
monitor ?switch t (fun snapshot ->
t.snapshot <- Some snapshot;
Prometheus.Counter.inc_one Metrics.update_notifications;
let active_tags = ref 0 in
let active_braches = ref 0 in
let active_prs = ref 0 in

t.projects |> Repo.Map.bindings |> Lwt_list.iter_s (fun (repo, project) ->
t.projects |> Repo.Map.bindings |> Lwt_list.iter_p (fun (repo, project) ->
Log.debug (fun f -> f "Monitor iter");
Conv.prs snapshot ~repos:(Repo.Set.singleton repo) >>= fun prs ->
Conv.refs snapshot ~repos:(Repo.Set.singleton repo) >>= fun refs ->
let prs = match Repo.Map.find repo (PR.index prs) with
Expand All @@ -398,8 +449,7 @@ let listen ?switch t =
let refs = match Repo.Map.find repo (Ref.index refs) with
| None -> Ref.Index.empty
| Some i -> i
in
Log.debug (fun f -> f "Monitor iter");
in
let prs, refs = apply_canaries project.canaries prs refs in
(* PRs *)
let is_current id open_pr =
Expand All @@ -411,11 +461,9 @@ let listen ?switch t =
current
in
project.open_prs <- PR.Index.filter is_current project.open_prs;
Lwt_mutex.with_lock t.term_lock (fun () ->
PR.Index.bindings prs |> Lwt_list.iter_s (fun pr ->
incr active_prs;
check_pr ~snapshot project pr
)
PR.Index.bindings prs |> Pool.iter pool (fun pr ->
incr active_prs;
check_pr project pr
)
>>= fun () ->
(* Refs *)
Expand All @@ -428,17 +476,16 @@ let listen ?switch t =
current
in
project.refs <- Ref.Index.filter is_current project.refs;
Lwt_mutex.with_lock t.term_lock (fun () ->
Ref.Index.bindings refs |> Lwt_list.iter_s (fun r ->
if is_tag (fst r) then incr active_tags else incr active_braches;
check_ref ~snapshot project r
)
Ref.Index.bindings refs |> Pool.iter pool (fun r ->
if is_tag (fst r) then incr active_tags else incr active_braches;
check_ref project r
)
)
>|= fun () ->
>>= fun () ->
Metrics.set_active_targets `Tag !active_tags;
Metrics.set_active_targets `Branch !active_braches;
Metrics.set_active_targets `PR !active_prs;
Pool.wait pool
)

let rebuild t ~branch_name =
Expand Down Expand Up @@ -472,7 +519,4 @@ let rebuild t ~branch_name =
| [], [] -> CI_utils.failf "No job depends on %S, so can't rebuild anything" branch_name
| triggers, jobs_needing_recalc ->
Lwt.join triggers >>= fun () ->
Lwt_mutex.with_lock t.term_lock (fun () ->
take_snapshot t >>= fun snapshot ->
Lwt_list.iter_s (recalculate t ~snapshot) jobs_needing_recalc
)
Lwt_list.iter_s (recalculate t) jobs_needing_recalc
2 changes: 1 addition & 1 deletion ci/src/cI_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let connect protocol address =
(* Connect to 9p server *)
Log.info (fun f -> f "Connecting to DataKit server on %s:%s" protocol address);
Lwt.catch
(fun () -> Client9p.connect protocol address ())
(fun () -> Client9p.connect protocol address ~max_fids:Int32.max_int ())
(fun ex ->
failf "Failed to connect to DataKit server at proto=%S addr=%S: %s"
protocol address (Printexc.to_string ex)
Expand Down

0 comments on commit 1e8e287

Please sign in to comment.