diff --git a/src/irmin-pack/unix/dune b/src/irmin-pack/unix/dune index 75896e19bd1..10e0c2c1942 100644 --- a/src/irmin-pack/unix/dune +++ b/src/irmin-pack/unix/dune @@ -12,7 +12,8 @@ lwt.unix mtime cmdliner - optint) + optint + rusage) (preprocess (pps ppx_irmin.internal)) (instrumentation diff --git a/src/irmin-pack/unix/gc.ml b/src/irmin-pack/unix/gc.ml index a6293c3dd2e..acf2824e48c 100644 --- a/src/irmin-pack/unix/gc.ml +++ b/src/irmin-pack/unix/gc.ml @@ -17,6 +17,233 @@ open! Import include Gc_intf +module Worker_stats : sig + type t + + val create : string -> t + val incr_objects_traversed : t -> t + val incr_suffix_transfer_loops : t -> t + val add_suffix_bytes_copied : t -> int63 -> t + val finish_current_step : t -> string -> t + val finalise : t -> Stats.Gc.worker +end = struct + module S = struct + include Stats.Gc + end + + type t = { + stats : S.worker; + current_stepname : string; + prev_wtime : float; + prev_stime : float; + prev_utime : float; + first_wtime : float; + first_stime : float; + first_utime : float; + } + (** [t] is the running state while computing the stats *) + + let is_darwin = + lazy + (try + match Unix.open_process_in "uname" |> input_line with + | "Darwin" -> true + | _ -> false + with Unix.Unix_error _ -> false) + + let get_wtime () = + Mtime_clock.now () |> Mtime.to_uint64_ns |> Int64.to_float |> ( /. ) 1e9 + + let get_stime () = Rusage.((get Self).stime) + let get_utime () = Rusage.((get Self).utime) + + let get_rusage : unit -> S.rusage = + fun () -> + let Rusage.{ maxrss; minflt; majflt; inblock; oublock; nvcsw; nivcsw; _ } = + Rusage.(get Self) + in + let maxrss = + if Lazy.force is_darwin then Int64.div maxrss 1000L else maxrss + in + S. + { + maxrss_before = maxrss; + maxrss_after = maxrss; + minflt; + majflt; + inblock; + oublock; + nvcsw; + nivcsw; + } + + let get_ocaml_gc : unit -> S.ocaml_gc = + fun () -> + let open Stdlib.Gc in + let v = quick_stat () in + S. + { + minor_words = v.minor_words; + promoted_words = v.promoted_words; + major_words = v.major_words; + minor_collections = v.minor_collections; + major_collections = v.major_collections; + heap_words = v.heap_words; + compactions = v.compactions; + top_heap_words_before = v.top_heap_words; + top_heap_words_after = v.top_heap_words; + stack_size_before = v.stack_size; + stack_size_after = v.stack_size; + } + + let create : string -> t = + fun first_stepname -> + (* Reseting all stats. We'll record their value at the end of worker. *) + Stats.reset_stats (); + + (* Using dummy values for these stats that will be overwritten during finalise. *) + let pack_store = Stats.((get ()).pack_store |> Pack_store.export) in + let index = Stats.((get ()).index |> Index.export) in + let inode = Irmin_pack.Stats.((get ()).inode |> Inode.export) in + + let wtime = get_wtime () in + let stime = get_stime () in + let utime = get_utime () in + + let stats = + S. + { + rusage = get_rusage (); + ocaml_gc = get_ocaml_gc (); + index; + pack_store; + inode; + wall_durations = []; + sys_durations = []; + user_durations = []; + objects_traversed = Int63.zero; + suffix_transfer_loops = 0; + suffix_bytes_copied = Int63.zero; + } + in + { + stats; + current_stepname = first_stepname; + prev_utime = utime; + prev_wtime = wtime; + prev_stime = stime; + first_utime = utime; + first_wtime = wtime; + first_stime = stime; + } + + let incr_objects_traversed t = + let stats = + { t.stats with objects_traversed = Int63.succ t.stats.objects_traversed } + in + { t with stats } + + let incr_suffix_transfer_loops t = + let stats = + { + t.stats with + suffix_transfer_loops = succ t.stats.suffix_transfer_loops; + } + in + { t with stats } + + let add_suffix_bytes_copied t count = + let stats = + { + t.stats with + suffix_bytes_copied = Int63.add t.stats.suffix_bytes_copied count; + } + in + { t with stats } + + let finish_current_step t next_stepname = + let wtime = get_wtime () in + let stime = get_stime () in + let utime = get_utime () in + let step = t.current_stepname in + + (* The durations lists are built in reverse order and reversed in + [finalise] *) + let wall_durations = + (step, wtime -. t.prev_wtime) :: t.stats.wall_durations + in + let sys_durations = + (step, stime -. t.prev_stime) :: t.stats.sys_durations + in + let user_durations = + (step, utime -. t.prev_utime) :: t.stats.user_durations + in + + let stats = + { t.stats with wall_durations; sys_durations; user_durations } + in + { + t with + current_stepname = next_stepname; + stats; + prev_wtime = wtime; + prev_stime = stime; + prev_utime = utime; + } + + let finalise : t -> S.worker = + fun t -> + let t = finish_current_step t "will not appear in the stats" in + let rusage = + let x = t.stats.rusage in + let y = get_rusage () in + let ( - ) = Int64.sub in + S. + { + maxrss_before = x.maxrss_before; + maxrss_after = y.maxrss_before; + minflt = y.minflt - x.minflt; + majflt = y.majflt - x.majflt; + inblock = y.inblock - x.inblock; + oublock = y.oublock - x.oublock; + nvcsw = y.nvcsw - x.nvcsw; + nivcsw = y.nivcsw - x.nivcsw; + } + in + let ocaml_gc = + let x = t.stats.ocaml_gc in + let y = get_ocaml_gc () in + S. + { + minor_words = y.minor_words -. x.minor_words; + promoted_words = y.promoted_words -. x.promoted_words; + major_words = y.major_words -. x.major_words; + minor_collections = y.minor_collections - x.minor_collections; + major_collections = y.major_collections - x.major_collections; + heap_words = y.heap_words - x.heap_words; + compactions = y.compactions - x.compactions; + top_heap_words_before = x.top_heap_words_before; + top_heap_words_after = y.top_heap_words_before; + stack_size_before = x.stack_size_before; + stack_size_after = y.stack_size_before; + } + in + let pack_store = Stats.((get ()).pack_store |> Pack_store.export) in + let index = Stats.((get ()).index |> Index.export) in + let inode = Irmin_pack.Stats.((get ()).inode |> Inode.export) in + { + t.stats with + rusage; + ocaml_gc; + index; + pack_store; + inode; + wall_durations = List.rev t.stats.wall_durations; + sys_durations = List.rev t.stats.sys_durations; + user_durations = List.rev t.stats.user_durations; + } +end + (** Code for running in an async GC worker thread. *) module Worker = struct module Payload = Control_file.Latest_payload diff --git a/src/irmin-pack/unix/import.ml b/src/irmin-pack/unix/import.ml index 9b93d7ea675..160dd2bc7f8 100644 --- a/src/irmin-pack/unix/import.ml +++ b/src/irmin-pack/unix/import.ml @@ -48,7 +48,6 @@ module type S = Irmin_pack.S module Conf = Irmin_pack.Conf module Layout = Irmin_pack.Layout module Pack_key = Irmin_pack.Pack_key -module Stats = Stats module Indexable = Irmin_pack.Indexable module Result_syntax = struct diff --git a/src/irmin-pack/unix/stats.mli b/src/irmin-pack/unix/stats.mli index 3bc36e5d399..9b2f88c5811 100644 --- a/src/irmin-pack/unix/stats.mli +++ b/src/irmin-pack/unix/stats.mli @@ -14,6 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +open Import + module Pack_store : sig type field = | Appended_hashes @@ -93,10 +95,92 @@ module File_manager : sig val export : stat -> t end +module Gc : sig + type rusage = { + maxrss_before : int64; + maxrss_after : int64; + minflt : int64; + majflt : int64; + inblock : int64; + oublock : int64; + nvcsw : int64; + nivcsw : int64; + } + [@@deriving irmin] + + type ocaml_gc = { + minor_words : float; + promoted_words : float; + major_words : float; + minor_collections : int; + major_collections : int; + heap_words : int; + compactions : int; + top_heap_words_before : int; + top_heap_words_after : int; + stack_size_before : int; + stack_size_after : int; + } + [@@deriving irmin] + + type worker = { + rusage : rusage; + ocaml_gc : ocaml_gc; + index : Index.t; + pack_store : Pack_store.t; + inode : Irmin_pack.Stats.Inode.t; + wall_durations : (string * float) list; + sys_durations : (string * float) list; + user_durations : (string * float) list; + objects_traversed : int63; + suffix_transfer_loops : int; + suffix_bytes_copied : int63; + } + [@@deriving irmin] + (** Stats produced by the worker. They are transmited to the parent process + through the gc result JSON file. + + Since the worker lives in a fork, [rusage], [ocaml_gc], [index], + [pack_store] and [inode] are computed in a special way so that they solely + reflect the GC process and not what happened in the main process before + the fork. + + [wall_durations], [sys_durations] and [user_durations] are association + lists that possess the same keys in the same order. Their values are + durations in seconds. Association lists are used here instead of types + because in the future the exact list of tasks may change and we don't want + the rigidity of types, especially because these informations will end up + appearing in a file format. *) + + type finalisation = { + total_duration : float; + read_gc_output_duration : float; + transfer_latest_newies_duration : float; + swap_duration : float; + unlink_duration : float; + suffix_bytes_copied : int63; + } + + type t = { + offset : int63; + before_suffix_offset : int63; + after_suffix_offset : int63; + duration : float; + finalisation : finalisation; + worker : worker; + } + [@@deriving irmin ~pp] + + type stat + + val export : stat -> t +end + type t = { pack_store : Pack_store.stat; index : Index.stat; file_manager : File_manager.stat; + gc : Gc.stat; } (** Record type for all statistics that will be collected. There is a single instance (which we refer to as "the instance" below) which is returned by @@ -147,3 +231,7 @@ val get_offset_stats : unit -> offset_stats val incr_fm_field : File_manager.field -> unit (** [incr_fm_field field] increments the chosen stats field for the {!File_manager} *) + +val report_gc : Gc.t -> unit +(** [report_gc gc_stats] sets [(get ()).gc] to the stats of the latest + successful GC. *)