Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push aggregate in LookupIndex #3504

Merged
merged 2 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ struct LookupIndexResp {
// Each column represents one property. the column name is in the form of "tag_name.prop_alias"
// or "edge_type_name.prop_alias" in the same order which specified in return_columns of request
2: optional common.DataSet data,
Nivras marked this conversation as resolved.
Show resolved Hide resolved
// stat_data only have one column, the column name is the order in LookupIndexRequest.stat_prop
3: optional common.DataSet stat_data,
Nivras marked this conversation as resolved.
Show resolved Hide resolved
}

enum ScanType {
Expand Down Expand Up @@ -546,6 +548,7 @@ struct LookupIndexRequest {
// max row count of each partition in this response
6: optional i64 limit,
7: optional list<OrderBy> order_by,
8: optional list<StatProp> stat_columns,
}


Expand Down
1 change: 1 addition & 0 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#pragma once

#include "interface/gen-cpp2/storage_types.h"
#include "meta/processors/BaseProcessor.h"

namespace nebula {
Expand Down
27 changes: 27 additions & 0 deletions src/storage/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,5 +186,32 @@ StatusOr<std::string> BaseProcessor<RESP>::encodeRowVal(const meta::NebulaSchema
return std::move(rowWrite).moveEncodedStr();
}

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::checkStatType(
const meta::SchemaProviderIf::Field& field, cpp2::StatType statType) {
// todo(doodle): how to deal with nullable fields? For now, null add anything
// is null, if there is even one null, the result will be invalid
auto fType = field.type();
switch (statType) {
case cpp2::StatType::SUM:
case cpp2::StatType::AVG:
case cpp2::StatType::MIN:
case cpp2::StatType::MAX: {
if (fType == nebula::cpp2::PropertyType::INT64 ||
fType == nebula::cpp2::PropertyType::INT32 ||
fType == nebula::cpp2::PropertyType::INT16 || fType == nebula::cpp2::PropertyType::INT8 ||
fType == nebula::cpp2::PropertyType::FLOAT ||
fType == nebula::cpp2::PropertyType::DOUBLE) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
return nebula::cpp2::ErrorCode::E_INVALID_STAT_TYPE;
}
case cpp2::StatType::COUNT: {
break;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ class BaseProcessor {

void handleAsync(GraphSpaceID spaceId, PartitionID partId, nebula::cpp2::ErrorCode code);

nebula::cpp2::ErrorCode checkStatType(const meta::SchemaProviderIf::Field& field,
cpp2::StatType statType);

StatusOr<std::string> encodeRowVal(const meta::NebulaSchemaProvider* schema,
const std::vector<std::string>& propNames,
const std::vector<Value>& props,
Expand Down
1 change: 1 addition & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ nebula_add_library(
exec/IndexDedupNode.cpp
exec/IndexEdgeScanNode.cpp
exec/IndexLimitNode.cpp
exec/IndexAggregateNode.cpp
exec/IndexProjectionNode.cpp
exec/IndexScanNode.cpp
exec/IndexSelectionNode.cpp
Expand Down
125 changes: 125 additions & 0 deletions src/storage/exec/IndexAggregateNode.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#include "storage/exec/IndexAggregateNode.h"

namespace nebula {
namespace storage {

IndexAggregateNode::IndexAggregateNode(const IndexAggregateNode& node)
: IndexNode(node), statInfos_(node.statInfos_), returnColumnsCount_(node.returnColumnsCount_) {
stats_ = node.stats_;
retColMap_ = node.retColMap_;
}

IndexAggregateNode::IndexAggregateNode(
RuntimeContext* context,
const std::vector<std::pair<std::string, cpp2::StatType>>& statInfos,
size_t returnColumnsCount)
: IndexNode(context, "IndexAggregateNode"),
statInfos_(statInfos),
returnColumnsCount_(returnColumnsCount) {}

nebula::cpp2::ErrorCode IndexAggregateNode::init(InitContext& ctx) {
Nivras marked this conversation as resolved.
Show resolved Hide resolved
DCHECK_EQ(children_.size(), 1);
for (const auto& statInfo : statInfos_) {
ctx.statColumns.insert(statInfo.first);
}
auto ret = children_[0]->init(ctx);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
return ret;
}
initStatValue();
retColMap_.clear();
retColMap_ = ctx.retColMap;
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

void IndexAggregateNode::initStatValue() {
stats_.clear();
if (statInfos_.size() > 0) {
stats_.reserve(statInfos_.size());
for (const auto& statInfo : statInfos_) {
stats_.emplace_back(statInfo.second);
}
}
}

void IndexAggregateNode::addStatValue(const Value& value, ColumnStat* stat) {
switch (stat->statType_) {
case cpp2::StatType::SUM: {
stat->sum_ = stat->sum_ + value;
break;
}
case cpp2::StatType::COUNT: {
stat->count_ = stat->count_ + 1;
break;
}
case cpp2::StatType::MAX: {
stat->max_ = value > stat->max_ ? value : stat->max_;
break;
}
case cpp2::StatType::MIN: {
stat->min_ = value < stat->min_ ? value : stat->min_;
break;
}
default:
LOG(ERROR) << "get invalid stat type";
return;
}
}

Row IndexAggregateNode::project(Row&& row) {
Row ret;
ret.reserve(returnColumnsCount_);
for (size_t i = 0; i < returnColumnsCount_; i++) {
ret.emplace_back(std::move(row[i]));
}
return ret;
}

Row IndexAggregateNode::calculateStats() {
Row result;
result.values.reserve(stats_.size());
for (const auto& stat : stats_) {
if (stat.statType_ == cpp2::StatType::SUM) {
result.values.emplace_back(stat.sum_);
} else if (stat.statType_ == cpp2::StatType::COUNT) {
result.values.emplace_back(stat.count_);
} else if (stat.statType_ == cpp2::StatType::MAX) {
result.values.emplace_back(stat.max_);
} else if (stat.statType_ == cpp2::StatType::MIN) {
result.values.emplace_back(stat.min_);
}
}
return result;
}

IndexNode::Result IndexAggregateNode::doNext() {
DCHECK_EQ(children_.size(), 1);
auto& child = *children_[0];
Result result = child.next();
const auto& row = result.row();
if (result.hasData()) {
for (size_t i = 0; i < statInfos_.size(); i++) {
const auto& columnName = statInfos_[i].first;
addStatValue(row[retColMap_[columnName]], &stats_[i]);
}
result = Result(project(std::move(result).row()));
}
return result;
}

std::unique_ptr<IndexNode> IndexAggregateNode::copy() {
return std::make_unique<IndexAggregateNode>(*this);
}

std::string IndexAggregateNode::identify() {
return "";
}

} // namespace storage

} // namespace nebula
49 changes: 49 additions & 0 deletions src/storage/exec/IndexAggregateNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once
#include "storage/exec/IndexNode.h"

namespace nebula {
namespace storage {

// used to save stat value for each column
struct ColumnStat {
ColumnStat() = default;

explicit ColumnStat(const cpp2::StatType& statType) : statType_(statType) {}

cpp2::StatType statType_;
mutable Value sum_ = 0L;
mutable Value count_ = 0L;
mutable Value min_ = std::numeric_limits<int64_t>::max();
mutable Value max_ = std::numeric_limits<int64_t>::min();
};

class IndexAggregateNode : public IndexNode {
public:
IndexAggregateNode(const IndexAggregateNode& node);
explicit IndexAggregateNode(RuntimeContext* context,
const std::vector<std::pair<std::string, cpp2::StatType>>& statInfos,
size_t returnColumnsCount);

nebula::cpp2::ErrorCode init(InitContext& ctx) override;
void initStatValue();
void addStatValue(const Value& value, ColumnStat* stat);
Row project(Row&& row);
Row calculateStats();

std::unique_ptr<IndexNode> copy() override;
std::string identify() override;

private:
Result doNext() override;
std::vector<std::pair<std::string, cpp2::StatType>> statInfos_;
std::vector<ColumnStat> stats_;
Map<std::string, size_t> retColMap_;
size_t returnColumnsCount_;
};

} // namespace storage
} // namespace nebula
89 changes: 89 additions & 0 deletions src/storage/exec/IndexExprContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include "common/expression/Expression.h"
#include "storage/exec/IndexNode.h"

namespace nebula {
namespace storage {

class IndexExprContext : public ExpressionContext {
public:
explicit IndexExprContext(const Map<std::string, size_t> &colPos) : colPos_(colPos) {}
void setRow(const Row &row) {
row_ = &row;
}

Value getEdgeProp(const std::string &edgeType, const std::string &prop) const override {
UNUSED(edgeType);
DCHECK(row_ != nullptr);
auto iter = colPos_.find(prop);
DCHECK(iter != colPos_.end());
DCHECK(iter->second < row_->size());
return (*row_)[iter->second];
}

Value getTagProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag);
DCHECK(row_ != nullptr);
auto iter = colPos_.find(prop);
DCHECK(iter != colPos_.end());
DCHECK(iter->second < row_->size());
return (*row_)[iter->second];
}

// override
const Value &getVar(const std::string &var) const override {
UNUSED(var);
return fatal(__FILE__, __LINE__);
}
const Value &getVersionedVar(const std::string &var, int64_t version) const override {
UNUSED(var), UNUSED(version);
return fatal(__FILE__, __LINE__);
}
const Value &getVarProp(const std::string &var, const std::string &prop) const override {
UNUSED(var), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
Value getSrcProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
const Value &getDstProp(const std::string &tag, const std::string &prop) const override {
UNUSED(tag), UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
const Value &getInputProp(const std::string &prop) const override {
UNUSED(prop);
return fatal(__FILE__, __LINE__);
}
Value getVertex(const std::string &) const override {
return fatal(__FILE__, __LINE__);
}
Value getEdge() const override {
return fatal(__FILE__, __LINE__);
}
Value getColumn(int32_t index) const override {
UNUSED(index);
return fatal(__FILE__, __LINE__);
}
void setVar(const std::string &var, Value val) override {
UNUSED(var), UNUSED(val);
fatal(__FILE__, __LINE__);
}

private:
const Map<std::string, size_t> &colPos_;
const Row *row_;
inline const Value &fatal(const std::string &file, int line) const {
LOG(FATAL) << "Unexpect at " << file << ":" << line;
static Value placeholder;
return placeholder;
}
};

} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/storage/exec/IndexNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct InitContext {
std::vector<std::string> returnColumns;
// The index of name in `returncolumns`
Map<std::string, size_t> retColMap;
// The columns in statColumns
// TODO(nivras) need refactor this, put statColumns in returnColumns
Set<std::string> statColumns;
Nivras marked this conversation as resolved.
Show resolved Hide resolved
};

class IndexNode {
Expand Down
6 changes: 6 additions & 0 deletions src/storage/exec/IndexProjectionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ nebula::cpp2::ErrorCode IndexProjectionNode::init(InitContext& ctx) {
for (auto& col : requiredColumns_) {
ctx.requiredColumns.insert(col);
}
for (auto& col : ctx.statColumns) {
if (ctx.requiredColumns.find(col) == ctx.requiredColumns.end()) {
ctx.requiredColumns.insert(col);
requiredColumns_.push_back(col);
}
}
auto ret = children_[0]->init(ctx);
if (UNLIKELY(ret != ::nebula::cpp2::ErrorCode::SUCCEEDED)) {
return ret;
Expand Down
Loading