Skip to content

Commit

Permalink
rework var list finalize
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 11, 2023
1 parent 24934c6 commit a3936f2
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 115 deletions.
6 changes: 2 additions & 4 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class ColumnChunk {
}

void populateWithDefaultVal(common::ValueVector* defaultValueVector);
virtual std::unique_ptr<ColumnChunk> finalize() {
return nullptr; // Nothing to be finalized.
virtual void finalize() { // DO NOTHING.
}

inline uint64_t getCapacity() const { return capacity; }
Expand Down Expand Up @@ -211,8 +210,7 @@ class NullColumnChunk : public BoolColumnChunk {

struct ColumnChunkFactory {
static std::unique_ptr<ColumnChunk> createColumnChunk(const common::LogicalType& dataType,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
9 changes: 4 additions & 5 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class Column;
class NodeGroup {
public:
NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity);
bool enableCompression, uint64_t capacity);
explicit NodeGroup(const std::vector<std::unique_ptr<Column>>& columns, bool enableCompression);
virtual ~NodeGroup() = default;

Expand Down Expand Up @@ -44,10 +44,10 @@ class NodeGroup {
class CSRNodeGroup : public NodeGroup {
public:
CSRNodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize)
bool enableCompression)
// By default, initialize all column chunks except for the csrOffsetChunk to empty, as they
// should be resized after csr offset calculation (e.g., during CopyRel).
: NodeGroup{columnTypes, enableCompression, needFinalize, 0 /* capacity */} {
: NodeGroup{columnTypes, enableCompression, 0 /* capacity */} {
csrOffsetChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, enableCompression);
}
Expand All @@ -61,8 +61,7 @@ class CSRNodeGroup : public NodeGroup {
struct NodeGroupFactory {
static std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize = false,
uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
};

} // namespace storage
Expand Down
5 changes: 5 additions & 0 deletions src/include/storage/store/struct_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ class StructColumnChunk : public ColumnChunk {
KU_ASSERT(childIdx < childChunks.size());
return childChunks[childIdx].get();
}
inline void finalize() {
for (auto& childChunk : childChunks) {
childChunk->finalize();
}
}

protected:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
Expand Down
45 changes: 20 additions & 25 deletions src/include/storage/store/var_list_column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ struct VarListDataColumnChunk {
};

class VarListColumnChunk : public ColumnChunk {
friend class VarListColumnChunk;

public:
VarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression);

Expand All @@ -36,16 +38,8 @@ class VarListColumnChunk : public ColumnChunk {
void resetToEmpty() final;

void append(common::ValueVector* vector) final;
inline void write(common::ValueVector* /*valueVector*/,
common::ValueVector* /*offsetInChunkVector*/) override {
// LCOV_EXCL_START
throw common::NotImplementedException("VarListColumnChunk::write");
// LCOV_EXCL_STOP
}
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

inline VarListDataColumnChunk* getVarListDataColumnChunk() {
return varListDataColumnChunk.get();
}
inline void resizeDataColumnChunk(uint64_t numBytesForBuffer) {
// TODO(bmwinger): This won't work properly for booleans (will be one eighth as many values
// as could fit)
Expand All @@ -56,39 +50,40 @@ class VarListColumnChunk : public ColumnChunk {
varListDataColumnChunk->resizeBuffer(numValues);
}

void finalize() final;

protected:
void copyListValues(const common::list_entry_t& entry, common::ValueVector* dataVector);

private:
void append(ColumnChunk* other, common::offset_t startPosInOtherChunk,
uint32_t numValuesToAppend) final;

inline void initializeIndices() {
indicesColumnChunk = ColumnChunkFactory::createColumnChunk(
common::LogicalType{common::LogicalTypeID::INT64}, false /* enableCompression */);
indicesColumnChunk->getNullChunk()->resetToAllNull();
}
inline uint64_t getListLen(common::offset_t offset) const {
return getListOffset(offset + 1) - getListOffset(offset);
}
inline common::offset_t getListOffset(common::offset_t offset) const {
return offset == 0 ? 0 : getValue<uint64_t>(offset - 1);
}

void moveFromOtherChunk(VarListColumnChunk* other);
void appendEmptyList(bool isNull);

protected:
bool enableCompression;
std::unique_ptr<VarListDataColumnChunk> varListDataColumnChunk;
};

class AuxVarListColumnChunk : public VarListColumnChunk {
public:
AuxVarListColumnChunk(common::LogicalType dataType, uint64_t capacity, bool enableCompression)
: VarListColumnChunk{dataType, capacity * 2 /* for sizeof(list_entry_t) */,
enableCompression},
lastDataOffset{0} {}

void resize(uint64_t newCapacity) final;
void write(common::ValueVector* valueVector, common::ValueVector* offsetInChunkVector) final;

std::unique_ptr<ColumnChunk> finalize() final;

private:
uint64_t lastDataOffset;
// The following is needed to write var list to random positions in the column chunk.
// We first append var list to the end of the column chunk. Then use indicesColumnChunk to track
// where each var list data is inside the column chunk.
// `needFinalize` is set to true whenever `write` is called.
// During `finalize`, the whole column chunk will be re-written according to indices.
bool needFinalize;
std::unique_ptr<ColumnChunk> indicesColumnChunk;
};

} // namespace storage
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ void CopyRelSharedState::logCopyRelWALRecord(WAL* wal) {

void CopyRel::initLocalStateInternal(ResultSet* /*resultSet_*/, ExecutionContext* /*context*/) {
localState = std::make_unique<CopyRelLocalState>();
localState->nodeGroup = NodeGroupFactory::createNodeGroup(info->dataFormat,
sharedState->columnTypes, info->compressionEnabled, true /* needFinalize */);
localState->nodeGroup = NodeGroupFactory::createNodeGroup(
info->dataFormat, sharedState->columnTypes, info->compressionEnabled);
}

void CopyRel::initGlobalStateInternal(ExecutionContext* /*context*/) {
Expand Down
12 changes: 4 additions & 8 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ class FixedListColumnChunk : public ColumnChunk {
};

std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
const LogicalType& dataType, bool enableCompression, bool needFinalize, uint64_t capacity) {
const LogicalType& dataType, bool enableCompression, uint64_t capacity) {
switch (dataType.getPhysicalType()) {
case PhysicalTypeID::BOOL: {
return std::make_unique<BoolColumnChunk>(capacity);
Expand All @@ -477,8 +477,8 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
case PhysicalTypeID::FLOAT:
case PhysicalTypeID::INTERVAL: {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
return std::make_unique<ColumnChunk>(LogicalType(LogicalTypeID::SERIAL), capacity,
false /*enableCompression*/, false /* hasNullChunk */);
return std::make_unique<ColumnChunk>(
dataType, capacity, false /*enableCompression*/, false /* hasNullChunk */);
} else {
return std::make_unique<ColumnChunk>(dataType, capacity, enableCompression);
}
Expand All @@ -495,11 +495,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
return std::make_unique<StringColumnChunk>(dataType, capacity);
}
case PhysicalTypeID::VAR_LIST: {
if (needFinalize) {
return std::make_unique<AuxVarListColumnChunk>(dataType, capacity, enableCompression);
} else {
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
return std::make_unique<VarListColumnChunk>(dataType, capacity, enableCompression);
}
case PhysicalTypeID::STRUCT: {
return std::make_unique<StructColumnChunk>(dataType, capacity, enableCompression);
Expand Down
16 changes: 6 additions & 10 deletions src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ namespace kuzu {
namespace storage {

NodeGroup::NodeGroup(const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, bool needFinalize, uint64_t capacity)
bool enableCompression, uint64_t capacity)
: nodeGroupIdx{UINT64_MAX}, numNodes{0} {
chunks.reserve(columnTypes.size());
for (auto& type : columnTypes) {
chunks.push_back(ColumnChunkFactory::createColumnChunk(
*type, enableCompression, needFinalize, capacity));
chunks.push_back(ColumnChunkFactory::createColumnChunk(*type, enableCompression, capacity));
}
}

Expand Down Expand Up @@ -97,19 +96,16 @@ void NodeGroup::write(DataChunk* dataChunk, vector_idx_t offsetVectorIdx) {
void NodeGroup::finalize(uint64_t nodeGroupIdx_) {
nodeGroupIdx = nodeGroupIdx_;
for (auto i = 0u; i < chunks.size(); i++) {
auto finalizedChunk = chunks[i]->finalize();
if (finalizedChunk) {
chunks[i] = std::move(finalizedChunk);
}
chunks[i]->finalize();
}
}

std::unique_ptr<NodeGroup> NodeGroupFactory::createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes, bool enableCompression,
bool needFinalize, uint64_t capacity) {
uint64_t capacity) {
return dataFormat == ColumnDataFormat::REGULAR ?
std::make_unique<NodeGroup>(columnTypes, enableCompression, needFinalize, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression, needFinalize);
std::make_unique<NodeGroup>(columnTypes, enableCompression, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression);
}

} // namespace storage
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ StructColumnChunk::StructColumnChunk(
childChunks.resize(fieldTypes.size());
for (auto i = 0u; i < fieldTypes.size(); i++) {
childChunks[i] =
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], capacity, enableCompression);
ColumnChunkFactory::createColumnChunk(*fieldTypes[i], enableCompression, capacity);
}
}

Expand Down
122 changes: 62 additions & 60 deletions src/storage/store/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/store/var_list_column_chunk.h"

#include "common/cast.h"
#include "common/types/value/value.h"

using namespace kuzu::common;
Expand All @@ -25,10 +26,10 @@ void VarListDataColumnChunk::resizeBuffer(uint64_t numValues) {
VarListColumnChunk::VarListColumnChunk(
LogicalType dataType, uint64_t capacity, bool enableCompression)
: ColumnChunk{std::move(dataType), capacity, enableCompression, true /* hasNullChunk */},
enableCompression{enableCompression} {
varListDataColumnChunk = std::make_unique<VarListDataColumnChunk>(
ColumnChunkFactory::createColumnChunk(*VarListType::getChildType(&this->dataType),
enableCompression, false /* needFinalize */, 0 /* capacity */));
enableCompression{enableCompression}, needFinalize{false} {
varListDataColumnChunk =
std::make_unique<VarListDataColumnChunk>(ColumnChunkFactory::createColumnChunk(
*VarListType::getChildType(&this->dataType), enableCompression, 0 /* capacity */));
KU_ASSERT(this->dataType.getPhysicalType() == PhysicalTypeID::VAR_LIST);
}

Expand Down Expand Up @@ -79,6 +80,39 @@ void VarListColumnChunk::append(ValueVector* vector) {
numValues += vector->state->selVector->selectedSize;
}

void VarListColumnChunk::appendEmptyList(bool isNull) {
auto nextListOffsetInChunk = getListOffset(numValues);
auto offsetBufferToWrite = (offset_t*)(buffer.get());
offsetBufferToWrite[numValues] = nextListOffsetInChunk;
nullChunk->setNull(numValues, isNull);
numValues++;
}

void VarListColumnChunk::write(ValueVector* valueVector, ValueVector* offsetInChunkVector) {
needFinalize = true;
if (!indicesColumnChunk) {
initializeIndices();
}
KU_ASSERT(valueVector->dataType.getPhysicalType() == dataType.getPhysicalType() &&
offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 &&
valueVector->state->selVector->selectedSize ==
offsetInChunkVector->state->selVector->selectedSize);
auto currentIndex = numValues;
append(valueVector);
for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) {
auto posInChunk = offsetInChunkVector->getValue<offset_t>(
offsetInChunkVector->state->selVector->selectedPositions[i]);
KU_ASSERT(posInChunk < capacity);
indicesColumnChunk->setValue(currentIndex++, posInChunk);
indicesColumnChunk->getNullChunk()->setNull(posInChunk, false);
if (indicesColumnChunk->getNumValues() <= posInChunk) {
indicesColumnChunk->setNumValues(posInChunk + 1);
}
}
KU_ASSERT(currentIndex == numValues &&
indicesColumnChunk->getNumValues() < indicesColumnChunk->getCapacity());
}

void VarListColumnChunk::copyListValues(const list_entry_t& entry, ValueVector* dataVector) {
auto numListValuesToCopy = entry.size;
auto numListValuesCopied = 0;
Expand All @@ -95,67 +129,35 @@ void VarListColumnChunk::copyListValues(const list_entry_t& entry, ValueVector*
}
}

void AuxVarListColumnChunk::write(ValueVector* vector, ValueVector* offsetInChunkVector) {
KU_ASSERT(vector->dataType.getPhysicalType() == dataType.getPhysicalType() &&
offsetInChunkVector->dataType.getPhysicalType() == PhysicalTypeID::INT64 &&
vector->state->selVector->selectedSize ==
offsetInChunkVector->state->selVector->selectedSize);
auto offsets = (offset_t*)offsetInChunkVector->getData();
auto chunkListEntries = (list_entry_t*)buffer.get();
for (auto i = 0u; i < offsetInChunkVector->state->selVector->selectedSize; i++) {
auto offsetInChunk = offsets[offsetInChunkVector->state->selVector->selectedPositions[i]];
KU_ASSERT(offsetInChunk < capacity);
auto offsetInVector = vector->state->selVector->selectedPositions[i];
uint64_t listLen = vector->isNull(offsetInVector) ?
0 :
vector->getValue<list_entry_t>(offsetInVector).size;
chunkListEntries[offsetInChunk].offset = lastDataOffset;
chunkListEntries[offsetInChunk].size = listLen;
lastDataOffset += listLen;
nullChunk->setNull(offsetInChunk, vector->isNull(offsetInVector));
if (offsetInChunk >= numValues) {
numValues = offsetInChunk + 1;
}
void VarListColumnChunk::finalize() {
if (!needFinalize) {
return;
}
varListDataColumnChunk->resizeBuffer(lastDataOffset);
auto dataVector = ListVector::getDataVector(vector);
dataVector->setState(std::make_unique<DataChunkState>());
dataVector->state->selVector->resetSelectorToValuePosBuffer();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
if (vector->isNull(pos)) {
continue;
auto newColumnChunk =
ColumnChunkFactory::createColumnChunk(dataType, enableCompression, capacity);
auto totalListLen = getListOffset(numValues);
auto newVarListChunk = ku_dynamic_cast<ColumnChunk*, VarListColumnChunk*>(newColumnChunk.get());
newVarListChunk->getDataColumnChunk()->resize(totalListLen);
for (auto i = 0u; i < indicesColumnChunk->getNumValues(); i++) {
if (indicesColumnChunk->getNullChunk()->isNull(i)) {
newVarListChunk->appendEmptyList(true /* isNull */);
} else {
auto index = indicesColumnChunk->getValue<offset_t>(i);
newColumnChunk->append(this, index, 1);
}
copyListValues(vector->getValue<list_entry_t>(pos), dataVector);
}
// Move offsets, null, data from newVarListChunk to this column chunk. And release indices.
moveFromOtherChunk(newVarListChunk);
}

void AuxVarListColumnChunk::resize(uint64_t newCapacity) {
// AuxVarListColumnChunk requires twice the size of the capacity of the VarListColumnChunk.
ColumnChunk::resize(newCapacity * 2);
}

std::unique_ptr<ColumnChunk> AuxVarListColumnChunk::finalize() {
std::unique_ptr<ColumnChunk> result = ColumnChunkFactory::createColumnChunk(
dataType, enableCompression, false /* needFinalize */, capacity / 2);
KU_ASSERT(result->getDataType().getPhysicalType() == PhysicalTypeID::VAR_LIST);
auto resultVarListChunk = reinterpret_cast<VarListColumnChunk*>(result.get());
resultVarListChunk->getNullChunk()->append(nullChunk.get(), 0, numValues);
auto resultOffsets = reinterpret_cast<offset_t*>(resultVarListChunk->getData());
resultVarListChunk->getVarListDataColumnChunk()->resizeBuffer(lastDataOffset);
auto listEntries = (list_entry_t*)buffer.get();
auto offsetInResultDataChunk = 0;
for (auto i = 0u; i < numValues; i++) {
if (listEntries[i].size > 0) {
resultVarListChunk->getDataColumnChunk()->append(
varListDataColumnChunk->dataColumnChunk.get(), listEntries[i].offset,
listEntries[i].size);
}
offsetInResultDataChunk += listEntries[i].size;
resultOffsets[i] = offsetInResultDataChunk;
}
resultVarListChunk->setNumValues(numValues);
return result;
void VarListColumnChunk::moveFromOtherChunk(VarListColumnChunk* other) {
this->buffer = std::move(other->buffer);
this->nullChunk = std::move(other->nullChunk);
this->varListDataColumnChunk = std::move(other->varListDataColumnChunk);
this->numValues = other->numValues;
// Reset indices and needFinalize.
this->indicesColumnChunk.reset();
this->needFinalize = false;
}

} // namespace storage
Expand Down

0 comments on commit a3936f2

Please sign in to comment.