Skip to content

Commit

Permalink
Merge pull request #2244 from art-w/out-of-order
Browse files Browse the repository at this point in the history
  • Loading branch information
metanivek committed May 3, 2023
2 parents a17f820 + 057c8d1 commit e1ef987
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 76 deletions.
82 changes: 6 additions & 76 deletions src/irmin-pack/unix/gc_worker.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,76 +28,6 @@ module Make (Args : Gc_args.S) = struct

let string_of_key = Irmin.Type.to_string key_t

module Live : sig
type t

val make : unit -> t
val add : off:int63 -> len:int -> t -> unit
val count : t -> int
val iter : (off:int63 -> len:int63 -> unit) -> t -> unit
end = struct
module Stack = struct
type t =
| Empty
| Stack of { mutable len : int; arr : int63 array; prev : t }

let capacity =
131_072 (* = 128*1024, a large but not too large chunk size *)

let make prev =
Stack { len = 0; arr = Array.make capacity Int63.zero; prev }

let is_full = function Empty -> true | Stack s -> s.len >= capacity

let rec push_pair ~off ~len t =
match t with
| Stack s when not (is_full t) ->
let i = s.len in
s.len <- i + 2;
s.arr.(i) <- off;
s.arr.(i + 1) <- Int63.of_int len;
t
| _ -> push_pair ~off ~len (make t)

let rec iter_pair fn = function
| Empty -> ()
| Stack { len; arr; prev } ->
assert (len mod 2 = 0);
for i = (len / 2) - 1 downto 0 do
let off = arr.(2 * i) in
let len = arr.((2 * i) + 1) in
fn ~off ~len
done;
iter_pair fn prev
end

type t = {
mutable last : (int63 * int) option;
mutable ranges : Stack.t;
mutable count : int;
}

let make () = { last = None; ranges = Stack.Empty; count = 0 }
let count t = t.count

let add ~off ~len t =
t.count <- t.count + 1;
let off_end = Int63.(Syntax.(off + of_int len)) in
match t.last with
| None -> t.last <- Some (off, len)
| Some (off', len') when off_end = off' -> t.last <- Some (off, len + len')
| Some (off', len') ->
t.last <- Some (off, len);
t.ranges <- Stack.push_pair ~off:off' ~len:len' t.ranges

let iter fn t =
match t.last with
| None -> assert (t.count = 0)
| Some (off, len) ->
fn ~off ~len:(Int63.of_int len);
Stack.iter_pair fn t.ranges
end

module Priority_queue = struct
module Offset_rev = struct
type t = int63
Expand Down Expand Up @@ -142,7 +72,7 @@ module Make (Args : Gc_args.S) = struct
- [min_offset] restricts the traversal to objects/commits with a larger or
equal offset. *)
let iter_reachable ~parents ~min_offset commit_key commit_store node_store =
let live = Live.make () in
let live = Ranges.make () in
let todos = Priority_queue.create 1024 in
let rec loop () =
if not (Priority_queue.is_empty todos) then (
Expand All @@ -153,7 +83,7 @@ module Make (Args : Gc_args.S) = struct
| (Contents | Node) as kind -> (
let node_key = Node_store.key_of_offset node_store off in
let len = Node_store.get_length node_store node_key in
Live.add live ~off ~len;
Ranges.add live ~off ~len;
if kind = Node then
match Node_store.unsafe_find_no_prefetch node_store node_key with
| None ->
Expand All @@ -165,7 +95,7 @@ module Make (Args : Gc_args.S) = struct
| Commit -> (
let commit_key = Commit_store.key_of_offset commit_store off in
let len = Commit_store.get_length commit_store commit_key in
Live.add live ~off ~len;
Ranges.add live ~off ~len;
match
Commit_store.unsafe_find ~check_integrity:false commit_store
commit_key
Expand Down Expand Up @@ -323,7 +253,7 @@ module Make (Args : Gc_args.S) = struct
stats :=
Gc_stats.Worker.finish_current_step !stats "mapping: of reachable";
stats :=
Gc_stats.Worker.set_objects_traversed !stats (Live.count live_entries);
Gc_stats.Worker.set_objects_traversed !stats (Ranges.count live_entries);
live_entries
in

Expand All @@ -345,7 +275,7 @@ module Make (Args : Gc_args.S) = struct
|> Errs.log_if_error "GC: Close prefix after data copy")
@@ fun () ->
(* Step 5.1. Transfer all. *)
Live.iter
Ranges.iter
(fun ~off ~len ->
let str = Dispatcher.read_seq_exn dispatcher ~off ~len in
Sparse.Ao.append_seq_exn prefix ~off str)
Expand Down Expand Up @@ -452,7 +382,7 @@ module Make (Args : Gc_args.S) = struct
Gc_stats.Worker.finish_current_step !stats "archive: iter reachable";
let min_offset = Dispatcher.suffix_start_offset dispatcher in
let to_archive = ref [] in
Live.iter
Ranges.iter
(fun ~off ~len ->
to_archive :=
(off, Dispatcher.read_seq_exn dispatcher ~off ~len)
Expand Down
1 change: 1 addition & 0 deletions src/irmin-pack/unix/irmin_pack_unix.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module Io_errors = Io_errors
module Control_file = Control_file
module Append_only_file = Append_only_file
module Chunked_suffix = Chunked_suffix
module Ranges = Ranges
module Sparse_file = Sparse_file
module File_manager = File_manager
module Lower = Lower
Expand Down
1 change: 1 addition & 0 deletions src/irmin-pack/unix/irmin_pack_unix.mli
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ module Io_errors = Io_errors
module Control_file = Control_file
module Append_only_file = Append_only_file
module Chunked_suffix = Chunked_suffix
module Ranges = Ranges
module Sparse_file = Sparse_file
module File_manager = File_manager
module Lower = Lower
Expand Down
133 changes: 133 additions & 0 deletions src/irmin-pack/unix/ranges.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
(*
* Copyright (c) 2022-2023 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open! Import

type range = { off : int63; len : int63 }

module Stack = struct
type t = Empty | Stack of { mutable len : int; arr : int63 array; prev : t }

let capacity = 131_072 (* = 128*1024, a large but not too large chunk size *)
let make prev = Stack { len = 0; arr = Array.make capacity Int63.zero; prev }
let is_full = function Empty -> true | Stack s -> s.len >= capacity

let rec push x t =
match t with
| Stack s when not (is_full t) ->
let i = s.len in
s.len <- i + 2;
s.arr.(i) <- x.off;
s.arr.(i + 1) <- x.len;
t
| _ -> push x (make t)

let rec to_seq t () =
match t with
| Empty -> Seq.Nil
| Stack { len; arr; prev } ->
assert (len mod 2 = 0);
let rec go i () =
if i < 0 then to_seq prev ()
else
let range = { off = arr.(2 * i); len = arr.((2 * i) + 1) } in
Seq.Cons (range, go (i - 1))
in
go ((len / 2) - 1) ()
end

type t = {
mutable last : range option;
mutable ranges : Stack.t;
mutable count : int;
mutable out_of_order : range list;
}

let make () =
{ last = None; ranges = Stack.Empty; count = 0; out_of_order = [] }

let count t = t.count

let add ~off ~len t =
t.count <- t.count + 1;
let open Int63.Syntax in
let len = Int63.of_int len in
match t.last with
| None -> t.last <- Some { off; len }
| Some last when off + len = last.off ->
(* latest interval can be fused with the previous one *)
t.last <- Some { off; len = len + last.len }
| Some last when off + len < last.off ->
(* disjoint and strictly smaller *)
t.last <- Some { off; len };
t.ranges <- Stack.push last t.ranges
| Some _ ->
(* latest range is not strictly smaller than previous,
* this is only expected on legacy data with wrong object ordering
* and is handled as a special case. *)
t.out_of_order <- { off; len } :: t.out_of_order

let ranges_to_seq t () =
match t.last with
| None -> Seq.Nil
| Some range -> Seq.Cons (range, Stack.to_seq t.ranges)

let out_of_order_to_seq t =
List.to_seq
@@ List.sort_uniq (fun a b -> Int63.compare a.off b.off) t.out_of_order

let rec seq_merge xs ys () =
match (xs (), ys ()) with
| Seq.Nil, rest | rest, Seq.Nil -> rest
| Seq.Cons (x, xs'), Seq.Cons (y, ys') -> (
match Int63.compare x.off y.off with
| 0 ->
assert (x.len = y.len);
Seq.Cons (x, seq_merge xs' ys')
| c when c < 0 -> Seq.Cons (x, seq_merge xs' ys)
| _ -> Seq.Cons (y, seq_merge xs ys'))

type fused = Disjoint of range * range | Overlap of range

let fuse fst snd =
let open Int63.Syntax in
let fst_end = fst.off + fst.len in
let snd_end = snd.off + snd.len in
if fst_end < snd.off then Disjoint (fst, snd)
else if snd_end < fst.off then Disjoint (snd, fst)
else
let start = min fst.off snd.off in
let stop = max fst_end snd_end in
Overlap { off = start; len = stop - start }

let rec seq_fuse ?prev s () =
match (prev, s ()) with
| None, Seq.Nil -> Seq.Nil
| Some prev, Nil -> Seq.Cons (prev, Seq.empty)
| None, Cons (x, xs) -> seq_fuse ~prev:x xs ()
| Some prev, Cons (x, xs) -> (
match fuse x prev with
| Disjoint (fst, snd) -> Seq.Cons (fst, seq_fuse ~prev:snd xs)
| Overlap prev -> seq_fuse ~prev xs ())

let iter fn t =
let in_order = ranges_to_seq t in
let ranges =
match t.out_of_order with
| [] -> in_order
| _ -> seq_fuse (seq_merge in_order (out_of_order_to_seq t))
in
Seq.iter (fun { off; len } -> fn ~off ~len) ranges
38 changes: 38 additions & 0 deletions src/irmin-pack/unix/ranges.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
(*
* Copyright (c) 2022-2023 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open! Import

type t
(** An ordered set of disjoint [(offset, length)] ranges. *)

val make : unit -> t
(** [make ()] returns a new empty set of ranges. *)

val add : off:int63 -> len:int -> t -> unit
(** [add ~off ~len t] inserts the range [(off, len)] into [t]. When [add] is
called multiple times sequentially, it is optimized for strictly decreasing
offsets arguments. *)

val iter : (off:int63 -> len:int63 -> unit) -> t -> unit
(** [iter fn t] calls [fn ~off ~len] on every disjoint range [(off, len)] in the
set [t]. The function [fn ~off ~len] is called with strictly increasing
offsets. If two or more consecutive ranges [(off,len)] and [(off+len,len')]
were added to the set [t], a single call to [fn] will be performed on the
englobing interval [(off,len+len')]. *)

val count : t -> int
(** [count t] returns the number of [add]s performed on [t]. *)
1 change: 1 addition & 0 deletions test/irmin-pack/dune
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
test_upgrade
test_gc
test_flush_reload
test_ranges
test_mapping
test_nearest_geq
test_dispatcher
Expand Down
1 change: 1 addition & 0 deletions test/irmin-pack/test_pack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ let misc =
("gc archival", Test_gc.Gc_archival.tests);
("split", Test_gc.Split.tests);
("flush", Test_flush_reload.tests);
("ranges", Test_ranges.tests);
("mapping", Test_mapping.tests);
("test_nearest_geq", Test_nearest_geq.tests);
("layout", Layout.tests);
Expand Down
38 changes: 38 additions & 0 deletions test/irmin-pack/test_ranges.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
(*
* Copyright (c) 2018-2022 Tarides <contact@tarides.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open! Import
module Int63 = Optint.Int63
module Ranges = Irmin_pack_unix.Ranges

let test () =
let input =
[ (90, 10); (80, 5); (70, 10); (87, 1); (60, 5); (50, 5); (65, 2); (55, 5) ]
in
let ranges = Ranges.make () in
List.iter
(fun (off, len) -> Ranges.add ~off:(Int63.of_int off) ~len ranges)
input;
let output = ref [] in
Ranges.iter
(fun ~off ~len -> output := (Int63.to_int off, Int63.to_int len) :: !output)
ranges;
let expected = [ (90, 10); (87, 1); (70, 15); (50, 17) ] in
Alcotest.(check (list (pair int int))) "out of order" expected !output;
Lwt.return_unit

let tests =
[ Alcotest_lwt.test_case "test ranges" `Quick (fun _switch () -> test ()) ]

0 comments on commit e1ef987

Please sign in to comment.