Skip to content

Commit

Permalink
rework var list finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 11, 2023
1 parent 24934c6 commit f04406a
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 116 deletions.
6 changes: 2 additions & 4 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class ColumnChunk {
}

void populateWithDefaultVal(common::ValueVector* defaultValueVector);
virtual std::unique_ptr<ColumnChunk> finalize() {
return nullptr; // Nothing to be finalized.
virtual void finalize() { // DO NOTHING.
}

inline uint64_t getCapacity() const { return capacity; }
Expand Down Expand Up @@ -211,8 +210,7 @@ class NullColumnChunk : public BoolColumnChunk {

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Column;
class NodeGroup {
public:
NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity);
bool enableCompression, uint64_t capacity);
explicit NodeGroup(const std::vector<std::unique_ptr<Column>>& columns, bool enableCompression);
virtual ~NodeGroup() = default;

Expand Down Expand Up @@ -44,10 +44,10 @@ class NodeGroup {
class CSRNodeGroup : public NodeGroup {
public:
CSRNodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize)
bool enableCompression)
// By default, initialize all column chunks except for the csrOffsetChunk to empty, as they
// should be resized after csr offset calculation (e.g., during CopyRel).
: NodeGroup{columnTypes, enableCompression, needFinalize, 0 /* capacity */} {
: NodeGroup{columnTypes, enableCompression, 0 /* capacity */} {
csrOffsetChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, enableCompression);
}
Expand All @@ -61,8 +61,7 @@ class CSRNodeGroup : public NodeGroup {
struct NodeGroupFactory {
static std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class StructColumnChunk : public ColumnChunk {
KU_ASSERT(childIdx < childChunks.size());
return childChunks[childIdx].get();
}
void finalize();

protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
Expand Down
45 changes: 19 additions & 26 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once

#include "common/exception/not_implemented.h"
#include "storage/store/column_chunk.h"

namespace kuzu {
Expand All @@ -26,6 +25,7 @@ struct VarListDataColumnChunk {
};

class VarListColumnChunk : public ColumnChunk {

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);

Expand All @@ -36,16 +36,8 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
inline void write(common::ValueVector* /*valueVector*/,
common::ValueVector* /*offsetInChunkVector*/) override {
// LCOV_EXCL_START
throw common::NotImplementedException("VarListColumnChunk::write");
// LCOV_EXCL_STOP
}
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

inline VarListDataColumnChunk* getVarListDataColumnChunk() {
return varListDataColumnChunk.get();
}
inline void resizeDataColumnChunk(uint64_t numBytesForBuffer) {
// TODO(bmwinger): This won't work properly for booleans (will be one eighth as many values
// as could fit)
Expand All @@ -56,39 +48,40 @@ class VarListColumnChunk : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() final;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, false /* enableCompression */);
indicesColumnChunk->getNullChunk()->resetToAllNull();
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}
inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}

void moveFromOtherChunk(VarListColumnChunk* other);
void appendEmptyList(bool isNull);

protected:
bool enableCompression;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
};

class AuxVarListColumnChunk : public VarListColumnChunk {
public:
AuxVarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression)
: VarListColumnChunk{dataType, capacity * 2 /* for sizeof(list_entry_t) */,
enableCompression},
lastDataOffset{0} {}

void resize(uint64_t newCapacity) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

std::unique_ptr<ColumnChunk> finalize() final;

private:
uint64_t lastDataOffset;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
};

} // namespace storage
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void CopyRelSharedState::logCopyRelWALRecord(WAL* wal) {

void CopyRel::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* /*context*/) {
localState = std::make_unique<CopyRelLocalState>();
localState->nodeGroup = NodeGroupFactory::createNodeGroup(info->dataFormat,
sharedState->columnTypes, info->compressionEnabled, true /* needFinalize */);
localState->nodeGroup = NodeGroupFactory::createNodeGroup(
info->dataFormat, sharedState->columnTypes, info->compressionEnabled);
}

void CopyRel::initGlobalStateInternal(ExecutionContext* /*context*/) {
Expand Down
12 changes: 4 additions & 8 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class FixedListColumnChunk : public ColumnChunk {
};

std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
const LogicalType& dataType, bool enableCompression, bool needFinalize, uint64_t capacity) {
const LogicalType& dataType, bool enableCompression, uint64_t capacity) {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::BOOL: {
return std::make_unique<BoolColumnChunk>(capacity);
Expand All @@ -477,8 +477,8 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
case PhysicalTypeID::FLOAT:
case PhysicalTypeID::INTERVAL: {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
return std::make_unique<ColumnChunk>(LogicalType(LogicalTypeID::SERIAL), capacity,
false /*enableCompression*/, false /* hasNullChunk */);
return std::make_unique<ColumnChunk>(
dataType, capacity, false /*enableCompression*/, false /* hasNullChunk */);
} else {
return std::make_unique<ColumnChunk>(dataType, capacity, enableCompression);
}
Expand All @@ -495,11 +495,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
return std::make_unique<StringColumnChunk>(dataType, capacity);
}
case PhysicalTypeID::VAR_LIST: {
if (needFinalize) {
return std::make_unique<AuxVarListColumnChunk>(dataType, capacity, enableCompression);
} else {
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructColumnChunk>(dataType, capacity, enableCompression);
Expand Down
16 changes: 6 additions & 10 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ namespace kuzu {
namespace storage {

NodeGroup::NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity)
bool enableCompression, uint64_t capacity)
: nodeGroupIdx{UINT64_MAX}, numNodes{0} {
chunks.reserve(columnTypes.size());
for (auto& type : columnTypes) {
chunks.push_back(ColumnChunkFactory::createColumnChunk(
*type, enableCompression, needFinalize, capacity));
chunks.push_back(ColumnChunkFactory::createColumnChunk(*type, enableCompression, capacity));
}
}

Expand Down Expand Up @@ -97,19 +96,16 @@ void NodeGroup::write(DataChunk* dataChunk, vector_idx_t offsetVectorIdx) {
void NodeGroup::finalize(uint64_t nodeGroupIdx_) {
nodeGroupIdx = nodeGroupIdx_;
for (auto i = 0u; i < chunks.size(); i++) {
auto finalizedChunk = chunks[i]->finalize();
if (finalizedChunk) {
chunks[i] = std::move(finalizedChunk);
}
chunks[i]->finalize();
}
}

std::unique_ptr<NodeGroup> NodeGroupFactory::createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes, bool enableCompression,
bool needFinalize, uint64_t capacity) {
uint64_t capacity) {
return dataFormat == ColumnDataFormat::REGULAR ?
std::make_unique<NodeGroup>(columnTypes, enableCompression, needFinalize, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression, needFinalize);
std::make_unique<NodeGroup>(columnTypes, enableCompression, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression);
}

} // namespace storage
Expand Down
8 changes: 7 additions & 1 deletion src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ StructColumnChunk::StructColumnChunk(
childChunks.resize(fieldTypes.size());
for (auto i = 0u; i < fieldTypes.size(); i++) {
childChunks[i] =
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], capacity, enableCompression);
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], enableCompression, capacity);
}
}

void StructColumnChunk::finalize() {
for (auto& childChunk : childChunks) {
childChunk->finalize();
}
}

Expand Down
Loading

0 comments on commit f04406a

Please sign in to comment.