Skip to content

Commit

Permalink
[two fix] 1. if propNames is empty, replaceDefaultValue wrong. 2. txn…
Browse files Browse the repository at this point in the history
…Mgr may core while shutdown (#3025)

Co-authored-by: CBS <56461666+bright-starry-sky@users.noreply.github.com>
  • Loading branch information
liuyu85cn and bright-starry-sky committed Oct 13, 2021
1 parent 76bdc3f commit afbf9d2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 34 deletions.
7 changes: 3 additions & 4 deletions src/storage/StorageServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ void StorageServer::stop() {

webSvc_.reset();

if (txnMan_) {
txnMan_->stop();
}
if (taskMgr_) {
taskMgr_->shutdown();
}
Expand All @@ -348,10 +351,6 @@ void StorageServer::stop() {
if (adminServer_) {
adminServer_->stop();
}
if (txnMan_) {
txnMan_->stop();
txnMan_.reset();
}
if (internalStorageServer_) {
internalStorageServer_->stop();
}
Expand Down
112 changes: 82 additions & 30 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
spaceVidType_ = vidType.value();
}
localPartId_ = req.get_parts().begin()->first;
// replaceNullWithDefaultValue(req_);
replaceNullWithDefaultValue(req_);
auto part = env_->kvstore_->part(spaceId_, localPartId_);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
Expand Down Expand Up @@ -425,44 +425,96 @@ std::string ChainAddEdgesProcessorLocal::makeReadableEdge(const cpp2::AddEdgesRe
* in/out edge, but they will calculate independent
* which lead to inconsistance
*
* that why we need to replace the inconsistance prone value
* that's why we need to replace the inconsistance prone value
* at the monment the request comes
* */
void ChainAddEdgesProcessorLocal::replaceNullWithDefaultValue(cpp2::AddEdgesRequest& req) {
auto& edgesOfPart = *req.parts_ref();
if (edgesOfPart.empty()) {
return;
}
auto& edgesOfFirstPart = edgesOfPart.begin()->second;
if (edgesOfFirstPart.empty()) {
return;
}
auto firstEdgeKey = edgesOfFirstPart.front().get_key();
auto edgeType = std::abs(*firstEdgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);

DefaultValueContext expCtx;
// the coming request has two forms
// 1st "propNames" is empty,
// which means all vals should be write as the same sequence of schema
// 2nd "propNames" is not empty
// vals of request should be write according to propName of schema
// use the following "idxVec" to identify which index a val should be write to.
std::vector<int64_t> idxVec;
auto& propNames = *req.prop_names_ref();
if (propNames.empty()) {
for (auto i = 0U; i < schema->getNumFields(); ++i) {
idxVec.emplace_back(i);
}
} else {
// first scan the origin input propNames
for (auto& name : propNames) {
int64_t index = schema->getFieldIndex(name);
idxVec.emplace_back(index);
}
// second, check if there any cols not filled but has default val
// we need to append these cols
for (auto i = 0U; i < schema->getNumFields(); ++i) {
auto it = std::find(idxVec.begin(), idxVec.end(), i);
if (it == idxVec.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
idxVec.emplace_back(i);
}
}
}
}

for (auto& part : *req.parts_ref()) {
for (auto& edge : part.second) {
auto edgeKey = edge.get_key();
auto edgeType = std::abs(*edgeKey.edge_type_ref());
auto schema = env_->schemaMan_->getEdgeSchema(spaceId_, edgeType);
auto& vals = *edge.props_ref();
for (auto i = 0U; i < schema->getNumFields(); ++i) {
std::string fieldName(schema->getFieldName(i));
auto it = std::find(propNames.begin(), propNames.end(), fieldName);
if (it == propNames.end()) {
auto field = schema->field(i);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
propNames.emplace_back(fieldName);
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// it's ok if this field doesn't have a default value
if (vals.size() > idxVec.size()) {
LOG(ERROR) << folly::sformat(
"error vals.size()={} > idxVec.size()={}", vals.size(), idxVec.size());
continue;
}
for (auto i = vals.size(); i < idxVec.size(); ++i) {
auto field = schema->field(idxVec[i]);
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::BOOL:
vals.emplace_back(defVal.getBool());
break;
case Value::Type::INT:
vals.emplace_back(defVal.getInt());
break;
case Value::Type::FLOAT:
vals.emplace_back(defVal.getFloat());
break;
case Value::Type::STRING:
vals.emplace_back(defVal.getStr());
break;
case Value::Type::DATE:
vals.emplace_back(defVal.getDate());
break;
case Value::Type::TIME:
vals.emplace_back(defVal.getTime());
break;
case Value::Type::DATETIME:
vals.emplace_back(defVal.getDateTime());
break;
default:
// for other type, local and remote should behavior same.
break;
}
} else {
// set null
vals.emplace_back(Value::kNullValue);
}
}
}
Expand Down

0 comments on commit afbf9d2

Please sign in to comment.