Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/hotspot/share/utilities/concurrentHashTable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ class ConcurrentHashTable : public CHeapObj<F> {
template <typename FUNC>
void do_scan_locked(Thread* thread, FUNC& scan_f);

// Visits nodes for buckets in range [start_idx, stop_id) with FUNC.
template <typename FUNC>
void do_scan_for_range(FUNC& scan_f, size_t start_idx, size_t stop_idx, InternalTable *table);

// Check for dead items in a bucket.
template <typename EVALUATE_FUNC>
size_t delete_check_nodes(Bucket* bucket, EVALUATE_FUNC& eval_f,
Expand Down Expand Up @@ -386,6 +390,8 @@ class ConcurrentHashTable : public CHeapObj<F> {

~ConcurrentHashTable();

class BucketsClaimer;

TableRateStatistics _stats_rate;

size_t get_mem_size(Thread* thread);
Expand Down Expand Up @@ -471,6 +477,11 @@ class ConcurrentHashTable : public CHeapObj<F> {
template <typename SCAN_FUNC>
void do_safepoint_scan(SCAN_FUNC& scan_f);

// Visit all items with SCAN_FUNC without any protection.
// Thread-safe, but must be called at safepoint.
template <typename SCAN_FUNC>
void do_safepoint_scan(SCAN_FUNC& scan_f, BucketsClaimer* bucket_claimer);

// Destroying items matching EVALUATE_FUNC, before destroying items
// DELETE_FUNC is called, if resize lock is obtained. Else returns false.
template <typename EVALUATE_FUNC, typename DELETE_FUNC>
Expand Down
105 changes: 105 additions & 0 deletions src/hotspot/share/utilities/concurrentHashTable.inline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,26 @@ inline void ConcurrentHashTable<CONFIG, F>::
} /* ends critical section */
}

template <typename CONFIG, MEMFLAGS F>
template <typename FUNC>
inline void ConcurrentHashTable<CONFIG, F>::
do_scan_for_range(FUNC& scan_f, size_t start_idx, size_t stop_idx, InternalTable* table)
{
assert(start_idx < stop_idx, "Must be");
assert(stop_idx <= table->_size, "Must be");

for (size_t bucket_it = start_idx; bucket_it < stop_idx; ++bucket_it) {
Bucket* bucket = table->get_bucket(bucket_it);
// If bucket has a redirect, the items will be in the new table.
if (!bucket->have_redirect()) {
bool result = visit_nodes(table->get_bucket(bucket_it), scan_f);
assert(result, "Must be");
} else {
assert(bucket->is_locked(), "Bucket must be locked.");
}
}
}

template <typename CONFIG, MEMFLAGS F>
template <typename EVALUATE_FUNC>
inline size_t ConcurrentHashTable<CONFIG, F>::
Expand Down Expand Up @@ -1174,6 +1194,22 @@ inline void ConcurrentHashTable<CONFIG, F>::
}
}

template <typename CONFIG, MEMFLAGS F>
template <typename SCAN_FUNC>
inline void ConcurrentHashTable<CONFIG, F>::
do_safepoint_scan(SCAN_FUNC& scan_f, BucketsClaimer* bucket_claimer)
{
assert(SafepointSynchronize::is_at_safepoint(),
"must only be called in a safepoint");
size_t start_idx = 0, stop_idx = 0;
InternalTable* table = NULL;
while (bucket_claimer->claim(&start_idx, &stop_idx, &table)) {
assert(table != NULL, "precondition");
do_scan_for_range(scan_f, start_idx, stop_idx, table);
table = NULL;
}
}

template <typename CONFIG, MEMFLAGS F>
template <typename EVALUATE_FUNC, typename DELETE_FUNC>
inline bool ConcurrentHashTable<CONFIG, F>::
Expand Down Expand Up @@ -1289,4 +1325,73 @@ inline bool ConcurrentHashTable<CONFIG, F>::
return true;
}

template <typename CONFIG, MEMFLAGS F>
class ConcurrentHashTable<CONFIG, F>::BucketsClaimer {
protected:
ConcurrentHashTable<CONFIG, F>* _cht;

// Default size of _claim_size_log2
static const size_t DEFAULT_CLAIM_SIZE_LOG2 = 12;
// The table is split into ranges, every increment is one range.
volatile size_t _next_to_claim;
size_t _claim_size_log2; // Number of buckets in claimed range.
size_t _limit; // Limit to number of claims

// If there is a paused resize, we also need to operate on the already resized items.
volatile size_t _next_to_claim_new_table;
size_t _claim_size_log2_new_table;
size_t _limit_new_table;

public:
BucketsClaimer(ConcurrentHashTable<CONFIG, F>* cht)
: _cht(cht), _next_to_claim(0), _claim_size_log2(DEFAULT_CLAIM_SIZE_LOG2), _limit(0),
_next_to_claim_new_table(0), _claim_size_log2_new_table(0), _limit_new_table(0) {

size_t size_log2 = _cht->_table->_log2_size;
_claim_size_log2 = MIN2(_claim_size_log2, size_log2);
size_t tmp = size_log2 > _claim_size_log2 ?
(size_log2 - _claim_size_log2) : 0;
_limit = (((size_t)1) << tmp);

ConcurrentHashTable<CONFIG, F>::InternalTable* new_table = _cht->get_new_table();

if (new_table != NULL) {
size_t size_log2_new_table = new_table->_log2_size;
_claim_size_log2_new_table = MIN2(DEFAULT_CLAIM_SIZE_LOG2, size_log2_new_table);
size_t tmp = size_log2_new_table > _claim_size_log2_new_table ?
(size_log2_new_table - _claim_size_log2_new_table) : 0;
_limit_new_table = (((size_t)1) << tmp);
}
}

// Returns true if you succeeded to claim the range [start, stop).
bool claim(size_t* start, size_t* stop, ConcurrentHashTable<CONFIG, F>::InternalTable** table) {
if (Atomic::load(&_next_to_claim) < _limit) {
size_t claimed = Atomic::fetch_and_add(&_next_to_claim, 1u);
if (claimed < _limit) {
*start = claimed << _claim_size_log2;
*stop = ((*start) + (((size_t)1) << _claim_size_log2));
*table = _cht->get_table();
return true;
}
}

if (_limit_new_table == 0) {
assert(_cht->get_new_table() == NULL, "Precondition");
return false;
}

ConcurrentHashTable<CONFIG, F>::InternalTable* new_table = _cht->get_new_table();
assert(new_table != NULL, "Precondition");
size_t claimed = Atomic::fetch_and_add(&_next_to_claim_new_table, 1u);
if (claimed < _limit_new_table) {
*start = claimed << _claim_size_log2_new_table;
*stop = ((*start) + (((size_t)1) << _claim_size_log2_new_table));
*table = new_table;
return true;
}
return false;
}
};

#endif // SHARE_UTILITIES_CONCURRENTHASHTABLE_INLINE_HPP
77 changes: 77 additions & 0 deletions test/hotspot/gtest/utilities/test_concurrentHashtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "precompiled.hpp"
#include "runtime/mutex.hpp"
#include "runtime/nonJavaThread.hpp"
#include "runtime/os.hpp"
#include "runtime/semaphore.hpp"
#include "runtime/thread.hpp"
#include "runtime/vmThread.hpp"
Expand Down Expand Up @@ -1144,3 +1146,78 @@ class Driver_BD_Thread : public JavaTestThread {
TEST_VM(ConcurrentHashTable, concurrent_mt_bulk_delete) {
mt_test_doer<Driver_BD_Thread>();
}

class ScannerThread: public WorkerThread {
Semaphore _start;
Semaphore* _done;
TestTable* _cht;
TestTable::BucketsClaimer* _bucket_claimer;
size_t _num_scanned;
size_t *_total_scanned;
protected:
void run() override {
_start.wait();
_cht->do_safepoint_scan(*this, _bucket_claimer);
Atomic::add(_total_scanned, _num_scanned);
_done->signal();
}
public:
ScannerThread(Semaphore* done, TestTable* cht, TestTable::BucketsClaimer* bc, size_t *total_scanned) :
_done(done), _cht(cht), _bucket_claimer(bc), _num_scanned(0), _total_scanned(total_scanned) {}

void run_thread() {
_start.signal();
}

bool operator()(uintptr_t* val) {
return ++_num_scanned;
}
};

class CHTScanMT: public VM_GTestExecuteAtSafepoint {
TestTable* _cht;
uintptr_t _num_items;
public:
CHTScanMT(TestTable* cht, uintptr_t num_items): _cht(cht), _num_items(num_items) { }
void doit();
};

void CHTScanMT::doit() {
size_t total_scanned = 0;
TestTable::BucketsClaimer bucket_claimer(_cht);
Semaphore done(0);

// Create and start parallel worker threads.
const int num_threads = 4;
ScannerThread* st[num_threads];
for (int i = 0; i < num_threads; i++) {
st[i] = new ScannerThread(&done, _cht, &bucket_claimer, &total_scanned);
os::create_thread(st[i], os::pgc_thread);
os::start_thread(st[i]);
}

for (int i = 0; i < num_threads; i++) {
st[i]->run_thread();
}

for (int i = 0; i < num_threads; i++) {
done.wait();
}

EXPECT_TRUE(total_scanned == (size_t)_num_items) << " Should scan all inserted items: " << total_scanned;
}

TEST_VM(ConcurrentHashTable, concurrent_mt_scan) {
TestTable* cht = new TestTable(16, 16, 2);

uintptr_t num_items = 99999;
for (uintptr_t v = 1; v <= num_items; v++ ) {
TestLookup tl(v);
EXPECT_TRUE(cht->insert(JavaThread::current(), tl, v)) << "Inserting an unique value should work.";
}

// Run the test at a safepoint.
CHTScanMT op(cht, num_items);
ThreadInVMfromNative invm(JavaThread::current());
VMThread::execute(&op);
}