Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

Commit

Permalink
Dedup vertex/edge before delete (#1253)
Browse files Browse the repository at this point in the history
Co-authored-by: Yichen Wang <18348405+Aiee@users.noreply.github.com>
  • Loading branch information
Shylock-Hg and Aiee committed Jul 15, 2021
1 parent 0f85fec commit 93c253c
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 76 deletions.
3 changes: 3 additions & 0 deletions src/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ void Executor::drop() {
if (inputVar != nullptr) {
if (inputVar->lastUser.value() == node()->id()) {
ectx_->dropResult(inputVar->name);
VLOG(1) << "Drop variable " << node()->outputVar();
}
}
}
Expand All @@ -592,6 +593,8 @@ Status Executor::finish(Result &&result) {
if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->lastUser.hasValue()) {
numRows_ = result.size();
ectx_->setResult(node()->outputVar(), std::move(result));
} else {
VLOG(1) << "Drop variable " << node()->outputVar();
}
if (FLAGS_enable_lifetime_optimize) {
drop();
Expand Down
110 changes: 53 additions & 57 deletions src/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,67 +80,63 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
SCOPED_TIMER(&execTime_);

auto *deNode = asNode<DeleteEdges>(node());
auto edgeKeyRefs = deNode->getEdgeKeyRefs();
auto *edgeKeyRef = DCHECK_NOTNULL(deNode->edgeKeyRef());
std::vector<storage::cpp2::EdgeKey> edgeKeys;
const auto& spaceInfo = qctx()->rctx()->session()->space();
if (!edgeKeyRefs.empty()) {
auto inputVar = deNode->inputVar();
DCHECK(!inputVar.empty());
auto& inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();
edgeKeys.reserve(iter->size());
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
for (auto &edgeKeyRef : edgeKeyRefs) {
storage::cpp2::EdgeKey edgeKey;
auto srcId = Expression::eval(edgeKeyRef->srcid(), ctx(iter.get()));
if (srcId.isNull() || srcId.empty()) {
VLOG(3) << "NULL or EMPTY vid";
continue;
}
if (!SchemaUtil::isValidVid(srcId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong srcId type `" << srcId.type()
<< "`, value `" << srcId.toString() << "'";
return Status::Error(ss.str());
}
auto dstId = Expression::eval(edgeKeyRef->dstid(), ctx(iter.get()));
if (!SchemaUtil::isValidVid(dstId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong dstId type `" << dstId.type()
<< "', value `" << dstId.toString() << "'";
return Status::Error(ss.str());
}
auto rank = Expression::eval(edgeKeyRef->rank(), ctx(iter.get()));
if (!rank.isInt()) {
std::stringstream ss;
ss << "Wrong rank type `" << rank.type()
<< "', value `" << rank.toString() << "'";
return Status::Error(ss.str());
}
DCHECK(edgeKeyRef->type());
auto type = Expression::eval(edgeKeyRef->type(), ctx(iter.get()));
if (!type.isInt()) {
std::stringstream ss;
ss << "Wrong edge type `" << type.type()
<< "', value `" << type.toString() << "'";
return Status::Error(ss.str());
}
auto inputVar = deNode->inputVar();
DCHECK(!inputVar.empty());
auto& inputResult = ectx_->getResult(inputVar);
auto iter = inputResult.iter();
edgeKeys.reserve(iter->size());
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
storage::cpp2::EdgeKey edgeKey;
auto srcId = Expression::eval(edgeKeyRef->srcid(), ctx(iter.get()));
if (srcId.isNull() || srcId.empty()) {
VLOG(3) << "NULL or EMPTY vid";
continue;
}
if (!SchemaUtil::isValidVid(srcId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong srcId type `" << srcId.type()
<< "`, value `" << srcId.toString() << "'";
return Status::Error(ss.str());
}
auto dstId = Expression::eval(edgeKeyRef->dstid(), ctx(iter.get()));
if (!SchemaUtil::isValidVid(dstId, *spaceInfo.spaceDesc.vid_type_ref())) {
std::stringstream ss;
ss << "Wrong dstId type `" << dstId.type()
<< "', value `" << dstId.toString() << "'";
return Status::Error(ss.str());
}
auto rank = Expression::eval(edgeKeyRef->rank(), ctx(iter.get()));
if (!rank.isInt()) {
std::stringstream ss;
ss << "Wrong rank type `" << rank.type()
<< "', value `" << rank.toString() << "'";
return Status::Error(ss.str());
}
DCHECK(edgeKeyRef->type());
auto type = Expression::eval(edgeKeyRef->type(), ctx(iter.get()));
if (!type.isInt()) {
std::stringstream ss;
ss << "Wrong edge type `" << type.type()
<< "', value `" << type.toString() << "'";
return Status::Error(ss.str());
}

// out edge
edgeKey.set_src(srcId);
edgeKey.set_dst(dstId);
edgeKey.set_ranking(rank.getInt());
edgeKey.set_edge_type(type.getInt());
edgeKeys.emplace_back(edgeKey);
// out edge
edgeKey.set_src(srcId);
edgeKey.set_dst(dstId);
edgeKey.set_ranking(rank.getInt());
edgeKey.set_edge_type(type.getInt());
edgeKeys.emplace_back(edgeKey);

// in edge
edgeKey.set_src(std::move(dstId));
edgeKey.set_dst(std::move(srcId));
edgeKey.set_edge_type(-type.getInt());
edgeKeys.emplace_back(std::move(edgeKey));
}
}
// in edge
edgeKey.set_src(std::move(dstId));
edgeKey.set_dst(std::move(srcId));
edgeKey.set_edge_type(-type.getInt());
edgeKeys.emplace_back(std::move(edgeKey));
}

if (edgeKeys.empty()) {
Expand Down
2 changes: 1 addition & 1 deletion src/executor/query/DedupExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ folly::Future<Status> DedupExecutor::execute() {
return Status::Error("Internal Error: iterator is nullptr");
}

if (UNLIKELY(iter->isGetNeighborsIter())) {
if (UNLIKELY(iter->isGetNeighborsIter() || iter->isDefaultIter())) {
auto e = Status::Error("Invalid iterator kind, %d",
static_cast<uint16_t>(iter->kind()));
LOG(ERROR) << e;
Expand Down
2 changes: 1 addition & 1 deletion src/planner/plan/Mutate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ std::unique_ptr<PlanNodeDescription> DeleteVertices::explain() const {
std::unique_ptr<PlanNodeDescription> DeleteEdges::explain() const {
auto desc = SingleInputNode::explain();
addDescription("space", folly::to<std::string>(space_), desc.get());
addDescription("edgeKeyRefs", folly::toJson(util::toJson(edgeKeyRefs_)), desc.get());
addDescription("edgeKeyRef", folly::toJson(util::toJson(edgeKeyRef_)), desc.get());
return desc;
}

Expand Down
14 changes: 7 additions & 7 deletions src/planner/plan/Mutate.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ class DeleteEdges final : public SingleInputNode {
static DeleteEdges* make(QueryContext* qctx,
PlanNode* input,
GraphSpaceID spaceId,
std::vector<EdgeKeyRef*> edgeKeyRefs) {
return qctx->objPool()->add(new DeleteEdges(qctx, input, spaceId, std::move(edgeKeyRefs)));
EdgeKeyRef *edgeKeyRef) {
return qctx->objPool()->add(new DeleteEdges(qctx, input, spaceId, edgeKeyRef));
}

std::unique_ptr<PlanNodeDescription> explain() const override;
Expand All @@ -400,22 +400,22 @@ class DeleteEdges final : public SingleInputNode {
return space_;
}

const std::vector<EdgeKeyRef*>& getEdgeKeyRefs() const {
return edgeKeyRefs_;
EdgeKeyRef* edgeKeyRef() const {
return edgeKeyRef_;
}

private:
DeleteEdges(QueryContext* qctx,
PlanNode* input,
GraphSpaceID spaceId,
std::vector<EdgeKeyRef*> edgeKeyRefs)
EdgeKeyRef *edgeKeyRef)
: SingleInputNode(qctx, Kind::kDeleteEdges, input)
, space_(spaceId)
, edgeKeyRefs_(std::move(edgeKeyRefs)) {}
, edgeKeyRef_(edgeKeyRef) {}

private:
GraphSpaceID space_{-1};
std::vector<EdgeKeyRef*> edgeKeyRefs_;
EdgeKeyRef *edgeKeyRef_{nullptr};
};

} // namespace graph
Expand Down
38 changes: 28 additions & 10 deletions src/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ Status DeleteVerticesValidator::toPlan() {
vidVar = inputVarName_;
}

auto* dedupVid = Dedup::make(qctx_, nullptr);
dedupVid->setInputVar(vidVar);

std::vector<storage::cpp2::EdgeProp> edgeProps;
// make edgeRefs and edgeProp
auto index = 0u;
Expand Down Expand Up @@ -354,7 +357,7 @@ Status DeleteVerticesValidator::toPlan() {
auto statPropsPtr = std::make_unique<std::vector<storage::cpp2::StatProp>>();
auto exprPtr = std::make_unique<std::vector<storage::cpp2::Expr>>();
auto* getNeighbors = GetNeighbors::make(qctx_,
nullptr,
dedupVid,
spaceId_,
vidRef_,
edgeTypes_,
Expand All @@ -363,22 +366,35 @@ Status DeleteVerticesValidator::toPlan() {
std::move(edgePropsPtr),
std::move(statPropsPtr),
std::move(exprPtr));
getNeighbors->setInputVar(vidVar);

auto *yieldColumns = pool->makeAndAdd<YieldColumns>();
yieldColumns->addColumn(new YieldColumn(EdgeSrcIdExpression::make(pool, "*"), kSrc));
yieldColumns->addColumn(new YieldColumn(EdgeTypeExpression::make(pool, "*"), kType));
yieldColumns->addColumn(new YieldColumn(EdgeRankExpression::make(pool, "*"), kRank));
yieldColumns->addColumn(new YieldColumn(EdgeDstIdExpression::make(pool, "*"), kDst));
auto *edgeKey = Project::make(qctx_, getNeighbors, yieldColumns);

auto *dedupEdgeKey = Dedup::make(qctx_, edgeKey);

// create deleteEdges node
auto *edgeKeyRef = pool->makeAndAdd<EdgeKeyRef>(InputPropertyExpression::make(pool, kSrc),
InputPropertyExpression::make(pool, kDst),
InputPropertyExpression::make(pool, kRank),
true);
edgeKeyRef->setType(InputPropertyExpression::make(pool, kType));
auto *deNode = DeleteEdges::make(qctx_,
getNeighbors,
dedupEdgeKey,
spaceId_,
std::move(edgeKeyRefs_));
edgeKeyRef);

auto *dvNode = DeleteVertices::make(qctx_,
deNode,
spaceId_,
vidRef_);

dvNode->setInputVar(vidVar);
dvNode->setInputVar(dedupVid->outputVar());
root_ = dvNode;
tail_ = getNeighbors;
tail_ = dedupVid;
return Status::OK();
}

Expand Down Expand Up @@ -473,13 +489,15 @@ Status DeleteEdgesValidator::checkInput() {
}

Status DeleteEdgesValidator::toPlan() {
auto dedup = Dedup::make(qctx_, nullptr);
dedup->setInputVar(edgeKeyVar_);

auto *doNode = DeleteEdges::make(qctx_,
nullptr,
dedup,
vctx_->whichSpace().id,
edgeKeyRefs_);
doNode->setInputVar(edgeKeyVar_);
edgeKeyRefs_.front());
root_ = doNode;
tail_ = root_;
tail_ = dedup;
return Status::OK();
}

Expand Down
9 changes: 9 additions & 0 deletions src/validator/test/MutateValidatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ TEST_F(MutateValidatorTest, DeleteVertexTest) {
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDeleteEdges,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kDedup,
PK::kStart,
};
ASSERT_TRUE(checkResult(cmd, expected));
Expand All @@ -67,7 +70,10 @@ TEST_F(MutateValidatorTest, DeleteVertexTest) {
std::vector<PlanNode::Kind> expected = {
PK::kDeleteVertices,
PK::kDeleteEdges,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kStart,
Expand All @@ -87,6 +93,7 @@ TEST_F(MutateValidatorTest, DeleteEdgeTest) {
auto cmd = "DELETE EDGE like \"A\"->\"B\"";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteEdges,
PK::kDedup,
PK::kStart,
};
ASSERT_TRUE(checkResult(cmd, expected));
Expand All @@ -103,6 +110,7 @@ TEST_F(MutateValidatorTest, DeleteEdgeTest) {
"| DELETE EDGE like $-.src -> $-.dst @ $-.rank";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteEdges,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kStart,
Expand All @@ -116,6 +124,7 @@ TEST_F(MutateValidatorTest, DeleteEdgeTest) {
"; DELETE EDGE like $var.src -> $var.dst @ $var.rank";
std::vector<PlanNode::Kind> expected = {
PK::kDeleteEdges,
PK::kDedup,
PK::kProject,
PK::kGetNeighbors,
PK::kStart,
Expand Down

0 comments on commit 93c253c

Please sign in to comment.