Skip to content

Commit

Permalink
irmin-pack: refine append only flush callback
Browse files Browse the repository at this point in the history
Add support for two types of flush behavior when the flush threshold is
reached: automatic internal flushes and manually controlled external
flushes.
  • Loading branch information
metanivek committed Sep 16, 2022
1 parent b72a868 commit 3b63db4
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 39 deletions.
26 changes: 16 additions & 10 deletions src/irmin-pack/unix/append_only_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
open Import
include Append_only_file_intf

module Make (Io : Io.S) = struct
module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct
module Io = Io
module Errs = Errs

type t = {
io : Io.t;
Expand All @@ -27,14 +28,16 @@ module Make (Io : Io.S) = struct
rw_perm : rw_perm option;
}

and auto_flush_procedure = [ `Internal | `External of t -> unit ]

and rw_perm = {
buf : Buffer.t;
auto_flush_threshold : int;
auto_flush_callback : t -> unit;
auto_flush_procedure : auto_flush_procedure;
}
(** [rw_perm] contains the data necessary to operate in readwrite mode. *)

let create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_callback =
let create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure =
let open Result_syntax in
let+ io = Io.create ~path ~overwrite in
let persisted_end_offset = Int63.zero in
Expand All @@ -43,7 +46,7 @@ module Make (Io : Io.S) = struct
io;
persisted_end_offset;
dead_header_size = Int63.zero;
rw_perm = Some { buf; auto_flush_threshold; auto_flush_callback };
rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure };
}

(** A store is consistent if the real offset of the suffix/dict files is the
Expand Down Expand Up @@ -73,7 +76,7 @@ module Make (Io : Io.S) = struct
Ok ())

let open_rw ~path ~end_offset ~dead_header_size ~auto_flush_threshold
~auto_flush_callback =
~auto_flush_procedure =
let open Result_syntax in
let* io = Io.open_ ~path ~readonly:false in
let+ () = check_consistent_store ~end_offset ~dead_header_size io in
Expand All @@ -84,7 +87,7 @@ module Make (Io : Io.S) = struct
io;
persisted_end_offset;
dead_header_size;
rw_perm = Some { buf; auto_flush_threshold; auto_flush_callback };
rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure };
}

let open_ro ~path ~end_offset ~dead_header_size =
Expand Down Expand Up @@ -160,10 +163,13 @@ module Make (Io : Io.S) = struct
let append_exn t s =
match t.rw_perm with
| None -> raise Errors.RO_not_allowed
| Some rw_perm ->
| Some rw_perm -> (
assert (Buffer.length rw_perm.buf < rw_perm.auto_flush_threshold);
Buffer.add_string rw_perm.buf s;
if Buffer.length rw_perm.buf >= rw_perm.auto_flush_threshold then (
rw_perm.auto_flush_callback t;
assert (empty_buffer t))
if Buffer.length rw_perm.buf >= rw_perm.auto_flush_threshold then
match rw_perm.auto_flush_procedure with
| `Internal -> flush t |> Errs.raise_if_error
| `External cb ->
cb t;
assert (empty_buffer t))
end
24 changes: 18 additions & 6 deletions src/irmin-pack/unix/append_only_file_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,24 @@ module type S = sig
automatically shifting offsets to deal with legacy file headers. *)

module Io : Io.S
module Errs : Io_errors.S

type t

type auto_flush_procedure = [ `Internal | `External of t -> unit ]
(** [auto_flush_procedure] defines behavior when the flush threshold is
reached.
- Use [`Internal] to have the buffer automatically flushed.
- Use [`External f] to have [f] called when the flush threshold is
reached. It is the responsibility of [f] to call flush, in addition to
any other processing it does. *)

val create_rw :
path:string ->
overwrite:bool ->
auto_flush_threshold:int ->
auto_flush_callback:(t -> unit) ->
auto_flush_procedure:auto_flush_procedure ->
(t, [> Io.create_error ]) result
(** Create a rw instance of [t] by creating the file at [path]. *)

Expand All @@ -42,7 +52,7 @@ module type S = sig
end_offset:int63 ->
dead_header_size:int ->
auto_flush_threshold:int ->
auto_flush_callback:(t -> unit) ->
auto_flush_procedure:auto_flush_procedure ->
( t,
[> Io.open_error
| `Closed
Expand Down Expand Up @@ -75,9 +85,10 @@ module type S = sig
One of the goals of the [Append_only_file] abstraction is to provide
buffered appends. [auto_flush_threshold] is the soft cap after which the
buffer should be flushed. If a call to [append_exn] fills the buffer,
[auto_flush_callback] will be called so that the parent abstraction takes
care of the flush procedure, which is expected to call [flush]. *)
buffer should be flushed. When a call to [append_exn] fills the buffer,
either the buffer will be flushed automatically, if
[auto_flush_procedure = `Internal], or the supplied external function [f]
will be called, if [auto_flush_procedure = `External f]. *)

val open_ro :
path:string ->
Expand Down Expand Up @@ -176,5 +187,6 @@ end
module type Sigs = sig
module type S = S

module Make (Io : Io.S) : S with module Io = Io
module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) :
S with module Io = Io and module Errs = Errs
end
2 changes: 1 addition & 1 deletion src/irmin-pack/unix/ext.ml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module Maker (Config : Conf.S) = struct
module Io = Io.Unix
module Errs = Io_errors.Make (Io)
module Control = Control_file.Make (Io)
module Aof = Append_only_file.Make (Io)
module Aof = Append_only_file.Make (Io) (Errs)
module File_manager = File_manager.Make (Control) (Aof) (Aof) (Index) (Errs)
module Dict = Dict.Make (File_manager)
module Dispatcher = Dispatcher.Make (File_manager)
Expand Down
7 changes: 4 additions & 3 deletions src/irmin-pack/unix/file_manager.ml
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ struct
in
let cb _ = suffix_requires_a_flush_exn t in
Suffix.open_rw ~path ~end_offset ~dead_header_size ~auto_flush_threshold
~auto_flush_callback:cb
~auto_flush_procedure:(`External cb)
in
let suffix0 = t.suffix in
t.suffix <- suffix1;
Expand Down Expand Up @@ -310,7 +310,8 @@ struct
Irmin_pack.Conf.suffix_auto_flush_threshold config
in
let cb _ = suffix_requires_a_flush_exn (get_instance ()) in
make_suffix ~path ~auto_flush_threshold ~auto_flush_callback:cb
make_suffix ~path ~auto_flush_threshold
~auto_flush_procedure:(`External cb)
in
let* prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
Expand All @@ -323,7 +324,7 @@ struct
Irmin_pack.Conf.dict_auto_flush_threshold config
in
let cb _ = dict_requires_a_flush_exn (get_instance ()) in
make_dict ~path ~auto_flush_threshold ~auto_flush_callback:cb
make_dict ~path ~auto_flush_threshold ~auto_flush_procedure:(`External cb)
in
let* index =
let log_size = Conf.index_log_size config in
Expand Down
19 changes: 8 additions & 11 deletions src/irmin-pack/unix/gc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ module Worker = struct
module Mapping_file = Fm.Mapping_file

module Ao = struct
include Append_only_file.Make (Fm.Io)
include Append_only_file.Make (Fm.Io) (Errs)

let create_rw_exn ~path ~auto_flush_callback =
let create_rw_exn ~path =
create_rw ~path ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
~auto_flush_procedure:`Internal
|> Errs.raise_if_error
end

Expand Down Expand Up @@ -148,8 +148,7 @@ module Worker = struct

let create_new_suffix ~root ~generation =
let path = Irmin_pack.Layout.V3.suffix ~root ~generation in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
Ao.create_rw_exn ~path ~auto_flush_callback
Ao.create_rw_exn ~path

let run ~generation root commit_key =
let open Result_syntax in
Expand Down Expand Up @@ -233,8 +232,7 @@ module Worker = struct
(* Step 4. Create the new prefix. *)
let prefix =
let path = Irmin_pack.Layout.V3.prefix ~root ~generation in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
Ao.create_rw_exn ~path ~auto_flush_callback
Ao.create_rw_exn ~path
in
let () =
Errors.finalise_exn (fun _outcome ->
Expand Down Expand Up @@ -343,7 +341,7 @@ module Make (Args : Args) = struct
module Args = Args
open Args
module Io = Fm.Io
module Ao = Append_only_file.Make (Io)
module Ao = Append_only_file.Make (Io) (Errs)
module Worker = Worker.Make (Args)

type t = {
Expand Down Expand Up @@ -418,10 +416,9 @@ module Make (Args : Args) = struct
0. *)
let dead_header_size = 0 in
let auto_flush_threshold = 1_000_000 in
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let* suffix =
Ao.open_rw ~path ~end_offset ~dead_header_size ~auto_flush_callback
~auto_flush_threshold
Ao.open_rw ~path ~end_offset ~dead_header_size
~auto_flush_procedure:`Internal ~auto_flush_threshold
in
Ok suffix

Expand Down
8 changes: 3 additions & 5 deletions src/irmin-pack/unix/mapping_file.ml
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,8 @@ let calculate_extents_oc ~(src_is_sorted : unit) ~(src : int_bigarray)

module Make (Io : Io.S) = struct
module Io = Io
module Ao = Append_only_file.Make (Io)
module Errs = Io_errors.Make (Io)
module Ao = Append_only_file.Make (Io) (Errs)

type t = { arr : int64_bigarray; root : string; generation : int }

Expand Down Expand Up @@ -371,10 +371,9 @@ module Make (Io : Io.S) = struct
Io.unlink path2 |> ignore;

(* Create [file0] *)
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let* file0 =
Ao.create_rw ~path:path0 ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
~auto_flush_procedure:`Internal
in

(* Fill and close [file0] *)
Expand Down Expand Up @@ -404,10 +403,9 @@ module Make (Io : Io.S) = struct
Io.unlink path0 |> ignore;

(* Create [file2] *)
let auto_flush_callback x = Ao.flush x |> Errs.raise_if_error in
let* file2 =
Ao.create_rw ~path:path2 ~overwrite:true ~auto_flush_threshold:1_000_000
~auto_flush_callback
~auto_flush_procedure:`Internal
in

(* Fill and close [file2] *)
Expand Down
2 changes: 1 addition & 1 deletion test/irmin-pack/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ module Key = Irmin_pack_unix.Pack_key.Make (Schema.Hash)
module Io = Irmin_pack_unix.Io.Unix
module Errs = Irmin_pack_unix.Io_errors.Make (Io)
module Control = Irmin_pack_unix.Control_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io) (Errs)

module File_manager =
Irmin_pack_unix.File_manager.Make (Control) (Aof) (Aof) (Index) (Errs)
Expand Down
2 changes: 1 addition & 1 deletion test/irmin-pack/test_inode.ml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct
module Io = Irmin_pack_unix.Io.Unix
module Errs = Irmin_pack_unix.Io_errors.Make (Io)
module Control = Irmin_pack_unix.Control_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io) (Errs)

module File_manager =
Irmin_pack_unix.File_manager.Make (Control) (Aof) (Aof) (Index) (Errs)
Expand Down
2 changes: 1 addition & 1 deletion test/irmin-pack/test_pack_version_bump.ml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module Private = struct
module Io = Irmin_pack_unix.Io.Unix
module Errs = Irmin_pack_unix.Io_errors.Make (Io)
module Control = Irmin_pack_unix.Control_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io)
module Aof = Irmin_pack_unix.Append_only_file.Make (Io) (Errs)

module File_manager =
Irmin_pack_unix.File_manager.Make (Control) (Aof) (Aof) (Index) (Errs)
Expand Down

0 comments on commit 3b63db4

Please sign in to comment.