diff --git a/mmtk/Cargo.toml b/mmtk/Cargo.toml index 85dc560..11a834b 100644 --- a/mmtk/Cargo.toml +++ b/mmtk/Cargo.toml @@ -12,7 +12,7 @@ edition = "2021" # Metadata for the Ruby repository [package.metadata.ci-repos.ruby] repo = "mmtk/ruby" # This is used by actions/checkout, so the format is "owner/repo", not URL. -rev = "a11526830e747dc68f250a38bbebf01c02bf6462" +rev = "f3f893c6fb8cbe08c3bc4c68d4cd0deceae42461" [lib] name = "mmtk_ruby" diff --git a/mmtk/cbindgen.toml b/mmtk/cbindgen.toml index eb92dd6..84837b8 100644 --- a/mmtk/cbindgen.toml +++ b/mmtk/cbindgen.toml @@ -47,3 +47,4 @@ include = ["HiddenHeader"] "MIN_OBJ_ALIGN" = "MMTK_MIN_OBJ_ALIGN" "HiddenHeader" = "MMTk_HiddenHeader" "HIDDEN_SIZE_MASK" = "MMTK_HIDDEN_SIZE_MASK" +"ConcurrentSetStats" = "MMTk_ConcurrentSetStats" diff --git a/mmtk/src/abi.rs b/mmtk/src/abi.rs index f9e1272..9ee766a 100644 --- a/mmtk/src/abi.rs +++ b/mmtk/src/abi.rs @@ -305,6 +305,14 @@ pub struct RubyBindingOptions { pub suffix_size: usize, } +#[repr(C)] +#[derive(Clone, Default)] +pub struct ConcurrentSetStats { + pub live: usize, + pub moved: usize, + pub deleted: usize, +} + #[repr(C)] #[derive(Clone)] pub struct RubyUpcalls { @@ -353,8 +361,9 @@ pub struct RubyUpcalls { pub get_cc_refinement_table_size: extern "C" fn() -> usize, pub update_cc_refinement_table: extern "C" fn(), // Get tables for specialized processing + pub get_fstring_table_obj: extern "C" fn() -> ObjectReference, pub get_global_symbols_table: extern "C" fn() -> *mut st_table, - // Detailed table info queries and operations + // Detailed st_table info queries and operations pub st_get_num_entries: extern "C" fn(table: *const st_table) -> usize, pub st_get_size_info: extern "C" fn( table: *const st_table, @@ -372,6 +381,15 @@ pub struct RubyUpcalls { ) -> usize, pub st_update_bins_range: extern "C" fn(table: *mut st_table, begin: libc::size_t, end: libc::size_t) -> usize, + // Detailed concurrent_set info queries and operations + pub concurrent_set_get_num_entries: extern "C" fn(set: ObjectReference) -> usize, + pub concurrent_set_get_capacity: extern "C" fn(set: ObjectReference) -> usize, + pub concurrent_set_update_entries_range: extern "C" fn( + set: ObjectReference, + begin: usize, + end: usize, + stats: *mut ConcurrentSetStats, + ), } unsafe impl Sync for RubyUpcalls {} diff --git a/mmtk/src/binding.rs b/mmtk/src/binding.rs index 2ec0335..90395d3 100644 --- a/mmtk/src/binding.rs +++ b/mmtk/src/binding.rs @@ -62,6 +62,7 @@ pub struct RubyBinding { pub wb_unprotected_objects: Mutex>, pub st_entries_chunk_size: usize, pub st_bins_chunk_size: usize, + pub concurrent_set_chunk_size: usize, } unsafe impl Sync for RubyBinding {} @@ -89,9 +90,12 @@ impl RubyBinding { let st_entries_chunk_size = env_default::("RUBY_MMTK_ENTRIES_CHUNK_SIZE", 1024); let st_bins_chunk_size = env_default::("RUBY_MMTK_BINS_CHUNK_SIZE", 4096); + let concurrent_set_chunk_size = + env_default::("RUBY_MMTK_CONCURRENT_SET_CHUNK_SIZE", 1024); debug!("st_entries_chunk_size: {st_entries_chunk_size}"); debug!("st_bins_chunk_size: {st_bins_chunk_size}"); + debug!("concurrent_set_chunk_size: {concurrent_set_chunk_size}"); Self { mmtk, @@ -105,6 +109,7 @@ impl RubyBinding { wb_unprotected_objects: Default::default(), st_entries_chunk_size, st_bins_chunk_size, + concurrent_set_chunk_size, } } diff --git a/mmtk/src/weak_proc.rs b/mmtk/src/weak_proc.rs index 74e7c9d..18ec920 100644 --- a/mmtk/src/weak_proc.rs +++ b/mmtk/src/weak_proc.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; use mmtk::{ scheduler::{GCWork, GCWorker, WorkBucketStage}, @@ -6,12 +6,13 @@ use mmtk::{ vm::ObjectTracerContext, }; -use crate::{ - abi::{st_table, GCThreadTLS}, - extra_assert, is_mmtk_object_safe, upcalls, - utils::AfterAll, - Ruby, -}; +use crate::{abi::GCThreadTLS, extra_assert, is_mmtk_object_safe, upcalls, Ruby}; + +pub mod concurrent_set_parallel; +pub mod st_table_parallel; + +/// Set this to true to use chunked processing optimization for the fstring table. +const SPECIALIZE_FSTRING_TABLE_PROCESSING: bool = true; /// Set this to true to use chunked processing optimization for the global symbols table. const SPECIALIZE_GLOBAL_SYMBOLS_TABLE_PROCESSING: bool = true; @@ -74,7 +75,6 @@ impl WeakProcessor { // global symbols table specialized Box::new(UpdateFinalizerAndObjIdTables) as _, Box::new(UpdateGenericFieldsTbl) as _, - Box::new(UpdateFrozenStringsTable) as _, Box::new(UpdateCCRefinementTable) as _, // END: Weak tables Box::new(UpdateWbUnprotectedObjectsList) as _, @@ -82,8 +82,19 @@ impl WeakProcessor { let forward = crate::mmtk().get_plan().current_gc_may_move_object(); + if SPECIALIZE_FSTRING_TABLE_PROCESSING { + concurrent_set_parallel::process_weak_concurrent_set_chunked( + "fstring", + (upcalls().get_fstring_table_obj)(), + worker, + ); + } else { + worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure] + .add_boxed(Box::new(UpdateFrozenStringsTable) as _); + } + if SPECIALIZE_GLOBAL_SYMBOLS_TABLE_PROCESSING { - Self::process_weak_table_chunked( + st_table_parallel::process_weak_table_chunked( "global symbols", (upcalls().get_global_symbols_table)(), false, @@ -96,78 +107,6 @@ impl WeakProcessor { .add_boxed(Box::new(UpdateGlobalSymbolsTable) as _); } } - - pub fn process_weak_table_chunked( - name: &'static str, - table: *mut st_table, - weak_keys: bool, - weak_values: bool, - forward: bool, - worker: &mut GCWorker, - ) { - let mut entries_start = 0; - let mut entries_bound = 0; - let mut bins_num = 0; - (upcalls().st_get_size_info)(table, &mut entries_start, &mut entries_bound, &mut bins_num); - let num_entries = (upcalls().st_get_num_entries)(table); - debug!( - "name: {name}, entries_start: {entries_start}, entries_bound: {entries_bound}, bins_num: {bins_num}, num_entries: {num_entries}" - ); - - let table_name_ptr = name.as_ptr(); - let table_name_len = name.len(); - - probe!( - mmtk_ruby, - initial_weak_table_stats, - entries_start, - entries_bound, - bins_num, - num_entries, - table_name_ptr, - table_name_len, - ); - - let entries_chunk_size = crate::binding().st_entries_chunk_size; - let bins_chunk_size = crate::binding().st_bins_chunk_size; - - let after_all = Arc::new(AfterAll::new(WorkBucketStage::VMRefClosure)); - - let entries_packets = (entries_start..entries_bound) - .step_by(entries_chunk_size) - .map(|begin| { - let end = (begin + entries_chunk_size).min(entries_bound); - let after_all = after_all.clone(); - Box::new(UpdateTableEntriesParallel { - name, - table, - begin, - end, - weak_keys, - weak_values, - forward, - after_all, - }) as _ - }) - .collect::>(); - after_all.count_up(entries_packets.len()); - - let bins_packets = (0..bins_num) - .step_by(entries_chunk_size) - .map(|begin| { - let end = (begin + bins_chunk_size).min(bins_num); - Box::new(UpdateTableBinsParallel { - name: name.to_string(), - table, - begin, - end, - }) as _ - }) - .collect::>(); - after_all.add_packets(bins_packets); - - worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure].bulk_add(entries_packets); - } } struct ProcessObjFreeCandidates; @@ -321,89 +260,6 @@ define_global_table_processor!(UpdateCCRefinementTable, { ///////// END: Simple table updating work packets //////// -struct UpdateTableEntriesParallel { - name: &'static str, - table: *mut st_table, - begin: usize, - end: usize, - weak_keys: bool, - weak_values: bool, - forward: bool, - after_all: Arc, -} - -unsafe impl Send for UpdateTableEntriesParallel {} - -impl UpdateTableEntriesParallel {} - -impl GCWork for UpdateTableEntriesParallel { - fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { - debug!("Updating entries of {} table", self.name); - let deleted_entries = (upcalls().st_update_entries_range)( - self.table, - self.begin, - self.end, - self.weak_keys, - self.weak_values, - self.forward, - ); - debug!("Done updating entries of {} table", self.name); - let table_name = self.name.as_ptr(); - let table_name_len = self.name.len(); - probe!( - mmtk_ruby, - update_table_entries_parallel, - self.begin, - self.end, - deleted_entries, - table_name, - table_name_len - ); - - let is_last = self.after_all.count_down(worker); - if is_last { - let num_entries = (upcalls().st_get_num_entries)(self.table); - probe!( - mmtk_ruby, - final_weak_table_stats, - num_entries, - table_name, - table_name_len - ) - } - } -} - -struct UpdateTableBinsParallel { - name: String, - table: *mut st_table, - begin: usize, - end: usize, -} - -unsafe impl Send for UpdateTableBinsParallel {} - -impl UpdateTableBinsParallel {} - -impl GCWork for UpdateTableBinsParallel { - fn do_work(&mut self, _worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { - debug!("Updating bins of {} table", self.name); - let deleted_bins = (upcalls().st_update_bins_range)(self.table, self.begin, self.end); - debug!("Done updating bins of {} table", self.name); - let table_name = self.name.as_ptr(); - let table_name_len = self.name.len(); - probe!( - mmtk_ruby, - update_table_bins_parallel, - self.begin, - self.end, - deleted_bins, - table_name, - table_name_len - ); - } -} - struct UpdateWbUnprotectedObjectsList; impl GCWork for UpdateWbUnprotectedObjectsList { diff --git a/mmtk/src/weak_proc/concurrent_set_parallel.rs b/mmtk/src/weak_proc/concurrent_set_parallel.rs new file mode 100644 index 0000000..d05915e --- /dev/null +++ b/mmtk/src/weak_proc/concurrent_set_parallel.rs @@ -0,0 +1,105 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use mmtk::{ + scheduler::{GCWork, GCWorker, WorkBucketStage}, + util::ObjectReference, +}; + +use crate::{abi::ConcurrentSetStats, upcalls, Ruby}; + +pub fn process_weak_concurrent_set_chunked( + name: &'static str, + set: ObjectReference, + worker: &mut GCWorker, +) { + let num_entries = (upcalls().concurrent_set_get_num_entries)(set); + let capacity = (upcalls().concurrent_set_get_capacity)(set); + debug!("name: {name}, num_entries: {num_entries}, capacity: {capacity}"); + + let set_name_ptr = name.as_ptr(); + let set_name_len = name.len(); + + probe!( + mmtk_ruby, + weak_cs_par_init, + num_entries, + capacity, + set_name_ptr, + set_name_len, + ); + + let chunk_size = crate::binding().concurrent_set_chunk_size; + + let counter = Arc::new(AtomicUsize::new(0)); + + let entries_packets = (0..capacity) + .step_by(chunk_size) + .map(|begin| { + let end = (begin + chunk_size).min(capacity); + Box::new(UpdateConcurrentSetEntriesParallel { + name, + set, + begin, + end, + counter: counter.clone(), + }) as _ + }) + .collect::>(); + + counter.fetch_add(entries_packets.len(), Ordering::SeqCst); + + worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure].bulk_add(entries_packets); +} + +struct UpdateConcurrentSetEntriesParallel { + name: &'static str, + set: ObjectReference, + begin: usize, + end: usize, + counter: Arc, +} + +unsafe impl Send for UpdateConcurrentSetEntriesParallel {} + +impl GCWork for UpdateConcurrentSetEntriesParallel { + fn do_work(&mut self, _worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { + debug!( + "Updating concurrent set '{}' range {}-{}", + self.name, self.begin, self.end + ); + let set_name = self.name.as_ptr(); + let set_name_len = self.name.len(); + probe!( + mmtk_ruby, + weak_cs_par_entries_begin, + self.begin, + self.end, + set_name, + set_name_len + ); + + let mut stats = ConcurrentSetStats::default(); + (upcalls().concurrent_set_update_entries_range)(self.set, self.begin, self.end, &mut stats); + + debug!( + "Done updating entries of concurrent set '{}' range {}-{}, live: {}, moved: {}, deleted: {}", + self.name, self.begin, self.end, stats.live, stats.moved, stats.deleted + ); + probe!( + mmtk_ruby, + weak_cs_par_entries_end, + stats.live, + stats.moved, + stats.deleted, + ); + + let old_counter = self.counter.fetch_sub(1, Ordering::SeqCst); + if old_counter == 1 { + let num_entries = (upcalls().concurrent_set_get_num_entries)(self.set); + probe!(mmtk_ruby, weak_cs_par_final, num_entries) + } + } +} diff --git a/mmtk/src/weak_proc/st_table_parallel.rs b/mmtk/src/weak_proc/st_table_parallel.rs new file mode 100644 index 0000000..c694422 --- /dev/null +++ b/mmtk/src/weak_proc/st_table_parallel.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use mmtk::scheduler::{GCWork, GCWorker, WorkBucketStage}; + +use crate::{abi::st_table, upcalls, utils::AfterAll, Ruby}; + +pub fn process_weak_table_chunked( + name: &'static str, + table: *mut st_table, + weak_keys: bool, + weak_values: bool, + forward: bool, + worker: &mut GCWorker, +) { + let mut entries_start = 0; + let mut entries_bound = 0; + let mut bins_num = 0; + (upcalls().st_get_size_info)(table, &mut entries_start, &mut entries_bound, &mut bins_num); + let num_entries = (upcalls().st_get_num_entries)(table); + debug!( + "name: {name}, entries_start: {entries_start}, entries_bound: {entries_bound}, bins_num: {bins_num}, num_entries: {num_entries}" + ); + + let table_name_ptr = name.as_ptr(); + let table_name_len = name.len(); + + probe!( + mmtk_ruby, + weak_st_par_init, + entries_start, + entries_bound, + bins_num, + num_entries, + table_name_ptr, + table_name_len, + ); + + let entries_chunk_size = crate::binding().st_entries_chunk_size; + let bins_chunk_size = crate::binding().st_bins_chunk_size; + + let after_all = Arc::new(AfterAll::new(WorkBucketStage::VMRefClosure)); + + let entries_packets = (entries_start..entries_bound) + .step_by(entries_chunk_size) + .map(|begin| { + let end = (begin + entries_chunk_size).min(entries_bound); + let after_all = after_all.clone(); + Box::new(UpdateTableEntriesParallel { + name, + table, + begin, + end, + weak_keys, + weak_values, + forward, + after_all, + }) as _ + }) + .collect::>(); + after_all.count_up(entries_packets.len()); + + let bins_packets = (0..bins_num) + .step_by(entries_chunk_size) + .map(|begin| { + let end = (begin + bins_chunk_size).min(bins_num); + Box::new(UpdateTableBinsParallel { + name: name.to_string(), + table, + begin, + end, + }) as _ + }) + .collect::>(); + after_all.add_packets(bins_packets); + + worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure].bulk_add(entries_packets); +} + +struct UpdateTableEntriesParallel { + name: &'static str, + table: *mut st_table, + begin: usize, + end: usize, + weak_keys: bool, + weak_values: bool, + forward: bool, + after_all: Arc, +} + +unsafe impl Send for UpdateTableEntriesParallel {} + +impl UpdateTableEntriesParallel {} + +impl GCWork for UpdateTableEntriesParallel { + fn do_work(&mut self, worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { + debug!("Updating entries of {} table", self.name); + let deleted_entries = (upcalls().st_update_entries_range)( + self.table, + self.begin, + self.end, + self.weak_keys, + self.weak_values, + self.forward, + ); + debug!("Done updating entries of {} table", self.name); + let table_name = self.name.as_ptr(); + let table_name_len = self.name.len(); + probe!( + mmtk_ruby, + weak_st_par_entries, + self.begin, + self.end, + deleted_entries, + table_name, + table_name_len + ); + + let is_last = self.after_all.count_down(worker); + if is_last { + let num_entries = (upcalls().st_get_num_entries)(self.table); + probe!( + mmtk_ruby, + weak_st_par_final, + num_entries, + table_name, + table_name_len + ) + } + } +} + +struct UpdateTableBinsParallel { + name: String, + table: *mut st_table, + begin: usize, + end: usize, +} + +unsafe impl Send for UpdateTableBinsParallel {} + +impl UpdateTableBinsParallel {} + +impl GCWork for UpdateTableBinsParallel { + fn do_work(&mut self, _worker: &mut GCWorker, _mmtk: &'static mmtk::MMTK) { + debug!("Updating bins of {} table", self.name); + let deleted_bins = (upcalls().st_update_bins_range)(self.table, self.begin, self.end); + debug!("Done updating bins of {} table", self.name); + let table_name = self.name.as_ptr(); + let table_name_len = self.name.len(); + probe!( + mmtk_ruby, + weak_st_par_bins, + self.begin, + self.end, + deleted_bins, + table_name, + table_name_len + ); + } +} diff --git a/tools/tracing/timeline/capture_ruby.bt b/tools/tracing/timeline/capture_ruby.bt index 6959e8b..0dfa715 100644 --- a/tools/tracing/timeline/capture_ruby.bt +++ b/tools/tracing/timeline/capture_ruby.bt @@ -1,3 +1,5 @@ +// PPPs + usdt:$MMTK:mmtk_ruby:pin_ppp_children { if (@enable_print) { printf("pin_ppp_children,meta,%d,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2); @@ -16,48 +18,76 @@ usdt:$MMTK:mmtk_ruby:unpin_ppp_children { } } +// Generic weak table processing + usdt:$MMTK:mmtk_ruby:weak_table_size_change { if (@enable_print) { printf("weak_table_size_change,meta,%d,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1); } } +// Specific weak table processing work packets + usdt:$MMTK:mmtk_ruby:update_finalizer_and_obj_id_tables { if (@enable_print) { printf("update_finalizer_and_obj_id_tables,meta,%d,%lu,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2, arg3); } } -usdt:$MMTK:mmtk_ruby:initial_weak_table_stats { +// Weak concurrent set optimization + +usdt:$MMTK:mmtk_ruby:weak_cs_par_init { + if (@enable_print) { + printf("weak_cs_par_init,meta,%d,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, str(arg2, arg3)); + } +} + +usdt:$MMTK:mmtk_ruby:weak_cs_par_final { + if (@enable_print) { + printf("weak_cs_par_final,meta,%d,%lu,%lu\n", tid, nsecs, arg0); + } +} + +usdt:$MMTK:mmtk_ruby:weak_cs_par_entries_begin { if (@enable_print) { - printf("initial_weak_table_stats,meta,%d,%lu,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, arg3, str(arg4, arg5)); + printf("weak_cs_par_entries_begin,meta,%d,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, str(arg2, arg3)); } } -usdt:$MMTK:mmtk_ruby:final_weak_table_stats { +usdt:$MMTK:mmtk_ruby:weak_cs_par_entries_end { if (@enable_print) { - printf("final_weak_table_stats,meta,%d,%lu,%lu,%s\n", tid, nsecs, arg0, str(arg1, arg2)); + printf("weak_cs_par_entries_end,meta,%d,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2); } } -usdt:$MMTK:mmtk_ruby:update_table_entries_parallel { +// Weak st table optimization + +usdt:$MMTK:mmtk_ruby:weak_st_par_init { if (@enable_print) { - printf("update_table_entries_parallel,meta,%d,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, str(arg3, arg4)); + printf("weak_st_par_init,meta,%d,%lu,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, arg3, str(arg4, arg5)); } } -usdt:$MMTK:mmtk_ruby:update_table_bins_parallel { +usdt:$MMTK:mmtk_ruby:weak_st_par_final { if (@enable_print) { - printf("update_table_bins_parallel,meta,%d,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, str(arg3, arg4)); + printf("weak_st_par_final,meta,%d,%lu,%lu,%s\n", tid, nsecs, arg0, str(arg1, arg2)); } } -usdt:$MMTK:mmtk_ruby:update_generic_fields_tbl { +usdt:$MMTK:mmtk_ruby:weak_st_par_entries { if (@enable_print) { - printf("update_generic_fields_tbl,meta,%d,%lu,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1, arg2); + printf("weak_st_par_entries,meta,%d,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, str(arg3, arg4)); } } +usdt:$MMTK:mmtk_ruby:weak_st_par_bins { + if (@enable_print) { + printf("weak_st_par_bins,meta,%d,%lu,%lu,%lu,%lu,%s\n", tid, nsecs, arg0, arg1, arg2, str(arg3, arg4)); + } +} + +// Other work packets + usdt:$MMTK:mmtk_ruby:process_obj_free_candidates { if (@enable_print) { printf("process_obj_free_candidates,meta,%d,%lu,%lu,%lu\n", tid, nsecs, arg0, arg1); diff --git a/tools/tracing/timeline/visualize_ruby.py b/tools/tracing/timeline/visualize_ruby.py index 2b410ee..1298014 100755 --- a/tools/tracing/timeline/visualize_ruby.py +++ b/tools/tracing/timeline/visualize_ruby.py @@ -3,6 +3,8 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): if wp is not None: match name: + # PPPs + case "pin_ppp_children": num_ppps, num_no_longer_ppps, num_pinned_children = [int(x) for x in args] num_still_ppps = num_ppps - num_no_longer_ppps @@ -34,6 +36,8 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "num_ppp_children": num_children, } + # Generic weak table processing + case "weak_table_size_change": before, after = [int(x) for x in args] wp["args"] |= { @@ -44,6 +48,8 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): }, } + # Specific weak table processing work packets + case "update_finalizer_and_obj_id_tables": (finalizer_before, finalizer_after, id2ref_before, id2ref_after) = [int(x) for x in args] @@ -52,7 +58,55 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "id2ref": { "before": id2ref_before, "after": id2ref_after, "diff": id2ref_after - id2ref_before }, } - case "initial_weak_table_stats": + # Weak concurrent set optimization + + case "weak_cs_par_init": + num_entries, capacity = [int(x) for x in args[0:2]] + set_name = args[2] + gc["args"].setdefault(set_name, {}) + gc["args"][set_name] |= { + "num_entries_before": num_entries, + "capacity": capacity, + } + + case "weak_cs_par_final": + num_entries = int(args[0]) + set_name = wp["args"]["set_name"] + gc["args"].setdefault(set_name, {}) + gc["args"][set_name] |= { + "num_entries_after": num_entries, + } + if "num_entries_before" in gc["args"][set_name]: + before = gc["args"][set_name].pop("num_entries_before") + after = gc["args"][set_name].pop("num_entries_after") + gc["args"][set_name]["entries"] = { + "before": before, + "after": after, + "diff": after - before, + } + + case "weak_cs_par_entries_begin": + begin, end = [int(x) for x in args[0:2]] + set_name = args[-1] + num_entries = end - begin + wp["args"] |= { + "begin": begin, + "end": end, + "num_entries": num_entries, + "set_name": set_name, + } + + case "weak_cs_par_entries_end": + live, moved, deleted = [int(x) for x in args[0:3]] + wp["args"] |= { + "live": live, + "moved": moved, + "deleted": deleted, + } + + # Weak st table optimization + + case "weak_st_par_init": entries_start, entries_bound, bins_num, num_entries = [int(x) for x in args[0:4]] table_name = args[4] gc["args"].setdefault(table_name, {}) @@ -63,7 +117,7 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "num_entries_before": num_entries, } - case "final_weak_table_stats": + case "weak_st_par_final": num_entries = int(args[0]) table_name = args[1] gc["args"].setdefault(table_name, {}) @@ -79,7 +133,7 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "diff": after - before, } - case "update_table_entries_parallel": + case "weak_st_par_entries": begin, end, deleted_entries = [int(x) for x in args[0:3]] table_name = args[3] num_entries = end - begin @@ -91,7 +145,7 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "table_name": table_name, } - case "update_table_bins_parallel": + case "weak_st_par_bins": begin, end, deleted_bins = [int(x) for x in args[0:3]] table_name = args[3] num_bins = end - begin @@ -103,16 +157,7 @@ def enrich_meta_extra(log_processor, name, tid, ts, gc, wp, args): "table_name": table_name, } - case "update_generic_fields_tbl": - entries_moved, old_entries, new_entries = [int(x) for x in args[0:3]] - wp["args"] |= { - "entries_moved": entries_moved, - "entries": { - "before": old_entries, - "after": new_entries, - "diff": new_entries - old_entries, - }, - } + # Other work packets case "process_obj_free_candidates": old_candidates, new_candidates = [int(x) for x in args[0:2]]