Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Ngoguey42 committed Sep 14, 2022
1 parent b1c706a commit 8d337a6
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/irmin-pack/unix/dune
Expand Up @@ -12,7 +12,8 @@
lwt.unix
mtime
cmdliner
optint)
optint
rusage)
(preprocess
(pps ppx_irmin.internal))
(instrumentation
Expand Down
227 changes: 227 additions & 0 deletions src/irmin-pack/unix/gc.ml
Expand Up @@ -17,6 +17,233 @@
open! Import
include Gc_intf

module Worker_stats : sig
type t

val create : string -> t
val incr_objects_traversed : t -> t
val incr_suffix_transfer_loops : t -> t
val add_suffix_bytes_copied : t -> int63 -> t
val finish_current_step : t -> string -> t
val finalise : t -> Stats.Gc.worker
end = struct
module S = struct
include Stats.Gc
end

type t = {
stats : S.worker;
current_stepname : string;
prev_wtime : float;
prev_stime : float;
prev_utime : float;
first_wtime : float;
first_stime : float;
first_utime : float;
}
(** [t] is the running state while computing the stats *)

let is_darwin =
lazy
(try
match Unix.open_process_in "uname" |> input_line with
| "Darwin" -> true
| _ -> false
with Unix.Unix_error _ -> false)

let get_wtime () =
Mtime_clock.now () |> Mtime.to_uint64_ns |> Int64.to_float |> ( /. ) 1e9

let get_stime () = Rusage.((get Self).stime)
let get_utime () = Rusage.((get Self).utime)

let get_rusage : unit -> S.rusage =
fun () ->
let Rusage.{ maxrss; minflt; majflt; inblock; oublock; nvcsw; nivcsw; _ } =
Rusage.(get Self)
in
let maxrss =
if Lazy.force is_darwin then Int64.div maxrss 1000L else maxrss
in
S.
{
maxrss_before = maxrss;
maxrss_after = maxrss;
minflt;
majflt;
inblock;
oublock;
nvcsw;
nivcsw;
}

let get_ocaml_gc : unit -> S.ocaml_gc =
fun () ->
let open Stdlib.Gc in
let v = quick_stat () in
S.
{
minor_words = v.minor_words;
promoted_words = v.promoted_words;
major_words = v.major_words;
minor_collections = v.minor_collections;
major_collections = v.major_collections;
heap_words = v.heap_words;
compactions = v.compactions;
top_heap_words_before = v.top_heap_words;
top_heap_words_after = v.top_heap_words;
stack_size_before = v.stack_size;
stack_size_after = v.stack_size;
}

let create : string -> t =
fun first_stepname ->
(* Reseting all stats. We'll record their value at the end of worker. *)
Stats.reset_stats ();

(* Using dummy values for these stats that will be overwritten during finalise. *)
let pack_store = Stats.((get ()).pack_store |> Pack_store.export) in
let index = Stats.((get ()).index |> Index.export) in
let inode = Irmin_pack.Stats.((get ()).inode |> Inode.export) in

let wtime = get_wtime () in
let stime = get_stime () in
let utime = get_utime () in

let stats =
S.
{
rusage = get_rusage ();
ocaml_gc = get_ocaml_gc ();
index;
pack_store;
inode;
wall_durations = [];
sys_durations = [];
user_durations = [];
objects_traversed = Int63.zero;
suffix_transfer_loops = 0;
suffix_bytes_copied = Int63.zero;
}
in
{
stats;
current_stepname = first_stepname;
prev_utime = utime;
prev_wtime = wtime;
prev_stime = stime;
first_utime = utime;
first_wtime = wtime;
first_stime = stime;
}

let incr_objects_traversed t =
let stats =
{ t.stats with objects_traversed = Int63.succ t.stats.objects_traversed }
in
{ t with stats }

let incr_suffix_transfer_loops t =
let stats =
{
t.stats with
suffix_transfer_loops = succ t.stats.suffix_transfer_loops;
}
in
{ t with stats }

let add_suffix_bytes_copied t count =
let stats =
{
t.stats with
suffix_bytes_copied = Int63.add t.stats.suffix_bytes_copied count;
}
in
{ t with stats }

let finish_current_step t next_stepname =
let wtime = get_wtime () in
let stime = get_stime () in
let utime = get_utime () in
let step = t.current_stepname in

(* The durations lists are built in reverse order and reversed in
[finalise] *)
let wall_durations =
(step, wtime -. t.prev_wtime) :: t.stats.wall_durations
in
let sys_durations =
(step, stime -. t.prev_stime) :: t.stats.sys_durations
in
let user_durations =
(step, utime -. t.prev_utime) :: t.stats.user_durations
in

let stats =
{ t.stats with wall_durations; sys_durations; user_durations }
in
{
t with
current_stepname = next_stepname;
stats;
prev_wtime = wtime;
prev_stime = stime;
prev_utime = utime;
}

let finalise : t -> S.worker =
fun t ->
let t = finish_current_step t "will not appear in the stats" in
let rusage =
let x = t.stats.rusage in
let y = get_rusage () in
let ( - ) = Int64.sub in
S.
{
maxrss_before = x.maxrss_before;
maxrss_after = y.maxrss_before;
minflt = y.minflt - x.minflt;
majflt = y.majflt - x.majflt;
inblock = y.inblock - x.inblock;
oublock = y.oublock - x.oublock;
nvcsw = y.nvcsw - x.nvcsw;
nivcsw = y.nivcsw - x.nivcsw;
}
in
let ocaml_gc =
let x = t.stats.ocaml_gc in
let y = get_ocaml_gc () in
S.
{
minor_words = y.minor_words -. x.minor_words;
promoted_words = y.promoted_words -. x.promoted_words;
major_words = y.major_words -. x.major_words;
minor_collections = y.minor_collections - x.minor_collections;
major_collections = y.major_collections - x.major_collections;
heap_words = y.heap_words - x.heap_words;
compactions = y.compactions - x.compactions;
top_heap_words_before = x.top_heap_words_before;
top_heap_words_after = y.top_heap_words_before;
stack_size_before = x.stack_size_before;
stack_size_after = y.stack_size_before;
}
in
let pack_store = Stats.((get ()).pack_store |> Pack_store.export) in
let index = Stats.((get ()).index |> Index.export) in
let inode = Irmin_pack.Stats.((get ()).inode |> Inode.export) in
{
t.stats with
rusage;
ocaml_gc;
index;
pack_store;
inode;
wall_durations = List.rev t.stats.wall_durations;
sys_durations = List.rev t.stats.sys_durations;
user_durations = List.rev t.stats.user_durations;
}
end

(** Code for running in an async GC worker thread. *)
module Worker = struct
module Payload = Control_file.Latest_payload
Expand Down
1 change: 0 additions & 1 deletion src/irmin-pack/unix/import.ml
Expand Up @@ -48,7 +48,6 @@ module type S = Irmin_pack.S
module Conf = Irmin_pack.Conf
module Layout = Irmin_pack.Layout
module Pack_key = Irmin_pack.Pack_key
module Stats = Stats
module Indexable = Irmin_pack.Indexable

module Result_syntax = struct
Expand Down
88 changes: 88 additions & 0 deletions src/irmin-pack/unix/stats.mli
Expand Up @@ -14,6 +14,8 @@
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*)

open Import

module Pack_store : sig
type field =
| Appended_hashes
Expand Down Expand Up @@ -93,10 +95,92 @@ module File_manager : sig
val export : stat -> t
end

module Gc : sig
type rusage = {
maxrss_before : int64;
maxrss_after : int64;
minflt : int64;
majflt : int64;
inblock : int64;
oublock : int64;
nvcsw : int64;
nivcsw : int64;
}
[@@deriving irmin]

type ocaml_gc = {
minor_words : float;
promoted_words : float;
major_words : float;
minor_collections : int;
major_collections : int;
heap_words : int;
compactions : int;
top_heap_words_before : int;
top_heap_words_after : int;
stack_size_before : int;
stack_size_after : int;
}
[@@deriving irmin]

type worker = {
rusage : rusage;
ocaml_gc : ocaml_gc;
index : Index.t;
pack_store : Pack_store.t;
inode : Irmin_pack.Stats.Inode.t;
wall_durations : (string * float) list;
sys_durations : (string * float) list;
user_durations : (string * float) list;
objects_traversed : int63;
suffix_transfer_loops : int;
suffix_bytes_copied : int63;
}
[@@deriving irmin]
(** Stats produced by the worker. They are transmited to the parent process
through the gc result JSON file.
Since the worker lives in a fork, [rusage], [ocaml_gc], [index],
[pack_store] and [inode] are computed in a special way so that they solely
reflect the GC process and not what happened in the main process before
the fork.
[wall_durations], [sys_durations] and [user_durations] are association
lists that possess the same keys in the same order. Their values are
durations in seconds. Association lists are used here instead of types
because in the future the exact list of tasks may change and we don't want
the rigidity of types, especially because these informations will end up
appearing in a file format. *)

type finalisation = {
total_duration : float;
read_gc_output_duration : float;
transfer_latest_newies_duration : float;
swap_duration : float;
unlink_duration : float;
suffix_bytes_copied : int63;
}

type t = {
offset : int63;
before_suffix_offset : int63;
after_suffix_offset : int63;
duration : float;
finalisation : finalisation;
worker : worker;
}
[@@deriving irmin ~pp]

type stat

val export : stat -> t
end

type t = {
pack_store : Pack_store.stat;
index : Index.stat;
file_manager : File_manager.stat;
gc : Gc.stat;
}
(** Record type for all statistics that will be collected. There is a single
instance (which we refer to as "the instance" below) which is returned by
Expand Down Expand Up @@ -147,3 +231,7 @@ val get_offset_stats : unit -> offset_stats
val incr_fm_field : File_manager.field -> unit
(** [incr_fm_field field] increments the chosen stats field for the
{!File_manager} *)

val report_gc : Gc.t -> unit
(** [report_gc gc_stats] sets [(get ()).gc] to the stats of the latest
successful GC. *)

0 comments on commit 8d337a6

Please sign in to comment.