From 28a2a9ff4e2fcaacfa463edb57ca8c06b381ff1e Mon Sep 17 00:00:00 2001 From: Ivan Walulya Date: Fri, 16 Jul 2021 16:18:45 +0200 Subject: [PATCH 1/3] add BucketsClaimer and a thread-safe do_safepoint_scan --- .../share/utilities/concurrentHashTable.hpp | 11 ++ .../utilities/concurrentHashTable.inline.hpp | 106 ++++++++++++++++++ .../utilities/test_concurrentHashtable.cpp | 77 +++++++++++++ 3 files changed, 194 insertions(+) diff --git a/src/hotspot/share/utilities/concurrentHashTable.hpp b/src/hotspot/share/utilities/concurrentHashTable.hpp index a93760fa69a2a..ec334dc4280c2 100644 --- a/src/hotspot/share/utilities/concurrentHashTable.hpp +++ b/src/hotspot/share/utilities/concurrentHashTable.hpp @@ -335,6 +335,10 @@ class ConcurrentHashTable : public CHeapObj { template void do_scan_locked(Thread* thread, FUNC& scan_f); + // Visits nodes for buckets in range [start_idx, stop_id) with FUNC. + template + void do_scan_for_range(FUNC& scan_f, size_t start_idx, size_t stop_idx, InternalTable *table); + // Check for dead items in a bucket. template size_t delete_check_nodes(Bucket* bucket, EVALUATE_FUNC& eval_f, @@ -386,6 +390,8 @@ class ConcurrentHashTable : public CHeapObj { ~ConcurrentHashTable(); + class BucketsClaimer; + TableRateStatistics _stats_rate; size_t get_mem_size(Thread* thread); @@ -471,6 +477,11 @@ class ConcurrentHashTable : public CHeapObj { template void do_safepoint_scan(SCAN_FUNC& scan_f); + // Visit all items with SCAN_FUNC without any protection. + // Thread-safe, but must be called at safepoint. + template + void do_safepoint_scan(SCAN_FUNC& scan_f, BucketsClaimer* bucket_claimer); + // Destroying items matching EVALUATE_FUNC, before destroying items // DELETE_FUNC is called, if resize lock is obtained. Else returns false. template diff --git a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp index b5c2389f11ae4..e3926d95a1928 100644 --- a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp +++ b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp @@ -977,6 +977,27 @@ inline void ConcurrentHashTable:: } /* ends critical section */ } +template +template +inline void ConcurrentHashTable:: + do_scan_for_range(FUNC& scan_f, size_t start_idx, size_t stop_idx, InternalTable* table) +{ + assert(start_idx < stop_idx, "Must be"); + assert(stop_idx <= table->_size, "Must be"); + + for (size_t bucket_it = start_idx; bucket_it < stop_idx; ++bucket_it) { + Bucket* bucket = table->get_bucket(bucket_it); + // If bucket has a redirect, the items will be in the new table. + if (!bucket->have_redirect()) { + if (!visit_nodes(table->get_bucket(bucket_it), scan_f)) { + return; // FIXME: do we really need to return in all cases? + } + } else { + assert(bucket->is_locked(), "Bucket must be locked."); + } + } +} + template template inline size_t ConcurrentHashTable:: @@ -1174,6 +1195,22 @@ inline void ConcurrentHashTable:: } } +template +template +inline void ConcurrentHashTable:: + do_safepoint_scan(SCAN_FUNC& scan_f, BucketsClaimer* bucket_claimer) +{ + assert(SafepointSynchronize::is_at_safepoint(), + "must only be called in a safepoint"); + size_t start_idx = 0, stop_idx = 0; + InternalTable* table = NULL; + while (bucket_claimer->claim(&start_idx, &stop_idx, &table)) { + assert(table != NULL, "precondition"); + do_scan_for_range(scan_f, start_idx, stop_idx, table); + table = NULL; + } +} + template template inline bool ConcurrentHashTable:: @@ -1289,4 +1326,73 @@ inline bool ConcurrentHashTable:: return true; } +template +class ConcurrentHashTable::BucketsClaimer { + protected: + ConcurrentHashTable* _cht; + + // Default size of _claim_size_log2 + static const size_t DEFAULT_CLAIM_SIZE_LOG2 = 12; + // The table is split into ranges, every increment is one range. + volatile size_t _next_to_claim; + size_t _claim_size_log2; // Number of buckets in claimed range. + size_t _limit; // Limit to number of claims + size_t _size_log2; // Table size. + + // If there is a paused resize, we also need to operate on the already resized items. + volatile size_t _next_to_claim_new_table; + size_t _claim_size_log2_new_table; + size_t _limit_new_table; + size_t _size_log2_new_table; + +public: + BucketsClaimer(ConcurrentHashTable* cht) + : _cht(cht), _next_to_claim(0), _claim_size_log2(DEFAULT_CLAIM_SIZE_LOG2), + _limit(0), _size_log2(_cht->_table->_log2_size), _next_to_claim_new_table(0), + _claim_size_log2_new_table(DEFAULT_CLAIM_SIZE_LOG2), _limit_new_table(0), + _size_log2_new_table(_cht->_table->_log2_size) { + + _claim_size_log2 = MIN2(_claim_size_log2, _size_log2); + size_t tmp = _size_log2 > _claim_size_log2 ? + (_size_log2 - _claim_size_log2) : 0; + _limit = (((size_t)1) << tmp); + + ConcurrentHashTable::InternalTable* new_table = _cht->get_new_table(); + + if (new_table != NULL) { + _size_log2_new_table = new_table->_log2_size; + _claim_size_log2_new_table = MIN2(_claim_size_log2_new_table, _size_log2_new_table); + size_t tmp = _size_log2_new_table > _claim_size_log2_new_table ? + (_size_log2_new_table - _claim_size_log2_new_table) : 0; + _limit_new_table = (((size_t)1) << tmp); + } + } + + // Returns true if you succeeded to claim the range [start, stop). + bool claim(size_t* start, size_t* stop, ConcurrentHashTable::InternalTable** table) { + if (Atomic::load(&_next_to_claim) < _limit) { + size_t claimed = Atomic::fetch_and_add(&_next_to_claim, 1u); + if (claimed < _limit) { + *start = claimed * (((size_t)1) << _claim_size_log2); + *stop = ((*start) + (((size_t)1) << _claim_size_log2)); + *table = _cht->get_table(); + return true; + } + } + if (_limit_new_table == 0) { + return false; + } + ConcurrentHashTable::InternalTable* new_table = _cht->get_new_table(); + assert(new_table != NULL, "Precondition"); + size_t claimed = Atomic::fetch_and_add(&_next_to_claim_new_table, 1u); + if (claimed < _limit_new_table) { + *start = claimed * (((size_t)1) << _claim_size_log2_new_table); + *stop = ((*start) + (((size_t)1) << _claim_size_log2_new_table)); + *table = new_table; + return true; + } + return false; + } +}; + #endif // SHARE_UTILITIES_CONCURRENTHASHTABLE_INLINE_HPP diff --git a/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp b/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp index c867df2eff52d..b839824a799cc 100644 --- a/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp +++ b/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp @@ -23,6 +23,8 @@ #include "precompiled.hpp" #include "runtime/mutex.hpp" +#include "runtime/nonJavaThread.hpp" +#include "runtime/os.hpp" #include "runtime/semaphore.hpp" #include "runtime/thread.hpp" #include "runtime/vmThread.hpp" @@ -1144,3 +1146,78 @@ class Driver_BD_Thread : public JavaTestThread { TEST_VM(ConcurrentHashTable, concurrent_mt_bulk_delete) { mt_test_doer(); } + +class Scanner_Thread: public WorkerThread { + Semaphore _start; + Semaphore* _done; + TestTable* _cht; + TestTable::BucketsClaimer* _bucket_claimer; + size_t _num_scanned; + size_t *_total_scanned; +protected: + void run() override { + _start.wait(); + _cht->do_safepoint_scan(*this, _bucket_claimer); + Atomic::add(_total_scanned, _num_scanned); + _done->signal(); + } +public: + Scanner_Thread(Semaphore* done, TestTable* cht, TestTable::BucketsClaimer* bc, size_t *total_scanned) : + _done(done), _cht(cht), _bucket_claimer(bc), _num_scanned(0), _total_scanned(total_scanned) {} + + void run_thread() { + _start.signal(); + } + + bool operator()(uintptr_t* val) { + return ++_num_scanned; + } +}; + +class VM_CHT_MT_Scan: public VM_GTestExecuteAtSafepoint { + TestTable* _cht; + uintptr_t _num_items; +public: + void doit(); + VM_CHT_MT_Scan(TestTable* cht, uintptr_t num_items): _cht(cht), _num_items(num_items) { } +}; + +void VM_CHT_MT_Scan::doit() { + size_t total_scanned = 0; + TestTable::BucketsClaimer bucket_claimer(_cht); + Semaphore done(0); + + // Create and start parallel worker threads. + const int num_threads = 4; + Scanner_Thread* st[num_threads]; + for (int i = 0; i < num_threads; i++) { + st[i] = new Scanner_Thread(&done, _cht, &bucket_claimer, &total_scanned); + os::create_thread(st[i], os::pgc_thread); + os::start_thread(st[i]); + } + + for (int i = 0; i < num_threads; i++) { + st[i]->run_thread(); + } + + for (int i = 0; i < num_threads; i++) { + done.wait(); + } + + EXPECT_TRUE(total_scanned == (size_t)_num_items) << " Should scan all inserted items: " << total_scanned; +} + +TEST_VM(ConcurrentHashTable, concurrent_mt_scan) { + TestTable* cht = new TestTable(16, 16, 2); + + uintptr_t num_items = 99999; + for (uintptr_t v = 1; v <= num_items; v++ ) { + TestLookup tl(v); + EXPECT_TRUE(cht->insert(JavaThread::current(), tl, v)) << "Inserting an unique value should work."; + } + + // Run the test at a safepoint. + VM_CHT_MT_Scan op(cht, num_items); + ThreadInVMfromNative invm(JavaThread::current()); + VMThread::execute(&op); +} From b89f41ef8113027e4d227a772907cce842b67368 Mon Sep 17 00:00:00 2001 From: Ivan Walulya Date: Mon, 19 Jul 2021 15:21:33 +0200 Subject: [PATCH 2/3] cclean up class names among other changes --- .../utilities/concurrentHashTable.inline.hpp | 17 ++++++++--------- .../utilities/test_concurrentHashtable.cpp | 16 ++++++++-------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp index e3926d95a1928..bb09abc9dfa16 100644 --- a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp +++ b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp @@ -989,9 +989,8 @@ inline void ConcurrentHashTable:: Bucket* bucket = table->get_bucket(bucket_it); // If bucket has a redirect, the items will be in the new table. if (!bucket->have_redirect()) { - if (!visit_nodes(table->get_bucket(bucket_it), scan_f)) { - return; // FIXME: do we really need to return in all cases? - } + bool result = visit_nodes(table->get_bucket(bucket_it), scan_f); + assert(result, "Must be"); } else { assert(bucket->is_locked(), "Bucket must be locked."); } @@ -1349,8 +1348,7 @@ class ConcurrentHashTable::BucketsClaimer { BucketsClaimer(ConcurrentHashTable* cht) : _cht(cht), _next_to_claim(0), _claim_size_log2(DEFAULT_CLAIM_SIZE_LOG2), _limit(0), _size_log2(_cht->_table->_log2_size), _next_to_claim_new_table(0), - _claim_size_log2_new_table(DEFAULT_CLAIM_SIZE_LOG2), _limit_new_table(0), - _size_log2_new_table(_cht->_table->_log2_size) { + _claim_size_log2_new_table(0), _limit_new_table(0), _size_log2_new_table(0) { _claim_size_log2 = MIN2(_claim_size_log2, _size_log2); size_t tmp = _size_log2 > _claim_size_log2 ? @@ -1361,9 +1359,9 @@ class ConcurrentHashTable::BucketsClaimer { if (new_table != NULL) { _size_log2_new_table = new_table->_log2_size; - _claim_size_log2_new_table = MIN2(_claim_size_log2_new_table, _size_log2_new_table); + _claim_size_log2_new_table = MIN2(DEFAULT_CLAIM_SIZE_LOG2, _size_log2_new_table); size_t tmp = _size_log2_new_table > _claim_size_log2_new_table ? - (_size_log2_new_table - _claim_size_log2_new_table) : 0; + (_size_log2_new_table - _claim_size_log2_new_table) : 0; _limit_new_table = (((size_t)1) << tmp); } } @@ -1373,20 +1371,21 @@ class ConcurrentHashTable::BucketsClaimer { if (Atomic::load(&_next_to_claim) < _limit) { size_t claimed = Atomic::fetch_and_add(&_next_to_claim, 1u); if (claimed < _limit) { - *start = claimed * (((size_t)1) << _claim_size_log2); + *start = claimed << _claim_size_log2; *stop = ((*start) + (((size_t)1) << _claim_size_log2)); *table = _cht->get_table(); return true; } } if (_limit_new_table == 0) { + assert(_cht->get_new_table() == NULL, "Precondition"); return false; } ConcurrentHashTable::InternalTable* new_table = _cht->get_new_table(); assert(new_table != NULL, "Precondition"); size_t claimed = Atomic::fetch_and_add(&_next_to_claim_new_table, 1u); if (claimed < _limit_new_table) { - *start = claimed * (((size_t)1) << _claim_size_log2_new_table); + *start = claimed << _claim_size_log2_new_table; *stop = ((*start) + (((size_t)1) << _claim_size_log2_new_table)); *table = new_table; return true; diff --git a/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp b/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp index b839824a799cc..d0a662e7fd264 100644 --- a/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp +++ b/test/hotspot/gtest/utilities/test_concurrentHashtable.cpp @@ -1147,7 +1147,7 @@ TEST_VM(ConcurrentHashTable, concurrent_mt_bulk_delete) { mt_test_doer(); } -class Scanner_Thread: public WorkerThread { +class ScannerThread: public WorkerThread { Semaphore _start; Semaphore* _done; TestTable* _cht; @@ -1162,7 +1162,7 @@ class Scanner_Thread: public WorkerThread { _done->signal(); } public: - Scanner_Thread(Semaphore* done, TestTable* cht, TestTable::BucketsClaimer* bc, size_t *total_scanned) : + ScannerThread(Semaphore* done, TestTable* cht, TestTable::BucketsClaimer* bc, size_t *total_scanned) : _done(done), _cht(cht), _bucket_claimer(bc), _num_scanned(0), _total_scanned(total_scanned) {} void run_thread() { @@ -1174,24 +1174,24 @@ class Scanner_Thread: public WorkerThread { } }; -class VM_CHT_MT_Scan: public VM_GTestExecuteAtSafepoint { +class CHTScanMT: public VM_GTestExecuteAtSafepoint { TestTable* _cht; uintptr_t _num_items; public: + CHTScanMT(TestTable* cht, uintptr_t num_items): _cht(cht), _num_items(num_items) { } void doit(); - VM_CHT_MT_Scan(TestTable* cht, uintptr_t num_items): _cht(cht), _num_items(num_items) { } }; -void VM_CHT_MT_Scan::doit() { +void CHTScanMT::doit() { size_t total_scanned = 0; TestTable::BucketsClaimer bucket_claimer(_cht); Semaphore done(0); // Create and start parallel worker threads. const int num_threads = 4; - Scanner_Thread* st[num_threads]; + ScannerThread* st[num_threads]; for (int i = 0; i < num_threads; i++) { - st[i] = new Scanner_Thread(&done, _cht, &bucket_claimer, &total_scanned); + st[i] = new ScannerThread(&done, _cht, &bucket_claimer, &total_scanned); os::create_thread(st[i], os::pgc_thread); os::start_thread(st[i]); } @@ -1217,7 +1217,7 @@ TEST_VM(ConcurrentHashTable, concurrent_mt_scan) { } // Run the test at a safepoint. - VM_CHT_MT_Scan op(cht, num_items); + CHTScanMT op(cht, num_items); ThreadInVMfromNative invm(JavaThread::current()); VMThread::execute(&op); } From b273c81d6892c9b28abfa324c4aee32388b0f207 Mon Sep 17 00:00:00 2001 From: Ivan Walulya Date: Mon, 19 Jul 2021 15:49:34 +0200 Subject: [PATCH 3/3] remve _size_log2 --- .../utilities/concurrentHashTable.inline.hpp | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp index bb09abc9dfa16..897b1a469f361 100644 --- a/src/hotspot/share/utilities/concurrentHashTable.inline.hpp +++ b/src/hotspot/share/utilities/concurrentHashTable.inline.hpp @@ -1336,32 +1336,30 @@ class ConcurrentHashTable::BucketsClaimer { volatile size_t _next_to_claim; size_t _claim_size_log2; // Number of buckets in claimed range. size_t _limit; // Limit to number of claims - size_t _size_log2; // Table size. // If there is a paused resize, we also need to operate on the already resized items. volatile size_t _next_to_claim_new_table; size_t _claim_size_log2_new_table; size_t _limit_new_table; - size_t _size_log2_new_table; public: BucketsClaimer(ConcurrentHashTable* cht) - : _cht(cht), _next_to_claim(0), _claim_size_log2(DEFAULT_CLAIM_SIZE_LOG2), - _limit(0), _size_log2(_cht->_table->_log2_size), _next_to_claim_new_table(0), - _claim_size_log2_new_table(0), _limit_new_table(0), _size_log2_new_table(0) { + : _cht(cht), _next_to_claim(0), _claim_size_log2(DEFAULT_CLAIM_SIZE_LOG2), _limit(0), + _next_to_claim_new_table(0), _claim_size_log2_new_table(0), _limit_new_table(0) { - _claim_size_log2 = MIN2(_claim_size_log2, _size_log2); - size_t tmp = _size_log2 > _claim_size_log2 ? - (_size_log2 - _claim_size_log2) : 0; + size_t size_log2 = _cht->_table->_log2_size; + _claim_size_log2 = MIN2(_claim_size_log2, size_log2); + size_t tmp = size_log2 > _claim_size_log2 ? + (size_log2 - _claim_size_log2) : 0; _limit = (((size_t)1) << tmp); ConcurrentHashTable::InternalTable* new_table = _cht->get_new_table(); if (new_table != NULL) { - _size_log2_new_table = new_table->_log2_size; - _claim_size_log2_new_table = MIN2(DEFAULT_CLAIM_SIZE_LOG2, _size_log2_new_table); - size_t tmp = _size_log2_new_table > _claim_size_log2_new_table ? - (_size_log2_new_table - _claim_size_log2_new_table) : 0; + size_t size_log2_new_table = new_table->_log2_size; + _claim_size_log2_new_table = MIN2(DEFAULT_CLAIM_SIZE_LOG2, size_log2_new_table); + size_t tmp = size_log2_new_table > _claim_size_log2_new_table ? + (size_log2_new_table - _claim_size_log2_new_table) : 0; _limit_new_table = (((size_t)1) << tmp); } } @@ -1377,10 +1375,12 @@ class ConcurrentHashTable::BucketsClaimer { return true; } } + if (_limit_new_table == 0) { assert(_cht->get_new_table() == NULL, "Precondition"); return false; } + ConcurrentHashTable::InternalTable* new_table = _cht->get_new_table(); assert(new_table != NULL, "Precondition"); size_t claimed = Atomic::fetch_and_add(&_next_to_claim_new_table, 1u);