diff --git a/examples/rnaseq.ml b/examples/rnaseq.ml index 9ef358bf..01c80c48 100644 --- a/examples/rnaseq.ml +++ b/examples/rnaseq.ml @@ -63,6 +63,7 @@ let differential_analysis = let () = let open Bistro_utils.Repo in [ + item ["delme"] (counts `WT_BHI_3) ; item ["deseq2"] differential_analysis#directory ; ] |> build_main ~np:4 ~mem:(`GB 4) ~outdir:"res" ~loggers:[Bistro_utils.Console_logger.create ()] diff --git a/examples/zhou2011.ml b/examples/zhou2011.ml index 8bbf742a..e4e8d1bb 100644 --- a/examples/zhou2011.ml +++ b/examples/zhou2011.ml @@ -1,9 +1,8 @@ -(** +(** Paper: https://www.ncbi.nlm.nih.gov/pubmed/21700227 Datasets: https://www.ncbi.nlm.nih.gov/geo/query/acc.cgi?acc=GSE29506 *) open Core -open Bistro open Bistro_bioinfo open Bistro_utils @@ -15,7 +14,7 @@ type chIP_sample = [ `ChIP_Pho4_noPi ] type factor = [ `Pho4 ] [@@deriving show, enumerate] - + let factor = function | `ChIP_Pho4_noPi -> `Pho4 @@ -87,5 +86,5 @@ let () = Repo.build_main ~np ~mem:(`GB 4) ~outdir:"res" - ~loggers:[ Console_logger.create () ] + ~loggers:[ Console_logger.create () ] repo diff --git a/examples/zhou2018.ml b/examples/zhou2018.ml index 4f0068f5..1d1c36ef 100644 --- a/examples/zhou2018.ml +++ b/examples/zhou2018.ml @@ -15,15 +15,13 @@ module Dataset = struct | `SongD1 -> "SongD1" let alignments d = - Workflow.string "https://ndownloader.figshare.com/files/9473962" - |> Bistro_unix.wget + Bistro_unix.wget "https://ndownloader.figshare.com/files/9473962" |> Bistro_unix.tar_xfj |> Fn.flip Workflow.select ["single-gene_alignments" ; to_string d ] |> Workflow.glob ~pattern:"*" let best_trees d = - Workflow.string "https://ndownloader.figshare.com/files/9473953" - |> Bistro_unix.wget + Bistro_unix.wget "https://ndownloader.figshare.com/files/9473953" |> Bistro_unix.tar_xfj |> Fn.flip Workflow.select ["single-gene_trees" ; to_string d ; "Best_observed"] |> Workflow.glob ~pattern:"*" diff --git a/lib/engine/db.ml b/lib/engine/db.ml index 7920df80..21d99a5f 100644 --- a/lib/engine/db.ml +++ b/lib/engine/db.ml @@ -135,8 +135,8 @@ let rec workflow_path db (Bistro_internals.Workflow.Any w) = workflow_path db (Any dir) |> Option.map ~f:(fun d -> Cd (d, sel)) | Shell { id ; _ } -> Some (Cache_id id) - | Value { id ; _ } -> Some (Cache_id id) - | Path { id ; _ } -> Some (Cache_id id) + | Plugin { id ; task = Path_plugin _ ; _ } -> Some (Cache_id id) + | Plugin { id ; task = Value_plugin _ ; _ } -> Some (Cache_id id) | _ -> None let is_in_cache db u = diff --git a/lib/engine/execution_trace.ml b/lib/engine/execution_trace.ml index 93d1ef32..40a21b4f 100644 --- a/lib/engine/execution_trace.ml +++ b/lib/engine/execution_trace.ml @@ -74,3 +74,5 @@ let error_report trace db buf = | (Done_already _ | Canceled _) -> () let all_ok xs = not (List.exists ~f:is_errored xs) + +module Set = S diff --git a/lib/engine/execution_trace.mli b/lib/engine/execution_trace.mli index b41940e7..3d713791 100644 --- a/lib/engine/execution_trace.mli +++ b/lib/engine/execution_trace.mli @@ -27,3 +27,5 @@ val error_report : val all_ok : t list -> bool val gather_failures : t list -> t list + +module Set : Set.S with type elt = t diff --git a/lib/engine/scheduler.ml b/lib/engine/scheduler.ml index 3ad62f1a..ecc5f528 100644 --- a/lib/engine/scheduler.ml +++ b/lib/engine/scheduler.ml @@ -9,14 +9,9 @@ type error = [ module Table = String.Table -module Traces = Caml.Set.Make(struct - type t = Execution_trace.t - let compare = compare - end) - (* Lwt threads that accumulate errors *) module Eval_thread : sig - type 'a t = ('a, Traces.t) Lwt_result.t + type 'a t = ('a, Execution_trace.Set.t) Lwt_result.t val return : 'a -> 'a t (* val fail : Traces.t -> 'a t *) val fail1 : Execution_trace.t -> 'a t @@ -38,7 +33,7 @@ module Eval_thread : sig end = struct - type 'a t = ('a, Traces.t) Lwt_result.t + type 'a t = ('a, Execution_trace.Set.t) Lwt_result.t module Infix = struct let ( >> ) = Lwt.( >>= ) @@ -48,13 +43,13 @@ struct let return = Lwt_result.return (* let fail = Lwt_result.fail *) - let fail1 e = Lwt_result.fail (Traces.singleton e) + let fail1 e = Lwt_result.fail (Execution_trace.Set.singleton e) let result_both x y = match x, y with | Ok x, Ok y -> Ok (x, y) | Ok _, Error e -> Error e | Error e, Ok _ -> Error e - | Error e, Error e' -> Error (Traces.union e e') + | Error e, Error e' -> Error (Execution_trace.Set.union e e') let both x y = Lwt.(x >>= fun x -> @@ -263,14 +258,14 @@ struct register gc ?target l.elts | Input _ -> Lwt.return () | Select x -> register gc ?target x.dir - | Value v -> + | Plugin { task = Value_plugin v ; _ } -> uses gc target w ; if stop_register gc w then Lwt.return () - else register gc ~target:w v.task - | Path p -> + else register gc ~target:w v + | Plugin { task = Path_plugin p ; _ } -> uses gc target w ; if stop_register gc w then Lwt.return () - else register gc ~target:w p.task + else register gc ~target:w p | Shell s -> uses gc target w ; if stop_register gc w then Lwt.return () @@ -332,9 +327,24 @@ struct | None -> Lwt.return () end +module Synchro : sig + type 'a t + val create : unit -> 'a t + val signal : 'a t -> 'a -> unit + val wait : 'a t -> 'a Lwt.t +end += +struct + type 'a t = ('a Lwt.t * 'a Lwt.u) + let create () = Lwt.wait () + let signal (_, u) x = Lwt.wakeup u x + let wait = fst +end -type 'a t = { - target : 'a W.t ; +type t = { + start : unit Synchro.t ; + _end_ : unit Synchro.t ; + mutable closed : bool ; allocator : Allocator.t ; db : Db.t ; logger : Logger.t ; @@ -347,10 +357,12 @@ let create ?(np = 1) ?mem:(`GB mem = `GB 1) ?(use_docker = true) ?(loggers = []) - ?(collect = false) db target = + ?(collect = false) db = let logger = Logger.tee loggers in { - target = Bistro.Private.reveal target ; + start = Synchro.create () ; + _end_ = Synchro.create () ; + closed = false ; allocator = Allocator.create ~np ~mem:(mem * 1024) ; db ; use_docker ; @@ -486,9 +498,9 @@ let rec blocking_evaluator let dir = blocking_evaluator db s.dir in fun () -> W.cd (dir ()) s.sel | W.Input { path ; _ } -> fun () -> W.FS_path path - | W.Value { id ; _ } -> + | W.Plugin { id ; task = Value_plugin _ ; _ } -> fun () -> (load_value (Db.cache db id)) - | W.Path p -> fun () -> W.Cache_id p.id + | W.Plugin { id ; task = Path_plugin _ ; _ } -> fun () -> W.Cache_id id | W.Spawn s -> let elts = blocking_evaluator db s.elts in fun () -> @@ -512,7 +524,7 @@ let rec blocking_evaluator |> List.map ~f:(fun fn -> W.cd dir_path [fn]) let rec shallow_eval - : type s. _ t -> s W.t -> s Lwt.t + : type s. t -> s W.t -> s Lwt.t = fun sched w -> match w with | W.Pure { value ; _ } -> Lwt.return value @@ -529,13 +541,13 @@ let rec shallow_eval shallow_eval sched s.dir >>= fun dir -> Lwt.return (W.cd dir s.sel) | W.Input { path ; _ } -> Lwt.return (W.FS_path path) - | W.Value { id ; _ } -> + | W.Plugin { id ; task = Value_plugin _ ; _ } -> Lwt.return (load_value (Db.cache sched.db id)) (* FIXME: blocking call *) | W.Spawn s -> (* FIXME: much room for improvement *) shallow_eval sched s.elts >>= fun elts -> let targets = List.init (List.length elts) ~f:(fun i -> s.f (W.list_nth s.elts i)) in Lwt_list.map_p (shallow_eval sched) targets - | W.Path s -> Lwt.return (W.Cache_id s.id) + | W.Plugin { id ; task = Path_plugin _ ; _ } -> Lwt.return (W.Cache_id id) | W.Shell s -> Lwt.return (W.Cache_id s.id) | W.List l -> Lwt_list.map_p (shallow_eval sched) l.elts @@ -604,12 +616,11 @@ let np_requirement | List _ -> 0 | List_nth _ -> 0 | Glob _ -> 0 - | Value x -> x.np - | Path x -> x.np + | Plugin x -> x.np | Shell x -> x.np let mem_requirement - : type u v. u t -> v Workflow.t -> int Lwt.t + : type u. t -> u Workflow.t -> int Lwt.t = fun sched -> function | Pure _ -> Lwt.return 0 | App _ -> Lwt.return 0 @@ -621,8 +632,7 @@ let mem_requirement | List _ -> Lwt.return 0 | List_nth _ -> Lwt.return 0 | Glob _ -> Lwt.return 0 - | Value x -> shallow_eval sched x.mem - | Path x -> shallow_eval sched x.mem + | Plugin x -> shallow_eval sched x.mem | Shell x -> shallow_eval sched x.mem let build_trace sched w perform = @@ -668,7 +678,7 @@ let schedule_cached_workflow sched ~id w ~deps ~perform = |> Eval_thread.ignore let rec build - : type u v. _ t -> ?target:v W.t -> u W.t -> unit thread + : type u v. t -> ?target:v W.t -> u W.t -> unit thread = fun sched ?target w -> let open Eval_thread.Infix in match w with @@ -703,7 +713,7 @@ let rec build ) |> Eval_thread.ignore - | W.Value { task = workflow ; id ; descr ; _ } -> + | W.Plugin { task = Value_plugin workflow ; id ; descr ; _ } -> schedule_cached_workflow sched ~id w ~deps:(fun () -> build sched ~target:w workflow) ~perform:(fun _ -> @@ -718,7 +728,7 @@ let rec build | Error msg -> Ok (Task_result.Plugin { id ; outcome = `Failed ; msg = Some msg ; descr }) ) - | W.Path { id ; task = workflow ; descr ; _ } -> + | W.Plugin { id ; task = Path_plugin workflow ; descr ; _ } -> schedule_cached_workflow sched ~id w ~deps:(fun () -> build sched ~target:w workflow) ~perform:(fun _ -> @@ -747,16 +757,17 @@ let rec build | List l -> Eval_thread.join l.elts ~f:(build ?target sched) -let run sched = - Maybe_gc.register sched.gc sched.target >>= fun () -> - build sched sched.target - >>= (fun r -> Maybe_gc.stop sched.gc >|= fun () -> r) - |> Fn.flip Lwt_result.bind Lwt.(fun () -> shallow_eval sched sched.target >|= Result.return) - |> Lwt_result.map_err Traces.elements +let start sched = Synchro.signal sched.start () -let eval ?np ?mem ?use_docker ?loggers ?collect db w = - let sched = create ?np ?mem ?use_docker ?loggers ?collect db w in - run sched +let eval sched target = + if sched.closed then failwith "Scheduler is closed" ; + let target = Bistro.Private.reveal target in + Synchro.wait sched.start >>= fun () -> + Maybe_gc.register sched.gc target >>= fun () -> + build sched target + >>= (fun r -> Maybe_gc.stop sched.gc >|= fun () -> r) + |> Fn.flip Lwt_result.bind Lwt.(fun () -> shallow_eval sched target >|= Result.return) + |> Lwt_result.map_err Execution_trace.Set.elements let error_report db traces = let buf = Buffer.create 1024 in @@ -765,7 +776,11 @@ let error_report db traces = ) ; Buffer.contents buf -let eval_exn ?np ?mem ?use_docker ?loggers ?collect db w = - eval ?np ?mem ?use_docker ?loggers ?collect db w >|= function +let eval_exn sched w = + eval sched w >|= function | Ok r -> r - | Error errors -> failwith (error_report db errors) + | Error errors -> failwith (error_report sched.db errors) + +let stop sched = + Maybe_gc.stop sched.gc >>= fun () -> + sched.logger#stop diff --git a/lib/engine/scheduler.mli b/lib/engine/scheduler.mli index e1af3e2b..39de68cb 100644 --- a/lib/engine/scheduler.mli +++ b/lib/engine/scheduler.mli @@ -12,7 +12,7 @@ module Gc : sig } end -type 'a t +type t val create : ?np:int -> @@ -21,32 +21,19 @@ val create : ?loggers:Logger.t list -> ?collect:bool -> Db.t -> - 'a Bistro.workflow -> - 'a t + t -val run : - 'a t -> - ('a, Execution_trace.t list) Lwt_result.t +val gc_state : t -> Gc.state option -val gc_state : _ t -> Gc.state option +val start : t -> unit val eval : - ?np:int -> - ?mem:[`GB of int] -> - ?use_docker:bool -> - ?loggers:Logger.t list -> - ?collect:bool -> - Db.t -> + t -> 'a Bistro.workflow -> ('a, Execution_trace.t list) Lwt_result.t val eval_exn : - ?np:int -> - ?mem:[`GB of int] -> - ?use_docker:bool -> - ?loggers:Logger.t list -> - ?collect:bool -> - Db.t -> + t -> 'a Bistro.workflow -> 'a Lwt.t @@ -54,3 +41,5 @@ val error_report : Db.t -> Execution_trace.t list -> string + +val stop : t -> unit Lwt.t diff --git a/lib/internals/workflow.ml b/lib/internals/workflow.ml index 7aa4ad49..0f7ff3be 100644 --- a/lib/internals/workflow.ml +++ b/lib/internals/workflow.ml @@ -42,8 +42,7 @@ type _ t = dir : path t ; sel : string list ; } -> path t - | Value : ((unit -> 'a) t, any) step -> 'a t - | Path : ((string -> unit) t, any) step -> path t + | Plugin : ('a plugin, any) step -> 'a t | Shell : (shell_command, any) step -> path t | Glob : { id : string ; @@ -61,6 +60,10 @@ and ('a, 'b) step = { deps : 'b list ; } +and 'a plugin = + | Value_plugin : (unit -> 'a) t -> 'a plugin + | Path_plugin : (string -> unit) t -> path plugin + and shell_command = token Command.t and token = Path_token of path t | String_token of string t @@ -73,8 +76,7 @@ let digest x = let id : type s. s t -> string = function | Input { id ; _ } -> id | Select { id ; _ } -> id - | Value { id ; _ } -> id - | Path { id ; _ } -> id + | Plugin { id ; _ } -> id | Pure { id ; _ } -> id | App { id ; _ } -> id | Spawn { id ; _ } -> id @@ -110,8 +112,7 @@ module Any = struct | List_nth l -> [ Any l.elts ] | Input _ -> [] | Select sel -> [ any sel.dir ] - | Value v -> v.deps - | Path p -> p.deps + | Plugin v -> v.deps | Shell s -> s.deps | Glob g -> [ Any g.dir ] end @@ -124,7 +125,7 @@ let select dir sel = let dir, sel = match dir with | Select { dir ; sel = root ; _ } -> dir, root @ sel - | Input _ | Path _ | Shell _ -> dir, sel + | Input _ | Plugin _ | Shell _ -> dir, sel | _ -> assert false in let id = digest ("select", id dir, sel) in @@ -144,11 +145,11 @@ let both fst snd = let cached_value ?(descr = "") ?(np = 1) ?(mem = int 100) ?version workflow = let id = digest (`Value, id workflow, version) in - Value { id ; descr ; task = workflow ; np ; mem ; version ; deps = [ any mem ; any workflow ] } + Plugin { id ; descr ; task = Value_plugin workflow ; np ; mem ; version ; deps = [ any mem ; any workflow ] } let cached_path ?(descr = "") ?(np = 1) ?(mem = int 100) ?version workflow = let id = digest (`Value, id workflow, version) in - Path { id ; descr ; task = workflow ; np ; mem ; version ; deps = [ any mem ; any workflow ] } + Plugin { id ; descr ; task = Path_plugin workflow ; np ; mem ; version ; deps = [ any mem ; any workflow ] } let eval_path w = Eval_path { id = digest (`Eval_path, id w) ; workflow = w } diff --git a/lib/utils/console_logger.ml b/lib/utils/console_logger.ml index 1eb7da6b..79f1387d 100644 --- a/lib/utils/console_logger.ml +++ b/lib/utils/console_logger.ml @@ -35,13 +35,11 @@ let error_short_descr = let output_step_event t ~id ~descr = let id = String.prefix id 6 in msg t "started %s.%s" descr id - + let output_event t = function | Logger.Workflow_started (Shell { id ; descr ; _ }, _) -> output_step_event t ~id ~descr - | Logger.Workflow_started (Value { id ; descr ; _ }, _) -> - output_step_event t ~id ~descr - | Logger.Workflow_started (Path { id ; descr ; _ }, _) -> + | Logger.Workflow_started (Plugin { id ; descr ; _ }, _) -> output_step_event t ~id ~descr | Workflow_ended { outcome = (Task_result.Shell { id ; descr ; _ } as outcome) ; _ } -> diff --git a/lib/utils/dot_output.ml b/lib/utils/dot_output.ml index f2bffbad..56ae2297 100644 --- a/lib/utils/dot_output.ml +++ b/lib/utils/dot_output.ml @@ -95,8 +95,7 @@ let dot_output ?db oc g ~needed = let label = Path.to_string s.sel in [ `Label label ; `Fontcolor color ; `Color color ; shape ] | Shell { descr ; _ } -> step_attributes ~descr u - | Value { descr ; _ } -> step_attributes ~descr u - | Path { descr ; _ } -> step_attributes ~descr u + | Plugin { descr ; _ } -> step_attributes ~descr u | Pure _ -> [ label "pure" u ; `Shape `Plaintext ] | App _ -> [ label "app" u ; `Shape `Plaintext ] | Spawn _ -> [ label "spawn" u ; `Shape `Ellipse ] diff --git a/lib/utils/repo.ml b/lib/utils/repo.ml index 2392a8ab..1de60387 100644 --- a/lib/utils/repo.ml +++ b/lib/utils/repo.ml @@ -80,6 +80,7 @@ let link dst p_u = ignore (Sys.command cmd) let generate outdir items = + let items = remove_redundancies items in List.iter items ~f:(fun item -> let repo_path = Filename.concat outdir item.repo_path in let file_path = Filename.concat outdir item.file_path in @@ -90,32 +91,33 @@ let generate outdir items = link file_path cache_path ) +let item_to_workflow = function + | Item (path, w) -> + [%workflow [ normalized_repo_item ~repo_path:path ~id:(W.id (Private.reveal w)) ~cache_path:[%path w] ]] + | Item_list l -> + [%workflow + let id = W.id (Private.reveal l.elts) in + let elts = [%eval Workflow.spawn l.elts ~f:Workflow.eval_path] in + let n = List.length elts in + let m = Float.(n |> of_int |> log10 |> to_int) in + let ext = match l.ext with + | None -> "" + | Some s -> "." ^ s + in + let format = + Scanf.format_from_string + (sprintf {|%%s_%%0%dd%%s|} m) + "%s%d%s" in + let list_elt_path i = + l.path @ [ sprintf format l.prefix i ext ] + in + List.mapi elts ~f:(fun i path_w -> + normalized_repo_item ~repo_path:(list_elt_path i) ~id:(Misc.digest (id, i)) ~cache_path:path_w + )] + let to_workflow ~outdir items = let normalized_items = - List.map items ~f:(function - | Item (path, w) -> - [%workflow [ normalized_repo_item ~repo_path:path ~id:(W.id (Private.reveal w)) ~cache_path:[%path w] ]] - | Item_list l -> - [%workflow - let id = W.id (Private.reveal l.elts) in - let elts = [%eval Workflow.spawn l.elts ~f:Workflow.eval_path] in - let n = List.length elts in - let m = Float.(n |> of_int |> log10 |> to_int) in - let ext = match l.ext with - | None -> "" - | Some s -> "." ^ s - in - let format = - Scanf.format_from_string - (sprintf {|%%s_%%0%dd%%s|} m) - "%s%d%s" in - let list_elt_path i = - l.path @ [ sprintf format l.prefix i ext ] - in - List.mapi elts ~f:(fun i path_w -> - normalized_repo_item ~repo_path:(list_elt_path i) ~id:(Misc.digest (id, i)) ~cache_path:path_w - )] - ) + List.map items ~f:item_to_workflow |> Workflow.list in [%workflow @@ -124,15 +126,31 @@ let to_workflow ~outdir items = |> remove_redundancies |> generate outdir] +let partition_results xs = + let rec inner ok err = function + | [] -> ok, err + | Ok x :: t -> inner (x :: ok) err t + | Error e :: t -> inner ok (e :: err) t + in + inner [] [] xs + let build ?np ?mem ?loggers ?use_docker ?(bistro_dir = "_bistro") ?collect ~outdir repo = let db = Db.init_exn bistro_dir in - let expr = to_workflow ~outdir repo in - Scheduler.eval ?np ?mem ?loggers ?use_docker ?collect db expr >|= - function - | Ok () -> () - | Error errors -> + let expressions = List.map repo ~f:(item_to_workflow) in + let sched = Scheduler.create ?np ?mem ?loggers ?use_docker ?collect db in + let results = Lwt_list.map_p (Scheduler.eval sched) expressions in + Scheduler.start sched ; + Lwt.map partition_results results >>= fun (res, errors) -> + Scheduler.stop sched >|= fun () -> + generate outdir (List.concat res) ; + if errors <> [] then ( + let errors = + List.concat errors + |> Execution_trace.gather_failures + in prerr_endline (Scheduler.error_report db errors) ; failwith "Some workflow failed!" + ) let build_main ?np ?mem ?loggers ?use_docker ?bistro_dir ?collect ~outdir repo = build ?np ?mem ?loggers ?use_docker ?bistro_dir ?collect ~outdir repo diff --git a/test/test1.ml b/test/test1.ml index 8b1b1283..9c33ca22 100644 --- a/test/test1.ml +++ b/test/test1.ml @@ -12,8 +12,8 @@ let pipeline = add (Workflow.int 1) (Workflow.int 41) let _ = let open Bistro_engine in - let open Lwt_result.Infix in let db = Db.init_exn "_bistro" in - Scheduler.eval db pipeline - >|= Printf.printf "%d\n" - |> Lwt_main.run + let sched = Scheduler.create db in + let thread = Scheduler.eval_exn sched pipeline in + Scheduler.start sched ; + Printf.printf "%d\n" (Lwt_main.run thread) diff --git a/test/test2.ml b/test/test2.ml index a463d0b5..a6b9a6cf 100644 --- a/test/test2.ml +++ b/test/test2.ml @@ -35,6 +35,8 @@ let _ = let db = Db.init_exn "_bistro" in let pipeline = Repo.to_workflow repo ~outdir:"res" in Dot_output.workflow_to_file ~db "workflow.dot" pipeline ; - let sched = Scheduler.create ~np:4 ~loggers:[Bistro_utils.Console_logger.create ()] ~collect:true db pipeline in - ignore (Scheduler.run sched |> Lwt_main.run) ; - dump_gc_state sched db "gc_final.dot" ; + let sched = Scheduler.create ~np:4 ~loggers:[Bistro_utils.Console_logger.create ()] ~collect:true db in + let thread = Scheduler.eval_exn sched pipeline in + Scheduler.start sched ; + Lwt_main.run thread ; + dump_gc_state sched db "gc_final.dot" diff --git a/test/test_gc.ml b/test/test_gc.ml index c6b7d26d..2c9064e0 100644 --- a/test/test_gc.ml +++ b/test/test_gc.ml @@ -35,6 +35,8 @@ let dump_gc_state sched db fn = let _ = let open Bistro_engine in let db = Db.init_exn "_bistro" in - let sched = Scheduler.create ~np:4 ~loggers:[Bistro_utils.Console_logger.create ()] ~collect:true db pipeline in - ignore (Scheduler.run sched |> Lwt_main.run) ; + let sched = Scheduler.create ~np:4 ~loggers:[Bistro_utils.Console_logger.create ()] ~collect:true db in + let thread = Scheduler.eval_exn sched pipeline in + Scheduler.start sched ; + ignore (Lwt_main.run thread) ; dump_gc_state sched db "gc_final.dot"