Skip to content

Commit

Permalink
Merge pull request #2426 from kuzudb/transaction-scan
Browse files Browse the repository at this point in the history
Add transaction pointer to column chunk scan
  • Loading branch information
ray6080 committed Nov 16, 2023
2 parents ff759d2 + db7809a commit a616abf
Show file tree
Hide file tree
Showing 19 changed files with 82 additions and 65 deletions.
16 changes: 10 additions & 6 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class Column {
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector);
virtual void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk);
virtual void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk);
virtual void lookup(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector);

Expand All @@ -62,8 +63,9 @@ class Column {
return metadataDA->getNumElements(transaction->getType());
}

void prepareCommitForChunk(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, bool isNewNodeGroup);
void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
bool isNewNodeGroup);
virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down Expand Up @@ -106,10 +108,12 @@ class Column {

private:
static bool containsVarList(common::LogicalType& dataType);
bool canCommitInPlace(common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
bool canCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk);
void commitLocalChunkInPlace(LocalVectorCollection* localChunk);
void commitLocalChunkOutOfPlace(common::node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup);
void commitLocalChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk,
bool isNewNodeGroup);

void applyLocalChunkToColumnChunk(LocalVectorCollection* localChunk, ColumnChunk* columnChunk,
common::offset_t nodeGroupStartOffset,
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class NodeTable : public Table {
common::ValueVector* defaultValueVector) final;
inline void dropColumn(common::column_id_t columnID) final { tableData->dropColumn(columnID); }

void prepareCommit(LocalTableData* localTable) final;
void prepareCommit(transaction::Transaction* transaction, LocalTableData* localTable) final;
void prepareRollback(LocalTableData* localTable) final;
void checkpointInMemory() final;
void rollbackInMemory() final;
Expand Down
13 changes: 6 additions & 7 deletions src/include/storage/store/node_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace storage {

class LocalTableData;

class NodeTableData : public TableData {
class NodeTableData final : public TableData {
public:
NodeTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH, common::table_id_t tableID,
BufferManager* bufferManager, WAL* wal, const std::vector<catalog::Property*>& properties,
Expand All @@ -20,11 +20,9 @@ class NodeTableData : public TableData {
readState->columnIDs = std::move(columnIDs);
}
void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) final;
common::ValueVector* nodeIDVector, const std::vector<common::ValueVector*>& outputVectors);
void lookup(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* nodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) final;
common::ValueVector* nodeIDVector, const std::vector<common::ValueVector*>& outputVectors);

// These two interfaces are node table specific, as rel table requires also relIDVector.
void insert(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand All @@ -33,9 +31,10 @@ class NodeTableData : public TableData {
common::ValueVector* nodeIDVector, common::ValueVector* propertyVector);
void delete_(transaction::Transaction* transaction, common::ValueVector* nodeIDVector);

void append(NodeGroup* nodeGroup) final;
void append(NodeGroup* nodeGroup);

void prepareLocalTableToCommit(LocalTableData* localTable);
void prepareLocalTableToCommit(
transaction::Transaction* transaction, LocalTableData* localTable);
};

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class RelTable : public Table {
bwdRelTableData->append(nodeGroup);
}

void prepareCommit(LocalTableData* localTable) final;
void prepareCommit(transaction::Transaction* transaction, LocalTableData* localTable) final;
void prepareRollback(LocalTableData* localTable) final;
void checkpointInMemory() final;
void rollbackInMemory() final;
Expand Down
15 changes: 8 additions & 7 deletions src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ struct RelDataReadState : public TableReadState {
};

class RelsStoreStats;
class RelTableData : public TableData {
class RelTableData final : public TableData {
public:
static constexpr common::column_id_t REL_ID_COLUMN_ID = 0;

Expand All @@ -46,22 +46,22 @@ class RelTableData : public TableData {
common::ValueVector* inNodeIDVector, RelDataReadState* readState);
inline void scan(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) final {
const std::vector<common::ValueVector*>& outputVectors) {
auto& relReadState = common::ku_dynamic_cast<TableReadState&, RelDataReadState&>(readState);
dataFormat == common::ColumnDataFormat::REGULAR ?
scanRegularColumns(transaction, relReadState, inNodeIDVector, outputVectors) :
scanCSRColumns(transaction, relReadState, inNodeIDVector, outputVectors);
}
void lookup(transaction::Transaction* transaction, TableReadState& readState,
common::ValueVector* inNodeIDVector,
const std::vector<common::ValueVector*>& outputVectors) final;
void append(NodeGroup* nodeGroup) final;
const std::vector<common::ValueVector*>& outputVectors);
void append(NodeGroup* nodeGroup);

inline Column* getAdjColumn() const { return adjColumn.get(); }
inline common::ColumnDataFormat getDataFormat() const { return dataFormat; }

void checkpointInMemory() final;
void rollbackInMemory() final;
void checkpointInMemory();
void rollbackInMemory();

private:
void scanRegularColumns(transaction::Transaction* transaction, RelDataReadState& readState,
Expand All @@ -78,7 +78,8 @@ class RelTableData : public TableData {
common::ColumnDataFormat::CSR;
}

void prepareLocalTableToCommit(LocalTableData* localTable);
void prepareLocalTableToCommit(
transaction::Transaction* transaction, LocalTableData* localTable);

private:
std::unique_ptr<Column> adjColumn;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/string_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ class StringColumn : public Column {
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;
void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final;

void append(ColumnChunk* columnChunk, common::node_group_idx_t nodeGroupIdx) final;

Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class StructColumn : public Column {
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) final;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class Table {
common::ValueVector* defaultValueVector) = 0;
virtual void dropColumn(common::column_id_t columnID) = 0;

virtual void prepareCommit(LocalTableData* localTable) = 0;
virtual void prepareCommit(
transaction::Transaction* transaction, LocalTableData* localTable) = 0;
virtual void prepareRollback(LocalTableData* localTable) = 0;
virtual void checkpointInMemory() = 0;
virtual void rollbackInMemory() = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class TableData {
return columns[0]->getNumNodeGroups(transaction);
}

virtual void prepareLocalTableToCommit(LocalTableData* localTable) = 0;
virtual void prepareLocalTableToCommit(
transaction::Transaction* transaction, LocalTableData* localTable) = 0;
virtual void checkpointInMemory();
virtual void rollbackInMemory();

Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/store/var_list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class VarListColumn : public Column {
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) final;

void scan(common::node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final;
void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final;

protected:
void scanInternal(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
Expand Down
2 changes: 1 addition & 1 deletion src/storage/storage_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void StorageManager::prepareCommit(transaction::Transaction* transaction) {
auto localStorage = transaction->getLocalStorage();
for (auto tableID : localStorage->getTableIDsWithUpdates()) {
KU_ASSERT(tables.contains(tableID));
tables.at(tableID)->prepareCommit(localStorage->getLocalTableData(tableID));
tables.at(tableID)->prepareCommit(transaction, localStorage->getLocalTableData(tableID));
}
if (nodesStatisticsAndDeletedIDs->hasUpdates()) {
wal->logTableStatisticsRecord(true /* isNodeTable */);
Expand Down
35 changes: 19 additions & 16 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ class NullColumn : public Column {
}
}

void scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) final {
void scan(transaction::Transaction* transaction, node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk) final {
if (propertyStatistics.mayHaveNull(DUMMY_WRITE_TRANSACTION)) {
Column::scan(nodeGroupIdx, columnChunk);
Column::scan(transaction, nodeGroupIdx, columnChunk);
} else {
static_cast<NullColumnChunk*>(columnChunk)->resetToNoNull();
}
Expand Down Expand Up @@ -342,14 +343,15 @@ void Column::scan(transaction::Transaction* transaction, node_group_idx_t nodeGr
transaction, pageCursor, numValuesToScan, resultVector, chunkMeta, offsetInVector);
}

void Column::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
void Column::scan(
Transaction* transaction, node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
if (nullColumn) {
nullColumn->scan(nodeGroupIdx, columnChunk->getNullChunk());
nullColumn->scan(transaction, nodeGroupIdx, columnChunk->getNullChunk());
}
if (nodeGroupIdx >= metadataDA->getNumElements()) {
if (nodeGroupIdx >= metadataDA->getNumElements(transaction->getType())) {
columnChunk->setNumValues(0);
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto chunkMetadata = metadataDA->get(nodeGroupIdx, transaction->getType());
auto cursor = PageElementCursor(chunkMetadata.pageIdx, 0);
uint64_t numValuesPerPage =
chunkMetadata.compMeta.numValues(BufferPoolConstants::PAGE_4KB_SIZE, *dataType);
Expand Down Expand Up @@ -535,18 +537,18 @@ void Column::setNull(offset_t nodeOffset) {
}
}

void Column::prepareCommitForChunk(
node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk, bool isNewNodeGroup) {
void Column::prepareCommitForChunk(Transaction* transaction, node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localColumnChunk, bool isNewNodeGroup) {
if (isNewNodeGroup) {
// If this is a new node group, updateInfo should be empty. We should perform out-of-place
// commit with a new column chunk.
commitLocalChunkOutOfPlace(nodeGroupIdx, localColumnChunk, isNewNodeGroup);
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup);
} else {
// If this is not a new node group, we should first check if we can perform in-place commit.
if (canCommitInPlace(nodeGroupIdx, localColumnChunk)) {
if (canCommitInPlace(transaction, nodeGroupIdx, localColumnChunk)) {
commitLocalChunkInPlace(localColumnChunk);
} else {
commitLocalChunkOutOfPlace(nodeGroupIdx, localColumnChunk, isNewNodeGroup);
commitLocalChunkOutOfPlace(transaction, nodeGroupIdx, localColumnChunk, isNewNodeGroup);
}
}
}
Expand All @@ -570,12 +572,13 @@ bool Column::containsVarList(LogicalType& dataType) {
}

// TODO(Guodong): This should be moved inside `LocalVectorCollection`.
bool Column::canCommitInPlace(node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk) {
bool Column::canCommitInPlace(
Transaction* transaction, node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk) {
if (containsVarList(*dataType)) {
// Always perform out of place commit for VAR_LIST data type.
return false;
}
auto metadata = getMetadata(nodeGroupIdx, TransactionType::WRITE);
auto metadata = getMetadata(nodeGroupIdx, transaction->getType());
if (metadata.compMeta.canAlwaysUpdateInPlace()) {
return true;
}
Expand Down Expand Up @@ -603,8 +606,8 @@ void Column::commitLocalChunkInPlace(LocalVectorCollection* localChunk) {
applyLocalChunkToColumn(localChunk, localChunk->getInsertInfoRef());
}

void Column::commitLocalChunkOutOfPlace(
node_group_idx_t nodeGroupIdx, LocalVectorCollection* localChunk, bool isNewNodeGroup) {
void Column::commitLocalChunkOutOfPlace(Transaction* transaction, node_group_idx_t nodeGroupIdx,
LocalVectorCollection* localChunk, bool isNewNodeGroup) {
auto columnChunk = ColumnChunkFactory::createColumnChunk(dataType->copy(), enableCompression);
if (isNewNodeGroup) {
KU_ASSERT(localChunk->getUpdateInfoRef().empty());
Expand All @@ -613,7 +616,7 @@ void Column::commitLocalChunkOutOfPlace(
nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2, localChunk->getInsertInfoRef());
} else {
// First, scan the whole column chunk from persistent storage.
scan(nodeGroupIdx, columnChunk.get());
scan(transaction, nodeGroupIdx, columnChunk.get());
// Then, apply updates from the local chunk.
applyLocalChunkToColumnChunk(localChunk, columnChunk.get(),
nodeGroupIdx << StorageConstants::NODE_GROUP_SIZE_LOG2, localChunk->getUpdateInfoRef());
Expand Down
4 changes: 2 additions & 2 deletions src/storage/store/node_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ void NodeTable::addColumn(transaction::Transaction* transaction, const catalog::
wal->addToUpdatedTables(tableID);
}

void NodeTable::prepareCommit(LocalTableData* localTable) {
void NodeTable::prepareCommit(Transaction* transaction, LocalTableData* localTable) {
if (pkIndex) {
pkIndex->prepareCommit();
}
tableData->prepareLocalTableToCommit(localTable);
tableData->prepareLocalTableToCommit(transaction, localTable);
wal->addToUpdatedTables(tableID);
}

Expand Down
6 changes: 4 additions & 2 deletions src/storage/store/node_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ void NodeTableData::append(kuzu::storage::NodeGroup* nodeGroup) {
}
}

void NodeTableData::prepareLocalTableToCommit(LocalTableData* localTable) {
void NodeTableData::prepareLocalTableToCommit(
Transaction* transaction, LocalTableData* localTable) {
auto numNodeGroups = getNumNodeGroups(&DUMMY_WRITE_TRANSACTION);
for (auto& [nodeGroupIdx, nodeGroup] : localTable->nodeGroups) {
for (auto columnID = 0; columnID < columns.size(); columnID++) {
Expand All @@ -114,7 +115,8 @@ void NodeTableData::prepareLocalTableToCommit(LocalTableData* localTable) {
if (columnChunk->getNumRows() == 0) {
continue;
}
column->prepareCommitForChunk(nodeGroupIdx, columnChunk, nodeGroupIdx >= numNodeGroups);
column->prepareCommitForChunk(
transaction, nodeGroupIdx, columnChunk, nodeGroupIdx >= numNodeGroups);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/rel_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void RelTable::addColumn(
wal->addToUpdatedTables(tableID);
}

void RelTable::prepareCommit(LocalTableData* /*localTable*/) {
void RelTable::prepareCommit(Transaction* /*transaction*/, LocalTableData* /*localTable*/) {
wal->addToUpdatedTables(tableID);
}

Expand Down
7 changes: 4 additions & 3 deletions src/storage/store/rel_table_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ RelTableData::RelTableData(BMFileHandle* dataFH, BMFileHandle* metadataFH,
dynamic_cast<InternalIDColumn*>(columns[REL_ID_COLUMN_ID].get())->setCommonTableID(tableID);
}

void RelTableData::initializeReadState(Transaction* /*transaction*/, RelDataDirection direction,
void RelTableData::initializeReadState(Transaction* transaction, RelDataDirection direction,
std::vector<common::column_id_t> columnIDs, ValueVector* inNodeIDVector,
RelDataReadState* readState) {
readState->direction = direction;
Expand All @@ -89,7 +89,7 @@ void RelTableData::initializeReadState(Transaction* /*transaction*/, RelDataDire
if (readState->isOutOfRange(nodeOffset)) {
// Scan csr offsets and populate csr list entries for the new node group.
readState->startNodeOffsetInState = startNodeOffset;
csrOffsetColumn->scan(nodeGroupIdx, readState->csrOffsetChunk.get());
csrOffsetColumn->scan(transaction, nodeGroupIdx, readState->csrOffsetChunk.get());
readState->numNodesInState = readState->csrOffsetChunk->getNumValues();
readState->populateCSRListEntries();
}
Expand Down Expand Up @@ -169,7 +169,8 @@ void RelTableData::append(NodeGroup* nodeGroup) {
}
}

void RelTableData::prepareLocalTableToCommit(LocalTableData* /*localTable*/) {
void RelTableData::prepareLocalTableToCommit(
Transaction* /*transaction*/, LocalTableData* /*localTable*/) {
KU_UNREACHABLE;
}

Expand Down
5 changes: 3 additions & 2 deletions src/storage/store/string_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ void StringColumn::scan(Transaction* transaction, node_group_idx_t nodeGroupIdx,
}
}

void StringColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
Column::scan(nodeGroupIdx, columnChunk);
void StringColumn::scan(
Transaction* transaction, node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
Column::scan(transaction, nodeGroupIdx, columnChunk);
auto stringColumnChunk = reinterpret_cast<StringColumnChunk*>(columnChunk);
auto overflowMetadata = overflowMetadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto inMemOverflowFile = stringColumnChunk->getOverflowFile();
Expand Down
11 changes: 6 additions & 5 deletions src/storage/store/struct_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ StructColumn::StructColumn(std::unique_ptr<LogicalType> dataType,
}
}

void StructColumn::scan(node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
void StructColumn::scan(
Transaction* transaction, node_group_idx_t nodeGroupIdx, ColumnChunk* columnChunk) {
KU_ASSERT(columnChunk->getDataType()->getPhysicalType() == PhysicalTypeID::STRUCT);
nullColumn->scan(nodeGroupIdx, columnChunk->getNullChunk());
if (nodeGroupIdx >= metadataDA->getNumElements()) {
nullColumn->scan(transaction, nodeGroupIdx, columnChunk->getNullChunk());
if (nodeGroupIdx >= metadataDA->getNumElements(transaction->getType())) {
columnChunk->setNumValues(0);
} else {
auto chunkMetadata = metadataDA->get(nodeGroupIdx, TransactionType::WRITE);
auto chunkMetadata = metadataDA->get(nodeGroupIdx, transaction->getType());
columnChunk->setNumValues(chunkMetadata.numValues);
}
auto structColumnChunk = ku_dynamic_cast<ColumnChunk*, StructColumnChunk*>(columnChunk);
for (auto i = 0u; i < childColumns.size(); i++) {
childColumns[i]->scan(nodeGroupIdx, structColumnChunk->getChild(i));
childColumns[i]->scan(transaction, nodeGroupIdx, structColumnChunk->getChild(i));
}
}

Expand Down
Loading

0 comments on commit a616abf

Please sign in to comment.