Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Partitioner to use ChunkedNodeGroupCollection #3123

Merged
merged 2 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "processor/operator/sink.h"
#include "storage/store/column_chunk.h"
#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
namespace storage {
Expand All @@ -20,21 +20,15 @@ struct PartitionerFunctions {
// partitioning methods. For example, copy of rel tables require partitioning on both FWD and BWD
// direction. Each partitioning method corresponds to a PartitioningState.
struct PartitioningBuffer {
using ColumnChunkCollection = std::vector<std::unique_ptr<storage::ColumnChunk>>;
struct Partition {
// One chunk for each column, grouped into a list
// so that groups from different threads can be quickly merged without copying
// E.g. [(a,b,c), (a,b,c)] where a is a chunk for column a, b for column b, etc.
std::vector<ColumnChunkCollection> chunks;
};
std::vector<Partition> partitions;
std::vector<storage::ChunkedNodeGroupCollection> partitions;

void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

// NOTE: Currently, Partitioner is tightly coupled with RelBatchInsert. We should generalize it
// later when necessary. Here, each partition is essentially a node group.
struct BatchInsertSharedState;
struct PartitioningInfo;
struct PartitionerSharedState {
std::mutex mtx;
storage::MemoryManager* mm;
Expand All @@ -51,12 +45,12 @@ struct PartitionerSharedState {

explicit PartitionerSharedState(storage::MemoryManager* mm) : mm{mm} {}

void initialize();
void initialize(std::vector<std::unique_ptr<PartitioningInfo>>& infos);
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);

inline PartitioningBuffer::Partition& getPartitionBuffer(
inline storage::ChunkedNodeGroupCollection& getPartitionBuffer(
common::vector_idx_t partitioningIdx, common::partition_idx_t partitionIdx) {
KU_ASSERT(partitioningIdx < partitioningBuffers.size());
KU_ASSERT(partitionIdx < partitioningBuffers[partitioningIdx]->partitions.size());
Expand Down Expand Up @@ -107,7 +101,7 @@ class Partitioner : public Sink {

std::unique_ptr<PhysicalOperator> clone() final;

static void initializePartitioningStates(
static void initializePartitioningStates(std::vector<std::unique_ptr<PartitioningInfo>>& infos,
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions);

Expand All @@ -121,9 +115,6 @@ class Partitioner : public Sink {
common::partition_idx_t partitioningIdx, common::DataChunk chunkToCopyFrom);

private:
// Same size as a value vector. Each thread will allocate a chunk for each node group,
// so this should be kept relatively small to avoid allocating more memory than is needed
static const uint64_t CHUNK_SIZE = 2048;
std::vector<std::unique_ptr<PartitioningInfo>> infos;
std::shared_ptr<PartitionerSharedState> sharedState;
std::unique_ptr<PartitionerLocalState> localState;
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/persistent/index_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,16 @@ class IndexBuilder {
IndexBuilder clone() { return IndexBuilder(sharedState); }

void insert(
storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes);
const storage::ColumnChunk& chunk, common::offset_t nodeOffset, common::offset_t numNodes);

ProducerToken getProducerToken() const { return ProducerToken(sharedState); }

void finishedProducing();
void finalize(ExecutionContext* context);

private:
void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes);
void checkNonNullConstraint(
const storage::NullColumnChunk& nullChunk, common::offset_t numNodes);
std::shared_ptr<IndexBuilderSharedState> sharedState;

IndexBuilderLocalBuffers localBuffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include "processor/operator/call/in_query_call.h"
#include "processor/operator/persistent/batch_insert.h"
#include "processor/operator/persistent/index_builder.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table.h"

namespace kuzu {
Expand Down
20 changes: 10 additions & 10 deletions src/include/processor/operator/persistent/rel_batch_insert.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@
#include "common/enums/rel_direction.h"
#include "processor/operator/partitioner.h"
#include "processor/operator/persistent/batch_insert.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/column_chunk.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace processor {

struct RelBatchInsertInfo final : public BatchInsertInfo {
common::RelDataDirection direction;
uint64_t partitioningIdx;
common::vector_idx_t offsetVectorIdx;
common::column_id_t offsetColumnID;
std::vector<common::LogicalType> columnTypes;

RelBatchInsertInfo(catalog::TableCatalogEntry* tableEntry, bool compressionEnabled,
common::RelDataDirection direction, uint64_t partitioningIdx,
common::vector_idx_t offsetVectorIdx, std::vector<common::LogicalType> columnTypes)
common::column_id_t offsetColumnID, std::vector<common::LogicalType> columnTypes)
: BatchInsertInfo{tableEntry, compressionEnabled}, direction{direction},
partitioningIdx{partitioningIdx}, offsetVectorIdx{offsetVectorIdx}, columnTypes{std::move(
columnTypes)} {}
partitioningIdx{partitioningIdx}, offsetColumnID{offsetColumnID}, columnTypes{std::move(
columnTypes)} {}
RelBatchInsertInfo(const RelBatchInsertInfo& other)
: BatchInsertInfo{other.tableEntry, other.compressionEnabled}, direction{other.direction},
partitioningIdx{other.partitioningIdx}, offsetVectorIdx{other.offsetVectorIdx},
partitioningIdx{other.partitioningIdx}, offsetColumnID{other.offsetColumnID},
columnTypes{common::LogicalType::copy(other.columnTypes)} {}

inline std::unique_ptr<BatchInsertInfo> copy() const override {
Expand Down Expand Up @@ -60,20 +60,20 @@ class RelBatchInsert final : public BatchInsert {
}

private:
void prepareCSRNodeGroup(PartitioningBuffer::Partition& partition,
common::offset_t startNodeOffset, common::vector_idx_t offsetVectorIdx,
void prepareCSRNodeGroup(storage::ChunkedNodeGroupCollection& partition,
common::offset_t startNodeOffset, common::column_id_t offsetColumnID,
common::offset_t numNodes);

static common::length_t getGapSize(common::length_t length);
static std::vector<common::offset_t> populateStartCSROffsetsAndLengths(
storage::ChunkedCSRHeader& csrHeader, common::offset_t numNodes,
PartitioningBuffer::Partition& partition, common::vector_idx_t offsetVectorIdx);
storage::ChunkedNodeGroupCollection& partition, common::column_id_t offsetColumnID);
static void populateEndCSROffsets(
storage::ChunkedCSRHeader& csrHeader, std::vector<common::offset_t>& gaps);
static void setOffsetToWithinNodeGroup(
storage::ColumnChunk& chunk, common::offset_t startOffset);
static void setOffsetFromCSROffsets(
storage::ColumnChunk* nodeOffsetChunk, storage::ColumnChunk* csrOffsetChunk);
storage::ColumnChunk& nodeOffsetChunk, storage::ColumnChunk& csrOffsetChunk);

// We only check rel multiplcity constraint (MANY_ONE, ONE_ONE) for now.
std::optional<common::offset_t> checkRelMultiplicityConstraint(
Expand Down
16 changes: 7 additions & 9 deletions src/include/storage/local_storage/local_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
#include "common/enums/rel_multiplicity.h"
#include "common/enums/table_type.h"
#include "common/vector/value_vector.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group_collection.h"

namespace kuzu {
namespace storage {
Expand All @@ -22,16 +22,14 @@ using ChunkCollection = std::vector<ColumnChunk*>;

class LocalChunkedGroupCollection {
public:
static constexpr uint64_t CHUNK_CAPACITY = 2048;

explicit LocalChunkedGroupCollection(std::vector<common::LogicalType> dataTypes)
: dataTypes{std::move(dataTypes)}, numRows{0} {}
: dataTypes{std::move(dataTypes)}, chunkedGroups{this->dataTypes}, numRows{0} {}
DELETE_COPY_DEFAULT_MOVE(LocalChunkedGroupCollection);

static inline std::pair<uint32_t, uint64_t> getChunkIdxAndOffsetInChunk(
common::row_idx_t rowIdx) {
return std::make_pair(rowIdx / LocalChunkedGroupCollection::CHUNK_CAPACITY,
rowIdx % LocalChunkedGroupCollection::CHUNK_CAPACITY);
return std::make_pair(rowIdx / ChunkedNodeGroupCollection::CHUNK_CAPACITY,
rowIdx % ChunkedNodeGroupCollection::CHUNK_CAPACITY);
}

inline common::row_idx_t getRowIdxFromOffset(common::offset_t offset) {
Expand Down Expand Up @@ -82,7 +80,7 @@ class LocalChunkedGroupCollection {
inline ChunkCollection getLocalChunk(common::column_id_t columnID) {
ChunkCollection localChunkCollection;
for (auto& chunkedGroup : chunkedGroups.getChunkedGroups()) {
localChunkCollection.push_back(chunkedGroup->getColumnChunkUnsafe(columnID));
localChunkCollection.push_back(&chunkedGroup->getColumnChunkUnsafe(columnID));
}
return localChunkCollection;
}
Expand All @@ -91,10 +89,10 @@ class LocalChunkedGroupCollection {
common::row_idx_t append(std::vector<common::ValueVector*> vectors);

private:
ChunkedNodeGroupCollection chunkedGroups;
std::vector<common::LogicalType> dataTypes;
storage::ChunkedNodeGroupCollection chunkedGroups;
// The offset here can either be nodeOffset ( for node table) or relOffset (for rel table).
offset_to_row_idx_t offsetToRowIdx;
std::vector<common::LogicalType> dataTypes;
common::row_idx_t numRows;

// Only used for rel tables. Should be moved out later.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ class ChunkedNodeGroup {
KU_ASSERT(columnID < chunks.size());
return *chunks[columnID];
}
inline ColumnChunk* getColumnChunkUnsafe(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return chunks[columnID].get();
}
inline const ColumnChunk& getColumnChunk(common::column_id_t columnID) {
inline ColumnChunk& getColumnChunkUnsafe(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return *chunks[columnID];
}
inline std::vector<std::unique_ptr<ColumnChunk>>& getColumnChunksUnsafe() { return chunks; }
inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; }

void resetToEmpty();
Expand All @@ -39,7 +36,7 @@ class ChunkedNodeGroup {
void resizeChunks(uint64_t newSize);

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
common::DataChunkState* columnState, uint64_t numValuesToAppend);
common::SelectionVector& selVector, uint64_t numValuesToAppend);
common::offset_t append(ChunkedNodeGroup* other, common::offset_t offsetInOtherNodeGroup);
void write(std::vector<std::unique_ptr<ColumnChunk>>& data, common::vector_idx_t offsetVector);

Expand Down Expand Up @@ -98,32 +95,6 @@ class ChunkedCSRNodeGroup : public ChunkedNodeGroup {
ChunkedCSRHeader csrHeader;
};

class ChunkedNodeGroupCollection {
public:
ChunkedNodeGroupCollection() {}

inline const std::vector<std::unique_ptr<ChunkedNodeGroup>>& getChunkedGroups() const {
return chunkedGroups;
}
inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline uint64_t getNumChunks() const { return chunkedGroups.size(); }
void append(std::unique_ptr<ChunkedNodeGroup> chunkedGroup);

private:
// Assert that all chunked node groups have the same num columns and same data types.
bool sanityCheckForAppend();

private:
std::vector<std::unique_ptr<ChunkedNodeGroup>> chunkedGroups;
};

struct NodeGroupFactory {
static inline std::unique_ptr<ChunkedNodeGroup> createNodeGroup(
common::ColumnDataFormat dataFormat, const std::vector<common::LogicalType>& columnTypes,
Expand Down
40 changes: 40 additions & 0 deletions src/include/storage/store/chunked_node_group_collection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "storage/store/chunked_node_group.h"

namespace kuzu {
namespace storage {

class ChunkedNodeGroupCollection {
public:
static constexpr uint64_t CHUNK_CAPACITY = 2048;

explicit ChunkedNodeGroupCollection(std::vector<common::LogicalType> types)
: types{std::move(types)} {}
DELETE_COPY_DEFAULT_MOVE(ChunkedNodeGroupCollection);

inline const std::vector<std::unique_ptr<ChunkedNodeGroup>>& getChunkedGroups() const {
return chunkedGroups;
}
inline const ChunkedNodeGroup* getChunkedGroup(uint64_t groupIdx) const {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline ChunkedNodeGroup* getChunkedGroupUnsafe(uint64_t groupIdx) {
KU_ASSERT(groupIdx < chunkedGroups.size());
return chunkedGroups[groupIdx].get();
}
inline uint64_t getNumChunks() const { return chunkedGroups.size(); }

void append(
const std::vector<common::ValueVector*>& vectors, const common::SelectionVector& selVector);
void append(std::unique_ptr<ChunkedNodeGroup> chunkedGroup);
void merge(ChunkedNodeGroupCollection& chunkedGroupCollection);

private:
std::vector<common::LogicalType> types;
std::vector<std::unique_ptr<ChunkedNodeGroup>> chunkedGroups;
};

} // namespace storage
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/storage/store/column_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ColumnChunk {
}

inline NullColumnChunk* getNullChunk() { return nullChunk.get(); }
inline const NullColumnChunk& getNullChunk() const { return *nullChunk; }
inline common::LogicalType& getDataType() { return dataType; }
inline const common::LogicalType& getDataType() const { return dataType; }

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/node_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "common/cast.h"
#include "storage/index/hash_index.h"
#include "storage/stats/nodes_store_statistics.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/node_table_data.h"
#include "storage/store/table.h"

Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/rel_table_data.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "common/enums/rel_direction.h"
#include "storage/store/node_group.h"
#include "storage/store/chunked_node_group.h"
#include "storage/store/table_data.h"

namespace kuzu {
Expand Down
2 changes: 1 addition & 1 deletion src/include/storage/store/table_data.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "storage/store/chunked_node_group.h"
#include "storage/store/column.h"
#include "storage/store/node_group.h"

namespace kuzu {
namespace storage {
Expand Down
Loading
Loading