Skip to content

Commit

Permalink
Input label is now explicit in protocol instead of encoded in url.
Browse files Browse the repository at this point in the history
  • Loading branch information
pmundkur committed Feb 15, 2013
1 parent 954645d commit c415bf6
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 51 deletions.
36 changes: 14 additions & 22 deletions lib/pipeline_utils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,24 @@ module P = Protocol
module L = Pipeline
module E = Errors

let task_input_of_one_uri ti id (rid, uri) =
let task_input_of_one_uri ti id label (rid, uri) =
let trans_auth =
match uri.Uri.scheme, uri.Uri.authority with
| Some "dir", Some a
| Some "disco", Some a -> Some {a with Uri.port = Some ti.P.task_disco_port}
| _, auth -> auth in
let url =
match uri.Uri.scheme, uri.Uri.fragment with
| Some "dir", None ->
match uri.Uri.scheme, label with
| Some "dir", P.Input_label_all ->
L.Dir {uri with Uri.authority = trans_auth}
| Some "dir", Some l ->
(try
let label = int_of_string l
in L.Dir_indexed (label, {uri with Uri.authority = trans_auth;
fragment = None})
with Failure _ ->
raise (E.Worker_failure (E.Invalid_task_input_label (id, rid, uri))))
| _, Some l ->
(try
let label = int_of_string l
in L.Data (label, {uri with Uri.authority = trans_auth;
fragment = None})
with Failure _ ->
raise (E.Worker_failure (E.Invalid_task_input_label (id, rid, uri))))
| _, None ->
raise (E.Worker_failure (E.Missing_task_input_label (id, rid, uri)))
| Some "dir", P.Input_label l ->
L.Dir_indexed (l, {uri with Uri.authority = trans_auth;
fragment = None})
| _, P.Input_label l ->
L.Data (l, {uri with Uri.authority = trans_auth;
fragment = None})
| _, P.Input_label_all ->
raise (E.Worker_failure (E.Invalid_task_input_label (id, rid, uri)))
in rid, url

let comparable_task_inputs (_, i1) (_, i2) =
Expand All @@ -37,12 +29,12 @@ let comparable_task_inputs (_, i1) (_, i2) =
| L.Dir _, L.Dir _ -> true
| _, _ -> false

let task_input_of ti id = function
let task_input_of ti id label = function
| [] ->
[]
| (r :: _) as replicas ->
let one = task_input_of_one_uri ti id r in
let all = List.map (task_input_of_one_uri ti id) replicas in
let one = task_input_of_one_uri ti id label r in
let all = List.map (task_input_of_one_uri ti id label) replicas in
(* ensure all urls in the input are consistent *)
if not (List.for_all (comparable_task_inputs one) all)
then
Expand Down
2 changes: 1 addition & 1 deletion lib/pipeline_utils.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
val comparable_task_inputs : Protocol.replica_id * Pipeline.task_input
-> Protocol.replica_id * Pipeline.task_input -> bool

val task_input_of : Protocol.taskinfo -> Protocol.input_id
val task_input_of : Protocol.taskinfo -> Protocol.input_id -> Protocol.input_label
-> Protocol.replica list -> (Protocol.replica_id * Pipeline.task_input) list
19 changes: 16 additions & 3 deletions lib/protocol.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type input_status =
| Input_ok
| Input_failed

type input_label =
| Input_label_all
| Input_label of Pipeline.label

let input_status_of_string s =
match String.lowercase s with
| "ok" -> Input_ok
Expand All @@ -70,7 +74,7 @@ let input_status_of_string s =
type input_id = int
type replica_id = int
type replica = replica_id * Uri.t
type input = input_id * input_status * replica list
type input = input_id * input_status * input_label * replica list

type master_msg =
| M_ok
Expand Down Expand Up @@ -126,6 +130,14 @@ let taskinfo_of b =
task_host; task_master; task_disco_port; task_put_port;
task_disco_root; task_ddfs_root; task_rootpath}

let input_label j =
if J.is_int j then Input_label (JC.to_int j)
else if J.is_string j && JC.to_string j = "all" then Input_label_all
else begin
let msg = Printf.sprintf "invalid input label %s" (J.to_string j) in
raise (E.Worker_failure (E.Protocol_error msg))
end

let task_input_of b =
let msg = JC.to_list b in
let status = task_input_status_of_string (JC.to_string (List.hd msg)) in
Expand All @@ -135,15 +147,16 @@ let task_input_of b =
let l = JC.to_list l in
let inp_id = JC.to_int (List.hd l) in
let inp_status = input_status_of_string (JC.to_string (List.nth l 1)) in
let replicas = JC.to_list (List.nth l 2) in
let inp_label = input_label (List.nth l 2) in
let replicas = JC.to_list (List.nth l 3) in
let inps = List.map
(fun jlist ->
let l = JC.to_list jlist in
let rep_id = JC.to_int (List.hd l) in
let rep_url = Uri.of_string (JC.to_string (List.nth l 1)) in
(rep_id, rep_url)
) replicas in
inp_id, inp_status, inps) in
inp_id, inp_status, inp_label, inps) in
status, List.map mk_inp minps

let retry_of b =
Expand Down
6 changes: 5 additions & 1 deletion lib/protocol.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ type input_status =
| Input_ok
| Input_failed

type input_label =
| Input_label_all
| Input_label of Pipeline.label

type input_id = int
type replica_id = int
type replica = replica_id * Uri.t
type input = input_id * input_status * replica list
type input = input_id * input_status * input_label * replica list

type master_msg =
| M_ok
Expand Down
49 changes: 25 additions & 24 deletions lib/worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ let send_output_msg ic oc out_files =

type resolved_input =
| Inp_replicas of P.input_id * L.label * (P.replica_id * Uri.t) list
| Inp_splits of P.input_id * P.replica_id * (L.label * Uri.t) list
| Inp_splits of P.input_id * P.input_label * P.replica_id * (L.label * Uri.t) list

(* The first pass of input processing resolves any Dir or Dir_indexed
inputs, by fetching the specified index files and processing the
Expand All @@ -123,7 +123,7 @@ let get_labels entries label =
let resolve taskinfo inputs =
(* Partition the inputs into dir indices and replicated data inputs. *)
let dirs, ireps = List.fold_left
(fun (dirs, ireps) ((id, _status, replicas) as inp) ->
(fun (dirs, ireps) ((id, _status, _ilabel, replicas) as inp) ->
match replicas with
| [] ->
dirs, ireps
Expand All @@ -136,13 +136,13 @@ let resolve taskinfo inputs =
dirs, (U.Right (Inp_replicas (id, l, reps))) :: ireps
) ([], []) inputs in
(* For each of the dir index inputs, fetch the index. *)
let make_req (id, _status, reps) =
let make_req (id, _status, ilabel, reps) =
let map, uris = List.split
(List.map
(fun (rid, dr) -> let u = L.uri_of dr in (u, rid), u)
reps) in
let dr_rep = snd (List.hd reps) in
id, (dr_rep, map), uris in
id, (ilabel, dr_rep, map), uris in
let ndirs = List.length dirs in
let indices =
if ndirs > 0 then begin
Expand All @@ -153,7 +153,7 @@ let resolve taskinfo inputs =
end else [] in
(* Process the retrieved indices *)
let resolved_dirs = List.map
(fun (id, (dr_rep, map), result) ->
(fun (id, (ilabel, dr_rep, map), result) ->
(* We're assuming in the assocs below that Disco internal
index urls don't get changed, e.g. via redirects. *)
match dr_rep, result with
Expand All @@ -162,22 +162,22 @@ let resolve taskinfo inputs =
(match parse_index index with
| None ->
U.dbg "Error parsing index %d: %s" id (Uri.to_string uri);
U.Left (id, [rid], E.Invalid_dir_index (id, rid, uri))
U.Left (id, ilabel, [rid], E.Invalid_dir_index (id, rid, uri))
| Some entries ->
(* Use all entries in Dir inputs *)
U.Right (Inp_splits (id, rid, entries)))
U.Right (Inp_splits (id, ilabel, rid, entries)))
| L.Dir_indexed (l, _), U.Right (uri, index) ->
let rid = List.assoc uri map in
(match parse_index index with
| None ->
U.dbg "Error parsing index %d: %s" id (Uri.to_string uri);
U.Left (id, [rid], E.Invalid_dir_index (id, rid, uri))
U.Left (id, ilabel, [rid], E.Invalid_dir_index (id, rid, uri))
| Some entries ->
(* Use the specified labels only for Dir_indexed inputs *)
U.Right (Inp_splits (id, rid, get_labels entries l)))
U.Right (Inp_splits (id, ilabel, rid, get_labels entries l)))
| _, U.Left e ->
U.dbg "Error downloading input %d: %s" id (E.string_of_error e);
U.Left (id, List.map snd map, e)
U.Left (id, ilabel, List.map snd map, e)
| L.Data _, _ ->
assert false
) indices in
Expand All @@ -199,7 +199,7 @@ let download taskinfo inputs =
let make_reqs = function
| (Inp_replicas (id, label, reps)) as inp ->
[ id, (label, inp), List.map snd reps ]
| (Inp_splits (id, _rid, splits)) as inp ->
| (Inp_splits (id, _ilabel, _rid, splits)) as inp ->
List.map (fun (label, url) -> id, (label, inp), [url]
) splits in
let inp_reqs = List.concat (List.map make_reqs inputs) in
Expand All @@ -220,14 +220,14 @@ let download taskinfo inputs =
let rep_list, split_map = List.fold_left
(fun (rep_list, split_map) -> function
(* collect rids for failed downloads *)
| _, (_, Inp_replicas (id, _, reps)), U.Left e ->
U.Left (id, List.map fst reps, e) :: rep_list, split_map
| _, (_, (Inp_splits (id, rid, _) as inp)), U.Left e ->
rep_list, add_split inp (U.Left (id, [rid], e)) split_map
| _, (_, Inp_replicas (id, label, reps)), U.Left e ->
U.Left (id, (P.Input_label label), List.map fst reps, e) :: rep_list, split_map
| _, (_, (Inp_splits (id, ilabel, rid, _) as inp)), U.Left e ->
rep_list, add_split inp (U.Left (id, ilabel, [rid], e)) split_map
| _, (_, Inp_replicas (id, label, _reps)), U.Right (uri, f) ->
(* localize results for successful downloads *)
U.Right (Local_replica (id, label, uri, f)) :: rep_list, split_map
| _, (label, (Inp_splits (_, _, _splits) as inp)), U.Right (uri, f) ->
| _, (label, (Inp_splits (_, _, _, _splits) as inp)), U.Right (uri, f) ->
(* collect splits for same input_id together *)
rep_list, add_split inp (U.Right (label, uri, f)) split_map
) ([], SplitDownloads.empty) downloads in
Expand All @@ -237,7 +237,7 @@ let download taskinfo inputs =
match inp with
| Inp_replicas _ ->
assert false
| Inp_splits (id, rid, splits) ->
| Inp_splits (id, _ilabel, rid, splits) ->
assert (List.length splits = List.length results);
let errors, downloads = U.lrsplit results in
if errors <> [] then begin
Expand All @@ -246,14 +246,15 @@ let download taskinfo inputs =
U.dbg "Error downloading all %d splits from replica %d of input %d: \
%d errors"
(List.length splits) rid id (List.length errors);
List.iter (fun (_label, _uri, f) -> N.File.close f) downloads;
List.iter (fun (_l, _uri, f) -> N.File.close f) downloads;
U.Left (List.hd errors) :: acc
end else
U.Right (Local_splits (id, downloads)) :: acc
) split_map rep_list

let pipeline_inputs_of ti inputs =
List.map (fun (id, st, replicas) -> id, st, LU.task_input_of ti id replicas
List.map (fun (id, st, ilabel, replicas) ->
id, st, ilabel, LU.task_input_of ti id ilabel replicas
) inputs

let run_task ic oc taskinfo task_init task_process task_done =
Expand All @@ -270,11 +271,11 @@ let run_task ic oc taskinfo task_init task_process task_done =
(Unix.in_channel_of_descr fd);
in_files := fi :: !in_files in
let process_localized_input = function
| Local_replica (id, label, uri, f) ->
process_download label uri f;
| Local_replica (id, l, uri, f) ->
process_download l uri f;
id
| Local_splits (id, inps) ->
List.iter (fun (label, uri, f) -> process_download label uri f) inps;
List.iter (fun (l, uri, f) -> process_download l uri f) inps;
id in
let process inputs =
(* First, resolve any indexed inputs *)
Expand All @@ -292,15 +293,15 @@ let run_task ic oc taskinfo task_init task_process task_done =
processed := done_ @ !processed;
(* Make one pass at processing retries for errors *)
let retries = List.fold_left
(fun acc (id, rids, e) ->
(fun acc (id, label, rids, e) ->
match P.send_request (P.W_input_failure (id, rids)) ic oc with
| P.M_fail ->
U.dbg "Unable to get replacement inputs for failed input %d \
(failure: %s), bailing ..."
id (E.string_of_error e);
raise (E.Worker_failure e)
| P.M_retry reps ->
(id, (* unused *) P.Input_ok, reps) :: acc
(id, (* unused *) P.Input_ok, label, reps) :: acc
| m ->
raise (E.Worker_failure (E.Unexpected_msg (P.master_msg_name m)))
) [] errors in
Expand Down

0 comments on commit c415bf6

Please sign in to comment.