From e8d98a04ab8e1fc34bef37bd9b802c49510c9e83 Mon Sep 17 00:00:00 2001 From: Jeff Preshing Date: Sat, 6 Feb 2016 12:55:15 -0500 Subject: [PATCH] Run clang-format --- .clang-format | 1 + junction/ConcurrentMap_Grampa.cpp | 2 +- junction/ConcurrentMap_Grampa.h | 143 ++++++++++-------- junction/ConcurrentMap_LeapFrog.cpp | 2 +- junction/ConcurrentMap_LeapFrog.h | 89 ++++++----- junction/ConcurrentMap_Linear.cpp | 2 +- junction/ConcurrentMap_Linear.h | 102 +++++++------ junction/Core.h | 2 +- junction/QSBR.cpp | 2 +- junction/QSBR.h | 10 +- junction/SimpleJobCoordinator.h | 2 +- junction/SingleMap_Linear.h | 22 +-- junction/details/Grampa.h | 139 +++++++++-------- junction/details/LeapFrog.h | 75 ++++----- junction/details/Linear.h | 67 ++++---- junction/extra/MemHook_NBDS.cpp | 6 +- junction/extra/MemHook_TBB.cpp | 4 +- junction/extra/impl/MapAdapter_CDS_Cuckoo.h | 8 +- junction/extra/impl/MapAdapter_CDS_Michael.h | 19 +-- junction/extra/impl/MapAdapter_Grampa.h | 4 +- junction/extra/impl/MapAdapter_LeapFrog.h | 12 +- junction/extra/impl/MapAdapter_Linear.h | 12 +- junction/extra/impl/MapAdapter_Linear_Mutex.h | 2 +- .../extra/impl/MapAdapter_Linear_RWLock.h | 2 +- junction/extra/impl/MapAdapter_Null.h | 2 +- junction/extra/impl/MapAdapter_StdMap.h | 2 +- junction/striped/ConditionBank.h | 6 +- junction/striped/ManualResetEvent.h | 7 +- junction/striped/Mutex.h | 2 +- .../MapCorrectnessTests.cpp | 14 +- samples/MapCorrectnessTests/TestChurn.h | 129 ++++++++-------- samples/MapCorrectnessTests/TestEnvironment.h | 1 - .../TestInsertDifferentKeys.h | 12 +- .../MapCorrectnessTests/TestInsertSameKeys.h | 12 +- .../MapPerformanceTests.cpp | 31 ++-- .../MapScalabilityTests.cpp | 28 ++-- 36 files changed, 518 insertions(+), 457 deletions(-) diff --git a/.clang-format b/.clang-format index be640a0..9cc8045 100644 --- a/.clang-format +++ b/.clang-format @@ -7,3 +7,4 @@ DerivePointerAlignment: false PointerAlignment: Left SpaceAfterCStyleCast: true AllowShortFunctionsOnASingleLine: None +AlwaysBreakTemplateDeclarations: true diff --git a/junction/ConcurrentMap_Grampa.cpp b/junction/ConcurrentMap_Grampa.cpp index 2acb10f..79b5b5b 100644 --- a/junction/ConcurrentMap_Grampa.cpp +++ b/junction/ConcurrentMap_Grampa.cpp @@ -13,7 +13,7 @@ #include namespace junction { - + TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Grampa, 27) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[locateTable] flattree lookup redirected") TURF_TRACE_DEFINE("[createInitialTable] race to create initial table") diff --git a/junction/ConcurrentMap_Grampa.h b/junction/ConcurrentMap_Grampa.h index d7fa979..5f03c8b 100644 --- a/junction/ConcurrentMap_Grampa.h +++ b/junction/ConcurrentMap_Grampa.h @@ -109,18 +109,20 @@ class ConcurrentMap_Grampa { // This TableMigration replaces the entire map with a single table. TURF_ASSERT(migration->m_baseHash == 0); TURF_ASSERT(migration->m_numDestinations == 1); - ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root. + ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root. // Store the single table in m_root directly. typename Details::Table* newTable = migration->getDestinations()[0]; - m_root.store(uptr(newTable), turf::Release); // Make table contents visible + m_root.store(uptr(newTable), turf::Release); // Make table contents visible newTable->isPublished.signal(); if ((oldRoot & 1) == 0) { - TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", + uptr(migration), 0); // If oldRoot is a table, it must be the original source of the migration. TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table); // Don't GC it here. The caller will GC it since it's a source of the TableMigration. } else { - TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", + uptr(migration), 0); // The entire previous flattree is being replaced. Details::garbageCollectFlatTree((typename Details::FlatTree*) (oldRoot & ~ureg(1))); } @@ -128,7 +130,8 @@ class ConcurrentMap_Grampa { } else { // We are either publishing a subtree of one or more tables, or replacing the entire map with multiple tables. // In either case, there will be a flattree after this function returns. - TURF_ASSERT(migration->m_safeShift < sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting + TURF_ASSERT(migration->m_safeShift < + sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting ureg oldRoot = m_root.load(turf::Consume); if ((oldRoot & 1) == 0) { // There's no flattree yet. This means the TableMigration is publishing the full range of hashes. @@ -138,7 +141,8 @@ class ConcurrentMap_Grampa { TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table); // Furthermore, it is guaranteed that there are no racing writes to m_root. // Create a new flattree and store it to m_root. - TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", + uptr(migration), 0); typename Details::FlatTree* flatTree = Details::FlatTree::create(migration->m_safeShift); typename Details::Table* prevTable = NULL; for (ureg i = 0; i < migration->m_numDestinations; i++) { @@ -149,14 +153,15 @@ class ConcurrentMap_Grampa { prevTable = newTable; } } - m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables + m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables // Caller will GC the TableMigration. // Caller will also GC the old oldRoot since it's a source of the TableMigration. } else { // There is an existing flattree, and we are publishing one or more tables to it. // Attempt to publish the subtree in a loop. // The loop is necessary because we might get redirected in the middle of publishing. - TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", + uptr(migration), 0); typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (oldRoot & ~ureg(1)); ureg subTreeEntriesPublished = 0; typename Details::Table* tableToReplace = migration->getSources()[0].table; @@ -164,7 +169,7 @@ class ConcurrentMap_Grampa { // Otherwise, there will be a race between a subtree and its own children. // (If all ManualResetEvent objects supported isPublished(), we could add a TURF_TRACE counter for this. // In previous tests, such a counter does in fact get hit.) - tableToReplace->isPublished.wait(); + tableToReplace->isPublished.wait(); typename Details::Table* prevTable = NULL; for (;;) { publishLoop: @@ -173,10 +178,12 @@ class ConcurrentMap_Grampa { // First, try to create a FlatTreeMigration with the necessary properties. // This will fail if an existing FlatTreeMigration has already been created using the same source. // In that case, we'll help complete the existing FlatTreeMigration, then we'll retry the loop. - TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", uptr(migration), 0); - typename Details::FlatTreeMigration* flatTreeMigration = Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift); + TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", + uptr(migration), 0); + typename Details::FlatTreeMigration* flatTreeMigration = + Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift); tableToReplace->jobCoordinator.runOne(flatTreeMigration); - flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible + flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible flatTree = flatTreeMigration->m_destination; // The FlatTreeMigration has already been GC'ed by the last worker. // Retry the loop. @@ -186,10 +193,12 @@ class ConcurrentMap_Grampa { // The subtree we're about to publish fits inside the flattree. TURF_ASSERT(dstStartIndex + migration->m_numDestinations * repeat - 1 <= Hash(-1) >> flatTree->safeShift); // If a previous attempt to publish got redirected, resume publishing into the new flattree, - // starting with the first subtree entry that has not yet been fully published, as given by subTreeEntriesPublished. + // starting with the first subtree entry that has not yet been fully published, as given by + // subTreeEntriesPublished. // (Note: We could, in fact, restart the publish operation starting at entry 0. That would be valid too. // We are the only thread that can modify this particular range of the flattree at this time.) - turf::Atomic* dstLeaf = flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat); + turf::Atomic* dstLeaf = + flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat); typename Details::Table** subFlatTree = migration->getDestinations(); while (subTreeEntriesPublished < migration->m_numDestinations) { typename Details::Table* srcTable = subFlatTree[subTreeEntriesPublished]; @@ -199,15 +208,20 @@ class ConcurrentMap_Grampa { if (ureg(probeTable) == Details::RedirectFlatTree) { // We've been redirected. // Help with the FlatTreeMigration, then try again. - TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), uptr(dstLeaf)); - typename Details::FlatTreeMigration* flatTreeMigration = Details::getExistingFlatTreeMigration(flatTree); + TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), + uptr(dstLeaf)); + typename Details::FlatTreeMigration* flatTreeMigration = + Details::getExistingFlatTreeMigration(flatTree); tableToReplace->jobCoordinator.runOne(flatTreeMigration); - flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible + flatTreeMigration->m_completed.wait(); + // flatTreeMigration->m_destination becomes entirely visible flatTree = flatTreeMigration->m_destination; goto publishLoop; } - // The only other possibility is that we were previously redirected, and the subtree entry got partially published. - TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", uptr(migration), 0); + // The only other possibility is that we were previously redirected, and the subtree entry got + // partially published. + TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", + uptr(migration), 0); TURF_ASSERT(probeTable == srcTable); } // The caller will GC the table) being replaced them since it's a source of the TableMigration. @@ -257,7 +271,8 @@ class ConcurrentMap_Grampa { // Constructor: Find existing cell Mutator(ConcurrentMap_Grampa& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key)); + TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), + uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { if (!m_map.locateTable(m_table, m_sizeMask, hash)) @@ -267,7 +282,7 @@ class ConcurrentMap_Grampa { return; m_value = m_cell->value.load(turf::Consume); if (m_value != Value(ValueTraits::Redirect)) - return; // Found an existing value + return; // Found an existing value // We've encountered a Redirect value. Help finish the migration. TURF_TRACE(ConcurrentMap_Grampa, 11, "[Mutator] find was redirected", uptr(m_table), 0); m_table->jobCoordinator.participate(); @@ -277,32 +292,33 @@ class ConcurrentMap_Grampa { // Constructor: Insert cell Mutator(ConcurrentMap_Grampa& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key)); + TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), + uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { if (!m_map.locateTable(m_table, m_sizeMask, hash)) { m_map.createInitialTable(Details::MinTableSize); } else { ureg overflowIdx; - switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell - case Details::InsertResult_InsertedNew: { - // We've inserted a new cell. Don't load m_cell->value. - return; - } - case Details::InsertResult_AlreadyFound: { - // The hash was already found in the table. - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - // We've encountered a Redirect value. - TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); - break; // Help finish the migration. - } - return; // Found an existing value - } - case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); - break; + switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell + case Details::InsertResult_InsertedNew: { + // We've inserted a new cell. Don't load m_cell->value. + return; + } + case Details::InsertResult_AlreadyFound: { + // The hash was already found in the table. + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + // We've encountered a Redirect value. + TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); + break; // Help finish the migration. } + return; // Found an existing value + } + case Details::InsertResult_Overflow: { + Details::beginTableMigration(m_map, m_table, overflowIdx); + break; + } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); @@ -319,22 +335,25 @@ class ConcurrentMap_Grampa { Value exchangeValue(Value desired) { TURF_ASSERT(desired != Value(ValueTraits::NullValue)); - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { // Exchange was successful. Return previous value. - TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); + TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), + uptr(desired)); Value result = m_value; - m_value = desired; // Leave the mutator in a valid state + m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", + uptr(m_table), uptr(m_value)); } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. @@ -354,25 +373,26 @@ class ConcurrentMap_Grampa { TURF_UNUSED(exists); m_value = Value(ValueTraits::NullValue); ureg overflowIdx; - switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell + switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(turf::Consume); if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + uptr(m_value)); break; } goto breakOuter; case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx); + TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), + overflowIdx); Details::beginTableMigration(m_map, m_table, overflowIdx); break; } // We were redirected... again } - breakOuter: - ; + breakOuter:; // Try again in the new table. } } @@ -382,21 +402,22 @@ class ConcurrentMap_Grampa { } Value eraseValue() { - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Grampa, 21, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_value)); for (;;) { if (m_value == Value(ValueTraits::NullValue)) return m_value; - TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. + TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { // Exchange was successful and a non-NullValue value was erased and returned by reference in m_value. - TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop. + TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop. Value result = m_value; - m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state + m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. // Pretend we erased nothing, and just let the racing write win. @@ -404,7 +425,7 @@ class ConcurrentMap_Grampa { } // We've been redirected to a new table. TURF_TRACE(ConcurrentMap_Grampa, 23, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell)); - Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash + Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); @@ -448,7 +469,7 @@ class ConcurrentMap_Grampa { return Value(ValueTraits::NullValue); Value value = cell->value.load(turf::Consume); if (value != Value(ValueTraits::Redirect)) - return value; // Found an existing value + return value; // Found an existing value // We've been redirected to a new table. Help with the migration. TURF_TRACE(ConcurrentMap_Grampa, 26, "[get] was redirected", uptr(table), 0); table->jobCoordinator.participate(); @@ -511,21 +532,21 @@ class ConcurrentMap_Grampa { void next() { TURF_ASSERT(m_table); - TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. + TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. for (;;) { searchInTable: m_idx++; if (m_idx <= m_table->sizeMask) { // Index still inside range of table. typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); - typename Details::Cell *cell = group->cells + (m_idx & 3); + typename Details::Cell* cell = group->cells + (m_idx & 3); m_hash = cell->hash.load(turf::Relaxed); if (m_hash != KeyTraits::NullHash) { // Cell has been reserved. m_value = cell->value.load(turf::Relaxed); TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); if (m_value != Value(ValueTraits::NullValue)) - return; // Yield this cell. + return; // Yield this cell. } } else { // We've advanced past the end of this table. @@ -537,7 +558,7 @@ class ConcurrentMap_Grampa { // Found the next table. m_table = nextTable; m_idx = -1; - goto searchInTable; // Continue iterating in this table. + goto searchInTable; // Continue iterating in this table. } } } diff --git a/junction/ConcurrentMap_LeapFrog.cpp b/junction/ConcurrentMap_LeapFrog.cpp index 4f1a539..46ef668 100644 --- a/junction/ConcurrentMap_LeapFrog.cpp +++ b/junction/ConcurrentMap_LeapFrog.cpp @@ -13,7 +13,7 @@ #include namespace junction { - + TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_LeapFrog, 17) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[Mutator] find constructor called") TURF_TRACE_DEFINE("[Mutator] find was redirected") diff --git a/junction/ConcurrentMap_LeapFrog.h b/junction/ConcurrentMap_LeapFrog.h index 3cefc6f..c18af4a 100644 --- a/junction/ConcurrentMap_LeapFrog.h +++ b/junction/ConcurrentMap_LeapFrog.h @@ -83,7 +83,7 @@ class ConcurrentMap_LeapFrog { return; m_value = m_cell->value.load(turf::Consume); if (m_value != Value(ValueTraits::Redirect)) - return; // Found an existing value + return; // Found an existing value // We've encountered a Redirect value. Help finish the migration. TURF_TRACE(ConcurrentMap_LeapFrog, 1, "[Mutator] find was redirected", uptr(m_table), 0); m_table->jobCoordinator.participate(); @@ -92,31 +92,32 @@ class ConcurrentMap_LeapFrog { } // Constructor: Insert cell - Mutator(ConcurrentMap_LeapFrog& map, Key key) : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { + Mutator(ConcurrentMap_LeapFrog& map, Key key) + : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { m_table = m_map.m_root.load(turf::Consume); ureg overflowIdx; - switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell - case Details::InsertResult_InsertedNew: { - // We've inserted a new cell. Don't load m_cell->value. - return; - } - case Details::InsertResult_AlreadyFound: { - // The hash was already found in the table. - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - // We've encountered a Redirect value. - TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); - break; // Help finish the migration. - } - return; // Found an existing value - } - case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); - break; + switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell + case Details::InsertResult_InsertedNew: { + // We've inserted a new cell. Don't load m_cell->value. + return; + } + case Details::InsertResult_AlreadyFound: { + // The hash was already found in the table. + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + // We've encountered a Redirect value. + TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); + break; // Help finish the migration. } + return; // Found an existing value + } + case Details::InsertResult_Overflow: { + Details::beginTableMigration(m_map, m_table, overflowIdx); + break; + } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); @@ -132,22 +133,25 @@ class ConcurrentMap_LeapFrog { Value exchangeValue(Value desired) { TURF_ASSERT(desired != Value(ValueTraits::NullValue)); - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { // Exchange was successful. Return previous value. - TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); + TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), + uptr(desired)); Value result = m_value; - m_value = desired; // Leave the mutator in a valid state + m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value", + uptr(m_table), uptr(m_value)); } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. @@ -163,25 +167,26 @@ class ConcurrentMap_LeapFrog { m_table = m_map.m_root.load(turf::Consume); m_value = Value(ValueTraits::NullValue); ureg overflowIdx; - switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell + switch (Details::insert(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(turf::Consume); if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + uptr(m_value)); break; } goto breakOuter; case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx); + TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), + overflowIdx); Details::beginTableMigration(m_map, m_table, overflowIdx); break; } // We were redirected... again } - breakOuter: - ; + breakOuter:; // Try again in the new table. } } @@ -191,21 +196,22 @@ class ConcurrentMap_LeapFrog { } Value eraseValue() { - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_LeapFrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell)); for (;;) { if (m_value == Value(ValueTraits::NullValue)) return Value(m_value); - TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. + TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. - TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. + TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. Value result = m_value; - m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state + m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_cell)); + TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + uptr(m_cell)); if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. // Pretend we erased nothing, and just let the racing write win. @@ -213,7 +219,7 @@ class ConcurrentMap_LeapFrog { } // We've been redirected to a new table. TURF_TRACE(ConcurrentMap_LeapFrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell)); - Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash + Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); @@ -227,7 +233,8 @@ class ConcurrentMap_LeapFrog { m_value = m_cell->value.load(turf::Relaxed); if (m_value != Value(ValueTraits::Redirect)) break; - TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), uptr(m_cell)); + TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), + uptr(m_cell)); } } } @@ -252,7 +259,7 @@ class ConcurrentMap_LeapFrog { return Value(ValueTraits::NullValue); Value value = cell->value.load(turf::Consume); if (value != Value(ValueTraits::Redirect)) - return value; // Found an existing value + return value; // Found an existing value // We've been redirected to a new table. Help with the migration. TURF_TRACE(ConcurrentMap_LeapFrog, 16, "[get] was redirected", uptr(table), uptr(hash)); table->jobCoordinator.participate(); @@ -295,18 +302,18 @@ class ConcurrentMap_LeapFrog { void next() { TURF_ASSERT(m_table); - TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. + TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. while (++m_idx <= m_table->sizeMask) { // Index still inside range of table. typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); - typename Details::Cell *cell = group->cells + (m_idx & 3); + typename Details::Cell* cell = group->cells + (m_idx & 3); m_hash = cell->hash.load(turf::Relaxed); if (m_hash != KeyTraits::NullHash) { // Cell has been reserved. m_value = cell->value.load(turf::Relaxed); TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); if (m_value != Value(ValueTraits::NullValue)) - return; // Yield this cell. + return; // Yield this cell. } } // That's the end of the map. diff --git a/junction/ConcurrentMap_Linear.cpp b/junction/ConcurrentMap_Linear.cpp index f351594..41fbb1e 100644 --- a/junction/ConcurrentMap_Linear.cpp +++ b/junction/ConcurrentMap_Linear.cpp @@ -13,7 +13,7 @@ #include namespace junction { - + TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 18) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[Mutator] find constructor called") TURF_TRACE_DEFINE("[Mutator] find was redirected") diff --git a/junction/ConcurrentMap_Linear.h b/junction/ConcurrentMap_Linear.h index 1de64b2..6d0fa98 100644 --- a/junction/ConcurrentMap_Linear.h +++ b/junction/ConcurrentMap_Linear.h @@ -85,7 +85,7 @@ class ConcurrentMap_Linear { return; m_value = m_cell->value.load(turf::Consume); if (m_value != Value(ValueTraits::Redirect)) - return; // Found an existing value + return; // Found an existing value // We've encountered a Redirect value. Help finish the migration. TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0)); m_table->jobCoordinator.participate(); @@ -94,30 +94,31 @@ class ConcurrentMap_Linear { } // Constructor: Insert cell - Mutator(ConcurrentMap_Linear& map, Key key) : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { + Mutator(ConcurrentMap_Linear& map, Key key) + : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) { TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { m_table = m_map.m_root.load(turf::Consume); - switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell - case Details::InsertResult_InsertedNew: { - // We've inserted a new cell. Don't load m_cell->value. - return; - } - case Details::InsertResult_AlreadyFound: { - // The hash was already found in the table. - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - // We've encountered a Redirect value. - TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); - break; // Help finish the migration. - } - return; // Found an existing value - } - case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table); - break; + switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell + case Details::InsertResult_InsertedNew: { + // We've inserted a new cell. Don't load m_cell->value. + return; + } + case Details::InsertResult_AlreadyFound: { + // The hash was already found in the table. + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + // We've encountered a Redirect value. + TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); + break; // Help finish the migration. } + return; // Found an existing value + } + case Details::InsertResult_Overflow: { + Details::beginTableMigration(m_map, m_table); + break; + } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); @@ -133,7 +134,7 @@ class ConcurrentMap_Linear { Value exchangeValue(Value desired) { TURF_ASSERT(desired != Value(ValueTraits::NullValue)); - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); for (;;) { Value oldValue = m_value; @@ -143,15 +144,18 @@ class ConcurrentMap_Linear { // Decrement remainingValues to ensure we have permission to (re)insert a value. prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed); if (prevRemainingValues <= 0) { - TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table), prevRemainingValues); + TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table), + prevRemainingValues); // Can't (re)insert any more values. // There are two ways this can happen. One with a TableMigration already in progress, and one without: // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert. // 2. Two threads race to insert the same key, and it's the last free cell in the table. - // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration failure, + // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration + // failure, // as LeapFrog and Grampa do...) - m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement - // Since we don't know whether there's already a TableMigration in progress, always attempt to start one here: + m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement + // Since we don't know whether there's already a TableMigration in progress, always attempt to start one + // here: Details::beginTableMigration(m_map, m_table); goto helpMigrate; } @@ -160,15 +164,17 @@ class ConcurrentMap_Linear { // Exchange was successful. Return previous value. TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); Value result = m_value; - m_value = desired; // Leave the mutator in a valid state + m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value)); - m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement + TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value", + uptr(m_table), uptr(m_value)); + m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. @@ -185,25 +191,26 @@ class ConcurrentMap_Linear { // Try again in the new table. m_table = m_map.m_root.load(turf::Consume); m_value = Value(ValueTraits::NullValue); - switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell + switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(turf::Consume); if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + uptr(m_value)); break; } goto breakOuter; case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0); + TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), + 0); Details::beginTableMigration(m_map, m_table); break; } // We were redirected... again } - breakOuter: - ; + breakOuter:; // Try again in the new table. } } @@ -213,30 +220,32 @@ class ConcurrentMap_Linear { } Value eraseValue() { - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells()); for (;;) { if (m_value == Value(ValueTraits::NullValue)) return Value(m_value); - TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. + TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. - TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. + TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); Value result = m_value; - m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state + m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table), m_cell - m_table->getCells()); + TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + m_cell - m_table->getCells()); if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. // Pretend we erased nothing, and just let the racing write win. return Value(ValueTraits::NullValue); } // We've been redirected to a new table. - TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table), m_cell - m_table->getCells()); - Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash + TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table), + m_cell - m_table->getCells()); + Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); @@ -250,7 +259,8 @@ class ConcurrentMap_Linear { m_value = m_cell->value.load(turf::Relaxed); if (m_value != Value(ValueTraits::Redirect)) break; - TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table), m_cell - m_table->getCells()); + TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table), + m_cell - m_table->getCells()); } } } @@ -275,7 +285,7 @@ class ConcurrentMap_Linear { return Value(ValueTraits::NullValue); Value value = cell->value.load(turf::Consume); if (value != Value(ValueTraits::Redirect)) - return value; // Found an existing value + return value; // Found an existing value // We've been redirected to a new table. Help with the migration. TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell)); table->jobCoordinator.participate(); @@ -318,17 +328,17 @@ class ConcurrentMap_Linear { void next() { TURF_ASSERT(m_table); - TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. + TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. while (++m_idx <= m_table->sizeMask) { // Index still inside range of table. - typename Details::Cell *cell = m_table->getCells() + m_idx; + typename Details::Cell* cell = m_table->getCells() + m_idx; m_hash = cell->hash.load(turf::Relaxed); if (m_hash != KeyTraits::NullHash) { // Cell has been reserved. m_value = cell->value.load(turf::Relaxed); TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); if (m_value != Value(ValueTraits::NullValue)) - return; // Yield this cell. + return; // Yield this cell. } } // That's the end of the map. diff --git a/junction/Core.h b/junction/Core.h index aacc8a7..e670b5d 100644 --- a/junction/Core.h +++ b/junction/Core.h @@ -14,7 +14,7 @@ #define JUNCTION_CORE_H //----------------------------------------------- -#include "junction_config.h" // junction_config.h generated by CMake. +#include "junction_config.h" // junction_config.h generated by CMake. // Default to true in case junction_config.h is missing entirely: #ifndef JUNCTION_USE_STRIPING diff --git a/junction/QSBR.cpp b/junction/QSBR.cpp index eaab358..b1ed138 100644 --- a/junction/QSBR.cpp +++ b/junction/QSBR.cpp @@ -94,7 +94,7 @@ void QSBR::flush() { // This is like saying that all contexts are quiescent, // so we can issue all actions at once. // No lock is taken. - TURF_RACE_DETECT_GUARD(m_flushRaceDetector); // There should be no concurrent operations + TURF_RACE_DETECT_GUARD(m_flushRaceDetector); // There should be no concurrent operations for (ureg i = 0; i < m_pendingActions.size(); i++) m_pendingActions[i](); m_pendingActions.clear(); diff --git a/junction/QSBR.h b/junction/QSBR.h index 98a9a1b..e8ab6e1 100644 --- a/junction/QSBR.h +++ b/junction/QSBR.h @@ -25,10 +25,10 @@ class QSBR { private: struct Action { void (*func)(void*); - uptr param[4]; // Size limit found experimentally. Verified by assert below. + uptr param[4]; // Size limit found experimentally. Verified by assert below. Action(void (*f)(void*), void* p, ureg paramSize) : func(f) { - TURF_ASSERT(paramSize <= sizeof(param)); // Verify size limit. + TURF_ASSERT(paramSize <= sizeof(param)); // Verify size limit. memcpy(¶m, p, paramSize); } void operator()() { @@ -70,11 +70,11 @@ class QSBR { void (T::*pmf)(); T* target; static void thunk(void* param) { - Closure* self = (Closure*) param; - TURF_CALL_MEMBER(*self->target, self->pmf)(); + Closure* self = (Closure*) param; + TURF_CALL_MEMBER (*self->target, self->pmf)(); } }; - Closure closure = { pmf, target }; + Closure closure = {pmf, target}; turf::LockGuard guard(m_mutex); TURF_RACE_DETECT_GUARD(m_flushRaceDetector); m_deferredActions.push_back(Action(Closure::thunk, &closure, sizeof(closure))); diff --git a/junction/SimpleJobCoordinator.h b/junction/SimpleJobCoordinator.h index 2732497..1fd90ee 100644 --- a/junction/SimpleJobCoordinator.h +++ b/junction/SimpleJobCoordinator.h @@ -58,7 +58,7 @@ class SimpleJobCoordinator { if (job == prevJob) { turf::LockGuard guard(pair.mutex); for (;;) { - job = m_job.loadNonatomic(); // No concurrent writes inside lock + job = m_job.loadNonatomic(); // No concurrent writes inside lock if (job != prevJob) break; pair.condVar.wait(guard); diff --git a/junction/SingleMap_Linear.h b/junction/SingleMap_Linear.h index 5ea2521..d347b5a 100644 --- a/junction/SingleMap_Linear.h +++ b/junction/SingleMap_Linear.h @@ -39,7 +39,7 @@ class SingleMap_Linear { static Cell* createTable(ureg size) { Cell* cells = (Cell*) TURF_HEAP.alloc(sizeof(Cell) * size); for (ureg i = 0; i < size; i++) - new(cells + i) Cell(KeyTraits::NullHash, Value(ValueTraits::NullValue)); + new (cells + i) Cell(KeyTraits::NullHash, Value(ValueTraits::NullValue)); return cells; } @@ -97,10 +97,10 @@ class SingleMap_Linear { idx &= map.m_sizeMask; m_cell = map.m_cells + idx; if (m_cell->hash == hash) - return; // Key found in table. + return; // Key found in table. if (m_cell->hash != KeyTraits::NullHash) - continue; // Slot is taken by another key. Try next slot. - m_cell = NULL; // Insert not allowed & key not found. + continue; // Slot is taken by another key. Try next slot. + m_cell = NULL; // Insert not allowed & key not found. return; } } @@ -114,13 +114,13 @@ class SingleMap_Linear { idx &= map.m_sizeMask; m_cell = map.m_cells + idx; if (m_cell->hash == hash) - return; // Key found in table. + return; // Key found in table. if (m_cell->hash != KeyTraits::NullHash) - continue; // Slot is taken by another key. Try next slot. + continue; // Slot is taken by another key. Try next slot. // Insert is allowed. Reserve this cell. if (isOverpopulated(map.m_population, map.m_sizeMask)) { map.migrateToNewTable((map.m_sizeMask + 1) * 2); - break; // Retry in new table. + break; // Retry in new table. } map.m_population++; m_cell->hash = hash; @@ -131,7 +131,7 @@ class SingleMap_Linear { } ~Iterator() { - TURF_ASSERT(!m_cell || m_cell->value != NULL); // Forbid leaving a cell half-inserted. + TURF_ASSERT(!m_cell || m_cell->value != NULL); // Forbid leaving a cell half-inserted. } public: @@ -146,7 +146,7 @@ class SingleMap_Linear { Value exchangeValue(Value desired) { TURF_ASSERT(m_cell); - TURF_ASSERT(desired != NULL); // Use eraseValue() + TURF_ASSERT(desired != NULL); // Use eraseValue() Value oldValue = m_cell->value; m_cell->value = desired; return oldValue; @@ -154,10 +154,10 @@ class SingleMap_Linear { Value erase() { TURF_ASSERT(m_cell); - TURF_ASSERT(m_cell->value != NULL); // Forbid erasing a cell that's not actually used. + TURF_ASSERT(m_cell->value != NULL); // Forbid erasing a cell that's not actually used. Value oldValue = m_cell->value; // Remove this cell by shuffling neighboring cells so there are no gaps in anyone's probe chain - ureg cellIdx = m_cell - m_map.m_cells; + ureg cellIdx = m_cell - m_map.m_cells; for (ureg neighborIdx = cellIdx + 1;; neighborIdx++) { neighborIdx &= m_map.m_sizeMask; Cell* neighbor = m_map.m_cells + neighborIdx; diff --git a/junction/details/Grampa.h b/junction/details/Grampa.h index b44a8c4..e628757 100644 --- a/junction/details/Grampa.h +++ b/junction/details/Grampa.h @@ -49,13 +49,13 @@ struct GrampaStats { GrampaCounter numFlatTrees; GrampaCounter numFlatTreeMigrations; - static GrampaStats Instance; // Zero-initialized + static GrampaStats Instance; // Zero-initialized }; #endif TURF_TRACE_DECLARE(Grampa, 37) -template +template struct Grampa { typedef typename Map::Hash Hash; typedef typename Map::Value Value; @@ -68,7 +68,7 @@ struct Grampa { static const ureg FlatTreeMigrationUnitSize = 32; static const ureg LinearSearchLimit = 128; static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links + TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain static const ureg MinTableSize = 8; @@ -98,14 +98,16 @@ struct Grampa { // eg. If the entire map is stored in a single table, then Table::shift == HASH_BITS. // If the entire map is stored in two tables, then Table::shift == (HASH_BITS - 1) for each table. // FlatTree::shift is always <= Table::shift for all the tables it contains. - const ureg sizeMask; // a power of two minus one + const ureg sizeMask; // a power of two minus one const Hash baseHash; const ureg unsafeRangeShift; - junction::striped::ManualResetEvent isPublished; // To prevent publishing a subtree before its parent is published (happened in testing) - junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) - SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration + junction::striped::ManualResetEvent + isPublished; // To prevent publishing a subtree before its parent is published (happened in testing) + junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) + SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration - Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift) : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) { + Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift) + : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) { } static Table* create(ureg tableSize, ureg baseHash, ureg unsafeShift) { @@ -114,7 +116,7 @@ struct Grampa { TURF_ASSERT(tableSize >= 4); ureg numGroups = tableSize >> 2; Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups); - new(table) Table(tableSize - 1, baseHash, (u8) unsafeShift); + new (table) Table(tableSize - 1, baseHash, (u8) unsafeShift); for (ureg i = 0; i < numGroups; i++) { CellGroup* group = table->getCellGroups() + i; for (ureg j = 0; j < 4; j++) { @@ -155,34 +157,36 @@ struct Grampa { }; Map& m_map; - Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree. + Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree. // If m_numDestinations == 1, m_shift == 0. - // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS - m_shift). + // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS + // - m_shift). // This ensures that m_shift is always less than sizeof(Hash) * 8, so that shifting by m_shift is not undefined behavior. // To determine the subtree index for a hash during migration, we use: (hash >> m_shift) & (m_numDestinations - 1) // A mask is used since we are only migrating a subtree -- not necessarily the entire map. ureg m_safeShift; - turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_workerStatus; // number of workers + end flag turf::Atomic m_overflowTableIndex; turf::Atomic m_unitsRemaining; ureg m_numSources; - ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated. + ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated. TableMigration(Map& map) : m_map(map) { } static TableMigration* create(Map& map, ureg numSources, ureg numDestinations) { - TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations); - new(migration) TableMigration(map); + TableMigration* migration = (TableMigration*) TURF_HEAP.alloc( + sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations); + new (migration) TableMigration(map); migration->m_workerStatus.storeNonatomic(0); migration->m_overflowTableIndex.storeNonatomic(-1); migration->m_unitsRemaining.storeNonatomic(0); migration->m_numSources = numSources; migration->m_numDestinations = numDestinations; - // Caller is responsible for filling in source & destination pointers #if JUNCTION_TRACK_GRAMPA_STATS GrampaStats::Instance.numTableMigrations.increment(); #endif + // Caller is responsible for filling in source & destination pointers return migration; } @@ -230,7 +234,7 @@ struct Grampa { // Each time the flattree doubles in size, shift decreases by 1. const ureg safeShift; junction::striped::Mutex mutex; - FlatTreeMigration* migration; // Protected by mutex + FlatTreeMigration* migration; // Protected by mutex FlatTree(ureg safeShift) : safeShift(safeShift), migration(NULL) { // A FlatTree always has at least two tables, so the shift is always safe. @@ -242,11 +246,11 @@ struct Grampa { TURF_ASSERT(safeShift < sizeof(Hash) * 8); ureg numLeaves = (Hash(-1) >> safeShift) + 1; FlatTree* flatTree = (FlatTree*) TURF_HEAP.alloc(sizeof(FlatTree) + sizeof(turf::Atomic) * numLeaves); - new(flatTree) FlatTree(safeShift); - // Caller will initialize flatTree->getTables() + new (flatTree) FlatTree(safeShift); #if JUNCTION_TRACK_GRAMPA_STATS GrampaStats::Instance.numFlatTrees.increment(); #endif + // Caller will initialize flatTree->getTables() return flatTree; } @@ -353,11 +357,7 @@ struct Grampa { } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. - enum InsertResult { - InsertResult_AlreadyFound, - InsertResult_InsertedNew, - InsertResult_Overflow - }; + enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insert(Hash hash, Table* table, ureg sizeMask, Cell*& cell, ureg& overflowIdx) { TURF_TRACE(Grampa, 3, "[insert] called", uptr(table), hash); TURF_ASSERT(table); @@ -407,7 +407,7 @@ struct Grampa { probeHash = cell->hash.load(turf::Acquire); } while (probeHash == KeyTraits::NullHash); } - TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked + TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked if (probeHash == hash) { TURF_TRACE(Grampa, 8, "[insert] found in probe chain", uptr(table), idx); return InsertResult_AlreadyFound; @@ -416,7 +416,7 @@ struct Grampa { // Reached the end of the link chain for this bucket. // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. ureg prevLinkIdx = idx; - TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. + TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit); while (linearProbesRemaining-- > 0) { idx++; @@ -430,13 +430,13 @@ struct Grampa { TURF_TRACE(Grampa, 9, "[insert] reserved cell", uptr(table), idx); TURF_ASSERT(probeDelta == 0); u8 desiredDelta = idx - prevLinkIdx; +#if TURF_WITH_ASSERTS // Note: another thread could actually set the link on our behalf (see below). -#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); #else prevLink->store(desiredDelta, turf::Relaxed); -#endif +#endif return InsertResult_InsertedNew; } else { TURF_TRACE(Grampa, 10, "[insert] race to reserve cell", uptr(table), idx); @@ -457,15 +457,15 @@ struct Grampa { // there's no guarantee that our own link chain will be well-formed by the time this function returns. // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS +#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); if (probeDelta == 0) TURF_TRACE(Grampa, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx); #else prevLink->store(desiredDelta, turf::Relaxed); -#endif - goto followLink; // Try to follow link chain for the bucket again. +#endif + goto followLink; // Try to follow link chain for the bucket again. } // Continue linear search... } @@ -485,7 +485,7 @@ struct Grampa { TURF_TRACE(Grampa, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0); } else { turf::LockGuard guard(table->mutex); - job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. + job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { TURF_TRACE(Grampa, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); } else { @@ -498,10 +498,12 @@ struct Grampa { migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); migration->getSources()[0].table = table; migration->getSources()[0].sourceIndex.storeNonatomic(0); - ureg subRangeShift = table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range) + ureg subRangeShift = + table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range) ureg hashOffsetDelta = subRangeShift < (sizeof(Hash) * 8) ? (ureg(1) << subRangeShift) : 0; for (ureg i = 0; i < numDestinations; i++) { - migration->getDestinations()[i] = Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift); + migration->getDestinations()[i] = + Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift); } // Publish the new migration. table->jobCoordinator.storeRelease(migration); @@ -540,7 +542,7 @@ struct Grampa { } beginTableMigrationToSize(map, table, nextTableSize, splitShift); } - + static FlatTreeMigration* createFlatTreeMigration(Map& map, FlatTree* flatTree, ureg shift) { turf::LockGuard guard(flatTree->mutex); if (!flatTree->migration) { @@ -551,13 +553,13 @@ struct Grampa { static FlatTreeMigration* getExistingFlatTreeMigration(FlatTree* flatTree) { turf::LockGuard guard(flatTree->mutex); - TURF_ASSERT(flatTree->migration); // Must already exist! + TURF_ASSERT(flatTree->migration); // Must already exist! return flatTree->migration; } }; // Grampa // Return index of the destination table that overflowed, or -1 if none -template +template sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { ureg srcSizeMask = srcTable->sizeMask; ureg safeShift = m_safeShift; @@ -575,14 +577,15 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { srcHash = srcCell->hash.load(turf::Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. - srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); + srcValue = + srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { @@ -591,7 +594,7 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert @@ -603,7 +606,7 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } - + // We've got a key/value pair to migrate. // Reserve a destination cell in dstTable. TURF_ASSERT(srcHash != KeyTraits::NullHash); @@ -632,8 +635,10 @@ sreg Grampa::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { // Copy srcValue to the destination. dstCell->value.store(srcValue, turf::Relaxed); // Try to place a Redirect marker in srcValue. - Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); - TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. + Value doubleCheckedSrcValue = + srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); + TURF_ASSERT(doubleCheckedSrcValue != + Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL @@ -681,21 +686,25 @@ void Grampa::TableMigration::run() { } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); if (startIdx >= source.table->sizeMask + 1) - break; // No more migration units in this table. Try next source table. + break; // No more migration units in this table. Try next source table. sreg overflowTableIndex = migrateRange(source.table, startIdx); - if (overflowTableIndex >= 0) { + if (overflowTableIndex >= 0) { // *** FAILED MIGRATION *** // TableMigration failed due to destination table overflow. - // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero. + // No other thread can declare the migration successful at this point, because *this* unit will never complete, + // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); - // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads before - // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread + // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads + // before + // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from + // the thread // that deals with it. // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins. sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed); if (oldIndex >= 0) - TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), uptr(oldIndex)); + TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), + uptr(oldIndex)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; } @@ -713,7 +722,8 @@ void Grampa::TableMigration::run() { endMigration: // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish + probeStatus = + m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); @@ -723,7 +733,7 @@ void Grampa::TableMigration::run() { // We're the very last worker thread. // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. TURF_ASSERT(probeStatus == 3); - sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point + sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point if (overflowTableIndex < 0) { // The migration succeeded. This is the most likely outcome. Publish the new subtree. m_map.publishTableMigration(this); @@ -738,13 +748,15 @@ void Grampa::TableMigration::run() { turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + uptr(checkedJob)); } else { TableMigration* migration; Table* overflowedTable = getDestinations()[overflowTableIndex]; if (overflowedTable->sizeMask + 1 < LeafSize) { // The entire map is contained in a small table. - TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), + uptr(checkedJob)); TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8); TURF_ASSERT(overflowedTable->baseHash == 0); TURF_ASSERT(m_numDestinations == 1); @@ -753,11 +765,13 @@ void Grampa::TableMigration::run() { migration->m_baseHash = 0; migration->m_safeShift = 0; // Double the destination table size. - migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash, overflowedTable->unsafeRangeShift); + migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash, + overflowedTable->unsafeRangeShift); } else { // The overflowed table is already the size of a leaf. Split it into two ranges. if (count == 1) { - TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), + uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2); migration->m_baseHash = m_baseHash; migration->m_safeShift = getUnsafeShift() - 1; @@ -767,7 +781,8 @@ void Grampa::TableMigration::run() { } count = 2; } else { - TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), + uptr(checkedJob)); migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations); migration->m_baseHash = m_baseHash; migration->m_safeShift = m_safeShift; @@ -779,7 +794,8 @@ void Grampa::TableMigration::run() { migration->getDestinations()[lo + i] = splitTable1; } ureg halfNumHashes = ureg(1) << (origTable->unsafeRangeShift - 1); - Table* splitTable2 = Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1); + Table* splitTable2 = + Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1); for (; i < count; i++) { migration->getDestinations()[lo + i] = splitTable2; } @@ -806,7 +822,7 @@ void Grampa::TableMigration::run() { DefaultQSBR.enqueue(&TableMigration::destroy, this); } -template +template void Grampa::FlatTreeMigration::run() { // Conditionally increment the shared # of workers. ureg probeStatus = m_workerStatus.load(turf::Relaxed); @@ -828,7 +844,7 @@ void Grampa::FlatTreeMigration::run() { for (;;) { ureg srcStart = m_sourceIndex.fetchAdd(FlatTreeMigrationUnitSize, turf::Relaxed); if (srcStart >= srcSize) - break; // No more migration units in this flattree. + break; // No more migration units in this flattree. // Migrate this range ureg srcEnd = turf::util::min(srcSize, srcStart + FlatTreeMigrationUnitSize); ureg dst = srcStart * repeat; @@ -853,7 +869,8 @@ void Grampa::FlatTreeMigration::run() { } // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. + probeStatus = m_workerStatus.fetchSub( + 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. return; @@ -861,7 +878,7 @@ void Grampa::FlatTreeMigration::run() { // We're the very last worker thread. // Publish the new flattree. - TURF_ASSERT(probeStatus == 3); // End flag must be set + TURF_ASSERT(probeStatus == 3); // End flag must be set m_map.publishFlatTreeMigration(this); m_completed.signal(); diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h index 7b1e395..e5af7b8 100644 --- a/junction/details/LeapFrog.h +++ b/junction/details/LeapFrog.h @@ -29,7 +29,7 @@ namespace details { TURF_TRACE_DECLARE(LeapFrog, 33) -template +template struct LeapFrog { typedef typename Map::Hash Hash; typedef typename Map::Value Value; @@ -40,7 +40,7 @@ struct LeapFrog { static const ureg TableMigrationUnitSize = 32; static const ureg LinearSearchLimit = 128; static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links + TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain struct Cell { @@ -60,9 +60,9 @@ struct LeapFrog { }; struct Table { - const ureg sizeMask; // a power of two minus one - turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) - SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration + const ureg sizeMask; // a power of two minus one + turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) + SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration Table(ureg sizeMask) : sizeMask(sizeMask) { } @@ -72,7 +72,7 @@ struct LeapFrog { TURF_ASSERT(tableSize >= 4); ureg numGroups = tableSize >> 2; Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups); - new(table) Table(tableSize - 1); + new (table) Table(tableSize - 1); for (ureg i = 0; i < numGroups; i++) { CellGroup* group = table->getCellGroups() + i; for (ureg j = 0; j < 4; j++) { @@ -108,7 +108,7 @@ struct LeapFrog { Map& m_map; Table* m_destination; - turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_workerStatus; // number of workers + end flag turf::Atomic m_overflowed; turf::Atomic m_unitsRemaining; ureg m_numSources; @@ -117,8 +117,9 @@ struct LeapFrog { } static TableMigration* create(Map& map, ureg numSources) { - TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); - new(migration) TableMigration(map); + TableMigration* migration = + (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); + new (migration) TableMigration(map); migration->m_workerStatus.storeNonatomic(0); migration->m_overflowed.storeNonatomic(false); migration->m_unitsRemaining.storeNonatomic(0); @@ -184,11 +185,7 @@ struct LeapFrog { } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. - enum InsertResult { - InsertResult_AlreadyFound, - InsertResult_InsertedNew, - InsertResult_Overflow - }; + enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) { TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash); TURF_ASSERT(table); @@ -239,7 +236,7 @@ struct LeapFrog { probeHash = cell->hash.load(turf::Acquire); } while (probeHash == KeyTraits::NullHash); } - TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked + TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked if (probeHash == hash) { TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx); return InsertResult_AlreadyFound; @@ -248,7 +245,7 @@ struct LeapFrog { // Reached the end of the link chain for this bucket. // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. ureg prevLinkIdx = idx; - TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. + TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit); while (linearProbesRemaining-- > 0) { idx++; @@ -262,7 +259,7 @@ struct LeapFrog { TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx); TURF_ASSERT(probeDelta == 0); u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS +#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); #else @@ -288,15 +285,15 @@ struct LeapFrog { // there's no guarantee that our own link chain will be well-formed by the time this function returns. // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS +#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); if (probeDelta == 0) TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx); #else prevLink->store(desiredDelta, turf::Relaxed); -#endif - goto followLink; // Try to follow link chain for the bucket again. +#endif + goto followLink; // Try to follow link chain for the bucket again. } // Continue linear search... } @@ -316,7 +313,7 @@ struct LeapFrog { TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0); } else { turf::LockGuard guard(table->mutex); - job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. + job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); } else { @@ -357,7 +354,7 @@ struct LeapFrog { } }; // LeapFrog -template +template bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { ureg srcSizeMask = srcTable->sizeMask; ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); @@ -372,14 +369,15 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) srcHash = srcCell->hash.load(turf::Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. - srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); + srcValue = + srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { @@ -388,7 +386,7 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert @@ -400,7 +398,7 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } - + // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. TURF_ASSERT(srcHash != KeyTraits::NullHash); @@ -427,8 +425,10 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // Copy srcValue to the destination. dstCell->value.store(srcValue, turf::Relaxed); // Try to place a Redirect marker in srcValue. - Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); - TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. + Value doubleCheckedSrcValue = + srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); + TURF_ASSERT(doubleCheckedSrcValue != + Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL @@ -475,20 +475,23 @@ void LeapFrog::TableMigration::run() { } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); if (startIdx >= source.table->sizeMask + 1) - break; // No more migration units in this table. Try next source table. + break; // No more migration units in this table. Try next source table. bool overflowed = !migrateRange(source.table, startIdx); if (overflowed) { // *** FAILED MIGRATION *** // TableMigration failed due to destination table overflow. - // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero. + // No other thread can declare the migration successful at this point, because *this* unit will never complete, + // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before - // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread + // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from + // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); if (oldOverflowed) - TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed)); + TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + uptr(oldOverflowed)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; } @@ -506,7 +509,8 @@ void LeapFrog::TableMigration::run() { endMigration: // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. + probeStatus = m_workerStatus.fetchSub( + 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); @@ -516,7 +520,7 @@ void LeapFrog::TableMigration::run() { // We're the very last worker thread. // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. TURF_ASSERT(probeStatus == 3); - bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point + bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point if (!overflowed) { // The migration succeeded. This is the most likely outcome. Publish the new subtree. m_map.publishTableMigration(this); @@ -528,7 +532,8 @@ void LeapFrog::TableMigration::run() { turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + uptr(checkedJob)); } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); // Double the destination table size. diff --git a/junction/details/Linear.h b/junction/details/Linear.h index f466cb0..d2ee367 100644 --- a/junction/details/Linear.h +++ b/junction/details/Linear.h @@ -29,7 +29,7 @@ namespace details { TURF_TRACE_DECLARE(Linear, 22) -template +template struct Linear { typedef typename Map::Hash Hash; typedef typename Map::Value Value; @@ -40,7 +40,7 @@ struct Linear { static const ureg TableMigrationUnitSize = 32; static const ureg LinearSearchLimit = 128; static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links + TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain struct Cell { @@ -49,21 +49,22 @@ struct Linear { }; struct Table { - const ureg sizeMask; // a power of two minus one + const ureg sizeMask; // a power of two minus one const ureg limitNumValues; turf::Atomic cellsRemaining; turf::Atomic valuesRemaining; - turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) - SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration + turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) + SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration - Table(ureg sizeMask, ureg limitNumValues) : sizeMask(sizeMask), limitNumValues(limitNumValues), - cellsRemaining(limitNumValues), valuesRemaining(limitNumValues) { + Table(ureg sizeMask, ureg limitNumValues) + : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues), + valuesRemaining(limitNumValues) { } static Table* create(ureg tableSize, ureg limitNumValues) { TURF_ASSERT(turf::util::isPowerOf2(tableSize)); Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize); - new(table) Table(tableSize - 1, limitNumValues); + new (table) Table(tableSize - 1, limitNumValues); for (ureg j = 0; j < tableSize; j++) { table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash); table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue)); @@ -91,7 +92,7 @@ struct Linear { Table* m_source; turf::Atomic m_sourceIndex; Table* m_destination; - turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_workerStatus; // number of workers + end flag turf::Atomic m_unitsRemaining; TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) { @@ -131,11 +132,7 @@ struct Linear { } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. - enum InsertResult { - InsertResult_AlreadyFound, - InsertResult_InsertedNew, - InsertResult_Overflow - }; + enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insert(Hash hash, Table* table, Cell*& cell) { TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash); TURF_ASSERT(table); @@ -149,7 +146,7 @@ struct Linear { uptr probeHash = cell->hash.load(turf::Relaxed); if (probeHash == hash) { TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx); - return InsertResult_AlreadyFound; // Key found in table. Return the existing cell. + return InsertResult_AlreadyFound; // Key found in table. Return the existing cell. } if (probeHash == KeyTraits::NullHash) { // It's an empty cell. Try to reserve it. @@ -158,7 +155,7 @@ struct Linear { if (prevCellsRemaining <= 0) { // Table is overpopulated. TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0); - table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement + table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement return InsertResult_Overflow; } // Try to reserve this cell. @@ -170,10 +167,10 @@ struct Linear { } // There was a race and another thread reserved that cell from under us. TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx); - table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement + table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement if (prevHash == hash) { TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx); - return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell. + return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell. } } // Try again in the next cell. @@ -188,7 +185,7 @@ struct Linear { TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0); } else { turf::LockGuard guard(table->mutex); - job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. + job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0); } else { @@ -205,16 +202,18 @@ struct Linear { // re-inserting more values than the new table can hold. // To set the new limitNumValues on the current table in an atomic fashion, // we update its valuesRemaining via CAS loop: - for(;;) { + for (;;) { // We must recalculate desiredValuesRemaining on each iteration of the CAS loop oldValuesInUse = oldValuesLimit - oldValuesRemaining; sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse; if (desiredValuesRemaining < 0) { - TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, desiredValuesRemaining); + TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, + desiredValuesRemaining); // Must recalculate nextTableSize. Goto, baby! goto calculateNextTableSize; } - if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, turf::Relaxed)) + if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, + turf::Relaxed)) break; // Success! // CAS failed because table->valuesRemaining was modified by another thread. // An updated value has been reloaded into oldValuesRemaining (modified by reference). @@ -235,7 +234,7 @@ struct Linear { } }; // Linear -template +template bool Linear::TableMigration::migrateRange(ureg startIdx) { ureg srcSizeMask = m_source->sizeMask; ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); @@ -250,14 +249,15 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { srcHash = srcCell->hash.load(turf::Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. - srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); + srcValue = + srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { @@ -266,15 +266,15 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx); } - + // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. TURF_ASSERT(srcHash != KeyTraits::NullHash); TURF_ASSERT(srcValue != Value(ValueTraits::NullValue)); - TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible. + TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible. Cell* dstCell; InsertResult result = insert(srcHash, m_destination, dstCell); // During migration, a hash can only exist in one place among all the source tables, @@ -287,8 +287,10 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { // Copy srcValue to the destination. dstCell->value.store(srcValue, turf::Relaxed); // Try to place a Redirect marker in srcValue. - Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); - TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. + Value doubleCheckedSrcValue = + srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); + TURF_ASSERT(doubleCheckedSrcValue != + Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL @@ -337,7 +339,7 @@ void Linear::TableMigration::run() { } ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); if (startIdx >= m_source->sizeMask + 1) - break; // No more migration units. + break; // No more migration units. migrateRange(startIdx); sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed); TURF_ASSERT(prevRemaining > 0); @@ -351,7 +353,8 @@ void Linear::TableMigration::run() { endMigration: // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. + probeStatus = m_workerStatus.fetchSub( + 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); diff --git a/junction/extra/MemHook_NBDS.cpp b/junction/extra/MemHook_NBDS.cpp index c736748..a0e1dce 100644 --- a/junction/extra/MemHook_NBDS.cpp +++ b/junction/extra/MemHook_NBDS.cpp @@ -15,14 +15,14 @@ #if JUNCTION_WITH_NBDS && NBDS_USE_TURF_HEAP extern "C" { -void mem_init (void) { +void mem_init(void) { } -void *nbd_malloc (size_t n) { +void* nbd_malloc(size_t n) { return TURF_HEAP.alloc(n); } -void nbd_free (void *x) { +void nbd_free(void* x) { TURF_HEAP.free(x); } } // extern "C" diff --git a/junction/extra/MemHook_TBB.cpp b/junction/extra/MemHook_TBB.cpp index 46dd76b..ea87b8b 100644 --- a/junction/extra/MemHook_TBB.cpp +++ b/junction/extra/MemHook_TBB.cpp @@ -18,7 +18,7 @@ void* tbbWrap_malloc(size_t size) { return TURF_HEAP.alloc(size); } -void tbbWrap_free(void* ptr) { +void tbbWrap_free(void* ptr) { TURF_HEAP.free(ptr); } @@ -26,7 +26,7 @@ void* tbbWrap_padded_allocate(size_t size, size_t alignment) { return TURF_HEAP.allocAligned(size, alignment); } -void tbbWrap_padded_free(void* ptr) { +void tbbWrap_padded_free(void* ptr) { TURF_HEAP.free(ptr); } #endif // JUNCTION_WITH_TBB && TBB_USE_TURF_HEAP diff --git a/junction/extra/impl/MapAdapter_CDS_Cuckoo.h b/junction/extra/impl/MapAdapter_CDS_Cuckoo.h index bb22a67..fd477f4 100644 --- a/junction/extra/impl/MapAdapter_CDS_Cuckoo.h +++ b/junction/extra/impl/MapAdapter_CDS_Cuckoo.h @@ -22,7 +22,7 @@ #include #include #include -#include // memcpy required by cuckoo_map.h +#include // memcpy required by cuckoo_map.h #include namespace junction { @@ -31,8 +31,8 @@ namespace extra { class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "CDS CuckooMap"; - - cds::gc::HP *m_hpGC; + + cds::gc::HP* m_hpGC; MapAdapter(ureg) { cds::Initialize(); @@ -92,7 +92,7 @@ class MapAdapter { void* get(u32 key) { void* result = NULL; - m_map.find(key, [&result](std::pair& item){ result = item.second; }); + m_map.find(key, [&result](std::pair& item) { result = item.second; }); return result; } diff --git a/junction/extra/impl/MapAdapter_CDS_Michael.h b/junction/extra/impl/MapAdapter_CDS_Michael.h index ce600f3..44c34a2 100644 --- a/junction/extra/impl/MapAdapter_CDS_Michael.h +++ b/junction/extra/impl/MapAdapter_CDS_Michael.h @@ -32,7 +32,7 @@ class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "CDS MichaelKVList"; - cds::gc::HP *m_hpGC; + cds::gc::HP* m_hpGC; MapAdapter(ureg) { cds::Initialize(); @@ -64,21 +64,18 @@ class MapAdapter { class Map { private: // List traits based on std::less predicate - struct ListTraits : public cds::container::michael_list::traits - { - typedef std::less less; + struct ListTraits : public cds::container::michael_list::traits { + typedef std::less less; }; // Ordered list - typedef cds::container::MichaelKVList< cds::gc::HP, u32, void*, ListTraits> OrderedList; + typedef cds::container::MichaelKVList OrderedList; // Map traits - struct MapTraits : public cds::container::michael_map::traits - { + struct MapTraits : public cds::container::michael_map::traits { struct hash { - size_t operator()( u32 i ) const - { - return cds::opt::v::hash()( i ); + size_t operator()(u32 i) const { + return cds::opt::v::hash()(i); } }; }; @@ -95,7 +92,7 @@ class MapAdapter { void* get(u32 key) { void* result = NULL; - m_map.find(key, [&result](std::pair& item){ result = item.second; }); + m_map.find(key, [&result](std::pair& item) { result = item.second; }); return result; } diff --git a/junction/extra/impl/MapAdapter_Grampa.h b/junction/extra/impl/MapAdapter_Grampa.h index 185fa94..9bf4bf4 100644 --- a/junction/extra/impl/MapAdapter_Grampa.h +++ b/junction/extra/impl/MapAdapter_Grampa.h @@ -24,14 +24,14 @@ namespace extra { class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "Junction Grampa map"; - + MapAdapter(ureg) { } class ThreadContext { private: QSBR::Context m_qsbrContext; - + public: ThreadContext(MapAdapter&, ureg) { } diff --git a/junction/extra/impl/MapAdapter_LeapFrog.h b/junction/extra/impl/MapAdapter_LeapFrog.h index 02b09ca..c277044 100644 --- a/junction/extra/impl/MapAdapter_LeapFrog.h +++ b/junction/extra/impl/MapAdapter_LeapFrog.h @@ -24,31 +24,31 @@ namespace extra { class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "Junction LeapFrog map"; - + MapAdapter(ureg) { } class ThreadContext { private: QSBR::Context m_qsbrContext; - + public: ThreadContext(MapAdapter&, ureg) { } - + void registerThread() { m_qsbrContext = DefaultQSBR.createContext(); } - + void unregisterThread() { DefaultQSBR.destroyContext(m_qsbrContext); } - + void update() { DefaultQSBR.update(m_qsbrContext); } }; - + typedef ConcurrentMap_LeapFrog Map; static ureg getInitialCapacity(ureg maxPopulation) { diff --git a/junction/extra/impl/MapAdapter_Linear.h b/junction/extra/impl/MapAdapter_Linear.h index 525eb74..d991782 100644 --- a/junction/extra/impl/MapAdapter_Linear.h +++ b/junction/extra/impl/MapAdapter_Linear.h @@ -24,31 +24,31 @@ namespace extra { class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "Junction Linear map"; - + MapAdapter(ureg) { } class ThreadContext { private: QSBR::Context m_qsbrContext; - + public: ThreadContext(MapAdapter&, ureg) { } - + void registerThread() { m_qsbrContext = DefaultQSBR.createContext(); } - + void unregisterThread() { DefaultQSBR.destroyContext(m_qsbrContext); } - + void update() { DefaultQSBR.update(m_qsbrContext); } }; - + typedef ConcurrentMap_Linear Map; static ureg getInitialCapacity(ureg maxPopulation) { diff --git a/junction/extra/impl/MapAdapter_Linear_Mutex.h b/junction/extra/impl/MapAdapter_Linear_Mutex.h index 4339596..fbb21df 100644 --- a/junction/extra/impl/MapAdapter_Linear_Mutex.h +++ b/junction/extra/impl/MapAdapter_Linear_Mutex.h @@ -61,7 +61,7 @@ class MapAdapter { turf::LockGuard guard(m_mutex); return m_map.get(key); } - + void* erase(u32 key) { turf::LockGuard guard(m_mutex); return m_map.erase(key); diff --git a/junction/extra/impl/MapAdapter_Linear_RWLock.h b/junction/extra/impl/MapAdapter_Linear_RWLock.h index b4e34aa..e9414e3 100644 --- a/junction/extra/impl/MapAdapter_Linear_RWLock.h +++ b/junction/extra/impl/MapAdapter_Linear_RWLock.h @@ -61,7 +61,7 @@ class MapAdapter { turf::SharedLockGuard guard(m_rwLock); return m_map.get(key); } - + void erase(u32 key) { turf::ExclusiveLockGuard guard(m_rwLock); m_map.erase(key); diff --git a/junction/extra/impl/MapAdapter_Null.h b/junction/extra/impl/MapAdapter_Null.h index 524f6e3..24254fc 100644 --- a/junction/extra/impl/MapAdapter_Null.h +++ b/junction/extra/impl/MapAdapter_Null.h @@ -21,7 +21,7 @@ namespace extra { class MapAdapter { public: static TURF_CONSTEXPR const char* MapName = "Null"; - + MapAdapter(ureg) { } diff --git a/junction/extra/impl/MapAdapter_StdMap.h b/junction/extra/impl/MapAdapter_StdMap.h index 4d287cc..fba38ae 100644 --- a/junction/extra/impl/MapAdapter_StdMap.h +++ b/junction/extra/impl/MapAdapter_StdMap.h @@ -62,7 +62,7 @@ class MapAdapter { MapType::iterator iter = m_map.find(key); return (iter == m_map.end()) ? NULL : iter->second; } - + void erase(u32 key) { std::lock_guard guard(m_mutex); m_map.erase(key); diff --git a/junction/striped/ConditionBank.h b/junction/striped/ConditionBank.h index 1c3e6d3..2556637 100644 --- a/junction/striped/ConditionBank.h +++ b/junction/striped/ConditionBank.h @@ -52,15 +52,15 @@ extern ConditionBank DefaultConditionBank; } // namespace junction #define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER() -#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) (junction::striped::DefaultConditionBank.get(objectPtr)) +#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) (junction::striped::DefaultConditionBank.get(objectPtr)) #else // JUNCTION_USE_STRIPING //----------------------------------- // Striping disabled //----------------------------------- -#define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER() junction::striped::ConditionPair m_conditionPair; -#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) ((objectPtr)->m_conditionPair) +#define JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER() junction::striped::ConditionPair m_conditionPair; +#define JUNCTION_STRIPED_CONDITIONBANK_GET(objectPtr) ((objectPtr)->m_conditionPair) #endif // JUNCTION_USE_STRIPING diff --git a/junction/striped/ManualResetEvent.h b/junction/striped/ManualResetEvent.h index e902f9a..2715edc 100644 --- a/junction/striped/ManualResetEvent.h +++ b/junction/striped/ManualResetEvent.h @@ -40,10 +40,11 @@ class ManualResetEvent { } void signal() { - u8 prevState = m_state.fetchOr(Signaled, turf::Release); // Synchronizes-with the load in wait (fast path) + u8 prevState = m_state.fetchOr(Signaled, turf::Release); // Synchronizes-with the load in wait (fast path) if (prevState & HasWaiters) { ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this); - turf::LockGuard guard(pair.mutex); // Prevents the wake from occuring in the middle of wait()'s critical section + turf::LockGuard guard( + pair.mutex); // Prevents the wake from occuring in the middle of wait()'s critical section pair.condVar.wakeAll(); } } @@ -57,7 +58,7 @@ class ManualResetEvent { } void wait() { - u8 state = m_state.load(turf::Acquire); // Synchronizes-with the fetchOr in signal (fast path) + u8 state = m_state.load(turf::Acquire); // Synchronizes-with the fetchOr in signal (fast path) if ((state & Signaled) == 0) { ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this); turf::LockGuard guard(pair.mutex); diff --git a/junction/striped/Mutex.h b/junction/striped/Mutex.h index c88b1ab..9921846 100644 --- a/junction/striped/Mutex.h +++ b/junction/striped/Mutex.h @@ -48,7 +48,7 @@ class Mutex { bool tryLock() { return (m_status.compareExchange(-1, 0, turf::Acquire) < 0); } - + void unlock() { if (m_status.exchange(-1, turf::Release) > 0) m_event.signal(); diff --git a/samples/MapCorrectnessTests/MapCorrectnessTests.cpp b/samples/MapCorrectnessTests/MapCorrectnessTests.cpp index ca7f9d2..3fee1b3 100644 --- a/samples/MapCorrectnessTests/MapCorrectnessTests.cpp +++ b/samples/MapCorrectnessTests/MapCorrectnessTests.cpp @@ -16,7 +16,7 @@ #include "TestInsertDifferentKeys.h" #include "TestChurn.h" #include -#include // for GrampaStats +#include // for GrampaStats static const ureg IterationsPerLog = 100; @@ -38,10 +38,14 @@ int main(int argc, const char** argv) { junction::DefaultQSBR.flush(); junction::details::GrampaStats& stats = junction::details::GrampaStats::Instance; printf("---------------------------\n"); - printf("numTables: %d/%d\n", (int) stats.numTables.current.load(turf::Relaxed), (int) stats.numTables.total.load(turf::Relaxed)); - printf("numTableMigrations: %d/%d\n", (int) stats.numTableMigrations.current.load(turf::Relaxed), (int) stats.numTableMigrations.total.load(turf::Relaxed)); - printf("numFlatTrees: %d/%d\n", (int) stats.numFlatTrees.current.load(turf::Relaxed), (int) stats.numFlatTrees.total.load(turf::Relaxed)); - printf("numFlatTreeMigrations: %d/%d\n", (int) stats.numFlatTreeMigrations.current.load(turf::Relaxed), (int) stats.numFlatTreeMigrations.total.load(turf::Relaxed)); + printf("numTables: %d/%d\n", (int) stats.numTables.current.load(turf::Relaxed), + (int) stats.numTables.total.load(turf::Relaxed)); + printf("numTableMigrations: %d/%d\n", (int) stats.numTableMigrations.current.load(turf::Relaxed), + (int) stats.numTableMigrations.total.load(turf::Relaxed)); + printf("numFlatTrees: %d/%d\n", (int) stats.numFlatTrees.current.load(turf::Relaxed), + (int) stats.numFlatTrees.total.load(turf::Relaxed)); + printf("numFlatTreeMigrations: %d/%d\n", (int) stats.numFlatTreeMigrations.current.load(turf::Relaxed), + (int) stats.numFlatTreeMigrations.total.load(turf::Relaxed)); #endif } diff --git a/samples/MapCorrectnessTests/TestChurn.h b/samples/MapCorrectnessTests/TestChurn.h index 5b93ff4..5ca48b8 100644 --- a/samples/MapCorrectnessTests/TestChurn.h +++ b/samples/MapCorrectnessTests/TestChurn.h @@ -50,9 +50,10 @@ class TestChurn { u32 m_relativePrime; std::vector m_threads; - TestChurn(TestEnvironment& env) : m_env(env), m_map(MapAdapter::getInitialCapacity(KeysInBlock * BlocksToMaintain * env.numThreads)) { + TestChurn(TestEnvironment& env) + : m_env(env), m_map(MapAdapter::getInitialCapacity(KeysInBlock * BlocksToMaintain * env.numThreads)) { m_threads.resize(m_env.numThreads); - m_rangePerThread = u32(-3) / m_env.numThreads; // from 2 to 0xffffffff inclusive + m_rangePerThread = u32(-3) / m_env.numThreads; // from 2 to 0xffffffff inclusive TURF_ASSERT(KeysInBlock * (BlocksToMaintain + BlocksToLookup + 1) < m_rangePerThread); u32 startIndex = 2; for (ureg i = 0; i < m_env.numThreads; i++) { @@ -90,76 +91,76 @@ class TestChurn { TURF_ASSERT(thread.insertIndex != thread.eraseIndex); for (sreg stepsRemaining = StepsPerIteration; stepsRemaining > 0; stepsRemaining--) { switch (thread.phase) { - case Phase_Insert: { - for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) { - u32 key = thread.insertIndex * m_relativePrime; - key = key ^ (key >> 16); - if (key >= 2) { - m_map.insert(key, (void*) uptr(key)); - } - if (++thread.insertIndex >= thread.rangeHi) - thread.insertIndex = thread.rangeLo; - TURF_ASSERT(thread.insertIndex != thread.eraseIndex); + case Phase_Insert: { + for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) { + u32 key = thread.insertIndex * m_relativePrime; + key = key ^ (key >> 16); + if (key >= 2) { + m_map.insert(key, (void*) uptr(key)); } - thread.phase = Phase_Lookup; - thread.lookupIndex = thread.insertIndex; - thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1))); - break; + if (++thread.insertIndex >= thread.rangeHi) + thread.insertIndex = thread.rangeLo; + TURF_ASSERT(thread.insertIndex != thread.eraseIndex); } - case Phase_Lookup: { - sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock); - thread.keysToCheck -= keysRemaining; - for (; keysRemaining > 0; keysRemaining--) { - if (thread.lookupIndex == thread.rangeLo) - thread.lookupIndex = thread.rangeHi; - thread.lookupIndex--; - u32 key = thread.lookupIndex * m_relativePrime; - key = key ^ (key >> 16); - if (key >= 2) { - if (m_map.get(key) != (void*) uptr(key)) - TURF_DEBUG_BREAK(); - } - } - if (thread.keysToCheck == 0) { - thread.phase = Phase_Erase; + thread.phase = Phase_Lookup; + thread.lookupIndex = thread.insertIndex; + thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1))); + break; + } + case Phase_Lookup: { + sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock); + thread.keysToCheck -= keysRemaining; + for (; keysRemaining > 0; keysRemaining--) { + if (thread.lookupIndex == thread.rangeLo) + thread.lookupIndex = thread.rangeHi; + thread.lookupIndex--; + u32 key = thread.lookupIndex * m_relativePrime; + key = key ^ (key >> 16); + if (key >= 2) { + if (m_map.get(key) != (void*) uptr(key)) + TURF_DEBUG_BREAK(); } - break; } - case Phase_Erase: { - for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) { - u32 key = thread.eraseIndex * m_relativePrime; - key = key ^ (key >> 16); - if (key >= 2) { - m_map.erase(key); - } - if (++thread.eraseIndex >= thread.rangeHi) - thread.eraseIndex = thread.rangeLo; - TURF_ASSERT(thread.insertIndex != thread.eraseIndex); - } - thread.phase = Phase_LookupDeleted; - thread.lookupIndex = thread.eraseIndex; - thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1))); - break; + if (thread.keysToCheck == 0) { + thread.phase = Phase_Erase; } - case Phase_LookupDeleted: { - sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock); - thread.keysToCheck -= keysRemaining; - for (; keysRemaining > 0; keysRemaining--) { - if (thread.lookupIndex == thread.rangeLo) - thread.lookupIndex = thread.rangeHi; - thread.lookupIndex--; - u32 key = thread.lookupIndex * m_relativePrime; - key = key ^ (key >> 16); - if (key >= 2) { - if (m_map.get(key)) - TURF_DEBUG_BREAK(); - } + break; + } + case Phase_Erase: { + for (sreg keysRemaining = KeysInBlock; keysRemaining > 0; keysRemaining--) { + u32 key = thread.eraseIndex * m_relativePrime; + key = key ^ (key >> 16); + if (key >= 2) { + m_map.erase(key); } - if (thread.keysToCheck == 0) { - thread.phase = Phase_Insert; + if (++thread.eraseIndex >= thread.rangeHi) + thread.eraseIndex = thread.rangeLo; + TURF_ASSERT(thread.insertIndex != thread.eraseIndex); + } + thread.phase = Phase_LookupDeleted; + thread.lookupIndex = thread.eraseIndex; + thread.keysToCheck = KeysInBlock + (thread.random.next32() % (KeysInBlock * (BlocksToLookup - 1))); + break; + } + case Phase_LookupDeleted: { + sreg keysRemaining = turf::util::min(thread.keysToCheck, KeysInBlock); + thread.keysToCheck -= keysRemaining; + for (; keysRemaining > 0; keysRemaining--) { + if (thread.lookupIndex == thread.rangeLo) + thread.lookupIndex = thread.rangeHi; + thread.lookupIndex--; + u32 key = thread.lookupIndex * m_relativePrime; + key = key ^ (key >> 16); + if (key >= 2) { + if (m_map.get(key)) + TURF_DEBUG_BREAK(); } - break; } + if (thread.keysToCheck == 0) { + thread.phase = Phase_Insert; + } + break; + } } } m_env.threads[threadIndex].update(); diff --git a/samples/MapCorrectnessTests/TestEnvironment.h b/samples/MapCorrectnessTests/TestEnvironment.h index 6246272..2ff9997 100644 --- a/samples/MapCorrectnessTests/TestEnvironment.h +++ b/samples/MapCorrectnessTests/TestEnvironment.h @@ -39,5 +39,4 @@ struct TestEnvironment { } }; - #endif // SAMPLES_MAPCORRECTNESSTESTS_TESTENVIRONMENT_H diff --git a/samples/MapCorrectnessTests/TestInsertDifferentKeys.h b/samples/MapCorrectnessTests/TestInsertDifferentKeys.h index df21e29..e545992 100644 --- a/samples/MapCorrectnessTests/TestInsertDifferentKeys.h +++ b/samples/MapCorrectnessTests/TestInsertDifferentKeys.h @@ -21,7 +21,7 @@ class TestInsertDifferentKeys { public: static const ureg KeysToInsert = 2048; TestEnvironment& m_env; - MapAdapter::Map *m_map; + MapAdapter::Map* m_map; turf::extra::Random m_random; u32 m_startIndex; u32 m_relativePrime; @@ -35,7 +35,7 @@ class TestInsertDifferentKeys { while (keysRemaining > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 m_map->insert(key, (void*) uptr(key)); keysRemaining--; } @@ -50,7 +50,7 @@ class TestInsertDifferentKeys { while (keysRemaining > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 m_map->erase(key); keysRemaining--; } @@ -78,7 +78,7 @@ class TestInsertDifferentKeys { while (leftToCheck > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 if (m_map->get(key) != (void*) uptr(key)) TURF_DEBUG_BREAK(); actualChecksum += key; @@ -100,14 +100,14 @@ class TestInsertDifferentKeys { for (MapAdapter::Map::Iterator iter(*m_map); iter.isValid(); iter.next()) { TURF_DEBUG_BREAK(); } - + for (ureg i = 0; i < m_env.numThreads; i++) { u32 index = m_startIndex + i * (KeysToInsert + 2); sreg leftToCheck = KeysToInsert; while (leftToCheck > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 if (m_map->get(key)) TURF_DEBUG_BREAK(); leftToCheck--; diff --git a/samples/MapCorrectnessTests/TestInsertSameKeys.h b/samples/MapCorrectnessTests/TestInsertSameKeys.h index 3e696af..9e87a23 100644 --- a/samples/MapCorrectnessTests/TestInsertSameKeys.h +++ b/samples/MapCorrectnessTests/TestInsertSameKeys.h @@ -21,7 +21,7 @@ class TestInsertSameKeys { public: static const ureg KeysToInsert = 2048; TestEnvironment& m_env; - MapAdapter::Map *m_map; + MapAdapter::Map* m_map; turf::extra::Random m_random; u32 m_startIndex; u32 m_relativePrime; @@ -35,7 +35,7 @@ class TestInsertSameKeys { while (keysRemaining > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 m_map->insert(key, (void*) uptr(key)); keysRemaining--; } @@ -50,7 +50,7 @@ class TestInsertSameKeys { while (keysRemaining > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 m_map->erase(key); keysRemaining--; } @@ -77,7 +77,7 @@ class TestInsertSameKeys { while (leftToCheck > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 if (m_map->get(key) != (void*) uptr(key)) TURF_DEBUG_BREAK(); actualChecksum += key; @@ -98,13 +98,13 @@ class TestInsertSameKeys { for (MapAdapter::Map::Iterator iter(*m_map); iter.isValid(); iter.next()) { TURF_DEBUG_BREAK(); } - + u32 index = m_startIndex; sreg leftToCheck = KeysToInsert; while (leftToCheck > 0) { u32 key = index * m_relativePrime; key = key ^ (key >> 16); - if (key >= 2) { // Don't insert 0 or 1 + if (key >= 2) { // Don't insert 0 or 1 if (m_map->get(key)) TURF_DEBUG_BREAK(); leftToCheck--; diff --git a/samples/MapPerformanceTests/MapPerformanceTests.cpp b/samples/MapPerformanceTests/MapPerformanceTests.cpp index 61dd0ae..806706b 100644 --- a/samples/MapPerformanceTests/MapPerformanceTests.cpp +++ b/samples/MapPerformanceTests/MapPerformanceTests.cpp @@ -61,7 +61,8 @@ struct SharedState { turf::Atomic doneFlag; SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk) - : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) { + : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), + itersPerChunk(itersPerChunk) { delayFactor = 0.5f; doneFlag.storeNonatomic(0); } @@ -103,7 +104,8 @@ class ThreadState { Stats m_stats; - ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) { + ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) + : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) { m_threadIndex = threadIndex; m_rangeLo = rangeLo; m_rangeHi = rangeHi; @@ -121,7 +123,7 @@ class ThreadState { void initialPopulate() { TURF_ASSERT(m_addIndex == m_removeIndex); - MapAdapter::Map *map = m_shared.map; + MapAdapter::Map* map = m_shared.map; for (ureg i = 0; i < m_shared.numKeysPerThread; i++) { u32 key = m_addIndex * Prime; map->insert(key, (void*) (key & ~uptr(3))); @@ -131,7 +133,7 @@ class ThreadState { } void run() { - MapAdapter::Map *map = m_shared.map; + MapAdapter::Map* map = m_shared.map; turf::CPUTimer::Converter converter; Delay delay(m_shared.delayFactor); Stats stats; @@ -221,10 +223,10 @@ class ThreadState { }; static const turf::extra::Option Options[] = { - { "readsPerWrite", 'r', true, "number of reads per write" }, - { "itersPerChunk", 'i', true, "number of iterations per chunk" }, - { "chunks", 'c', true, "number of chunks to execute" }, - { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" }, + {"readsPerWrite", 'r', true, "number of reads per write"}, + {"itersPerChunk", 'i', true, "number of iterations per chunk"}, + {"chunks", 'c', true, "number of chunks to execute"}, + {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"}, }; int main(int argc, const char** argv) { @@ -262,10 +264,8 @@ int main(int argc, const char** argv) { printf("'itersPerChunk': %d,\n", (int) itersPerChunk); printf("'chunks': %d,\n", (int) chunks); printf("'keepChunkFraction': %f,\n", keepChunkFraction); - printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), - printf("'points': [\n"); - for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) - { + printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n"); + for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) { shared.delayFactor = delayFactor; std::vector kickTotals; @@ -285,11 +285,8 @@ int main(int argc, const char** argv) { totals += kickTotals[t]; } - printf(" (%f, %d, %d, %f),\n", - shared.delayFactor, - int(totals.workUnitsDone), - int(totals.mapOpsDone), - totals.duration); + printf(" (%f, %d, %d, %f),\n", shared.delayFactor, int(totals.workUnitsDone), int(totals.mapOpsDone), + totals.duration); } printf("],\n"); printf("}\n"); diff --git a/samples/MapScalabilityTests/MapScalabilityTests.cpp b/samples/MapScalabilityTests/MapScalabilityTests.cpp index 6cf2e05..db6d953 100644 --- a/samples/MapScalabilityTests/MapScalabilityTests.cpp +++ b/samples/MapScalabilityTests/MapScalabilityTests.cpp @@ -40,7 +40,8 @@ struct SharedState { turf::Atomic doneFlag; SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk) - : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) { + : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), + itersPerChunk(itersPerChunk) { doneFlag.storeNonatomic(0); numThreads = 0; } @@ -79,7 +80,8 @@ class ThreadState { Stats m_stats; - ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) { + ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) + : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) { m_threadIndex = threadIndex; m_rangeLo = rangeLo; m_rangeHi = rangeHi; @@ -97,7 +99,7 @@ class ThreadState { void initialPopulate() { TURF_ASSERT(m_addIndex == m_removeIndex); - MapAdapter::Map *map = m_shared.map; + MapAdapter::Map* map = m_shared.map; for (ureg i = 0; i < m_shared.numKeysPerThread; i++) { u32 key = m_addIndex * Prime; if (key >= 2) @@ -108,7 +110,7 @@ class ThreadState { } void run() { - MapAdapter::Map *map = m_shared.map; + MapAdapter::Map* map = m_shared.map; turf::CPUTimer::Converter converter; Stats stats; ureg lookupIndex = m_rangeLo; @@ -193,16 +195,16 @@ class ThreadState { }; static const turf::extra::Option Options[] = { - { "readsPerWrite", 'r', true, "number of reads per write" }, - { "itersPerChunk", 'i', true, "number of iterations per chunk" }, - { "chunks", 'c', true, "number of chunks to execute" }, - { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" }, + {"readsPerWrite", 'r', true, "number of reads per write"}, + {"itersPerChunk", 'i', true, "number of iterations per chunk"}, + {"chunks", 'c', true, "number of chunks to execute"}, + {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"}, }; int main(int argc, const char** argv) { turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options)); options.parse(argc, argv); - ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite); + ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite); ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk); ureg chunks = options.getInteger("chunks", DefaultChunks); double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0); @@ -238,8 +240,7 @@ int main(int argc, const char** argv) { printf("'itersPerChunk': %d,\n", (int) itersPerChunk); printf("'chunks': %d,\n", (int) chunks); printf("'keepChunkFraction': %f,\n", keepChunkFraction); - printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"), - printf("'points': [\n"); + printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n"); for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) { if (shared.numThreads > 1) { // Spawn and register a new thread @@ -263,10 +264,7 @@ int main(int argc, const char** argv) { totals += kickTotals[t]; } - printf(" (%d, %d, %f),\n", - int(shared.numThreads), - int(totals.mapOpsDone), - totals.duration); + printf(" (%d, %d, %f),\n", int(shared.numThreads), int(totals.mapOpsDone), totals.duration); } printf("],\n"); printf("}\n");