Skip to content

Commit

Permalink
Raise errors in worker
Browse files Browse the repository at this point in the history
  • Loading branch information
icristescu committed Sep 1, 2022
1 parent 97bcdab commit 24aee6c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 58 deletions.
128 changes: 70 additions & 58 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,36 @@ module Worker = struct

module Make (Args : Args) : S with module Args := Args = struct
open Args
module Io = Fm.Io

module Io = struct
include Fm.Io

let fsync_and_close x ~log_if_error =
let open Result_syntax in
fsync x >>= (fun () -> close x) |> Errs.log_if_error log_if_error

let open_exn ~path ~readonly =
open_ ~path ~readonly |> Errs.raise_if_error
end

module Mapping_file = Fm.Mapping_file
module Ao = Append_only_file.Make (Io)

module Ao = struct
include Append_only_file.Make (Fm.Io)

let create_rw_exn ~path ~auto_flush_callback =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
|> Errs.raise_if_error

let flush_exn x = flush x |> Errs.raise_if_error

let fsync_and_close x ~log_if_error =
let open Result_syntax in
fsync x >>= (fun () -> close x) |> Errs.log_if_error log_if_error

let close_exn x ~log_if_error = close x |> Errs.log_if_error log_if_error
end

module X = struct
type t = int63 [@@deriving irmin]
Expand Down Expand Up @@ -139,42 +166,37 @@ module Worker = struct
write_exn ~off:poff ~len (Bytes.unsafe_to_string buffer)

let create_new_suffix ~root ~generation =
let open Result_syntax in
let path = Irmin_pack.Layout.V3.suffix ~root ~generation in
let auto_flush_threshold = 1_000_000 in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let* suffix =
Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold
~auto_flush_callback
in
Ok suffix
let auto_flush_callback = Ao.flush_exn in
Ao.create_rw_exn ~path ~auto_flush_callback

let run ~generation root commit_key =
let open Result_syntax in
let config =
Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root
in

(* Step 1. Open the files *)
[%log.debug "GC: opening files in RO mode"];
let* fm = Fm.open_ro config in
let fm = Fm.open_ro config |> Errs.raise_if_error in
Errors.finalise (fun _outcome ->
Fm.close fm |> Errs.log_if_error "GC: Close File_manager")
@@ fun () ->
let* dict = Dict.v fm in
let* dispatcher = Dispatcher.v ~root fm in
let dict = Dict.v fm |> Errs.raise_if_error in
let dispatcher = Dispatcher.v ~root fm |> Errs.raise_if_error in
let node_store = Node_store.v ~config ~fm ~dict ~dispatcher in
let commit_store = Commit_store.v ~config ~fm ~dict ~dispatcher in

(* Step 2. Load commit which will make [commit_key] [Direct] if it's not
already the case. *)
let* commit =
let commit =
match
Commit_store.unsafe_find ~check_integrity:false commit_store
commit_key
with
| None -> Error (`Commit_key_is_dangling (string_of_key commit_key))
| Some commit -> Ok commit
| None ->
Errs.raise_error
(`Commit_key_is_dangling (string_of_key commit_key))
| Some commit -> commit
in
let commit_offset, _ =
let state : _ Irmin_pack.Pack_key.state =
Expand All @@ -186,10 +208,12 @@ module Worker = struct
in

(* Step 3. Create the new mapping. *)
let* mapping =
let mapping =
(* Step 3.1 Start [Mapping_file] routine which will create the
reachable file. *)
(fun f -> Mapping_file.create ~root ~generation ~register_entries:f)
(fun f ->
Mapping_file.create ~root ~generation ~register_entries:f
|> Errs.raise_if_error)
@@ fun ~register_entry ->
(* Step 3.2 Put the commit parents in the reachable file.
The parent(s) of [commit_key] must be included in the iteration
Expand Down Expand Up @@ -225,17 +249,16 @@ module Worker = struct
()
in

let* () =
let () =
(* Step 4. Create the new prefix. *)
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let* prefix =
let prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
let auto_flush_callback = Ao.flush_exn in
Ao.create_rw_exn ~path ~auto_flush_callback
in
let* () =
let () =
Errors.finalise (fun _outcome ->
Ao.close prefix |> Errs.log_if_error "GC: Close prefix")
Ao.close_exn prefix ~log_if_error:"GC: Close prefix")
@@ fun () ->
();

Expand All @@ -249,42 +272,35 @@ module Worker = struct
let len = Int63.of_int len in
transfer_append_exn ~read_exn ~append_exn ~off ~len buffer
in
let* () = Mapping_file.iter mapping f in
Ao.flush prefix
let () = Mapping_file.iter_exn mapping f in
Ao.flush_exn prefix
in
(* Step 5.2. Transfer again the parent commits but with a modified
magic. Reopen the new prefix, this time _not_ in append-only
as we have to modify data inside the file. *)
let read_exn = Dispatcher.read_exn dispatcher in
let* prefix =
let prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
Io.open_ ~path ~readonly:false
in
let* () =
Errors.finalise (fun _outcome ->
Io.fsync prefix
>>= (fun _ -> Io.close prefix)
|> Errs.log_if_error "GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Io.write_exn prefix in
List.iter
(fun key ->
transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key)
(Commit_value.parents commit);
Ok ()
Io.open_exn ~path ~readonly:false
in
Ok ()
Errors.finalise (fun _outcome : unit ->
Io.fsync_and_close prefix
~log_if_error:"GC: Close prefix after parent rewrite")
@@ fun () ->
let write_exn = Io.write_exn prefix in
List.iter
(fun key ->
transfer_parent_commit_exn ~read_exn ~write_exn ~mapping key)
(Commit_value.parents commit)
in

(* Step 6. Create the new suffix and prepare 2 functions for read and write
operations. *)
let buffer = Bytes.create buffer_size in
[%log.debug "GC: creating new suffix"];
let* suffix = create_new_suffix ~root ~generation in
Errors.finalise (fun _outcome ->
Ao.fsync suffix
>>= (fun _ -> Ao.close suffix)
|> Errs.log_if_error "GC: Close suffix")
let suffix = create_new_suffix ~root ~generation in
Errors.finalise (fun _outcome : unit ->
Ao.fsync_and_close suffix ~log_if_error:"GC: Close suffix")
@@ fun () ->
let read_exn = Dispatcher.read_exn dispatcher in
let append_exn = Ao.append_exn suffix in
Expand Down Expand Up @@ -317,20 +333,16 @@ module Worker = struct
let off = Int63.Syntax.(off + len) in
transfer_loop ~off (i - 1)
in
let flush_and_raise () = Ao.flush suffix |> Errs.raise_if_error in
let* offs =
Errs.catch (fun () ->
let offs = transfer_loop ~off:commit_offset num_iterations in
flush_and_raise ();
offs)
in
let offs = transfer_loop ~off:commit_offset num_iterations in
Ao.flush_exn suffix;

(* Step 8. Inform the caller of the end_offset copied. *)
Ok offs
offs

(* No one catches errors when this function terminates. Write the result in a
file and terminate the process with an exception, if needed. *)
let run_and_output_result ~generation root commit_key =
let result = run ~generation root commit_key in
let result = Errs.catch (fun () -> run ~generation root commit_key) in
let write_result = Fm.write_gc_output ~root ~generation result in
write_result |> Errs.raise_if_error;
result |> Errs.raise_if_error
Expand Down
5 changes: 5 additions & 0 deletions src/irmin-pack/unix/mapping_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,11 @@ module Make (Io : Io.S) = struct
let entry_poff arr i = arr.{entry_idx i + 1} |> conv_int64 |> Int63.of_int
let entry_len arr i = arr.{entry_idx i + 2} |> conv_int64

let iter_exn { arr; _ } f =
for i = 0 to entry_count arr - 1 do
f ~off:(entry_off arr i) ~len:(entry_len arr i)
done

let iter { arr; _ } f =
Errs.catch (fun () ->
for i = 0 to entry_count arr - 1 do
Expand Down
3 changes: 3 additions & 0 deletions src/irmin-pack/unix/mapping_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ module type S = sig
The exceptions raised by [f] are caught and returned (as long as they are
known by [Errs]). *)

val iter_exn : t -> (off:int63 -> len:int -> unit) -> unit
(** Similar to [iter mapping f] but raises exceptions. *)

type entry = { off : int63; poff : int63; len : int }

val find_nearest_leq : t -> int63 -> entry option
Expand Down

0 comments on commit 24aee6c

Please sign in to comment.