Skip to content

Commit

Permalink
Fix memory leak, remove toss gflag (#5204)
Browse files Browse the repository at this point in the history
* remove toss gflag

* fix memory leak

* loose wait job finish time

Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
  • Loading branch information
critical27 and Sophie-Xie committed Jan 28, 2023
1 parent 635f376 commit 6d1bc5d
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ieNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->addEdges(param,
Expand Down
2 changes: 1 addition & 1 deletion src/graph/executor/mutate/UpdateExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ folly::Future<Status> UpdateEdgeExecutor::execute() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
ueNode->getSpaceId(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature && FLAGS_enable_toss;
param.useExperimentalFeature = false;
return qctx()
->getStorageClient()
->updateEdge(param,
Expand Down
1 change: 0 additions & 1 deletion src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ DEFINE_bool(disable_octal_escape_char,
" in next version to ensure compatibility with cypher.");

DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");
DEFINE_bool(enable_toss, false, "Whether to enable toss feature");
DEFINE_bool(enable_data_balance, true, "Whether to enable data balance feature");

DEFINE_int32(num_rows_to_check_memory, 1024, "number rows to check memory");
Expand Down
1 change: 0 additions & 1 deletion src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ DECLARE_bool(optimize_appendvertice);
DECLARE_int64(max_allowed_connections);

DECLARE_bool(enable_experimental_feature);
DECLARE_bool(enable_toss);
DECLARE_bool(enable_data_balance);

DECLARE_bool(enable_client_white_list);
Expand Down
40 changes: 17 additions & 23 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,7 @@ Status InsertEdgesValidator::check() {
// Check validity of vertices data.
// Check edge key type, check properties value, fill to NewEdge structure.
Status InsertEdgesValidator::prepareEdges() {
auto size =
FLAGS_enable_experimental_feature && FLAGS_enable_toss ? rows_.size() : rows_.size() * 2;
auto size = rows_.size() * 2;
edges_.reserve(size);

size_t fieldNum = schema_->getNumFields();
Expand Down Expand Up @@ -297,7 +296,7 @@ Status InsertEdgesValidator::prepareEdges() {
edge.key_ref() = key;
edge.props_ref() = std::move(entirePropValues);
edges_.emplace_back(edge);
if (!(FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
{
// inbound
key.src_ref() = dstId;
key.dst_ref() = srcId;
Expand Down Expand Up @@ -892,26 +891,21 @@ Status UpdateEdgeValidator::toPlan() {
{},
condition_,
{});
if ((FLAGS_enable_experimental_feature && FLAGS_enable_toss)) {
root_ = outNode;
tail_ = root_;
} else {
auto *inNode = UpdateEdge::make(qctx_,
outNode,
spaceId_,
std::move(name_),
std::move(dstId_),
std::move(srcId_),
-edgeType_,
rank_,
insertable_,
std::move(updatedProps_),
std::move(returnProps_),
std::move(condition_),
std::move(yieldColNames_));
root_ = inNode;
tail_ = outNode;
}
auto *inNode = UpdateEdge::make(qctx_,
outNode,
spaceId_,
std::move(name_),
std::move(dstId_),
std::move(srcId_),
-edgeType_,
rank_,
insertable_,
std::move(updatedProps_),
std::move(returnProps_),
std::move(condition_),
std::move(yieldColNames_));
root_ = inNode;
tail_ = outNode;
return Status::OK();
}

Expand Down
14 changes: 7 additions & 7 deletions src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,6 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
return;
}

auto batchHolder = std::make_unique<kvstore::BatchHolder>();
batchHolder->remove(std::move(indexKey));
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));

const auto& serviceKey = MetaKeyUtils::serviceKey(cpp2::ExternalServiceType::ELASTICSEARCH);
auto getRet = doGet(serviceKey);
if (!nebula::ok(getRet)) {
Expand Down Expand Up @@ -213,6 +206,13 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
onFinished();
return;
}

auto batchHolder = std::make_unique<kvstore::BatchHolder>();
batchHolder->remove(std::move(indexKey));
auto timeInMilliSec = time::WallClock::fastNowInMilliSec();
LastUpdateTimeMan::update(batchHolder.get(), timeInMilliSec);
auto batch = encodeBatchValue(std::move(batchHolder)->getBatch());
doBatchOperation(std::move(batch));
}

void ListFTIndexesProcessor::process(const cpp2::ListFTIndexesReq&) {
Expand Down
2 changes: 1 addition & 1 deletion tests/tck/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def running_or_queue_row(row):


def wait_all_jobs_finished(sess, jobs=[]):
times = 4 * get_running_jobs(sess)
times = 5 * get_running_jobs(sess)
while jobs and times > 0:
jobs = [job for job in jobs if not is_job_finished(sess, job)]
time.sleep(1)
Expand Down

0 comments on commit 6d1bc5d

Please sign in to comment.