Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup hash index initialization #3577

Merged
merged 2 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 1 addition & 43 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <cstdint>
#include <string_view>
#include <type_traits>

#include "common/cast.h"
#include "common/type_utils.h"
Expand Down Expand Up @@ -96,8 +97,6 @@ class HashIndex final : public OnDiskHashIndex {
private:
bool lookupInPersistentIndex(transaction::TransactionType trxType, Key key,
common::offset_t& result);
// The following two functions are only used in prepareCommit, and are not thread-safe.
void insertIntoPersistentIndex(Key key, common::offset_t value);
void deleteFromPersistentIndex(Key key);

entry_pos_t findMatchedEntryInSlot(transaction::TransactionType trxType, const Slot<T>& slot,
Expand Down Expand Up @@ -143,21 +142,7 @@ class HashIndex final : public OnDiskHashIndex {
const T& keyInEntry) const {
return keyToLookup == keyInEntry;
}
template<typename K, bool isCopyEntry>
void copyAndUpdateSlotHeader(Slot<T>& slot, entry_pos_t entryPos, K key, common::offset_t value,
uint8_t fingerprint) {
if constexpr (isCopyEntry) {
slot.entries[entryPos].copyFrom((uint8_t*)&key);
} else {
insert(key, slot.entries[entryPos], value);
}
slot.header.setEntryValid(entryPos, fingerprint);
}

inline void insert(Key key, SlotEntry<T>& entry, common::offset_t offset) {
entry.key = key;
entry.value = offset;
}
inline common::hash_t hashStored(transaction::TransactionType /*trxType*/, const T& key) const {
return HashIndexUtils::hash(key);
}
Expand Down Expand Up @@ -186,30 +171,6 @@ class HashIndex final : public OnDiskHashIndex {

std::vector<std::pair<SlotInfo, Slot<T>>> getChainedSlots(slot_id_t pSlotId);

template<typename K, bool isCopyEntry>
void copyKVOrEntryToSlot(SlotIterator& iter, K key, common::offset_t value,
uint8_t fingerprint) {
if (iter.slot.header.numEntries() == getSlotCapacity<T>()) {
// Allocate a new oSlot, insert the entry to the new oSlot, and update slot's
// nextOvfSlotId.
Slot<T> newSlot;
auto entryPos = 0u; // Always insert to the first entry when there is a new slot.
copyAndUpdateSlotHeader<K, isCopyEntry>(newSlot, entryPos, key, value, fingerprint);
iter.slot.header.nextOvfSlotId = appendOverflowSlot(std::move(newSlot));
} else {
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!iter.slot.header.isEntryValid(entryPos)) {
copyAndUpdateSlotHeader<K, isCopyEntry>(iter.slot, entryPos, key, value,
fingerprint);
break;
}
}
}
updateSlot(iter.slotInfo, iter.slot);
}

void copyEntryToSlot(slot_id_t slotId, const T& entry, uint8_t fingerprint);

private:
DBFileIDAndName dbFileIDAndName;
BufferManager& bm;
Expand Down Expand Up @@ -315,9 +276,6 @@ class PrimaryKeyIndex {

common::PhysicalTypeID keyTypeID() { return keyDataTypeID; }

static void createEmptyHashIndexFiles(common::PhysicalTypeID typeID, const std::string& fName,
common::VirtualFileSystem* vfs, main::ClientContext* context);

void writeHeaders();

private:
Expand Down
23 changes: 15 additions & 8 deletions src/include/storage/index/hash_index_slot.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include <array>
#include <cstdint>
#include <cstring>
#include <type_traits>

#include "common/assert.h"
#include "common/constants.h"
#include "common/types/internal_id_t.h"
#include <bit>
Expand Down Expand Up @@ -48,19 +51,23 @@ class SlotHeader {
uint32_t validityMask;
slot_id_t nextOvfSlotId;
};
static_assert(std::has_unique_object_representations_v<SlotHeader>);

template<typename T>
struct SlotEntry {
SlotEntry() : key{}, value{} {}
SlotEntry(T _key, common::offset_t _value) : key{_key}, value{_value} {
// Zero padding, if any
if constexpr (sizeof(T) + sizeof(common::offset_t) < sizeof(SlotEntry<T>)) {
auto padding = sizeof(SlotEntry<T>) - sizeof(T) - sizeof(common::offset_t);
memset(reinterpret_cast<uint8_t*>(&key) + sizeof(T), 0, padding);
// Assumes that all the padding follows the key
KU_ASSERT((std::byte*)&key + sizeof(key) + padding == (std::byte*)&value);
}
}
SlotEntry() : SlotEntry(T{}, 0) {}

T key;
common::offset_t value;

inline uint8_t* data() const { return (uint8_t*)&key; }

// otherEntry must be a pointer to the beginning of another slot's key field
inline void copyFrom(const uint8_t* otherEntry) {
memcpy(data(), otherEntry, sizeof(SlotEntry<T>));
}
};

template<typename T>
Expand Down
4 changes: 1 addition & 3 deletions src/include/storage/index/in_mem_hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ class InMemHashIndex final {

inline void insert(Key key, Slot<T>* slot, uint8_t entryPos, common::offset_t value,
uint8_t fingerprint) {
auto& entry = slot->entries[entryPos];
entry.key = key;
entry.value = value;
slot->entries[entryPos] = SlotEntry<T>(key, value);
slot->header.setEntryValid(entryPos, fingerprint);
KU_ASSERT(HashIndexUtils::getFingerprintForHash(HashIndexUtils::hash(key)) == fingerprint);
}
Expand Down
3 changes: 0 additions & 3 deletions src/include/storage/storage_structure/disk_array_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ class DiskArrayCollection {
DiskArrayCollection(BMFileHandle& fileHandle, DBFileID dbFileID, BufferManager* bufferManager,
WAL* wal, common::page_idx_t firstHeaderPage = 0, bool bypassWAL = false);

static void writeEmptyHeadersToFile(FileHandle& handle, common::page_idx_t firstHeaderPage,
size_t numHeaders);

void prepareCommit();

void checkpointInMemory() {
Expand Down
4 changes: 0 additions & 4 deletions src/include/storage/storage_structure/overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ class OverflowFile {
OverflowFile(const DBFileIDAndName& dbFileIdAndName, BufferManager* bufferManager, WAL* wal,
bool readOnly, common::VirtualFileSystem* vfs, main::ClientContext* context);

// For creating an overflow file from scratch
static void createEmptyFiles(const std::string& fName, common::VirtualFileSystem* vfs,
main::ClientContext* context);

// Handles contain a reference to the overflow file
OverflowFile(OverflowFile&& other) = delete;

Expand Down
31 changes: 0 additions & 31 deletions src/include/storage/wal_replayer_utils.h

This file was deleted.

3 changes: 1 addition & 2 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ add_library(kuzu_storage
storage_utils.cpp
storage_version_info.cpp
undo_buffer.cpp
wal_replayer.cpp
wal_replayer_utils.cpp)
wal_replayer.cpp)

set(ALL_OBJECT_FILES
${ALL_OBJECT_FILES} $<TARGET_OBJECTS:kuzu_storage>
Expand Down
96 changes: 22 additions & 74 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,22 +207,6 @@ bool HashIndex<T>::lookupInPersistentIndex(TransactionType trxType, Key key, off
return false;
}

template<typename T>
void HashIndex<T>::insertIntoPersistentIndex(Key key, offset_t value) {
auto& header = this->indexHeaderForWriteTrx;
reserve(1);
auto hashValue = HashIndexUtils::hash(key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue);
auto iter = getSlotIterator(HashIndexUtils::getPrimarySlotIdForHash(header, hashValue),
TransactionType::WRITE);
// Find a slot with free entries
while (iter.slot.header.numEntries() == getSlotCapacity<T>() &&
nextChainedSlot(TransactionType::WRITE, iter))
;
copyKVOrEntryToSlot<Key, false /* insert kv */>(iter, key, value, fingerprint);
header.numEntries++;
}

template<typename T>
void HashIndex<T>::deleteFromPersistentIndex(Key key) {
auto trxType = TransactionType::WRITE;
Expand Down Expand Up @@ -327,13 +311,6 @@ inline bool HashIndex<ku_string_t>::equals(transaction::TransactionType trxType,
return false;
}

template<>
inline void HashIndex<ku_string_t>::insert(std::string_view key, SlotEntry<ku_string_t>& entry,
common::offset_t offset) {
entry.key = overflowFileHandle->writeString(key);
entry.value = offset;
}

template<typename T>
void HashIndex<T>::splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit) {
auto originalSlotIterator = pSlots->iter_mut();
Expand All @@ -360,12 +337,12 @@ void HashIndex<T>::splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit
}
// Copy entry from old slot to new slot
const auto& key = originalSlot->entries[originalEntryPos].key;
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto newSlotId = hash & header.higherLevelHashMask;
const hash_t hash = this->hashStored(TransactionType::WRITE, key);
const auto newSlotId = hash & header.higherLevelHashMask;
if (newSlotId != header.nextSplitSlotId) {
KU_ASSERT(newSlotId == newSlotIterator.idx());
copyAndUpdateSlotHeader<const T&, true>(*newSlot, newEntryPos,
originalSlot->entries[originalEntryPos].key, UINT32_MAX,
newSlot->entries[newEntryPos] = originalSlot->entries[originalEntryPos];
newSlot->header.setEntryValid(newEntryPos,
originalSlot->header.fingerprints[originalEntryPos]);
originalSlot->header.setEntryInvalid(originalEntryPos);
newEntryPos++;
Expand Down Expand Up @@ -394,17 +371,6 @@ std::vector<std::pair<SlotInfo, Slot<T>>> HashIndex<T>::getChainedSlots(slot_id_
return slots;
}

template<typename T>
void HashIndex<T>::copyEntryToSlot(slot_id_t slotId, const T& entry, uint8_t fingerprint) {
auto iter = getSlotIterator(slotId, TransactionType::WRITE);
do {
if (iter.slot.header.numEntries() < getSlotCapacity<T>()) {
// Found a slot with empty space.
break;
}
} while (nextChainedSlot(TransactionType::WRITE, iter));
copyKVOrEntryToSlot<const T&, true /* copy entry */>(iter, entry, UINT32_MAX, fingerprint);
}
template<typename T>
void HashIndex<T>::reserve(uint64_t newEntries) {
slot_id_t numRequiredEntries =
Expand Down Expand Up @@ -586,12 +552,12 @@ size_t HashIndex<T>::mergeSlot(const std::vector<HashIndexEntryView>& slotToMerg
}
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos, it->entry->key, UINT32_MAX,
it->fingerprint);
diskSlot->entries[diskEntryPos] = *it->entry;
diskSlot->header.setEntryValid(diskEntryPos, it->fingerprint);
KU_ASSERT([&]() {
auto key = it->entry->key;
auto hash = hashStored(TransactionType::WRITE, key);
auto primarySlot =
const auto& key = it->entry->key;
const auto hash = hashStored(TransactionType::WRITE, key);
const auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(indexHeaderForWriteTrx, hash);
KU_ASSERT(it->fingerprint == HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == diskSlotId);
Expand Down Expand Up @@ -631,15 +597,12 @@ PrimaryKeyIndex::PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool re
: hasRunPrepareCommit{false}, keyDataTypeID(keyDataType),
fileHandle{bufferManager.getBMFileHandle(dbFileIDAndName.fName,
readOnly ? FileHandle::O_PERSISTENT_FILE_READ_ONLY :
FileHandle::O_PERSISTENT_FILE_NO_CREATE,
FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS,
BMFileHandle::FileVersionedType::VERSIONED_FILE, vfs, context)},
bufferManager{bufferManager}, dbFileIDAndName{dbFileIDAndName}, wal{*wal},
hashIndexDiskArrays{std::make_unique<DiskArrayCollection>(*fileHandle,
dbFileIDAndName.dbFileID, &bufferManager, wal,
INDEX_HEADER_PAGES /*firstHeaderPage follows the index header pages*/,
true /*bypassWAL*/)} {
bufferManager{bufferManager}, dbFileIDAndName{dbFileIDAndName}, wal{*wal} {
bool newIndex = fileHandle->getNumPages() == 0;

if (fileHandle->getNumPages() == 0) {
if (newIndex) {
fileHandle->addNewPages(INDEX_HEADER_PAGES);
hashIndexHeadersForReadTrx.resize(NUM_HASH_INDEXES);
hashIndexHeadersForWriteTrx.resize(NUM_HASH_INDEXES);
Expand All @@ -659,11 +622,20 @@ PrimaryKeyIndex::PrimaryKeyIndex(const DBFileIDAndName& dbFileIDAndName, bool re
hashIndexHeadersForReadTrx.end());
KU_ASSERT(headerIdx == NUM_HASH_INDEXES);
}
hashIndexDiskArrays = std::make_unique<DiskArrayCollection>(*fileHandle,
dbFileIDAndName.dbFileID, &bufferManager, wal,
INDEX_HEADER_PAGES /*firstHeaderPage follows the index header pages*/, true /*bypassWAL*/);

if (keyDataTypeID == PhysicalTypeID::STRING) {
overflowFile = std::make_unique<OverflowFile>(dbFileIDAndName, &bufferManager, wal,
readOnly, vfs, context);
}
if (newIndex) {
// Each index has a primary slot array and an overflow slot array
for (size_t i = 0; i < NUM_HASH_INDEXES * 2; i++) {
benjaminwinger marked this conversation as resolved.
Show resolved Hide resolved
hashIndexDiskArrays->addDiskArray();
}
}

hashIndices.reserve(NUM_HASH_INDEXES);
TypeUtils::visit(
Expand Down Expand Up @@ -815,29 +787,5 @@ void PrimaryKeyIndex::prepareRollback() {

PrimaryKeyIndex::~PrimaryKeyIndex() = default;

void PrimaryKeyIndex::createEmptyHashIndexFiles(PhysicalTypeID typeID, const std::string& fName,
VirtualFileSystem* vfs, main::ClientContext* context) {
FileHandle fileHandle(fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs, context);
fileHandle.addNewPages(INDEX_HEADER_PAGES);
// Write HashIndexHeaders
std::array<uint8_t, BufferPoolConstants::PAGE_4KB_SIZE> buffer{};
HashIndexHeaderOnDisk indexHeader{};
auto* data = reinterpret_cast<HashIndexHeaderOnDisk*>(buffer.data());
for (size_t i = 0; i < INDEX_HEADERS_PER_PAGE; i++) {
memcpy(data + i, &indexHeader, sizeof(indexHeader));
}
for (size_t i = 0; i < INDEX_HEADER_PAGES; i++) {
fileHandle.writePage(buffer.data(), i);
}

// Write Disk Array Headers (one for each pSlot and oSlot disk array)
DiskArrayCollection::writeEmptyHeadersToFile(fileHandle, INDEX_HEADER_PAGES,
NUM_HASH_INDEXES * 2);

if (typeID == PhysicalTypeID::STRING) {
OverflowFile::createEmptyFiles(StorageUtils::getOverflowFileName(fName), vfs, context);
}
}

} // namespace storage
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,7 @@ template<>
void InMemHashIndex<ku_string_t>::insert(std::string_view key, Slot<ku_string_t>* slot,
uint8_t entryPos, offset_t offset, uint8_t fingerprint) {
auto& entry = slot->entries[entryPos];
entry.key = overflowFileHandle->writeString(key);
entry.value = offset;
entry = SlotEntry<ku_string_t>(overflowFileHandle->writeString(key), offset);
slot->header.setEntryValid(entryPos, fingerprint);
}

Expand Down
6 changes: 2 additions & 4 deletions src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include "storage/store/node_table.h"
#include "storage/wal/wal_record.h"
#include "storage/wal_replayer.h"
#include "storage/wal_replayer_utils.h"

using namespace kuzu::catalog;
using namespace kuzu::common;
Expand Down Expand Up @@ -116,8 +115,6 @@ void StorageManager::createNodeTable(table_id_t tableID, NodeTableCatalogEntry*
main::ClientContext* context) {
KU_ASSERT(context != nullptr);
nodesStatisticsAndDeletedIDs->addNodeStatisticsAndDeletedIDs(nodeTableEntry);
WALReplayerUtils::createEmptyHashIndexFiles(nodeTableEntry, databasePath,
context->getVFSUnsafe(), context);
tables[tableID] = std::make_unique<NodeTable>(this, nodeTableEntry, &memoryManager,
context->getVFSUnsafe(), context);
}
Expand Down Expand Up @@ -207,7 +204,8 @@ void StorageManager::dropTable(table_id_t tableID, VirtualFileSystem* vfs) {
switch (tableType) {
case TableType::NODE: {
nodesStatisticsAndDeletedIDs->removeTableStatistic(tableID);
WALReplayerUtils::removeHashIndexFile(vfs, tableID, databasePath);
vfs->removeFileIfExists(
StorageUtils::getNodeIndexFName(vfs, databasePath, tableID, FileVersionType::ORIGINAL));
} break;
case TableType::REL: {
relsStatistics->removeTableStatistic(tableID);
Expand Down
Loading
Loading