Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable compression for INTERNAL_ID #3116

Merged
merged 1 commit into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.2.2 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.2.3 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct PageCursor;

// Returns the size of the data type in bytes
uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType);
uint32_t getDataTypeSizeInChunk(const common::PhysicalTypeID& dataType);

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression.
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class InternalIDColumn : public Column {
public:
InternalIDColumn(std::string name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats);
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

inline void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ static void getRelColumnNamesInCopyOrder(TableCatalogEntry* tableEntry,
columnNames.emplace_back(InternalKeyword::SRC_OFFSET);
columnNames.emplace_back(InternalKeyword::DST_OFFSET);
columnNames.emplace_back(InternalKeyword::ROW_OFFSET);
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
auto& properties = tableEntry->getPropertiesRef();
for (auto i = 1u; i < properties.size(); ++i) { // skip internal ID
columnNames.push_back(properties[i].getName());
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ void Partitioner::copyDataToPartitions(partition_idx_t partitioningIdx, DataChun
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom.getNumValueVectors());
for (auto j = 0u; j < chunkToCopyFrom.getNumValueVectors(); j++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom.getValueVector(j)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
partition.chunks.back().emplace_back(
ColumnChunkFactory::createColumnChunk(infos[partitioningIdx]->columnTypes[j],
false /*enableCompression*/, Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom.getNumValueVectors());
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/rel_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ std::vector<offset_t> RelBatchInsert::populateStartCSROffsetsAndLengths(ChunkedC
}

void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t startOffset) {
KU_ASSERT(chunk.getDataType().getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(chunk.getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
auto offsets = (offset_t*)chunk.getData();
for (auto i = 0u; i < chunk.getNumValues(); i++) {
offsets[i] -= startOffset;
Expand All @@ -145,7 +145,7 @@ void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t sta

void RelBatchInsert::setOffsetFromCSROffsets(
ColumnChunk* nodeOffsetChunk, ColumnChunk* csrOffsetChunk) {
KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
for (auto i = 0u; i < nodeOffsetChunk->getNumValues(); i++) {
auto nodeOffset = nodeOffsetChunk->getValue<offset_t>(i);
auto csrOffset = csrOffsetChunk->getValue<offset_t>(nodeOffset);
Expand Down
15 changes: 12 additions & 3 deletions src/storage/compression/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
return 0;
}
switch (dataType.getPhysicalType()) {
return getDataTypeSizeInChunk(dataType.getPhysicalType());
}

uint32_t getDataTypeSizeInChunk(const common::PhysicalTypeID& dataType) {
switch (dataType) {
case PhysicalTypeID::STRUCT: {
return 0;
}
Expand All @@ -34,7 +38,7 @@ uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) {
return sizeof(offset_t);
}
default: {
auto size = StorageUtils::getDataTypeSize(dataType);
auto size = PhysicalTypeUtils::getFixedTypeSize(dataType);
KU_ASSERT(size <= BufferPoolConstants::PAGE_4KB_SIZE);
return size;
}
Expand Down Expand Up @@ -66,7 +70,7 @@ bool CompressionMetadata::canUpdateInPlace(
switch (compression) {
case CompressionType::CONSTANT: {
// Value can be updated in place only if it is identical to the value already stored.
auto size = PhysicalTypeUtils::getFixedTypeSize(physicalType);
auto size = getDataTypeSizeInChunk(physicalType);
return memcmp(data + pos * size, this->data.data(), size) == 0;
}
case CompressionType::BOOLEAN_BITPACKING:
Expand Down Expand Up @@ -95,6 +99,7 @@ bool CompressionMetadata::canUpdateInPlace(
return IntegerBitpacking<int8_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
auto value = reinterpret_cast<const uint64_t*>(data)[pos];
Expand Down Expand Up @@ -150,6 +155,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, const LogicalType& da
return IntegerBitpacking<int16_t>::numValues(pageSize, BitpackHeader::readHeader(data));
case PhysicalTypeID::INT8:
return IntegerBitpacking<int8_t>::numValues(pageSize, BitpackHeader::readHeader(data));
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64:
return IntegerBitpacking<uint64_t>::numValues(
Expand Down Expand Up @@ -629,6 +635,7 @@ void ReadCompressedValuesFromPageToVector::operator()(const uint8_t* frame, Page
return IntegerBitpacking<int8_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -689,6 +696,7 @@ void ReadCompressedValuesFromPage::operator()(const uint8_t* frame, PageCursor&
return IntegerBitpacking<int8_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
result, startPosInResult, numValuesToRead, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -750,6 +758,7 @@ void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame
return IntegerBitpacking<int8_t>().setValuesFromUncompressed(
data, dataOffset, frame, posInFrame, numValues, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().setValuesFromUncompressed(
Expand Down
6 changes: 3 additions & 3 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ class SerialColumn final : public Column {

InternalIDColumn::InternalIDColumn(std::string name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats)
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression)
: Column{name, *LogicalType::INTERNAL_ID(), metaDAHeaderInfo, dataFH, metadataFH, bufferManager,
wal, transaction, stats, false /*enableCompression*/},
wal, transaction, stats, enableCompression},
commonTableID{INVALID_TABLE_ID} {}

void InternalIDColumn::populateCommonTableID(ValueVector* resultVector) const {
Expand Down Expand Up @@ -914,7 +914,7 @@ std::unique_ptr<Column> ColumnFactory::createColumn(std::string name, LogicalTyp
}
case LogicalTypeID::INTERNAL_ID: {
return std::make_unique<InternalIDColumn>(name, metaDAHeaderInfo, dataFH, metadataFH,
bufferManager, wal, transaction, propertyStatistics);
bufferManager, wal, transaction, propertyStatistics, enableCompression);
}
case LogicalTypeID::BLOB:
case LogicalTypeID::STRING: {
Expand Down
37 changes: 30 additions & 7 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
case PhysicalTypeID::INT8: {
return std::make_shared<IntegerBitpacking<int8_t>>();
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return std::make_shared<IntegerBitpacking<uint64_t>>();
Expand Down Expand Up @@ -181,6 +182,7 @@
case PhysicalTypeID::INT32:
case PhysicalTypeID::INT16:
case PhysicalTypeID::INT8:
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64:
case PhysicalTypeID::UINT32:
Expand Down Expand Up @@ -239,7 +241,7 @@

void ColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity multiplicity) {
KU_ASSERT(chunk->dataType.getPhysicalType() == dataType.getPhysicalType() &&
dstOffsets->dataType.getPhysicalType() == PhysicalTypeID::INT64 &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto dstOffset = dstOffsets->getValue<offset_t>(i);
Expand Down Expand Up @@ -453,7 +455,7 @@
void BoolColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::BOOL &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto dstOffset = dstOffsets->getValue<offset_t>(i);
Expand Down Expand Up @@ -525,18 +527,28 @@
class InternalIDColumnChunk final : public ColumnChunk {
public:
// Physically, we only materialize offset of INTERNAL_ID, which is same as UINT64,
explicit InternalIDColumnChunk(uint64_t capacity)
: ColumnChunk(*LogicalType::INT64(), capacity, false /*enableCompression*/),
explicit InternalIDColumnChunk(uint64_t capacity, bool enableCompression)
: ColumnChunk(*LogicalType::INTERNAL_ID(), capacity, enableCompression),
commonTableID{INVALID_TABLE_ID} {}

void append(ValueVector* vector, common::SelectionVector& selVector) override {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
copyVectorToBuffer(vector, numValues, selVector);
switch (vector->dataType.getPhysicalType()) {
case PhysicalTypeID::INTERNAL_ID: {
copyVectorToBuffer(vector, numValues, selVector);
} break;
case PhysicalTypeID::INT64: {
copyInt64VectorToBuffer(vector, numValues, selVector);
} break;
default: {

Check warning on line 542 in src/storage/store/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/column_chunk.cpp#L542

Added line #L542 was not covered by tests
KU_UNREACHABLE;
}
}
numValues += selVector.selectedSize;
}

void copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk,
common::SelectionVector& selVector) override {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
auto relIDsInVector = (internalID_t*)vector->getData();
if (commonTableID == INVALID_TABLE_ID) {
commonTableID = relIDsInVector[selVector.selectedPositions[0]].tableID;
Expand All @@ -550,6 +562,17 @@
}
}

void copyInt64VectorToBuffer(
ValueVector* vector, offset_t startPosInChunk, common::SelectionVector& selVector) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
for (auto i = 0u; i < selVector.selectedSize; i++) {
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
auto pos = selVector.selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + (startPosInChunk + i) * numBytesPerValue,
&vector->getValue<offset_t>(pos), numBytesPerValue);
}
}

void lookup(
offset_t offsetInChunk, ValueVector& output, sel_t posInOutputVector) const override {
KU_ASSERT(offsetInChunk < capacity);
Expand Down Expand Up @@ -609,7 +632,7 @@
}
// Physically, we only materialize offset of INTERNAL_ID, which is same as INT64,
case PhysicalTypeID::INTERNAL_ID: {
return std::make_unique<InternalIDColumnChunk>(capacity);
return std::make_unique<InternalIDColumnChunk>(capacity, enableCompression);
}
case PhysicalTypeID::STRING: {
return std::make_unique<StringColumnChunk>(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void StringColumnChunk::write(
void StringColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRING &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < chunk->getNumValues(); i++) {
auto offsetInChunk = dstOffsets->getValue<offset_t>(i);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void StructColumnChunk::write(

void StructColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity multiplicity) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT);
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto offsetInChunk = dstOffsets->getValue<offset_t>(i);
KU_ASSERT(offsetInChunk < capacity);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ void VarListColumnChunk::lookup(

void VarListColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
needFinalize = true;
if (!indicesColumnChunk) {
initializeIndices();
}
KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
chunk->getNumValues() == dstOffsets->getNumValues());
auto currentIndex = numValues;
append(chunk, 0, chunk->getNumValues());
Expand Down
Loading