Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use updatePage function for write operations #2599

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 0 additions & 35 deletions src/include/storage/storage_structure/db_file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,6 @@
namespace kuzu {
namespace storage {

struct WALPageIdxAndFrame {
WALPageIdxAndFrame(
common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL, uint8_t* frame)
: originalPageIdx{originalPageIdx}, pageIdxInWAL{pageIdxInWAL}, frame{frame} {}

WALPageIdxAndFrame(WALPageIdxAndFrame& other)
: originalPageIdx{other.originalPageIdx},
pageIdxInWAL{other.pageIdxInWAL}, frame{other.frame} {}

common::page_idx_t originalPageIdx;
common::page_idx_t pageIdxInWAL;
uint8_t* frame;
};

struct WALPageIdxPosInPageAndFrame : WALPageIdxAndFrame {
WALPageIdxPosInPageAndFrame(WALPageIdxAndFrame walPageIdxAndFrame, uint16_t posInPage)
: WALPageIdxAndFrame(walPageIdxAndFrame), posInPage{posInPage} {}

uint16_t posInPage;
};

class DBFileUtils {
public:
constexpr static common::page_idx_t NULL_PAGE_IDX = common::INVALID_PAGE_IDX;
Expand All @@ -42,10 +21,6 @@ class DBFileUtils {
BMFileHandle& fileHandle, common::page_idx_t physicalPageIdx, WAL& wal,
transaction::TransactionType trxType);

static WALPageIdxAndFrame createWALVersionIfNecessaryAndPinPage(
common::page_idx_t originalPageIdx, bool insertingNewPage, BMFileHandle& fileHandle,
DBFileID dbFileID, BufferManager& bufferManager, WAL& wal);

static void readWALVersionOfPage(BMFileHandle& fileHandle, common::page_idx_t originalPageIdx,
BufferManager& bufferManager, WAL& wal, const std::function<void(uint8_t*)>& readOp);

Expand All @@ -61,16 +36,6 @@ class DBFileUtils {
static void updatePage(BMFileHandle& fileHandle, DBFileID dbFileID,
common::page_idx_t originalPageIdx, bool isInsertingNewPage, BufferManager& bufferManager,
WAL& wal, const std::function<void(uint8_t*)>& updateOp);

// Unpins the WAL version of a page that was updated and releases the lock of the page (recall
// we use the same lock to do operations on both the original and WAL versions of the page).
static void unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal);

private:
static void unpinPageIdxInWALAndReleaseOriginalPageLock(common::page_idx_t pageIdxInWAL,
common::page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager,
WAL& wal);
};
} // namespace storage
} // namespace kuzu
12 changes: 1 addition & 11 deletions src/include/storage/storage_structure/disk_overflow_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include "common/constants.h"
#include "common/types/types.h"
#include "storage/buffer_manager/buffer_manager.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"
#include "storage/wal/wal.h"
#include "transaction/transaction.h"
Expand Down Expand Up @@ -57,20 +56,11 @@ class DiskOverflowFile {
}

private:
void addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
bool addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend);
void setStringOverflowWithoutLock(
const char* inMemSrcStr, uint64_t len, common::ku_string_t& diskDstString);
void logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock();

// If necessary creates a second version (backed by the WAL) of a page that contains the value
// that will be written to. The position of the value, which determines the original page to
// update, is computed from the given elementOffset and numElementsPerPage argument. Obtains
// *and does not release* the lock original page. Pins and updates the WAL version of the
// page. The original page lock will be released when the WALPageIdxPosInPageAndFrame goes out
// of scope
WALPageIdxPosInPageAndFrame createWALVersionOfPageIfNecessaryForElement(
PageCursor originalPageCursor);

private:
static const common::page_idx_t END_OF_PAGE =
common::BufferPoolConstants::PAGE_4KB_SIZE - sizeof(common::page_idx_t);
Expand Down
7 changes: 3 additions & 4 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ class Column {
virtual void writeValue(const ColumnChunkMetadata& chunkMeta,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
virtual void writeValues(const ColumnChunkMetadata& chunkMeta,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk, const uint8_t* data,
virtual void writeValues(ReadState& state, common::offset_t offsetInChunk, const uint8_t* data,
common::offset_t dataOffset = 0, common::offset_t numValues = 1);

// Produces a page cursor for the offset relative to the given node group
Expand All @@ -147,8 +146,8 @@ class Column {
// Produces a page cursor for the absolute node offset
PageCursor getPageCursorForOffset(transaction::TransactionType transactionType,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk);
WALPageIdxPosInPageAndFrame createWALVersionOfPageForValue(
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk);
void updatePageWithCursor(
PageCursor cursor, const std::function<void(uint8_t*, common::offset_t)>& writeOp);

virtual std::unique_ptr<ColumnChunk> getEmptyChunkForCommit(uint64_t capacity);

Expand Down
104 changes: 62 additions & 42 deletions src/storage/storage_structure/db_file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,61 @@
namespace kuzu {
namespace storage {

struct WALPageIdxAndFrame {
WALPageIdxAndFrame(
common::page_idx_t originalPageIdx, common::page_idx_t pageIdxInWAL, uint8_t* frame)
: originalPageIdx{originalPageIdx}, pageIdxInWAL{pageIdxInWAL}, frame{frame} {}

WALPageIdxAndFrame(WALPageIdxAndFrame& other)
: originalPageIdx{other.originalPageIdx},
pageIdxInWAL{other.pageIdxInWAL}, frame{other.frame} {}

common::page_idx_t originalPageIdx;
common::page_idx_t pageIdxInWAL;
uint8_t* frame;
};

WALPageIdxAndFrame createWALVersionIfNecessaryAndPinPage(page_idx_t originalPageIdx,
bool insertingNewPage, BMFileHandle& fileHandle, DBFileID dbFileID,
BufferManager& bufferManager, WAL& wal) {
fileHandle.addWALPageIdxGroupIfNecessary(originalPageIdx);
page_idx_t pageIdxInWAL;
uint8_t* walFrame;
fileHandle.acquireWALPageIdxLock(originalPageIdx);
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
}
fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
}
return {originalPageIdx, pageIdxInWAL, walFrame};
}

void unpinPageIdxInWALAndReleaseOriginalPageLock(page_idx_t pageIdxInWAL,

Check warning on line 49 in src/storage/storage_structure/db_file_utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/db_file_utils.cpp#L49

Added line #L49 was not covered by tests
page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
if (originalPageIdx != INVALID_PAGE_IDX) {

Check warning on line 51 in src/storage/storage_structure/db_file_utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/db_file_utils.cpp#L51

Added line #L51 was not covered by tests
bufferManager.unpin(*wal.fileHandle, pageIdxInWAL);
fileHandle.releaseWALPageIdxLock(originalPageIdx);
}
}

Check warning on line 55 in src/storage/storage_structure/db_file_utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/db_file_utils.cpp#L55

Added line #L55 was not covered by tests

void unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
unpinPageIdxInWALAndReleaseOriginalPageLock(walPageIdxAndFrame.pageIdxInWAL,
walPageIdxAndFrame.originalPageIdx, fileHandle, bufferManager, wal);
}

std::pair<BMFileHandle*, page_idx_t> DBFileUtils::getFileHandleAndPhysicalPageIdxToPin(
BMFileHandle& fileHandle, page_idx_t physicalPageIdx, WAL& wal,
transaction::TransactionType trxType) {
Expand Down Expand Up @@ -34,9 +89,14 @@
void DBFileUtils::updatePage(BMFileHandle& fileHandle, DBFileID dbFileID,
page_idx_t originalPageIdx, bool isInsertingNewPage, BufferManager& bufferManager, WAL& wal,
const std::function<void(uint8_t*)>& updateOp) {
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
auto walPageIdxAndFrame = createWALVersionIfNecessaryAndPinPage(
originalPageIdx, isInsertingNewPage, fileHandle, dbFileID, bufferManager, wal);
updateOp(walPageIdxAndFrame.frame);
try {
updateOp(walPageIdxAndFrame.frame);
} catch (Exception& e) {
unpinWALPageAndReleaseOriginalPageLock(walPageIdxAndFrame, fileHandle, bufferManager, wal);
throw;
}

Check warning on line 99 in src/storage/storage_structure/db_file_utils.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/storage_structure/db_file_utils.cpp#L96-L99

Added lines #L96 - L99 were not covered by tests
unpinWALPageAndReleaseOriginalPageLock(walPageIdxAndFrame, fileHandle, bufferManager, wal);
}

Expand All @@ -50,45 +110,5 @@
pageIdxInWAL, originalPageIdx, fileHandle, bufferManager, wal);
}

WALPageIdxAndFrame DBFileUtils::createWALVersionIfNecessaryAndPinPage(page_idx_t originalPageIdx,
bool insertingNewPage, BMFileHandle& fileHandle, DBFileID dbFileID,
BufferManager& bufferManager, WAL& wal) {
fileHandle.addWALPageIdxGroupIfNecessary(originalPageIdx);
page_idx_t pageIdxInWAL;
uint8_t* walFrame;
fileHandle.acquireWALPageIdxLock(originalPageIdx);
if (fileHandle.hasWALPageVersionNoWALPageIdxLock(originalPageIdx)) {
pageIdxInWAL = fileHandle.getWALPageIdxNoWALPageIdxLock(originalPageIdx);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::READ_PAGE);
} else {
pageIdxInWAL =
wal.logPageUpdateRecord(dbFileID, originalPageIdx /* pageIdxInOriginalFile */);
walFrame = bufferManager.pin(
*wal.fileHandle, pageIdxInWAL, BufferManager::PageReadPolicy::DONT_READ_PAGE);
if (!insertingNewPage) {
bufferManager.optimisticRead(fileHandle, originalPageIdx, [&](uint8_t* frame) -> void {
memcpy(walFrame, frame, BufferPoolConstants::PAGE_4KB_SIZE);
});
}
fileHandle.setWALPageIdxNoLock(originalPageIdx /* pageIdxInOriginalFile */, pageIdxInWAL);
wal.fileHandle->setLockedPageDirty(pageIdxInWAL);
}
return {originalPageIdx, pageIdxInWAL, walFrame};
}

void DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(WALPageIdxAndFrame& walPageIdxAndFrame,
BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
DBFileUtils::unpinPageIdxInWALAndReleaseOriginalPageLock(walPageIdxAndFrame.pageIdxInWAL,
walPageIdxAndFrame.originalPageIdx, fileHandle, bufferManager, wal);
}

void DBFileUtils::unpinPageIdxInWALAndReleaseOriginalPageLock(page_idx_t pageIdxInWAL,
page_idx_t originalPageIdx, BMFileHandle& fileHandle, BufferManager& bufferManager, WAL& wal) {
if (originalPageIdx != INVALID_PAGE_IDX) {
bufferManager.unpin(*wal.fileHandle, pageIdxInWAL);
fileHandle.releaseWALPageIdxLock(originalPageIdx);
}
}
} // namespace storage
} // namespace kuzu
48 changes: 15 additions & 33 deletions src/storage/storage_structure/disk_overflow_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "common/exception/message.h"
#include "common/type_utils.h"
#include "common/types/types.h"
#include "storage/storage_structure/db_file_utils.h"
#include "storage/storage_utils.h"

using lock_t = std::unique_lock<std::mutex>;
Expand Down Expand Up @@ -48,20 +49,20 @@ std::string DiskOverflowFile::readString(TransactionType trxType, const ku_strin
}
}

void DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend) {
bool DiskOverflowFile::addNewPageIfNecessaryWithoutLock(uint32_t numBytesToAppend) {
if ((nextPosToWriteTo.elemPosInPage == 0) ||
((nextPosToWriteTo.elemPosInPage + numBytesToAppend - 1) > END_OF_PAGE)) {
page_idx_t newPageIdx =
DBFileUtils::insertNewPage(*fileHandle, dbFileID, *bufferManager, *wal);
// Write new page index to end of previous page
if (nextPosToWriteTo.pageIdx > 0) {
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
nextPosToWriteTo.pageIdx - 1, false, *fileHandle, dbFileID, *bufferManager, *wal);
memcpy(walPageIdxAndFrame.frame + END_OF_PAGE, &newPageIdx, sizeof(page_idx_t));
DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(
walPageIdxAndFrame, *fileHandle, *bufferManager, *wal);
DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx - 1,
false /* existing page */, *bufferManager, *wal,
[&](auto frame) { memcpy(frame + END_OF_PAGE, &newPageIdx, sizeof(page_idx_t)); });
}
return true;
}
return false;
}

void DiskOverflowFile::setStringOverflowWithoutLock(
Expand All @@ -77,24 +78,18 @@ void DiskOverflowFile::setStringOverflowWithoutLock(
}
}
int32_t remainingLength = len;
TypeUtils::encodeOverflowPtr(
diskDstString.overflowPtr, nextPosToWriteTo.pageIdx, nextPosToWriteTo.elemPosInPage);
while (remainingLength > 0) {
auto bytesWritten = len - remainingLength;
auto numBytesToWriteInPage = std::min(
static_cast<uint32_t>(remainingLength), END_OF_PAGE - nextPosToWriteTo.elemPosInPage);
addNewPageIfNecessaryWithoutLock(remainingLength);
auto updatedPageInfoAndWALPageFrame =
createWALVersionOfPageIfNecessaryForElement(nextPosToWriteTo);
memcpy(updatedPageInfoAndWALPageFrame.frame + updatedPageInfoAndWALPageFrame.posInPage,
srcRawString + bytesWritten, numBytesToWriteInPage);
DBFileUtils::unpinWALPageAndReleaseOriginalPageLock(
updatedPageInfoAndWALPageFrame, *fileHandle, *bufferManager, *wal);
// The overflow pointer should point to the first position, so it must only run
// the first iteration of the loop
if (static_cast<uint64_t>(remainingLength) == len) {
TypeUtils::encodeOverflowPtr(diskDstString.overflowPtr,
updatedPageInfoAndWALPageFrame.originalPageIdx,
updatedPageInfoAndWALPageFrame.posInPage);
}
bool insertingNewPage = addNewPageIfNecessaryWithoutLock(remainingLength);
DBFileUtils::updatePage(*fileHandle, dbFileID, nextPosToWriteTo.pageIdx, insertingNewPage,
*bufferManager, *wal, [&](auto frame) {
memcpy(frame + nextPosToWriteTo.elemPosInPage, srcRawString + bytesWritten,
numBytesToWriteInPage);
});
remainingLength -= numBytesToWriteInPage;
nextPosToWriteTo.elemPosInPage += numBytesToWriteInPage;
if (nextPosToWriteTo.elemPosInPage >= END_OF_PAGE) {
Expand Down Expand Up @@ -124,18 +119,5 @@ void DiskOverflowFile::logNewOverflowFileNextBytePosRecordIfNecessaryWithoutLock
}
}

WALPageIdxPosInPageAndFrame DiskOverflowFile::createWALVersionOfPageIfNecessaryForElement(
PageCursor originalPageCursor) {
bool insertingNewPage = false;
if (originalPageCursor.pageIdx >= fileHandle->getNumPages()) {
KU_ASSERT(originalPageCursor.pageIdx == fileHandle->getNumPages());
DBFileUtils::insertNewPage(*fileHandle, dbFileID, *bufferManager, *wal);
insertingNewPage = true;
}
auto walPageIdxAndFrame = DBFileUtils::createWALVersionIfNecessaryAndPinPage(
originalPageCursor.pageIdx, insertingNewPage, *fileHandle, dbFileID, *bufferManager, *wal);
return {walPageIdxAndFrame, originalPageCursor.elemPosInPage};
}

} // namespace storage
} // namespace kuzu
Loading
Loading