Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework var list finalize #2392

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class ColumnChunk {
}

void populateWithDefaultVal(common::ValueVector* defaultValueVector);
virtual std::unique_ptr<ColumnChunk> finalize() {
return nullptr; // Nothing to be finalized.
virtual void finalize() { // DO NOTHING.
}

inline uint64_t getCapacity() const { return capacity; }
Expand Down Expand Up @@ -211,8 +210,7 @@ class NullColumnChunk : public BoolColumnChunk {

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Column;
class NodeGroup {
public:
NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity);
bool enableCompression, uint64_t capacity);
explicit NodeGroup(const std::vector<std::unique_ptr<Column>>& columns, bool enableCompression);
virtual ~NodeGroup() = default;

Expand Down Expand Up @@ -44,10 +44,10 @@ class NodeGroup {
class CSRNodeGroup : public NodeGroup {
public:
CSRNodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize)
bool enableCompression)
// By default, initialize all column chunks except for the csrOffsetChunk to empty, as they
// should be resized after csr offset calculation (e.g., during CopyRel).
: NodeGroup{columnTypes, enableCompression, needFinalize, 0 /* capacity */} {
: NodeGroup{columnTypes, enableCompression, 0 /* capacity */} {
csrOffsetChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, enableCompression);
}
Expand All @@ -61,8 +61,7 @@ class CSRNodeGroup : public NodeGroup {
struct NodeGroupFactory {
static std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class StructColumnChunk : public ColumnChunk {
KU_ASSERT(childIdx < childChunks.size());
return childChunks[childIdx].get();
}
void finalize();

protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
Expand Down
45 changes: 19 additions & 26 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "common/exception/not_implemented.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand All @@ -26,6 +25,7 @@ struct VarListDataColumnChunk {
};

class VarListColumnChunk : public ColumnChunk {

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);

Expand All @@ -36,16 +36,8 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
inline void write(common::ValueVector* /*valueVector*/,
common::ValueVector* /*offsetInChunkVector*/) override {
// LCOV_EXCL_START
throw common::NotImplementedException("VarListColumnChunk::write");
// LCOV_EXCL_STOP
}
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

inline VarListDataColumnChunk* getVarListDataColumnChunk() {
return varListDataColumnChunk.get();
}
inline void resizeDataColumnChunk(uint64_t numBytesForBuffer) {
// TODO(bmwinger): This won't work properly for booleans (will be one eighth as many values
// as could fit)
Expand All @@ -56,39 +48,40 @@ class VarListColumnChunk : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() final;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, false /* enableCompression */);
indicesColumnChunk->getNullChunk()->resetToAllNull();
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}
inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}

void moveFromOtherChunk(VarListColumnChunk* other);
void appendEmptyList(bool isNull);

protected:
bool enableCompression;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
};

class AuxVarListColumnChunk : public VarListColumnChunk {
public:
AuxVarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression)
: VarListColumnChunk{dataType, capacity * 2 /* for sizeof(list_entry_t) */,
enableCompression},
lastDataOffset{0} {}

void resize(uint64_t newCapacity) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

std::unique_ptr<ColumnChunk> finalize() final;

private:
uint64_t lastDataOffset;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
};

} // namespace storage
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void CopyRelSharedState::logCopyRelWALRecord(WAL* wal) {

void CopyRel::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* /*context*/) {
localState = std::make_unique<CopyRelLocalState>();
localState->nodeGroup = NodeGroupFactory::createNodeGroup(info->dataFormat,
sharedState->columnTypes, info->compressionEnabled, true /* needFinalize */);
localState->nodeGroup = NodeGroupFactory::createNodeGroup(
info->dataFormat, sharedState->columnTypes, info->compressionEnabled);
}

void CopyRel::initGlobalStateInternal(ExecutionContext* /*context*/) {
Expand Down
12 changes: 4 additions & 8 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class FixedListColumnChunk : public ColumnChunk {
};

std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
const LogicalType& dataType, bool enableCompression, bool needFinalize, uint64_t capacity) {
const LogicalType& dataType, bool enableCompression, uint64_t capacity) {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::BOOL: {
return std::make_unique<BoolColumnChunk>(capacity);
Expand All @@ -477,8 +477,8 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
case PhysicalTypeID::FLOAT:
case PhysicalTypeID::INTERVAL: {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
return std::make_unique<ColumnChunk>(LogicalType(LogicalTypeID::SERIAL), capacity,
false /*enableCompression*/, false /* hasNullChunk */);
return std::make_unique<ColumnChunk>(
dataType, capacity, false /*enableCompression*/, false /* hasNullChunk */);
} else {
return std::make_unique<ColumnChunk>(dataType, capacity, enableCompression);
}
Expand All @@ -495,11 +495,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
return std::make_unique<StringColumnChunk>(dataType, capacity);
}
case PhysicalTypeID::VAR_LIST: {
if (needFinalize) {
return std::make_unique<AuxVarListColumnChunk>(dataType, capacity, enableCompression);
} else {
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructColumnChunk>(dataType, capacity, enableCompression);
Expand Down
16 changes: 6 additions & 10 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ namespace kuzu {
namespace storage {

NodeGroup::NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity)
bool enableCompression, uint64_t capacity)
: nodeGroupIdx{UINT64_MAX}, numNodes{0} {
chunks.reserve(columnTypes.size());
for (auto& type : columnTypes) {
chunks.push_back(ColumnChunkFactory::createColumnChunk(
*type, enableCompression, needFinalize, capacity));
chunks.push_back(ColumnChunkFactory::createColumnChunk(*type, enableCompression, capacity));
}
}

Expand Down Expand Up @@ -97,19 +96,16 @@ void NodeGroup::write(DataChunk* dataChunk, vector_idx_t offsetVectorIdx) {
void NodeGroup::finalize(uint64_t nodeGroupIdx_) {
nodeGroupIdx = nodeGroupIdx_;
for (auto i = 0u; i < chunks.size(); i++) {
auto finalizedChunk = chunks[i]->finalize();
if (finalizedChunk) {
chunks[i] = std::move(finalizedChunk);
}
chunks[i]->finalize();
}
}

std::unique_ptr<NodeGroup> NodeGroupFactory::createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes, bool enableCompression,
bool needFinalize, uint64_t capacity) {
uint64_t capacity) {
return dataFormat == ColumnDataFormat::REGULAR ?
std::make_unique<NodeGroup>(columnTypes, enableCompression, needFinalize, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression, needFinalize);
std::make_unique<NodeGroup>(columnTypes, enableCompression, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression);
}

} // namespace storage
Expand Down
8 changes: 7 additions & 1 deletion src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ StructColumnChunk::StructColumnChunk(
childChunks.resize(fieldTypes.size());
for (auto i = 0u; i < fieldTypes.size(); i++) {
childChunks[i] =
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], capacity, enableCompression);
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], enableCompression, capacity);
}
}

void StructColumnChunk::finalize() {
for (auto& childChunk : childChunks) {
childChunk->finalize();
}
}

Expand Down
Loading