Skip to content

Commit

Permalink
Cherry pick 3.3 (1013-1017) (#4735)
Browse files Browse the repository at this point in the history
* Fix aggregate expression type deduce (#4706)

* fix aggregate expression type deduce

add test case

fix tck

fix tck

* fix ut

* fix hdfs download command will always return succeeded (#4723)

* remove rebuild index guard when job finished (#4722)

* remove rebuild index guard when job finished

* fix reenter problem

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

* split experimental_feature flag (#4728)

* split experimental_feature flag

* eof

* forbid insert vertex when vertex_key flag is off (#4727)

* forbid insert vertex when vertex_key flag is off

* fix issue ent#1420

* fix test case

* fix format

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>

Co-authored-by: kyle.cao <kyle.cao@vesoft.com>
Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
Co-authored-by: Alex Xing <90179377+SuperYoko@users.noreply.github.com>
  • Loading branch information
4 people committed Oct 17, 2022
1 parent 8d63052 commit cf152be
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 39 deletions.
6 changes: 6 additions & 0 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,9 @@
########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true
6 changes: 6 additions & 0 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

########## session ##########
# Maximum number of sessions that can be created per IP and per user
--max_sessions_per_ip_per_user=300
6 changes: 6 additions & 0 deletions conf/nebula-standalone.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
# if use experimental features
--enable_experimental_feature=false

# if use toss feature, only work if enable_experimental_feature is true
--enable_toss=false

# if use balance data feature, only work if enable_experimental_feature is true
--enable_data_balance=true

######### Raft #########
# Raft election timeout
--raft_heartbeat_interval_secs=30
Expand Down
6 changes: 6 additions & 0 deletions src/common/hdfs/HdfsCommandHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Status HdfsCommandHelper::ls(const std::string& hdfsHost,
auto result = proc.wait();
if (!result.exited()) {
return Status::Error("Failed to ls hdfs");
} else if (result.exitStatus() != 0) {
LOG(INFO) << "Failed to ls: " << result.str();
return Status::Error("Failed to ls hdfs, errno: %d", result.exitStatus());
} else {
return Status::OK();
}
Expand All @@ -47,6 +50,9 @@ Status HdfsCommandHelper::copyToLocal(const std::string& hdfsHost,
auto result = proc.wait();
if (!result.exited()) {
return Status::Error("Failed to download from hdfs");
} else if (result.exitStatus() != 0) {
LOG(INFO) << "Failed to download: " << result.str();
return Status::Error("Failed to download from hdfs, errno: %d", result.exitStatus());
} else {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
return qctx()
->getStorageClient()
->addEdges(param,
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ folly::Future<Status> UpdateEdgeExecutor::execute() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
return qctx()
->getStorageClient()
->updateEdge(param,
Expand Down
4 changes: 4 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ DEFINE_bool(disable_octal_escape_char,
" in next version to ensure compatibility with cypher.");

DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");
DEFINE_bool(enable_toss, false, "Whether to enable toss feature");
DEFINE_bool(enable_data_balance, true, "Whether to enable data balance feature");

DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory");
DEFINE_int32(max_sessions_per_ip_per_user,
Expand Down Expand Up @@ -103,3 +105,5 @@ DEFINE_uint32(
gc_worker_size,
0,
"Background garbage clean workers, default number is 0 which means using hardware core size.");

DEFINE_bool(graph_use_vertex_key, false, "whether allow insert or query the vertex key");
5 changes: 5 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ DECLARE_bool(optimize_appendvertice);
DECLARE_int64(max_allowed_connections);

DECLARE_bool(enable_experimental_feature);
DECLARE_bool(enable_toss);
DECLARE_bool(enable_data_balance);

DECLARE_bool(enable_client_white_list);
DECLARE_string(client_white_list);
Expand All @@ -65,4 +67,7 @@ DECLARE_int32(max_job_size);

DECLARE_bool(enable_async_gc);
DECLARE_uint32(gc_worker_size);

DECLARE_bool(graph_use_vertex_key);

#endif // GRAPH_GRAPHFLAGS_H_
3 changes: 2 additions & 1 deletion src/graph/validator/AdminJobValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@
#include "graph/validator/AdminJobValidator.h"

#include "graph/planner/plan/Admin.h"
#include "graph/service/GraphFlags.h"

namespace nebula {
namespace graph {

Status AdminJobValidator::validateImpl() {
if (sentence_->getJobType() == meta::cpp2::JobType::DATA_BALANCE ||
sentence_->getJobType() == meta::cpp2::JobType::ZONE_BALANCE) {
if (!FLAGS_enable_experimental_feature) {
if (!(FLAGS_enable_experimental_feature && FLAGS_enable_data_balance)) {
return Status::SemanticError("Data balance not support");
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Status InsertVerticesValidator::check() {
}

auto tagItems = sentence->tagItems();
if (!FLAGS_graph_use_vertex_key && tagItems.empty()) {
return Status::SemanticError("Insert vertex is forbidden, please speicify the tag");
}

schemas_.reserve(tagItems.size());

Expand Down Expand Up @@ -206,7 +209,8 @@ Status InsertEdgesValidator::check() {
// Check validity of vertices data.
// Check edge key type, check properties value, fill to NewEdge structure.
Status InsertEdgesValidator::prepareEdges() {
auto size = FLAGS_enable_experimental_feature ? rows_.size() : rows_.size() * 2;
auto size =
FLAGS_enable_experimental_feature && FLAGS_enable_toss ? rows_.size() : rows_.size() * 2;
edges_.reserve(size);

size_t fieldNum = schema_->getNumFields();
Expand Down Expand Up @@ -291,7 +295,7 @@ Status InsertEdgesValidator::prepareEdges() {
edge.key_ref() = key;
edge.props_ref() = std::move(entirePropValues);
edges_.emplace_back(edge);
if (!FLAGS_enable_experimental_feature) {
if (!(FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
// inbound
key.src_ref() = dstId;
key.dst_ref() = srcId;
Expand Down Expand Up @@ -826,7 +830,7 @@ Status UpdateEdgeValidator::toPlan() {
{},
condition_,
{});
if (FLAGS_enable_experimental_feature) {
if ((FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
root_ = outNode;
tail_ = root_;
} else {
Expand Down
18 changes: 6 additions & 12 deletions src/graph/validator/test/YieldValidatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,11 @@ TEST_F(YieldValidatorTest, TypeCastTest) {
}
{
std::string query = "YIELD (int)\"123abc\"";
auto result = checkResult(query);
EXPECT_EQ(std::string(result.message()),
"SemanticError: `(INT)\"123abc\"' is not a valid expression ");
EXPECT_TRUE(checkResult(query));
}
{
std::string query = "YIELD (int)\"abc123\"";
auto result = checkResult(query);
EXPECT_EQ(std::string(result.message()),
"SemanticError: `(INT)\"abc123\"' is not a valid expression ");
EXPECT_TRUE(checkResult(query));
}
{
std::string query = "YIELD (doublE)\"123\"";
Expand All @@ -182,9 +178,7 @@ TEST_F(YieldValidatorTest, TypeCastTest) {
}
{
std::string query = "YIELD (doublE)\".a123\"";
auto result = checkResult(query);
EXPECT_EQ(std::string(result.message()),
"SemanticError: `(FLOAT)\".a123\"' is not a valid expression ");
EXPECT_TRUE(checkResult(query));
}
{
std::string query = "YIELD (STRING)1.23";
Expand All @@ -200,15 +194,15 @@ TEST_F(YieldValidatorTest, TypeCastTest) {
}
{
std::string query = "YIELD (BOOL)123";
EXPECT_FALSE(checkResult(query, expected_));
EXPECT_TRUE(checkResult(query, expected_));
}
{
std::string query = "YIELD (BOOL)0";
EXPECT_FALSE(checkResult(query, expected_));
EXPECT_TRUE(checkResult(query, expected_));
}
{
std::string query = "YIELD (BOOL)\"12\"";
EXPECT_FALSE(checkResult(query, expected_));
EXPECT_TRUE(checkResult(query, expected_));
}
{
std::string query = "YIELD (MAP)(\"12\")";
Expand Down
24 changes: 16 additions & 8 deletions src/graph/visitor/DeduceTypeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,7 @@ void DeduceTypeVisitor::visit(TypeCastingExpression *expr) {
status_ = Status::SemanticError(out.str());
return;
}
QueryExpressionContext ctx(nullptr);
auto val = expr->eval(ctx(nullptr));
if (val.isNull()) {
status_ = Status::SemanticError("`%s' is not a valid expression ", expr->toString().c_str());
return;
}
type_ = val.type();
type_ = expr->type();
status_ = Status::OK();
}

Expand Down Expand Up @@ -486,7 +480,21 @@ void DeduceTypeVisitor::visit(FunctionCallExpression *expr) {
void DeduceTypeVisitor::visit(AggregateExpression *expr) {
expr->arg()->accept(this);
if (!ok()) return;
type_ = Value::Type::__EMPTY__;
auto func = expr->name();
std::transform(func.begin(), func.end(), func.begin(), ::toupper);
if ("COUNT" == func) {
type_ = Value::Type::INT;
} else if ("COLLECT" == func) {
type_ = Value::Type::LIST;
} else if ("COLLECT_SET" == func) {
type_ = Value::Type::SET;
} else if ("AVG" == func || "SUM" == func) {
type_ = Value::Type::FLOAT;
} else if ("MAX" == func || "MIN" == func) {
// Keep same with arg's type
} else {
type_ = Value::Type::__EMPTY__;
}
}

void DeduceTypeVisitor::visit(UUIDExpression *) {
Expand Down
12 changes: 12 additions & 0 deletions src/storage/admin/RebuildIndexTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ const int32_t kReserveNum = 1024 * 4;
bool RebuildIndexTask::check() {
return env_->kvstore_ != nullptr;
}
void RebuildIndexTask::finish(nebula::cpp2::ErrorCode rc) {
if (changedSpaceGuard_) {
auto space = *ctx_.parameters_.space_id_ref();
for (auto it = env_->rebuildIndexGuard_->begin(); it != env_->rebuildIndexGuard_->end(); ++it) {
if (std::get<0>(it->first) == space) {
env_->rebuildIndexGuard_->insert_or_assign(it->first, IndexState::FINISHED);
}
}
}
AdminTask::finish(rc);
}

RebuildIndexTask::RebuildIndexTask(StorageEnv* env, TaskContext&& ctx)
: AdminTask(env, std::move(ctx)) {
Expand Down Expand Up @@ -71,6 +82,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<AdminSubTask>> RebuildIndexTask::ge
for (const auto& part : parts) {
env_->rebuildIndexGuard_->insert_or_assign(std::make_tuple(space_, part), IndexState::STARTING);
TaskFunction task = std::bind(&RebuildIndexTask::invoke, this, space_, part, items);
changedSpaceGuard_ = true;
tasks.emplace_back(std::move(task));
}
return tasks;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/admin/RebuildIndexTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ using IndexItems = std::vector<std::shared_ptr<meta::cpp2::IndexItem>>;
*/
class RebuildIndexTask : public AdminTask {
public:
using AdminTask::finish;

RebuildIndexTask(StorageEnv* env, TaskContext&& ctx);

~RebuildIndexTask() {
Expand All @@ -31,6 +33,8 @@ class RebuildIndexTask : public AdminTask {

bool check() override;

void finish(nebula::cpp2::ErrorCode rc) override;

/**
* @brief Generate subtasks for rebuilding index.
*
Expand Down Expand Up @@ -71,6 +75,7 @@ class RebuildIndexTask : public AdminTask {

protected:
GraphSpaceID space_;
bool changedSpaceGuard_{false};
};

} // namespace storage
Expand Down
38 changes: 35 additions & 3 deletions src/storage/exec/GetPropNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@ class GetTagPropNode : public QueryNode<VertexID> {
std::vector<TagNode*> tagNodes,
nebula::DataSet* resultDataSet,
Expression* filter,
std::size_t limit)
std::size_t limit,
TagContext* tagContext)
: context_(context),
tagNodes_(std::move(tagNodes)),
resultDataSet_(resultDataSet),
expCtx_(filter == nullptr
? nullptr
: new StorageExpressionContext(context->vIdLen(), context->isIntId())),
filter_(filter),
limit_(limit) {
limit_(limit),
tagContext_(tagContext) {
name_ = "GetTagPropNode";
}

Expand Down Expand Up @@ -69,7 +71,36 @@ class GetTagPropNode : public QueryNode<VertexID> {
} else if (!iter->valid()) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
// if has any tag, will emplace a row with vId

bool hasValidTag = false;
for (; iter->valid(); iter->next()) {
// check if tag schema exists
auto key = iter->key();
auto tagId = NebulaKeyUtils::getTagId(context_->vIdLen(), key);
auto schemaIter = tagContext_->schemas_.find(tagId);
if (schemaIter == tagContext_->schemas_.end()) {
continue;
}
// check if ttl expired
auto schemas = &(schemaIter->second);
RowReaderWrapper reader;
reader.reset(*schemas, iter->val());
if (!reader) {
continue;
}
auto ttl = QueryUtils::getTagTTLInfo(tagContext_, tagId);
if (ttl.has_value() &&
CommonUtils::checkDataExpiredForTTL(
schemas->back().get(), reader.get(), ttl.value().first, ttl.value().second)) {
continue;
}
hasValidTag = true;
break;
}
if (!hasValidTag) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}
// if has any valid tag, will emplace a row with vId
}
}

Expand Down Expand Up @@ -131,6 +162,7 @@ class GetTagPropNode : public QueryNode<VertexID> {
std::unique_ptr<StorageExpressionContext> expCtx_{nullptr};
Expression* filter_{nullptr};
const std::size_t limit_{std::numeric_limits<std::size_t>::max()};
TagContext* tagContext_;
};

class GetEdgePropNode : public QueryNode<cpp2::EdgeKey> {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/query/GetPropProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ StoragePlan<VertexID> GetPropProcessor::buildTagPlan(RuntimeContext* context,
plan.addNode(std::move(tag));
}
auto output = std::make_unique<GetTagPropNode>(
context, tags, result, filter_ == nullptr ? nullptr : filter_->clone(), limit_);
context, tags, result, filter_ == nullptr ? nullptr : filter_->clone(), limit_, &tagContext_);
for (auto* tag : tags) {
output->addDependency(tag);
}
Expand Down
1 change: 0 additions & 1 deletion tests/tck/features/delete/DeleteTag.IntVid.feature
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ Feature: Delete int vid of tag
| id |
Then drop the used space

@wtf
Scenario: delete int vid multiple vertex one tag
Given an empty graph
And load "nba_int_vid" csv data to a new space
Expand Down
Loading

0 comments on commit cf152be

Please sign in to comment.