Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngoguey42 committed Jun 30, 2022
1 parent b5e563a commit b0d70ad
Show file tree
Hide file tree
Showing 18 changed files with 251 additions and 78 deletions.
6 changes: 1 addition & 5 deletions src/irmin-pack/unix/control_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ module Payload_v3 = struct
entries before that point may be v1 entries. V1 entries need an entry in
index because it is the only place their lenght is stored. *)

type from_v3_gced = {
(* TODO(good gc): Uncomment entry_offset_suffix_start *)
(* entry_offset_suffix_start : int63; *)
generation : int;
}
type from_v3_gced = { entry_offset_suffix_start : int63; generation : int }
[@@deriving irmin]
(** [entry_offset_suffix_start] is 0 if the suffix file was never garbage
collected. Otherwise it is the offset of the very first entry of the
Expand Down
53 changes: 53 additions & 0 deletions src/irmin-pack/unix/dispatcher.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
open Import

module type S = sig
module Fm : File_manager.S

type t

val v : Fm.t -> (t, [> Fm.Errs.t ]) result

val read_exn : t -> off:int63 -> len:int -> bytes -> unit
(** [dispatched_read_exn] either reads in the prefix or the suffix file,
depending on [off]. See [Io.read_exn] for the arguments. *)

val end_offset : t -> int63
(** [end_offset] is the end offsets of the pack entries, counting that the
prefix doesn't start at 0. It counts the entries not yet flushed from the
prefix. *)
end

module Make (Fm : File_manager.S) : S with module Fm = Fm = struct
module Fm = Fm
module Suffix = Fm.Suffix

type t = { fm : Fm.t }

let v fm = Ok { fm }

(* let entry_offset_suffix_start t =
* let pl = Control.payload t.control in
* match pl.status with
* | Payload.From_v1_v2_post_upgrade _
* | From_v3_used_non_minimal_indexing_strategy | From_v3_no_gc_yet ->
* Int63.zero
* | T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13 | T14
* | T15 ->
* assert false
* | From_v3_gced { entry_offset_suffix_start; _ } -> entry_offset_suffix_start *)

let entry_offset_suffix_start _ = Int63.zero

let read_exn t ~off ~len buf =
let open Int63.Syntax in
let entry_offset_suffix_start = entry_offset_suffix_start t in
if off >= entry_offset_suffix_start then
Suffix.read_exn (Fm.suffix t.fm) ~off ~len buf
else
(* TODO: read in prefix *)
Fmt.failwith "unexpected read %a" Int63.pp off

let end_offset t =
let open Int63.Syntax in
Suffix.end_offset (Fm.suffix t.fm) + entry_offset_suffix_start t
end
3 changes: 2 additions & 1 deletion src/irmin-pack/unix/errors.ml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ type base_error =
| `Commit_parent_key_is_indexed of string
| `Gc_process_error of string
| `Corrupted_gc_result_file of string
| `Gc_process_died_without_result_file of string ]
| `Gc_process_died_without_result_file of string
| `Gc_forbidden_on_32bit_platforms ]
[@@deriving irmin ~pp]
(** [base_error] is the type of most errors that can occur in a [result], except
for errors that have associated exceptions (see below) and backend-specific
Expand Down
23 changes: 16 additions & 7 deletions src/irmin-pack/unix/ext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ module Maker (Config : Conf.S) = struct
module Aof = Append_only_file.Make (Io)
module File_manager = File_manager.Make (Control) (Aof) (Aof) (Index) (Errs)
module Dict = Dict.Make (File_manager)
module Dispatcher = Dispatcher.Make (File_manager)
module XKey = Pack_key.Make (H)

module X = struct
Expand All @@ -78,7 +79,8 @@ module Maker (Config : Conf.S) = struct
module Pack_value = Pack_value.Of_contents (Config) (H) (XKey) (C)

module CA =
Pack_store.Make (File_manager) (Dict) (H) (Pack_value) (Errs)
Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Pack_value)
(Errs)

include Irmin.Contents.Store_indexable (CA) (H) (C)
end
Expand All @@ -91,7 +93,8 @@ module Maker (Config : Conf.S) = struct
Irmin_pack.Inode.Make_internal (Config) (H) (XKey) (Value)

module Pack' =
Pack_store.Make (File_manager) (Dict) (H) (Inter.Raw) (Errs)
Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Inter.Raw)
(Errs)

include Inode.Make_persistent (H) (Value) (Inter) (Pack')
end
Expand All @@ -118,7 +121,8 @@ module Maker (Config : Conf.S) = struct
module Pack_value = Pack_value.Of_commit (H) (XKey) (Value)

module CA =
Pack_store.Make (File_manager) (Dict) (H) (Pack_value) (Errs)
Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Pack_value)
(Errs)

include
Irmin.Commit.Generic_key.Store (Schema.Info) (Node) (CA) (H) (Value)
Expand Down Expand Up @@ -157,6 +161,7 @@ module Maker (Config : Conf.S) = struct
module Fm = File_manager
module Errs = Errs
module Dict = Dict
module Dispatcher = Dispatcher
module Hash = Schema.Hash
module Node_value = Node.CA.Inter.Val
module Node_store = Node.CA
Expand All @@ -178,6 +183,7 @@ module Maker (Config : Conf.S) = struct
branch : Branch.t;
fm : File_manager.t;
dict : Dict.t;
dispatcher : Dispatcher.t;
mutable during_batch : bool;
mutable during_gc : during_gc option;
}
Expand Down Expand Up @@ -240,9 +246,10 @@ module Maker (Config : Conf.S) = struct
| (`File | `Other), _ -> Errs.raise_error (`Not_a_directory root)
in
let dict = Dict.v fm |> Errs.raise_if_error in
let contents = Contents.CA.v ~config ~fm ~dict in
let node = Node.CA.v ~config ~fm ~dict in
let commit = Commit.CA.v ~config ~fm ~dict in
let dispatcher = Dispatcher.v fm |> Errs.raise_if_error in
let contents = Contents.CA.v ~config ~fm ~dict ~dispatcher in
let node = Node.CA.v ~config ~fm ~dict ~dispatcher in
let commit = Commit.CA.v ~config ~fm ~dict ~dispatcher in
let+ branch =
let root = Conf.root config in
let fresh = Conf.fresh config in
Expand All @@ -262,6 +269,7 @@ module Maker (Config : Conf.S) = struct
dict;
during_batch;
during_gc;
dispatcher;
}

let close t =
Expand Down Expand Up @@ -626,7 +634,8 @@ module Maker (Config : Conf.S) = struct
module Hash = H
module Inode = X.Node.CA
module Contents_pack = X.Contents.CA
module File_manager = File_manager
module Fm = File_manager
module Dispatcher = Dispatcher
end)

include S
Expand Down
105 changes: 84 additions & 21 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ module Make
(Errs : Io_errors.S with module Io = Control.Io) =
struct
module Io = Control.Io
module Control = Control
module Dict = Dict
module Suffix = Suffix
module Index = Index
module Errs = Errs
module Prefix = Io
module Mapping = Io

type dict_consumer_data = {
after_reload : unit -> (unit, Io.read_error) result;
Expand All @@ -40,6 +47,8 @@ struct
dict : Dict.t;
control : Control.t;
mutable suffix : Suffix.t;
mutable prefix : Prefix.t option;
mutable mapping : Mapping.t option;
index : Index.t;
mutable dict_consumers : dict_consumer_data list;
mutable suffix_consumers : suffix_consumer_data list;
Expand All @@ -48,12 +57,6 @@ struct
root : string;
}

module Control = Control
module Dict = Dict
module Suffix = Suffix
module Index = Index
module Errs = Errs

let control t = t.control
let dict t = t.dict
let suffix t = t.suffix
Expand All @@ -73,8 +76,6 @@ struct
let register_suffix_consumer t ~after_flush =
t.suffix_consumers <- { after_flush } :: t.suffix_consumers

(* Reload ***************************************************************** *)

let generation = function
| Payload.From_v1_v2_post_upgrade _
| From_v3_used_non_minimal_indexing_strategy | From_v3_no_gc_yet ->
Expand All @@ -85,6 +86,50 @@ struct
assert false
| From_v3_gced x -> x.generation

(* let reopen_prefix t ~generation =
* let open Result_syntax in
* let* prefix1 =
* let path = Irmin_pack.Layout.V3.prefix ~root:t.root ~generation in
* [%log.debug "reload: generation changed, opening %s" path];
* Prefix.open_ ~readonly:true ~path
* in
* let prefix0 = t.prefix in
* t.prefix <- Some prefix1;
* match prefix0 with None -> Ok () | Some io -> Prefix.close io
*
* let reopen_mapping t ~generation =
* let open Result_syntax in
* let* mapping1 =
* let path = Irmin_pack.Layout.V3.mapping ~root:t.root ~generation in
* [%log.debug "reload: generation changed, opening %s" path];
* Mapping.open_ ~readonly:true ~path
* in
* let mapping0 = t.mapping in
* t.mapping <- Some mapping1;
* match mapping0 with None -> Ok () | Some io -> Mapping.close io *)

let reopen_suffix t ~generation ~end_offset =
let open Result_syntax in
let* suffix1 =
let path = Irmin_pack.Layout.V3.suffix ~root:t.root ~generation in
[%log.debug "reload: generation changed, opening %s" path];
Suffix.open_ro ~path ~end_offset ~dead_header_size:0
in
let suffix0 = t.suffix in
t.suffix <- suffix1;
Suffix.close suffix0

let only_open_after_gc ~generation ~path =
ignore (generation, path);
Ok None
(* let open Result_syntax in *)
(* if generation = 0 then Ok None
* else
* let* t = Io.open_ ~path ~readonly:true in
* Ok (Some t) *)

(* Reload ***************************************************************** *)

let reload t =
let open Result_syntax in
let* () = Index.reload t.index in
Expand All @@ -93,23 +138,16 @@ struct
let pl1 : Payload.t = Control.payload t.control in
if pl0 = pl1 then Ok ()
else
(* Check if generation changed. If it did, reopen suffix. *)
(* Check if generation changed. If it did, reopen suffix, prefix and
mapping. *)
let* () =
let gen0 = generation pl0.status in
let gen1 = generation pl1.status in
if gen0 = gen1 then Ok ()
else
let* suffix1 =
let path =
Irmin_pack.Layout.V3.suffix ~root:t.root ~generation:gen1
in
let end_offset = pl1.entry_offset_suffix_end in
[%log.debug "reload: generation changed, opening %s" path];
Suffix.open_ro ~path ~end_offset ~dead_header_size:0
in
let suffix0 = t.suffix in
t.suffix <- suffix1;
Suffix.close suffix0
let end_offset = pl1.entry_offset_suffix_end in
let* () = reopen_suffix t ~generation:gen1 ~end_offset in
Ok ()
in
(* Update end offsets. This prevents the readonly instance to read data
flushed to disk by the readwrite, between calls to reload. *)
Expand Down Expand Up @@ -278,6 +316,14 @@ struct
let cb () = suffix_requires_a_flush_exn (get_instance ()) in
make_suffix ~path ~auto_flush_threshold ~auto_flush_callback:cb
in
let* prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
only_open_after_gc ~generation ~path
in
let* mapping =
let path = Irmin_pack.Layout.V3.mapping ~root ~generation in
only_open_after_gc ~generation ~path
in
let* dict =
let path = Irmin_pack.Layout.V3.dict ~root in
let auto_flush_threshold =
Expand All @@ -299,6 +345,8 @@ struct
dict;
control;
suffix;
prefix;
mapping;
use_fsync;
index;
dict_consumers = [];
Expand Down Expand Up @@ -488,6 +536,14 @@ struct
let end_offset = pl.entry_offset_suffix_end in
Suffix.open_ro ~path ~end_offset ~dead_header_size
in
let* prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
only_open_after_gc ~path ~generation
in
let* mapping =
let path = Irmin_pack.Layout.V3.mapping ~root ~generation in
only_open_after_gc ~path ~generation
in
let* dict =
let path = Irmin_pack.Layout.V3.dict ~root in
let end_offset = pl.dict_offset_end in
Expand All @@ -504,6 +560,8 @@ struct
dict;
control;
suffix;
prefix;
mapping;
use_fsync;
indexing_strategy;
index;
Expand Down Expand Up @@ -573,7 +631,10 @@ struct
| T1 | T2 | T3 | T4 | T5 | T6 | T7 | T8 | T9 | T10 | T11 | T12 | T13
| T14 | T15 ->
assert false
| From_v3_gced _ | From_v3_no_gc_yet -> From_v3_gced { generation }
| From_v3_gced _ | From_v3_no_gc_yet ->
let entry_offset_suffix_start = Int63.zero in
(* TODO: [entry_offset_suffix_start] should come from parameters *)
From_v3_gced { entry_offset_suffix_start; generation }
in
{ pl with status }
in
Expand Down Expand Up @@ -614,4 +675,6 @@ struct
Errs.of_json_string x
|> Result.map_error (fun err ->
`Gc_process_error (Fmt.str "%a" Errs.pp err))

let readonly t = Suffix.readonly t.suffix
end
12 changes: 7 additions & 5 deletions src/irmin-pack/unix/file_manager_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Import

module type S = sig
(** Abstraction that governs the lifetime of the various files that are part
of a pack store (except the branch store).
Expand Down Expand Up @@ -179,7 +181,7 @@ module type S = sig
val swap :
t ->
generation:int ->
copy_end_offset:Import.int63 ->
copy_end_offset:int63 ->
(unit, [> swap_error ]) result

type write_gc_output_error :=
Expand All @@ -192,7 +194,7 @@ module type S = sig
val write_gc_output :
root:string ->
generation:int ->
(Import.int63, Errs.t) result ->
(int63, Errs.t) result ->
(unit, [> write_gc_output_error ]) result
(** Used by the gc process at the end to write its output in
store.<generation>.out. *)
Expand All @@ -201,11 +203,11 @@ module type S = sig
[ `Corrupted_gc_result_file of string | `Gc_process_error of string ]

val read_gc_output :
root:string ->
generation:int ->
(Import.int63, [> read_gc_output_error ]) result
root:string -> generation:int -> (int63, [> read_gc_output_error ]) result
(** Used by the main process, after the gc process finished, to read
store.<generation>.out. *)

val readonly : t -> bool
end

module type Sigs = sig
Expand Down
Loading

0 comments on commit b0d70ad

Please sign in to comment.