Skip to content

Commit

Permalink
enable compression for INTERNAL_ID
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Mar 22, 2024
1 parent 6d39076 commit 2df56a2
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 26 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.15)

project(Kuzu VERSION 0.3.2.2 LANGUAGES CXX C)
project(Kuzu VERSION 0.3.2.3 LANGUAGES CXX C)

find_package(Threads REQUIRED)

Expand Down
1 change: 1 addition & 0 deletions src/include/storage/compression/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct PageCursor;

// Returns the size of the data type in bytes
uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType);
uint32_t getDataTypeSizeInChunk(const common::PhysicalTypeID& dataType);

// Compression type is written to the data header both so we can usually catch issues when we
// decompress uncompressed data by mistake, and to allow for runtime-configurable compression.
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ class InternalIDColumn : public Column {
public:
InternalIDColumn(std::string name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats);
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

inline void scan(transaction::Transaction* transaction, common::ValueVector* nodeIDVector,
common::ValueVector* resultVector) override {
Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ static void getRelColumnNamesInCopyOrder(TableCatalogEntry* tableEntry,
columnNames.emplace_back(InternalKeyword::SRC_OFFSET);
columnNames.emplace_back(InternalKeyword::DST_OFFSET);
columnNames.emplace_back(InternalKeyword::ROW_OFFSET);
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INT64));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
columnTypes.emplace_back(LogicalType(LogicalTypeID::INTERNAL_ID));
auto& properties = tableEntry->getPropertiesRef();
for (auto i = 1u; i < properties.size(); ++i) { // skip internal ID
columnNames.push_back(properties[i].getName());
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ void Partitioner::copyDataToPartitions(partition_idx_t partitioningIdx, DataChun
partition.chunks.emplace_back();
partition.chunks.back().reserve(chunkToCopyFrom.getNumValueVectors());
for (auto j = 0u; j < chunkToCopyFrom.getNumValueVectors(); j++) {
partition.chunks.back().emplace_back(ColumnChunkFactory::createColumnChunk(
chunkToCopyFrom.getValueVector(j)->dataType, false /*enableCompression*/,
Partitioner::CHUNK_SIZE));
partition.chunks.back().emplace_back(
ColumnChunkFactory::createColumnChunk(infos[partitioningIdx]->columnTypes[j],
false /*enableCompression*/, Partitioner::CHUNK_SIZE));
}
}
KU_ASSERT(partition.chunks.back().size() == chunkToCopyFrom.getNumValueVectors());
Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/persistent/rel_batch_insert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ std::vector<offset_t> RelBatchInsert::populateStartCSROffsetsAndLengths(ChunkedC
}

void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t startOffset) {
KU_ASSERT(chunk.getDataType().getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(chunk.getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
auto offsets = (offset_t*)chunk.getData();
for (auto i = 0u; i < chunk.getNumValues(); i++) {
offsets[i] -= startOffset;
Expand All @@ -145,7 +145,7 @@ void RelBatchInsert::setOffsetToWithinNodeGroup(ColumnChunk& chunk, offset_t sta

void RelBatchInsert::setOffsetFromCSROffsets(
ColumnChunk* nodeOffsetChunk, ColumnChunk* csrOffsetChunk) {
KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(nodeOffsetChunk->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
for (auto i = 0u; i < nodeOffsetChunk->getNumValues(); i++) {
auto nodeOffset = nodeOffsetChunk->getValue<offset_t>(i);
auto csrOffset = csrOffsetChunk->getValue<offset_t>(nodeOffset);
Expand Down
15 changes: 12 additions & 3 deletions src/storage/compression/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) {
if (dataType.getLogicalTypeID() == LogicalTypeID::SERIAL) {
return 0;
}
switch (dataType.getPhysicalType()) {
return getDataTypeSizeInChunk(dataType.getPhysicalType());
}

uint32_t getDataTypeSizeInChunk(const common::PhysicalTypeID& dataType) {
switch (dataType) {
case PhysicalTypeID::STRUCT: {
return 0;
}
Expand All @@ -34,7 +38,7 @@ uint32_t getDataTypeSizeInChunk(const common::LogicalType& dataType) {
return sizeof(offset_t);
}
default: {
auto size = StorageUtils::getDataTypeSize(dataType);
auto size = PhysicalTypeUtils::getFixedTypeSize(dataType);
KU_ASSERT(size <= BufferPoolConstants::PAGE_4KB_SIZE);
return size;
}
Expand Down Expand Up @@ -66,7 +70,7 @@ bool CompressionMetadata::canUpdateInPlace(
switch (compression) {
case CompressionType::CONSTANT: {
// Value can be updated in place only if it is identical to the value already stored.
auto size = PhysicalTypeUtils::getFixedTypeSize(physicalType);
auto size = getDataTypeSizeInChunk(physicalType);
return memcmp(data + pos * size, this->data.data(), size) == 0;
}
case CompressionType::BOOLEAN_BITPACKING:
Expand Down Expand Up @@ -95,6 +99,7 @@ bool CompressionMetadata::canUpdateInPlace(
return IntegerBitpacking<int8_t>::canUpdateInPlace(
value, BitpackHeader::readHeader(this->data));
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
auto value = reinterpret_cast<const uint64_t*>(data)[pos];
Expand Down Expand Up @@ -150,6 +155,7 @@ uint64_t CompressionMetadata::numValues(uint64_t pageSize, const LogicalType& da
return IntegerBitpacking<int16_t>::numValues(pageSize, BitpackHeader::readHeader(data));
case PhysicalTypeID::INT8:
return IntegerBitpacking<int8_t>::numValues(pageSize, BitpackHeader::readHeader(data));
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64:
return IntegerBitpacking<uint64_t>::numValues(
Expand Down Expand Up @@ -629,6 +635,7 @@ void ReadCompressedValuesFromPageToVector::operator()(const uint8_t* frame, Page
return IntegerBitpacking<int8_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
resultVector->getData(), posInVector, numValuesToRead, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -689,6 +696,7 @@ void ReadCompressedValuesFromPage::operator()(const uint8_t* frame, PageCursor&
return IntegerBitpacking<int8_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
result, startPosInResult, numValuesToRead, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().decompressFromPage(frame, pageCursor.elemPosInPage,
Expand Down Expand Up @@ -750,6 +758,7 @@ void WriteCompressedValuesToPage::operator()(uint8_t* frame, uint16_t posInFrame
return IntegerBitpacking<int8_t>().setValuesFromUncompressed(
data, dataOffset, frame, posInFrame, numValues, metadata);
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return IntegerBitpacking<uint64_t>().setValuesFromUncompressed(
Expand Down
6 changes: 3 additions & 3 deletions src/storage/store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,9 @@ class SerialColumn final : public Column {

InternalIDColumn::InternalIDColumn(std::string name, const MetadataDAHInfo& metaDAHeaderInfo,
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats)
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression)
: Column{name, *LogicalType::INTERNAL_ID(), metaDAHeaderInfo, dataFH, metadataFH, bufferManager,
wal, transaction, stats, false /*enableCompression*/},
wal, transaction, stats, enableCompression},
commonTableID{INVALID_TABLE_ID} {}

void InternalIDColumn::populateCommonTableID(ValueVector* resultVector) const {
Expand Down Expand Up @@ -914,7 +914,7 @@ std::unique_ptr<Column> ColumnFactory::createColumn(std::string name, LogicalTyp
}
case LogicalTypeID::INTERNAL_ID: {
return std::make_unique<InternalIDColumn>(name, metaDAHeaderInfo, dataFH, metadataFH,
bufferManager, wal, transaction, propertyStatistics);
bufferManager, wal, transaction, propertyStatistics, enableCompression);
}
case LogicalTypeID::BLOB:
case LogicalTypeID::STRING: {
Expand Down
37 changes: 30 additions & 7 deletions src/storage/store/column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ static std::shared_ptr<CompressionAlg> getCompression(
case PhysicalTypeID::INT8: {
return std::make_shared<IntegerBitpacking<int8_t>>();
}
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64: {
return std::make_shared<IntegerBitpacking<uint64_t>>();
Expand Down Expand Up @@ -181,6 +182,7 @@ void ColumnChunk::initializeFunction() {
case PhysicalTypeID::INT32:
case PhysicalTypeID::INT16:
case PhysicalTypeID::INT8:
case PhysicalTypeID::INTERNAL_ID:
case PhysicalTypeID::VAR_LIST:
case PhysicalTypeID::UINT64:
case PhysicalTypeID::UINT32:
Expand Down Expand Up @@ -239,7 +241,7 @@ void ColumnChunk::lookup(

void ColumnChunk::write(ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity multiplicity) {
KU_ASSERT(chunk->dataType.getPhysicalType() == dataType.getPhysicalType() &&
dstOffsets->dataType.getPhysicalType() == PhysicalTypeID::INT64 &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto dstOffset = dstOffsets->getValue<offset_t>(i);
Expand Down Expand Up @@ -453,7 +455,7 @@ void BoolColumnChunk::lookup(
void BoolColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::BOOL &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto dstOffset = dstOffsets->getValue<offset_t>(i);
Expand Down Expand Up @@ -525,18 +527,28 @@ void NullColumnChunk::append(
class InternalIDColumnChunk final : public ColumnChunk {
public:
// Physically, we only materialize offset of INTERNAL_ID, which is same as UINT64,
explicit InternalIDColumnChunk(uint64_t capacity)
: ColumnChunk(*LogicalType::INT64(), capacity, false /*enableCompression*/),
explicit InternalIDColumnChunk(uint64_t capacity, bool enableCompression)
: ColumnChunk(*LogicalType::INTERNAL_ID(), capacity, enableCompression),
commonTableID{INVALID_TABLE_ID} {}

void append(ValueVector* vector, common::SelectionVector& selVector) override {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
copyVectorToBuffer(vector, numValues, selVector);
switch (vector->dataType.getPhysicalType()) {
case PhysicalTypeID::INTERNAL_ID: {
copyVectorToBuffer(vector, numValues, selVector);
} break;
case PhysicalTypeID::INT64: {
copyInt64VectorToBuffer(vector, numValues, selVector);
} break;
default: {

Check warning on line 542 in src/storage/store/column_chunk.cpp

View check run for this annotation

Codecov / codecov/patch

src/storage/store/column_chunk.cpp#L542

Added line #L542 was not covered by tests
KU_UNREACHABLE;
}
}
numValues += selVector.selectedSize;
}

void copyVectorToBuffer(ValueVector* vector, offset_t startPosInChunk,
common::SelectionVector& selVector) override {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
auto relIDsInVector = (internalID_t*)vector->getData();
if (commonTableID == INVALID_TABLE_ID) {
commonTableID = relIDsInVector[selVector.selectedPositions[0]].tableID;
Expand All @@ -550,6 +562,17 @@ class InternalIDColumnChunk final : public ColumnChunk {
}
}

void copyInt64VectorToBuffer(
ValueVector* vector, offset_t startPosInChunk, common::SelectionVector& selVector) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
for (auto i = 0u; i < selVector.selectedSize; i++) {
auto pos = selVector.selectedPositions[i];
nullChunk->setNull(startPosInChunk + i, vector->isNull(pos));
memcpy(buffer.get() + (startPosInChunk + i) * numBytesPerValue,
&vector->getValue<offset_t>(pos), numBytesPerValue);
}
}

void lookup(
offset_t offsetInChunk, ValueVector& output, sel_t posInOutputVector) const override {
KU_ASSERT(offsetInChunk < capacity);
Expand Down Expand Up @@ -609,7 +632,7 @@ std::unique_ptr<ColumnChunk> ColumnChunkFactory::createColumnChunk(
}
// Physically, we only materialize offset of INTERNAL_ID, which is same as INT64,
case PhysicalTypeID::INTERNAL_ID: {
return std::make_unique<InternalIDColumnChunk>(capacity);
return std::make_unique<InternalIDColumnChunk>(capacity, enableCompression);
}
case PhysicalTypeID::STRING: {
return std::make_unique<StringColumnChunk>(
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/string_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void StringColumnChunk::write(
void StringColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRING &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID &&
chunk->getNumValues() == dstOffsets->getNumValues());
for (auto i = 0u; i < chunk->getNumValues(); i++) {
auto offsetInChunk = dstOffsets->getValue<offset_t>(i);
Expand Down
3 changes: 2 additions & 1 deletion src/storage/store/struct_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void StructColumnChunk::write(

void StructColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity multiplicity) {
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT);
KU_ASSERT(chunk->getDataType().getPhysicalType() == PhysicalTypeID::STRUCT &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
for (auto i = 0u; i < dstOffsets->getNumValues(); i++) {
auto offsetInChunk = dstOffsets->getValue<offset_t>(i);
KU_ASSERT(offsetInChunk < capacity);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/var_list_column_chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ void VarListColumnChunk::lookup(

void VarListColumnChunk::write(
ColumnChunk* chunk, ColumnChunk* dstOffsets, RelMultiplicity /*multiplicity*/) {
KU_ASSERT(dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INTERNAL_ID);
needFinalize = true;
if (!indicesColumnChunk) {
initializeIndices();
}
KU_ASSERT(chunk->getDataType().getPhysicalType() == dataType.getPhysicalType() &&
dstOffsets->getDataType().getPhysicalType() == PhysicalTypeID::INT64 &&
chunk->getNumValues() == dstOffsets->getNumValues());
auto currentIndex = numValues;
append(chunk, 0, chunk->getNumValues());
Expand Down

0 comments on commit 2df56a2

Please sign in to comment.