Skip to content

Commit

Permalink
Merge pull request #2599 from kuzudb/update-functional
Browse files Browse the repository at this point in the history
Use updatePage function for write operations
  • Loading branch information
benjaminwinger authored Feb 20, 2024
2 parents f9a7e38 + 2203b62 commit c79a8a6
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 184 deletions.
35 changes: 0 additions & 35 deletions src/include/storage/storage_structure/db_file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,6 @@
namespace kuzu {
namespace storage {

struct WALPageIdxAndFrame {
WALPageIdxAndFrame(
common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL, uint8_t* frame)
: originalPageIdx{originalPageIdx}, pageIdxInWAL{pageIdxInWAL}, frame{frame} {}

WALPageIdxAndFrame(WALPageIdxAndFrame& other)
: originalPageIdx{other.originalPageIdx},
pageIdxInWAL{other.pageIdxInWAL}, frame{other.frame} {}

common::page_idx_t originalPageIdx;
common::page_idx_t pageIdxInWAL;
uint8_t* frame;
};

struct WALPageIdxPosInPageAndFrame : WALPageIdxAndFrame {
WALPageIdxPosInPageAndFrame(WALPageIdxAndFrame walPageIdxAndFrame, uint16_t posInPage)
: WALPageIdxAndFrame(walPageIdxAndFrame), posInPage{posInPage} {}

uint16_t posInPage;
};

class DBFileUtils {
public:
constexpr static common::page_idx_t NULL_PAGE_IDX = common::INVALID_PAGE_IDX;
Expand All @@ -42,10 +21,6 @@ class DBFileUtils {
BMFileHandle& fileHandle, common::page_idx_t physicalPageIdx, WAL& wal,
transaction::TransactionType trxType);

static WALPageIdxAndFrame createWALVersionIfNecessaryAndPinPage(
common::page_idx_t originalPageIdx, bool insertingNewPage, BMFileHandle& fileHandle,
DBFileID dbFileID, BufferManager& bufferManager, WAL& wal);

static void readWALVersionOfPage(BMFileHandle& fileHandle, common::page_idx_t originalPageIdx,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& readOp);

Expand All @@ -61,16 +36,6 @@ class DBFileUtils {
static void updatePage(BMFileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t originalPageIdx, bool isInsertingNewPage, BufferManager& bufferManager,
WAL& wal, const std::function<void(uint8_t*)>& updateOp);

// Unpins the WAL version of a page that was updated and releases the lock of the page (recall
// we use the same lock to do operations on both the original and WAL versions of the page).
static void unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal);

private:
static void unpinPageIdxInWALAndReleaseOriginalPageLock(common::page_idx_t pageIdxInWAL,
common::page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager,
WAL& wal);
};
} // namespace storage
} // namespace kuzu
12 changes: 1 addition & 11 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"
Expand Down Expand Up @@ -57,20 +56,11 @@ class DiskOverflowFile {
}

private:
void addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
bool addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
void setStringOverflowWithoutLock(
const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();

// If necessary creates a second version (backed by the WAL) of a page that contains the value
// that will be written to. The position of the value, which determines the original page to
// update, is computed from the given elementOffset and numElementsPerPage argument. Obtains
// *and does not release* the lock original page. Pins and updates the WAL version of the
// page. The original page lock will be released when the WALPageIdxPosInPageAndFrame goes out
// of scope
WALPageIdxPosInPageAndFrame createWALVersionOfPageIfNecessaryForElement(
PageCursor originalPageCursor);

private:
static const common::page_idx_t END_OF_PAGE =
common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t);
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ class Column {
virtual void writeValue(const ColumnChunkMetadata& chunkMeta,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
virtual void writeValues(const ColumnChunkMetadata& chunkMeta,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, const uint8_t* data,
virtual void writeValues(ReadState& state, common::offset_t offsetInChunk, const uint8_t* data,
common::offset_t dataOffset = 0, common::offset_t numValues = 1);

// Produces a page cursor for the offset relative to the given node group
Expand All @@ -146,8 +145,8 @@ class Column {
// Produces a page cursor for the absolute node offset
PageCursor getPageCursorForOffset(transaction::TransactionType transactionType,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk);
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk);
void updatePageWithCursor(
PageCursor cursor, const std::function<void(uint8_t*, common::offset_t)>& writeOp);

virtual std::unique_ptr<ColumnChunk> getEmptyChunkForCommit(uint64_t capacity);

Expand Down
104 changes: 62 additions & 42 deletions src/storage/storage_structure/db_file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,61 @@ using namespace kuzu::common;
namespace kuzu {
namespace storage {

struct WALPageIdxAndFrame {
WALPageIdxAndFrame(
common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL, uint8_t* frame)
: originalPageIdx{originalPageIdx}, pageIdxInWAL{pageIdxInWAL}, frame{frame} {}

WALPageIdxAndFrame(WALPageIdxAndFrame& other)
: originalPageIdx{other.originalPageIdx},
pageIdxInWAL{other.pageIdxInWAL}, frame{other.frame} {}

common::page_idx_t originalPageIdx;
common::page_idx_t pageIdxInWAL;
uint8_t* frame;
};

WALPageIdxAndFrame createWALVersionIfNecessaryAndPinPage(page_idx_t originalPageIdx,
bool insertingNewPage, BMFileHandle& fileHandle, DBFileID dbFileID,
BufferManager& bufferManager, WAL& wal) {
fileHandle.addWALPageIdxGroupIfNecessary(originalPageIdx);
page_idx_t pageIdxInWAL;
uint8_t* walFrame;
fileHandle.acquireWALPageIdxLock(originalPageIdx);
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
}
fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
}
return {originalPageIdx, pageIdxInWAL, walFrame};
}

void unpinPageIdxInWALAndReleaseOriginalPageLock(page_idx_t pageIdxInWAL,
page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
if (originalPageIdx != INVALID_PAGE_IDX) {
bufferManager.unpin(*wal.fileHandle, pageIdxInWAL);
fileHandle.releaseWALPageIdxLock(originalPageIdx);
}
}

void unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
unpinPageIdxInWALAndReleaseOriginalPageLock(walPageIdxAndFrame.pageIdxInWAL,
walPageIdxAndFrame.originalPageIdx, fileHandle, bufferManager, wal);
}

std::pair<BMFileHandle*, page_idx_t> DBFileUtils::getFileHandleAndPhysicalPageIdxToPin(
BMFileHandle& fileHandle, page_idx_t physicalPageIdx, WAL& wal,
transaction::TransactionType trxType) {
Expand Down Expand Up @@ -34,9 +89,14 @@ common::page_idx_t DBFileUtils::insertNewPage(BMFileHandle& fileHandle, DBFileID
void DBFileUtils::updatePage(BMFileHandle& fileHandle, DBFileID dbFileID,
page_idx_t originalPageIdx, bool isInsertingNewPage, BufferManager& bufferManager, WAL& wal,
const std::function<void(uint8_t*)>& updateOp) {
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
auto walPageIdxAndFrame = createWALVersionIfNecessaryAndPinPage(
originalPageIdx, isInsertingNewPage, fileHandle, dbFileID, bufferManager, wal);
updateOp(walPageIdxAndFrame.frame);
try {
updateOp(walPageIdxAndFrame.frame);
} catch (Exception& e) {
unpinWALPageAndReleaseOriginalPageLock(walPageIdxAndFrame, fileHandle, bufferManager, wal);
throw;
}
unpinWALPageAndReleaseOriginalPageLock(walPageIdxAndFrame, fileHandle, bufferManager, wal);
}

Expand All @@ -50,45 +110,5 @@ void DBFileUtils::readWALVersionOfPage(BMFileHandle& fileHandle, page_idx_t orig
pageIdxInWAL, originalPageIdx, fileHandle, bufferManager, wal);
}

WALPageIdxAndFrame DBFileUtils::createWALVersionIfNecessaryAndPinPage(page_idx_t originalPageIdx,
bool insertingNewPage, BMFileHandle& fileHandle, DBFileID dbFileID,
BufferManager& bufferManager, WAL& wal) {
fileHandle.addWALPageIdxGroupIfNecessary(originalPageIdx);
page_idx_t pageIdxInWAL;
uint8_t* walFrame;
fileHandle.acquireWALPageIdxLock(originalPageIdx);
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
}
fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
}
return {originalPageIdx, pageIdxInWAL, walFrame};
}

void DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
DBFileUtils::unpinPageIdxInWALAndReleaseOriginalPageLock(walPageIdxAndFrame.pageIdxInWAL,
walPageIdxAndFrame.originalPageIdx, fileHandle, bufferManager, wal);
}

void DBFileUtils::unpinPageIdxInWALAndReleaseOriginalPageLock(page_idx_t pageIdxInWAL,
page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
if (originalPageIdx != INVALID_PAGE_IDX) {
bufferManager.unpin(*wal.fileHandle, pageIdxInWAL);
fileHandle.releaseWALPageIdxLock(originalPageIdx);
}
}
} // namespace storage
} // namespace kuzu
48 changes: 15 additions & 33 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/exception/message.h"
#include "common/type_utils.h"
#include "common/types/types.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"

using lock_t = std::unique_lock<std::mutex>;
Expand Down Expand Up @@ -51,20 +52,20 @@ std::string DiskOverflowFile::readString(TransactionType trxType, const ku_strin
}
}

void DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend) {
bool DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend) {
if ((nextPosToWriteTo.elemPosInPage == 0) ||
((nextPosToWriteTo.elemPosInPage + numBytesToAppend - 1) > END_OF_PAGE)) {
page_idx_t newPageIdx =
DBFileUtils::insertNewPage(*fileHandle, dbFileID, *bufferManager, *wal);
// Write new page index to end of previous page
if (nextPosToWriteTo.pageIdx > 0) {
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
nextPosToWriteTo.pageIdx - 1, false, *fileHandle, dbFileID, *bufferManager, *wal);
memcpy(walPageIdxAndFrame.frame + END_OF_PAGE, &newPageIdx, sizeof(page_idx_t));
DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(
walPageIdxAndFrame, *fileHandle, *bufferManager, *wal);
DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx - 1,
false /* existing page */, *bufferManager, *wal,
[&](auto frame) { memcpy(frame + END_OF_PAGE, &newPageIdx, sizeof(page_idx_t)); });
}
return true;
}
return false;
}

void DiskOverflowFile::setStringOverflowWithoutLock(
Expand All @@ -80,24 +81,18 @@ void DiskOverflowFile::setStringOverflowWithoutLock(
}
}
int32_t remainingLength = len;
TypeUtils::encodeOverflowPtr(
diskDstString.overflowPtr, nextPosToWriteTo.pageIdx, nextPosToWriteTo.elemPosInPage);
while (remainingLength > 0) {
auto bytesWritten = len - remainingLength;
auto numBytesToWriteInPage = std::min(
static_cast<uint32_t>(remainingLength), END_OF_PAGE - nextPosToWriteTo.elemPosInPage);
addNewPageIfNecessaryWithoutLock(remainingLength);
auto updatedPageInfoAndWALPageFrame =
createWALVersionOfPageIfNecessaryForElement(nextPosToWriteTo);
memcpy(updatedPageInfoAndWALPageFrame.frame + updatedPageInfoAndWALPageFrame.posInPage,
srcRawString + bytesWritten, numBytesToWriteInPage);
DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(
updatedPageInfoAndWALPageFrame, *fileHandle, *bufferManager, *wal);
// The overflow pointer should point to the first position, so it must only run
// the first iteration of the loop
if (static_cast<uint64_t>(remainingLength) == len) {
TypeUtils::encodeOverflowPtr(diskDstString.overflowPtr,
updatedPageInfoAndWALPageFrame.originalPageIdx,
updatedPageInfoAndWALPageFrame.posInPage);
}
bool insertingNewPage = addNewPageIfNecessaryWithoutLock(remainingLength);
DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx, insertingNewPage,
*bufferManager, *wal, [&](auto frame) {
memcpy(frame + nextPosToWriteTo.elemPosInPage, srcRawString + bytesWritten,
numBytesToWriteInPage);
});
remainingLength -= numBytesToWriteInPage;
nextPosToWriteTo.elemPosInPage += numBytesToWriteInPage;
if (nextPosToWriteTo.elemPosInPage >= END_OF_PAGE) {
Expand Down Expand Up @@ -127,18 +122,5 @@ void DiskOverflowFile::logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock
}
}

WALPageIdxPosInPageAndFrame DiskOverflowFile::createWALVersionOfPageIfNecessaryForElement(
PageCursor originalPageCursor) {
bool insertingNewPage = false;
if (originalPageCursor.pageIdx >= fileHandle->getNumPages()) {
KU_ASSERT(originalPageCursor.pageIdx == fileHandle->getNumPages());
DBFileUtils::insertNewPage(*fileHandle, dbFileID, *bufferManager, *wal);
insertingNewPage = true;
}
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
originalPageCursor.pageIdx, insertingNewPage, *fileHandle, dbFileID, *bufferManager, *wal);
return {walPageIdxAndFrame, originalPageCursor.elemPosInPage};
}

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit c79a8a6

Please sign in to comment.