Skip to content

Commit

Permalink
added collect_in_directory workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
pveber committed Oct 10, 2018
1 parent 7345cbe commit 3aec571
Show file tree
Hide file tree
Showing 19 changed files with 196 additions and 185 deletions.
6 changes: 4 additions & 2 deletions examples/nlp_demo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ let definition_analysis w =
let text = wikipedia_query w in
let deps = stanford_parser text in
let deps_graphs =
collection_map (sentences_of_stanford_deps deps) ~f:dependensee in
collection_map (sentences_of_stanford_deps deps) ~f:dependensee
|> collect_in_directory ~ext:"png"
in
Bistro_pack.Repo.[
item [ "definition.txt" ] text ;
item [ "deps" ] deps ;
items [ "deps_graphs" ] ~base:"sentence" ~ext:"png" deps_graphs ;
item [ "deps_graphs" ] deps_graphs ;
]
|> Repo.shift w
Expand Down
11 changes: 5 additions & 6 deletions lib/base/sigs.ml
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ module type DSL = sig
f:('a workflow -> 'b workflow) ->
'b collection

val collect_in_directory :
?ext:string ->
'a collection ->
_ directory workflow

type template
module Template_dsl : Template_dsl with type 'a dep := 'a workflow
and type 'a deps := 'a collection
Expand Down Expand Up @@ -235,7 +240,6 @@ end

module type Repo = sig
type 'a workflow
type 'a collection
type logger

type item
Expand All @@ -246,11 +250,6 @@ module type Repo = sig

val item : string list -> 'a workflow -> item

val items :
?base:string ->
?ext:string ->
string list -> 'a collection -> item

val singleton : string -> 'a workflow -> t

val add_prefix : string list -> t -> t
Expand Down
25 changes: 16 additions & 9 deletions lib/base/workflow.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ type t =
}
| Shell of shell
| Plugin of plugin
| Collect_in_directory of {
id : string ;
elts : collection ;
ext : string option ;
}

and 'a step = {
id : string ;
Expand All @@ -35,10 +40,6 @@ and env = <
and dep = WDep of t | CDep of collection
and dep_token = WDepT of t | CDepT of collection * string
and collection =
| List of {
id : string ;
elts : t list ;
}
| Glob of {
id : string ;
dir : t ;
Expand All @@ -57,8 +58,9 @@ let dep_of_dep_token = function
let id = function
| Input { id ; _ }
| Select { id ; _}
| Shell { id ; _ } -> id
| Plugin { id ; _ } -> id
| Shell { id ; _ }
| Plugin { id ; _ }
| Collect_in_directory { id ; _ } -> id

let compare u v =
String.compare (id u) (id v)
Expand All @@ -75,7 +77,8 @@ let select u q =
| Select { dir ; sel = p ; _ } -> k dir (p @ q)
| Input _
| Plugin _
| Shell _ -> k u q
| Shell _
| Collect_in_directory _ -> k u q

let input ?version path =
let id = digest ("input", path, version) in
Expand All @@ -89,9 +92,9 @@ let rec digestible_workflow = function
| Select { dir ; sel = p ; _ } ->
`Select (digestible_workflow dir, p)
| Plugin c -> `Plugin c.id
| Collect_in_directory c -> `Collect_in_directory c.id

let digestible_workflow_list = function
| List l -> `List l.id
| Glob g -> `Glob g.id
| ListMap lm -> `ListMap lm.id

Expand Down Expand Up @@ -120,7 +123,6 @@ let glob ?pattern dir =
Glob { id ; pattern ; dir }

let list_id = function
| List { id ; _ }
| Glob { id ; _ }
| ListMap { id ; _ } -> id

Expand All @@ -137,8 +139,13 @@ let plugin
let id = digest ("closure", version, id) in
Plugin { descr ; task = f ; deps ; np ; mem ; version ; id }

let collect_in_directory ?ext elts =
let id = digest ("collect_in_directory", ext, list_id elts) in
Collect_in_directory { id ; elts ; ext }

let deps = function
| Input _ -> []
| Select { dir ; _ } -> [ WDep dir ]
| Shell s -> s.deps
| Plugin s -> s.deps
| Collect_in_directory c -> [ CDep c.elts ]
1 change: 1 addition & 0 deletions lib/bistro.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type 'a collection = Workflow.collection

let glob = Workflow.glob
let collection_map = Workflow.list_map
let collect_in_directory = Workflow.collect_in_directory

module Private = struct
let reveal (x : 'a workflow) = (x : Workflow.t)
Expand Down
5 changes: 3 additions & 2 deletions lib/engine/db.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,6 @@ let rec path db =
| Input i -> i.path
| Select s ->
Filename.concat (path db s.dir) (Path.to_string s.sel)
| Shell s -> cache db s.id
| Plugin s -> cache db s.id
| Shell { id ; _ }
| Plugin { id ; _ }
| Collect_in_directory { id ; _ } -> cache db id
4 changes: 2 additions & 2 deletions lib/engine/execution_env.ml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type container_mount = {
let container_mount db =
let open Workflow in
function
| Shell _ | Plugin _ as u ->
| Shell _ | Plugin _ | Collect_in_directory _ as u ->
{
mount_host_location = Db.cache_dir db ;
mount_container_location = docker_cache_dir ;
Expand All @@ -62,7 +62,7 @@ let container_mount db =
file_container_location = container_path ;
}

| Select { dir = (Shell _ | Plugin _) as dir ; sel ; _ } ->
| Select { dir = (Shell _ | Plugin _ | Collect_in_directory _) as dir ; sel ; _ } ->
{
mount_host_location = Db.cache_dir db ;
mount_container_location = docker_cache_dir ;
Expand Down
6 changes: 3 additions & 3 deletions lib/engine/execution_trace.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ type t =
_end_ : time ;
outcome : Task_result.t }

| Done_already of Task.t
| Done_already of Workflow.t
| Canceled of {
task : Task.t ;
task : Workflow.t ;
missing_deps : t list ;
}
| Allocation_error of Task.t * string
| Allocation_error of Workflow.t * string
| Invalid_glob of {
dir : Workflow.t ;
}
Expand Down
6 changes: 3 additions & 3 deletions lib/engine/execution_trace.mli
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ type t =
_end_ : time ;
outcome : Task_result.t }

| Done_already of Task.t
| Done_already of Workflow.t
| Canceled of {
task : Task.t ;
task : Workflow.t ;
missing_deps : t list ;
}
| Allocation_error of Task.t * string
| Allocation_error of Workflow.t * string
| Invalid_glob of {
dir : Workflow.t ;
}
Expand Down
6 changes: 3 additions & 3 deletions lib/engine/logger.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ open Bistro_base
type time = float

type event =
| Task_ready of Task.t
| Task_started of Task.t * Allocator.resource
| Task_ready of Workflow.t
| Task_started of Workflow.t * Allocator.resource
| Task_ended of {
outcome : Task_result.t ;
start : time ;
_end_ : time ;
}
| Workflow_skipped of Workflow.t * [ `Done_already | `Missing_dep ]
| Task_allocation_error of Task.t * string
| Task_allocation_error of Workflow.t * string

class type t = object
method event : Db.t -> time -> event -> unit
Expand Down
20 changes: 0 additions & 20 deletions lib/engine/repo.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ open Bistro

type item =
| Item : string list * _ workflow -> item
| Items : {
base : string option ;
ext : string option ;
path : string list ;
collection : _ collection ;
} -> item

type t = item list

Expand All @@ -26,17 +20,7 @@ let normalized_repo_item repo_path w cache_path = [
}
]

let normalized_repo_items ?(base = "") ?ext repo_path results =
List.mapi results ~f:(fun i (w, cache_path) ->
let fn = sprintf "%s%06d%s" base i (match ext with None -> "" | Some e -> "." ^ e) in
let repo_path = repo_path @ [ fn ] in
normalized_repo_item repo_path w cache_path
)
|> List.concat

let item path w = Item (path, w)
let items ?base ?ext path collection =
Items { base ; ext ; path ; collection }

let ( %> ) path w = item path w

Expand Down Expand Up @@ -104,9 +88,6 @@ let to_expr ~outdir items =
| Item (path, w) ->
let%map path_w = dep w in
normalized_repo_item path w path_w
| Items { base ; ext ; path ; collection } ->
let%map paths = deps collection in
normalized_repo_items ?base ?ext path paths
)
|> Static_scheduler.Expr.list
in
Expand All @@ -127,7 +108,6 @@ let build ?np ?mem ?loggers ?keep_all:_ ?use_docker ?(bistro_dir = "_bistro") ~o
let add_prefix prefix items =
List.map items ~f:(function
| Item (p, w) -> Item (prefix @ p, w)
| Items i -> Items { i with path = prefix @ i.path }
)

let shift dir items = add_prefix [ dir ] items
Expand Down
1 change: 0 additions & 1 deletion lib/engine/repo.mli
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
open Bistro

include Bistro_base.Sigs.Repo with type 'a workflow := 'a workflow
and type 'a collection := 'a collection
and type logger := Logger.t

val to_expr :
Expand Down
13 changes: 0 additions & 13 deletions lib/engine/static_scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,6 @@ and schedule_deps sched w =
and schedule_collection sched =
let open Lwt_eval in
function
| List l ->
map_p l.elts ~f:(schedule_workflow sched) >>= fun _ ->
return (List.map ~f:Bistro.Private.laever l.elts)
| Glob g ->
schedule_workflow sched g.dir >>= fun () ->
Lwt.(Misc.files_in_dir (Db.path sched.config.db g.dir) >|= fun x -> Ok x) >>= fun files ->
Expand All @@ -214,11 +211,7 @@ module Expr = struct
| Pure : 'a -> 'a t
| App : ('a -> 'b) t * 'a t -> 'b t
| Workflow : _ Bistro.workflow -> string t
| Collection : 'a Bistro.collection -> (('a workflow * string) list) t
| List : 'a t list -> 'a list t
and 'a workflow = W of 'a Bistro.workflow [@@unboxed]
(* this type definition is to avoid a typing error, see
https://caml.inria.fr/mantis/print_bug_page.php?bug_id=7605 *)

let rec eval
: type s. scheduler -> s t -> (s, S.t) Lwt_result.t
Expand All @@ -235,10 +228,6 @@ module Expr = struct
let w = Bistro.Private.reveal w in
schedule_workflow sched w >>= fun _ ->
return (Db.path sched.config.db w)
| Collection c ->
let c = Bistro.Private.collection c in
schedule_collection sched c >>= fun workflows ->
return (List.map workflows ~f:(fun w -> W w, Db.path sched.config.db (Bistro.Private.reveal w)))
| List xs ->
map_p xs ~f:(eval sched)

Expand All @@ -248,8 +237,6 @@ module Expr = struct
let map x ~f = pure f $ x
let both x y = pure (fun x y -> x, y) $ x $ y
let dep x = Workflow x
let deps xs =
pure (List.map ~f:(fun (W w, s) -> w, s)) $ (Collection xs)
let list xs = List xs
let return x = pure x
end
Expand Down

0 comments on commit 3aec571

Please sign in to comment.