Skip to content

Commit

Permalink
Merge branch 'repo-under-errors'
Browse files Browse the repository at this point in the history
  • Loading branch information
pveber committed Dec 14, 2018
2 parents 2f7e469 + 39895b9 commit 69b84a7
Show file tree
Hide file tree
Showing 15 changed files with 150 additions and 124 deletions.
1 change: 1 addition & 0 deletions examples/rnaseq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ let differential_analysis =
let () =
let open Bistro_utils.Repo in
[
item ["delme"] (counts `WT_BHI_3) ;
item ["deseq2"] differential_analysis#directory ;
]
|> build_main ~np:4 ~mem:(`GB 4) ~outdir:"res" ~loggers:[Bistro_utils.Console_logger.create ()]
7 changes: 3 additions & 4 deletions examples/zhou2011.ml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
(**
(**
Paper: https://www.ncbi.nlm.nih.gov/pubmed/21700227
Datasets: https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE29506
*)
open Core
open Bistro
open Bistro_bioinfo
open Bistro_utils

Expand All @@ -15,7 +14,7 @@ type chIP_sample = [ `ChIP_Pho4_noPi ]

type factor = [ `Pho4 ]
[@@deriving show, enumerate]

let factor = function
| `ChIP_Pho4_noPi -> `Pho4

Expand Down Expand Up @@ -87,5 +86,5 @@ let () =
Repo.build_main
~np ~mem:(`GB 4)
~outdir:"res"
~loggers:[ Console_logger.create () ]
~loggers:[ Console_logger.create () ]
repo
6 changes: 2 additions & 4 deletions examples/zhou2018.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ module Dataset = struct
| `SongD1 -> "SongD1"

let alignments d =
Workflow.string "https://ndownloader.figshare.com/files/9473962"
|> Bistro_unix.wget
Bistro_unix.wget "https://ndownloader.figshare.com/files/9473962"
|> Bistro_unix.tar_xfj
|> Fn.flip Workflow.select ["single-gene_alignments" ; to_string d ]
|> Workflow.glob ~pattern:"*"

let best_trees d =
Workflow.string "https://ndownloader.figshare.com/files/9473953"
|> Bistro_unix.wget
Bistro_unix.wget "https://ndownloader.figshare.com/files/9473953"
|> Bistro_unix.tar_xfj
|> Fn.flip Workflow.select ["single-gene_trees" ; to_string d ; "Best_observed"]
|> Workflow.glob ~pattern:"*"
Expand Down
4 changes: 2 additions & 2 deletions lib/engine/db.ml
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ let rec workflow_path db (Bistro_internals.Workflow.Any w) =
workflow_path db (Any dir)
|> Option.map ~f:(fun d -> Cd (d, sel))
| Shell { id ; _ } -> Some (Cache_id id)
| Value { id ; _ } -> Some (Cache_id id)
| Path { id ; _ } -> Some (Cache_id id)
| Plugin { id ; task = Path_plugin _ ; _ } -> Some (Cache_id id)
| Plugin { id ; task = Value_plugin _ ; _ } -> Some (Cache_id id)
| _ -> None

let is_in_cache db u =
Expand Down
2 changes: 2 additions & 0 deletions lib/engine/execution_trace.ml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,5 @@ let error_report trace db buf =
| (Done_already _ | Canceled _) -> ()

let all_ok xs = not (List.exists ~f:is_errored xs)

module Set = S
2 changes: 2 additions & 0 deletions lib/engine/execution_trace.mli
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ val error_report :
val all_ok : t list -> bool

val gather_failures : t list -> t list

module Set : Set.S with type elt = t
99 changes: 57 additions & 42 deletions lib/engine/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,9 @@ type error = [

module Table = String.Table

module Traces = Caml.Set.Make(struct
type t = Execution_trace.t
let compare = compare
end)

(* Lwt threads that accumulate errors *)
module Eval_thread : sig
type 'a t = ('a, Traces.t) Lwt_result.t
type 'a t = ('a, Execution_trace.Set.t) Lwt_result.t
val return : 'a -> 'a t
(* val fail : Traces.t -> 'a t *)
val fail1 : Execution_trace.t -> 'a t
Expand All @@ -38,7 +33,7 @@ module Eval_thread : sig
end
=
struct
type 'a t = ('a, Traces.t) Lwt_result.t
type 'a t = ('a, Execution_trace.Set.t) Lwt_result.t

module Infix = struct
let ( >> ) = Lwt.( >>= )
Expand All @@ -48,13 +43,13 @@ struct

let return = Lwt_result.return
(* let fail = Lwt_result.fail *)
let fail1 e = Lwt_result.fail (Traces.singleton e)
let fail1 e = Lwt_result.fail (Execution_trace.Set.singleton e)
let result_both x y =
match x, y with
| Ok x, Ok y -> Ok (x, y)
| Ok _, Error e -> Error e
| Error e, Ok _ -> Error e
| Error e, Error e' -> Error (Traces.union e e')
| Error e, Error e' -> Error (Execution_trace.Set.union e e')

let both x y =
Lwt.(x >>= fun x ->
Expand Down Expand Up @@ -263,14 +258,14 @@ struct
register gc ?target l.elts
| Input _ -> Lwt.return ()
| Select x -> register gc ?target x.dir
| Value v ->
| Plugin { task = Value_plugin v ; _ } ->
uses gc target w ;
if stop_register gc w then Lwt.return ()
else register gc ~target:w v.task
| Path p ->
else register gc ~target:w v
| Plugin { task = Path_plugin p ; _ } ->
uses gc target w ;
if stop_register gc w then Lwt.return ()
else register gc ~target:w p.task
else register gc ~target:w p
| Shell s ->
uses gc target w ;
if stop_register gc w then Lwt.return ()
Expand Down Expand Up @@ -332,9 +327,24 @@ struct
| None -> Lwt.return ()
end

module Synchro : sig
type 'a t
val create : unit -> 'a t
val signal : 'a t -> 'a -> unit
val wait : 'a t -> 'a Lwt.t
end
=
struct
type 'a t = ('a Lwt.t * 'a Lwt.u)
let create () = Lwt.wait ()
let signal (_, u) x = Lwt.wakeup u x
let wait = fst
end

type 'a t = {
target : 'a W.t ;
type t = {
start : unit Synchro.t ;
_end_ : unit Synchro.t ;
mutable closed : bool ;
allocator : Allocator.t ;
db : Db.t ;
logger : Logger.t ;
Expand All @@ -347,10 +357,12 @@ let create
?(np = 1) ?mem:(`GB mem = `GB 1)
?(use_docker = true)
?(loggers = [])
?(collect = false) db target =
?(collect = false) db =
let logger = Logger.tee loggers in
{
target = Bistro.Private.reveal target ;
start = Synchro.create () ;
_end_ = Synchro.create () ;
closed = false ;
allocator = Allocator.create ~np ~mem:(mem * 1024) ;
db ;
use_docker ;
Expand Down Expand Up @@ -486,9 +498,9 @@ let rec blocking_evaluator
let dir = blocking_evaluator db s.dir in
fun () -> W.cd (dir ()) s.sel
| W.Input { path ; _ } -> fun () -> W.FS_path path
| W.Value { id ; _ } ->
| W.Plugin { id ; task = Value_plugin _ ; _ } ->
fun () -> (load_value (Db.cache db id))
| W.Path p -> fun () -> W.Cache_id p.id
| W.Plugin { id ; task = Path_plugin _ ; _ } -> fun () -> W.Cache_id id
| W.Spawn s ->
let elts = blocking_evaluator db s.elts in
fun () ->
Expand All @@ -512,7 +524,7 @@ let rec blocking_evaluator
|> List.map ~f:(fun fn -> W.cd dir_path [fn])

let rec shallow_eval
: type s. _ t -> s W.t -> s Lwt.t
: type s. t -> s W.t -> s Lwt.t
= fun sched w ->
match w with
| W.Pure { value ; _ } -> Lwt.return value
Expand All @@ -529,13 +541,13 @@ let rec shallow_eval
shallow_eval sched s.dir >>= fun dir ->
Lwt.return (W.cd dir s.sel)
| W.Input { path ; _ } -> Lwt.return (W.FS_path path)
| W.Value { id ; _ } ->
| W.Plugin { id ; task = Value_plugin _ ; _ } ->
Lwt.return (load_value (Db.cache sched.db id)) (* FIXME: blocking call *)
| W.Spawn s -> (* FIXME: much room for improvement *)
shallow_eval sched s.elts >>= fun elts ->
let targets = List.init (List.length elts) ~f:(fun i -> s.f (W.list_nth s.elts i)) in
Lwt_list.map_p (shallow_eval sched) targets
| W.Path s -> Lwt.return (W.Cache_id s.id)
| W.Plugin { id ; task = Path_plugin _ ; _ } -> Lwt.return (W.Cache_id id)
| W.Shell s -> Lwt.return (W.Cache_id s.id)
| W.List l ->
Lwt_list.map_p (shallow_eval sched) l.elts
Expand Down Expand Up @@ -604,12 +616,11 @@ let np_requirement
| List _ -> 0
| List_nth _ -> 0
| Glob _ -> 0
| Value x -> x.np
| Path x -> x.np
| Plugin x -> x.np
| Shell x -> x.np

let mem_requirement
: type u v. u t -> v Workflow.t -> int Lwt.t
: type u. t -> u Workflow.t -> int Lwt.t
= fun sched -> function
| Pure _ -> Lwt.return 0
| App _ -> Lwt.return 0
Expand All @@ -621,8 +632,7 @@ let mem_requirement
| List _ -> Lwt.return 0
| List_nth _ -> Lwt.return 0
| Glob _ -> Lwt.return 0
| Value x -> shallow_eval sched x.mem
| Path x -> shallow_eval sched x.mem
| Plugin x -> shallow_eval sched x.mem
| Shell x -> shallow_eval sched x.mem

let build_trace sched w perform =
Expand Down Expand Up @@ -668,7 +678,7 @@ let schedule_cached_workflow sched ~id w ~deps ~perform =
|> Eval_thread.ignore

let rec build
: type u v. _ t -> ?target:v W.t -> u W.t -> unit thread
: type u v. t -> ?target:v W.t -> u W.t -> unit thread
= fun sched ?target w ->
let open Eval_thread.Infix in
match w with
Expand Down Expand Up @@ -703,7 +713,7 @@ let rec build
)
|> Eval_thread.ignore

| W.Value { task = workflow ; id ; descr ; _ } ->
| W.Plugin { task = Value_plugin workflow ; id ; descr ; _ } ->
schedule_cached_workflow sched ~id w
~deps:(fun () -> build sched ~target:w workflow)
~perform:(fun _ ->
Expand All @@ -718,7 +728,7 @@ let rec build
| Error msg -> Ok (Task_result.Plugin { id ; outcome = `Failed ; msg = Some msg ; descr })
)

| W.Path { id ; task = workflow ; descr ; _ } ->
| W.Plugin { id ; task = Path_plugin workflow ; descr ; _ } ->
schedule_cached_workflow sched ~id w
~deps:(fun () -> build sched ~target:w workflow)
~perform:(fun _ ->
Expand Down Expand Up @@ -747,16 +757,17 @@ let rec build
| List l ->
Eval_thread.join l.elts ~f:(build ?target sched)

let run sched =
Maybe_gc.register sched.gc sched.target >>= fun () ->
build sched sched.target
>>= (fun r -> Maybe_gc.stop sched.gc >|= fun () -> r)
|> Fn.flip Lwt_result.bind Lwt.(fun () -> shallow_eval sched sched.target >|= Result.return)
|> Lwt_result.map_err Traces.elements
let start sched = Synchro.signal sched.start ()

let eval ?np ?mem ?use_docker ?loggers ?collect db w =
let sched = create ?np ?mem ?use_docker ?loggers ?collect db w in
run sched
let eval sched target =
if sched.closed then failwith "Scheduler is closed" ;
let target = Bistro.Private.reveal target in
Synchro.wait sched.start >>= fun () ->
Maybe_gc.register sched.gc target >>= fun () ->
build sched target
>>= (fun r -> Maybe_gc.stop sched.gc >|= fun () -> r)
|> Fn.flip Lwt_result.bind Lwt.(fun () -> shallow_eval sched target >|= Result.return)
|> Lwt_result.map_err Execution_trace.Set.elements

let error_report db traces =
let buf = Buffer.create 1024 in
Expand All @@ -765,7 +776,11 @@ let error_report db traces =
) ;
Buffer.contents buf

let eval_exn ?np ?mem ?use_docker ?loggers ?collect db w =
eval ?np ?mem ?use_docker ?loggers ?collect db w >|= function
let eval_exn sched w =
eval sched w >|= function
| Ok r -> r
| Error errors -> failwith (error_report db errors)
| Error errors -> failwith (error_report sched.db errors)

let stop sched =
Maybe_gc.stop sched.gc >>= fun () ->
sched.logger#stop
27 changes: 8 additions & 19 deletions lib/engine/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ module Gc : sig
}
end

type 'a t
type t

val create :
?np:int ->
Expand All @@ -21,36 +21,25 @@ val create :
?loggers:Logger.t list ->
?collect:bool ->
Db.t ->
'a Bistro.workflow ->
'a t
t

val run :
'a t ->
('a, Execution_trace.t list) Lwt_result.t
val gc_state : t -> Gc.state option

val gc_state : _ t -> Gc.state option
val start : t -> unit

val eval :
?np:int ->
?mem:[`GB of int] ->
?use_docker:bool ->
?loggers:Logger.t list ->
?collect:bool ->
Db.t ->
t ->
'a Bistro.workflow ->
('a, Execution_trace.t list) Lwt_result.t

val eval_exn :
?np:int ->
?mem:[`GB of int] ->
?use_docker:bool ->
?loggers:Logger.t list ->
?collect:bool ->
Db.t ->
t ->
'a Bistro.workflow ->
'a Lwt.t

val error_report :
Db.t ->
Execution_trace.t list ->
string

val stop : t -> unit Lwt.t

0 comments on commit 69b84a7

Please sign in to comment.