Skip to content

Commit

Permalink
gc-based snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
icristescu committed Nov 4, 2022
1 parent 3dd2231 commit 78ee6a8
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 17 deletions.
48 changes: 41 additions & 7 deletions src/irmin-pack/unix/ext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ module Maker (Config : Conf.S) = struct
cancelled
| None -> false

let start ~unlink ~use_auto_finalisation t commit_key =
let start ~unlink ~use_auto_finalisation ~path_new_files t commit_key
=
let open Result_syntax in
[%log.info "GC: Starting on %a" pp_key commit_key];
let* () =
Expand Down Expand Up @@ -276,19 +277,21 @@ module Maker (Config : Conf.S) = struct
let gc =
Gc.v ~root ~generation:next_generation ~unlink
~dispatcher:t.dispatcher ~fm:t.fm ~contents:t.contents
~node:t.node ~commit:t.commit commit_key
~node:t.node ~commit:t.commit ~path_new_files commit_key
in
t.running_gc <- Some { gc; use_auto_finalisation };
Ok ()

let start_exn ?(unlink = true) ~use_auto_finalisation t commit_key =
let start_exn ?(unlink = true) ~use_auto_finalisation ~path_new_files
t commit_key =
match t.running_gc with
| Some _ ->
[%log.info "Repo is alreadying running GC. Skipping."];
Lwt.return false
| None -> (
let result =
start ~unlink ~use_auto_finalisation t commit_key
start ~unlink ~use_auto_finalisation ~path_new_files t
commit_key
in
match result with
| Ok _ -> Lwt.return true
Expand Down Expand Up @@ -351,6 +354,31 @@ module Maker (Config : Conf.S) = struct
Pack_key.v_direct ~offset ~length ~hash:entry.hash
in
Some key)

let create_one_commit_store t commit_key path =
let () = Io.mkdir path |> Errs.raise_if_error in
let* _launched =
start_exn ~use_auto_finalisation:false ~path_new_files:path t
commit_key
in
let generation = File_manager.generation t.fm + 1 in
let* () =
match t.running_gc with
| None -> assert false
| Some { gc; _ } -> Gc.finalise_without_swap gc
in
let () =
File_manager.create_one_commit_store t.config ~generation
~new_store_root:path commit_key
|> Errs.raise_if_error
in
let branch_path = Irmin_pack.Layout.V4.branch ~root:path in
let* branch_store =
Branch.v ~fresh:true ~readonly:false branch_path
in
let* () = Branch.close branch_store in
(* TODO fsync dir.*)
Lwt.return_unit
end

let batch t f =
Expand Down Expand Up @@ -538,6 +566,7 @@ module Maker (Config : Conf.S) = struct
let flush = X.Repo.flush
let fsync = X.Repo.fsync
let split = X.Repo.split_exn
let create_one_commit_store = X.Repo.Gc.create_one_commit_store

module Gc = struct
type msg = [ `Msg of string ]
Expand Down Expand Up @@ -568,13 +597,18 @@ module Maker (Config : Conf.S) = struct
`Msg err_msg

let finalise_exn = X.Repo.Gc.finalise_exn
let start_exn = X.Repo.Gc.start_exn ~use_auto_finalisation:false

let start_exn ?unlink t =
let root = Irmin_pack.Conf.root t.X.Repo.config in
X.Repo.Gc.start_exn ?unlink ~use_auto_finalisation:false
~path_new_files:root t

let start repo commit_key =
let root = Irmin_pack.Conf.root repo.X.Repo.config in
try
let* started =
X.Repo.Gc.start_exn ~unlink:true ~use_auto_finalisation:true repo
commit_key
X.Repo.Gc.start_exn ~unlink:true ~use_auto_finalisation:true
~path_new_files:root repo commit_key
in
Lwt.return_ok started
with exn -> catch_errors "Start GC" exn
Expand Down
68 changes: 68 additions & 0 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -817,4 +817,72 @@ struct
let chunk_start_idx = pl.chunk_start_idx in
let chunk_num = pl.chunk_num in
cleanup ~root ~generation ~chunk_start_idx ~chunk_num

let create_one_commit_store config ~generation ~new_store_root
latest_gc_target =
let open Result_syntax in
let root = Irmin_pack.Conf.root config in
let commit_key, latest_gc_target_offset, suffix_start_offset =
match Pack_key.inspect latest_gc_target with
| Direct { offset; length; _ } as commit_key ->
let suffix_start_offset =
Int63.Syntax.(offset + Int63.of_int length)
in
(commit_key, offset, suffix_start_offset)
| Indexed _ ->
(* The caller of this function lifted the key to a direct one. *)
assert false
in
(* Step 1. Copy the dict *)
let src_dict = Irmin_pack.Layout.V4.dict ~root in
let dst_dict = Irmin_pack.Layout.V4.dict ~root:new_store_root in
let* () = Io.cp_file ~src:src_dict ~dst:dst_dict in
(* Step 2. Create an empty suffix and close it. *)
let* suffix =
Suffix.create_rw ~root:new_store_root ~overwrite:false
~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal
~start_idx:generation
in
let* () = Suffix.close suffix in
(* Step 3. Create the control file and close it. *)
let status =
Payload.Gced
{
suffix_start_offset;
generation;
latest_gc_target_offset;
suffix_dead_bytes = Int63.zero;
}
in
let dict_end_poff = Io.size_of_path dst_dict |> Errs.raise_if_error in
let pl =
{
Payload.dict_end_poff;
suffix_end_poff = Int63.zero;
checksum = Int63.zero;
status;
upgraded_from_v3_to_v4 = false;
chunk_num = 1;
chunk_start_idx = generation;
}
in
let path = Irmin_pack.Layout.V4.control ~root:new_store_root in
let* control = Control.create_rw ~path ~overwrite:false pl in
let* () = Control.close control in
(* Step 4. Create the index. *)
let* index =
let log_size = Conf.index_log_size config in
let throttle = Conf.merge_throttle config in
Index.v ~fresh:true ~flush_callback:Fun.id ~readonly:false ~throttle
~log_size new_store_root
in
(* Step 5. Add the commit to the index, close the index. *)
let () =
match commit_key with
| Direct { hash; offset; length } ->
Index.add index hash (offset, length, Pack_value.Kind.Commit_v2)
| Indexed _ -> assert false
in
let* () = Index.close index in
Ok ()
end
7 changes: 7 additions & 0 deletions src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ module type S = sig
val generation : t -> int
val gc_allowed : t -> bool
val split : t -> (unit, [> Errs.t ]) result

val create_one_commit_store :
Irmin.Backend.Conf.t ->
generation:int ->
new_store_root:string ->
Index.key Pack_key.t ->
(unit, [> open_rw_error | close_error ]) result
end

module type Sigs = sig
Expand Down
16 changes: 13 additions & 3 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ module Make (Args : Gc_args.S) = struct
latest_gc_target_offset : int63;
}

let v ~root ~generation ~unlink ~dispatcher ~fm ~contents ~node ~commit
commit_key =
let v ~root ~path_new_files ~generation ~unlink ~dispatcher ~fm ~contents
~node ~commit commit_key =
let offset, latest_gc_target_offset =
let state : _ Pack_key.state = Pack_key.inspect commit_key in
match state with
Expand Down Expand Up @@ -89,7 +89,8 @@ module Make (Args : Gc_args.S) = struct
(* start worker task *)
let task =
Async.async (fun () ->
Worker.run_and_output_result root commit_key offset ~generation)
Worker.run_and_output_result root commit_key offset ~generation
~path_new_files)
in
let partial_stats =
Gc_stats.Main.finish_current_step partial_stats "before finalise"
Expand Down Expand Up @@ -314,6 +315,15 @@ module Make (Args : Gc_args.S) = struct
| `Running -> Lwt.return_ok `Running
| #Async.outcome as status -> go status)

let finalise_without_swap t =
let* status = Async.await t.task in
match status with
| `Success -> Lwt.return_unit
| _ ->
let gc_output = read_gc_output ~root:t.root ~generation:t.generation in
let r = gc_errors status gc_output |> Errs.raise_if_error in
Lwt.return r

let on_finalise t f =
(* Ignore returned promise since the purpose of this
function is to add asynchronous callbacks to the GC
Expand Down
2 changes: 2 additions & 0 deletions src/irmin-pack/unix/gc.mli
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module Make (Args : Gc_args.S) : sig

val v :
root:string ->
path_new_files:string ->
generation:int ->
unlink:bool ->
dispatcher:Args.Dispatcher.t ->
Expand All @@ -51,5 +52,6 @@ module Make (Args : Gc_args.S) : sig
finalises. *)

val cancel : t -> bool
val finalise_without_swap : t -> unit Lwt.t
end
with module Args = Args
17 changes: 11 additions & 6 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ module Make (Args : Gc_args.S) = struct
stats := Gc_stats.Worker.add_file_size !stats "old_prefix" prefix_size;
stats := Gc_stats.Worker.add_file_size !stats "old_mapping" mapping_size

let run ~generation root commit_key new_prefix_end_offset =
let run ~generation ~path_new_files root commit_key new_prefix_end_offset =
let open Result_syntax in
let config =
Irmin_pack.Conf.init ~fresh:false ~readonly:true ~lru_size:0 root
Expand Down Expand Up @@ -167,7 +167,7 @@ module Make (Args : Gc_args.S) = struct
stats := Gc_stats.Worker.add_file_size !stats "mapping" mapping_size
in
(fun f ->
Mapping_file.create ~report_file_sizes ~root ~generation
Mapping_file.create ~report_file_sizes ~root:path_new_files ~generation
~register_entries:f ()
|> Errs.raise_if_error)
@@ fun ~register_entry ->
Expand Down Expand Up @@ -218,7 +218,9 @@ module Make (Args : Gc_args.S) = struct
(* Step 4. Create the new prefix. *)
stats := Gc_stats.Worker.finish_current_step !stats "prefix: start";
let prefix =
let path = Irmin_pack.Layout.V4.prefix ~root ~generation in
let path =
Irmin_pack.Layout.V4.prefix ~root:path_new_files ~generation
in
Ao.create_rw_exn ~path
in
let () =
Expand Down Expand Up @@ -252,7 +254,9 @@ module Make (Args : Gc_args.S) = struct
Dispatcher.read_exn dispatcher accessor buf
in
let prefix =
let path = Irmin_pack.Layout.V4.prefix ~root ~generation in
let path =
Irmin_pack.Layout.V4.prefix ~root:path_new_files ~generation
in
Io.open_ ~path ~readonly:false |> Errs.raise_if_error
in
Errors.finalise_exn (fun _outcome ->
Expand Down Expand Up @@ -330,10 +334,11 @@ module Make (Args : Gc_args.S) = struct

(* No one catches errors when this function terminates. Write the result in a
file and terminate. *)
let run_and_output_result ~generation root commit_key new_prefix_end_offset =
let run_and_output_result ~generation ~path_new_files root commit_key
new_prefix_end_offset =
let result =
Errs.catch (fun () ->
run ~generation root commit_key new_prefix_end_offset)
run ~generation ~path_new_files root commit_key new_prefix_end_offset)
in
let write_result = write_gc_output ~root ~generation result in
write_result |> Errs.log_if_error "writing gc output"
Expand Down
7 changes: 6 additions & 1 deletion src/irmin-pack/unix/gc_worker.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ module Make (Args : Gc_args.S) : sig
module Args : Gc_args.S

val run_and_output_result :
generation:int -> string -> Args.key -> int63 -> unit
generation:int ->
path_new_files:string ->
string ->
Args.key ->
int63 ->
unit

type gc_output = (Stats.Latest_gc.worker, Args.Errs.t) result
[@@deriving irmin]
Expand Down
6 changes: 6 additions & 0 deletions src/irmin-pack/unix/io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ module Unix = struct
Ok ()
with Sys_error msg -> Error (`Sys_error msg)

let cp_file ~src ~dst =
let cmd = Filename.quote_command "cp" [ "-p"; src; dst ] in
match Sys.command cmd with
| 0 -> Ok ()
| n -> Error (`Sys_error (Int.to_string n))

let mkdir path =
match (classify_path (Filename.dirname path), classify_path path) with
| `Directory, `No_such_file_or_directory -> (
Expand Down
3 changes: 3 additions & 0 deletions src/irmin-pack/unix/io_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ module type S = sig
val move_file :
src:string -> dst:string -> (unit, [> `Sys_error of string ]) result

val cp_file :
src:string -> dst:string -> (unit, [> `Sys_error of string ]) result

val mkdir : string -> (unit, [> mkdir_error ]) result
val unlink : string -> (unit, [> `Sys_error of string ]) result

Expand Down
9 changes: 9 additions & 0 deletions src/irmin-pack/unix/s.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ module type S = sig
TODO: Detail exceptions raised. *)

val create_one_commit_store : repo -> commit_key -> string -> unit Lwt.t
(** [create_one_commit_store t key path] creates a new store at [path] from
the existing one, containing only one commit, specified by the [key]. Note
that this operation is blocking.
It requires that the files existing on disk when the operation is
launched, remain on disk until the operation completes. In particular, a
Gc running in a different process could remove files from disk. *)

module Gc : sig
(** GC *)

Expand Down
Loading

0 comments on commit 78ee6a8

Please sign in to comment.