Skip to content

Commit

Permalink
fix ft (#5594)
Browse files Browse the repository at this point in the history
* fix ft

* fix memleak

* remove some debug info

* address some comment
  • Loading branch information
cangfengzhs committed Jun 14, 2023
1 parent 70b4eb2 commit b59eaf1
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 32 deletions.
6 changes: 4 additions & 2 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ void ESBulk::put(const std::string& indexName,
const std::string& src,
const std::string& dst,
int64_t rank,
const std::string& text) {
std::map<std::string, std::string> data) {
folly::dynamic action = folly::dynamic::object();
folly::dynamic metadata = folly::dynamic::object();
folly::dynamic body = folly::dynamic::object();
Expand All @@ -35,7 +35,9 @@ void ESBulk::put(const std::string& indexName,
body["src"] = src;
body["dst"] = dst;
body["rank"] = rank;
body["text"] = text;
for (auto& [key, value] : data) {
body[key] = std::move(value);
}
documents_[indexName].emplace_back(std::move(action));
documents_[indexName].emplace_back(std::move(body));
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/plugin/fulltext/elasticsearch/ESAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ESBulk {
const std::string& src,
const std::string& dst,
int64_t rank,
const std::string& text);
std::map<std::string, std::string> data);
void delete_(const std::string& indexName,
const std::string& vid,
const std::string& src,
Expand Down
3 changes: 2 additions & 1 deletion src/common/plugin/fulltext/test/ElasticsearchTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ TEST_F(ESTest, bulk) {
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
plugin::ESBulk bulk;
bulk.put("nebula_index_1", "1", "", "", 0, "vertex text");
std::map<std::string, std::string> data{{"text", "vretex text"}};
bulk.put("nebula_index_1", "1", "", "", 0, data);
bulk.delete_("nebula_index_2", "", "a", "b", 10);
{
auto result = adapter.bulk(bulk, true);
Expand Down
24 changes: 14 additions & 10 deletions src/graph/executor/query/FulltextIndexScanExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "common/datatypes/DataSet.h"
#include "common/datatypes/Edge.h"
#include "graph/planner/plan/Query.h"
#include "graph/util/Constants.h"
#include "graph/util/FTIndexUtils.h"

using nebula::storage::StorageClient;
Expand All @@ -31,7 +32,7 @@ folly::Future<Status> FulltextIndexScanExecutor::execute() {
const auto& space = qctx()->rctx()->session()->space();
if (!isIntVidType(space)) {
if (ftIndexScan->isEdge()) {
DataSet edges({"edge"});
DataSet edges({"id", kScore});
for (auto& item : esResultValue.items) {
Edge edge;
edge.src = item.src;
Expand All @@ -42,25 +43,26 @@ folly::Future<Status> FulltextIndexScanExecutor::execute() {
}
finish(ResultBuilder().value(Value(std::move(edges))).iter(Iterator::Kind::kProp).build());
} else {
DataSet vertices({kVid});
DataSet vertices({"id", kScore});
for (auto& item : esResultValue.items) {
vertices.emplace_back(Row({item.vid, item.score}));
}
finish(ResultBuilder().value(Value(std::move(vertices))).iter(Iterator::Kind::kProp).build());
}
} else {
if (ftIndexScan->isEdge()) {
DataSet edges({kSrc, kRank, kDst});
DataSet edges({"id", kScore});
for (auto& item : esResultValue.items) {
std::string srcStr = item.src;
std::string dstStr = item.dst;
int64_t src = *reinterpret_cast<int64_t*>(srcStr.data());
int64_t dst = *reinterpret_cast<int64_t*>(dstStr.data());
edges.emplace_back(Row({src, item.rank, dst}));
Edge edge;
edge.src = item.src;
edge.dst = item.dst;
edge.ranking = item.rank;
edge.type = ftIndexScan->schemaId();
edges.emplace_back(Row({std::move(edge), item.score}));
}
finish(ResultBuilder().value(Value(std::move(edges))).iter(Iterator::Kind::kProp).build());
} else {
DataSet vertices({kVid});
DataSet vertices({"id", kScore});
for (auto& item : esResultValue.items) {
std::string vidStr = item.vid;
int64_t vid = *reinterpret_cast<int64_t*>(vidStr.data());
Expand All @@ -83,7 +85,9 @@ StatusOr<plugin::ESQueryResult> FulltextIndexScanExecutor::accessFulltextIndex(
auto index = arg->index();
auto query = arg->query();
int64_t offset = ftIndexScan->offset();
int64_t count = ftIndexScan->limit();
int64_t count = ftIndexScan->limit() > std::numeric_limits<int32_t>::max()
? std::numeric_limits<int32_t>::max()
: ftIndexScan->limit();
execFunc = [=, &esAdapter]() { return esAdapter.queryString(index, query, offset, count); };
break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/graph/planner/ngql/LookupPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ StatusOr<SubPlan> LookupPlanner::transform(AstContext* astCtx) {
GetEdges::make(qctx, plan.root, spaceId, src, type, rank, dst, std::move(edgeProps));

hashKeys = {ColumnExpression::make(pool, 0)};
probeKeys = {ColumnExpression::make(pool, 0)};
probeKeys = {EdgeExpression::make(pool)};
} else {
storage::cpp2::VertexProp vertexProp;
vertexProp.tag_ref() = lookupCtx->schemaId;
Expand All @@ -80,7 +80,7 @@ StatusOr<SubPlan> LookupPlanner::transform(AstContext* astCtx) {
qctx, plan.root, spaceId, ColumnExpression::make(pool, 0), std::move(vertexProps));

hashKeys = {ColumnExpression::make(pool, 0)};
probeKeys = {FunctionCallExpression::make(pool, "id", {ColumnExpression::make(pool, 0)})};
probeKeys = {FunctionCallExpression::make(pool, "id", {VertexExpression::make(pool)})};
}

if (lookupCtx->hasScore) {
Expand Down
29 changes: 16 additions & 13 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ bool ESListener::apply(const BatchHolder& batch) {
const std::string& src,
const std::string& dst,
int64_t rank,
const std::string& text) {
std::map<std::string, std::string> data) {
if (type == BatchLogType::OP_BATCH_PUT) {
bulk.put(index, vid, src, dst, rank, text);
bulk.put(index, vid, src, dst, rank, std::move(data));
} else if (type == BatchLogType::OP_BATCH_REMOVE) {
bulk.delete_(index, vid, src, dst, rank);
} else {
Expand Down Expand Up @@ -134,21 +134,24 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
std::string text;
std::map<std::string, std::string> data;
std::string indexName = index.first;
if (type == BatchLogType::OP_BATCH_PUT) {
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
callback(BatchLogType::OP_BATCH_REMOVE, indexName, vid, src, dst, 0, text);
continue;
}
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& field : index.second.get_fields()) {
auto v = reader->getValueByName(field);
if (v.type() == Value::Type::NULLVALUE) {
data[field] = "";
continue;
}
if (v.type() != Value::Type::STRING) {
data[field] = "";
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
continue;
}
data[field] = std::move(v).getStr();
}
text = std::move(v).getStr();
}
callback(type, indexName, vid, src, dst, rank, text);
callback(type, indexName, vid, src, dst, rank, std::move(data));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/listener/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ESListener : public Listener {
const std::string& src,
const std::string& dst,
int64_t rank,
const std::string& text)>;
std::map<std::string, std::string> data)>;
void pickTagAndEdgeData(BatchLogType type,
const std::string& key,
const std::string& value,
Expand Down
10 changes: 8 additions & 2 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ unreserved_keyword
| KW_DIVIDE { $$ = new std::string("divide"); }
| KW_RENAME { $$ = new std::string("rename"); }
| KW_CLEAR { $$ = new std::string("clear"); }
| KW_ANALYZER { $$ = new std::string("analyzer"); }
;

expression
Expand Down Expand Up @@ -2618,10 +2619,15 @@ opt_analyzer

create_fulltext_index_sentence
: KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN opt_analyzer {
$$ = new CreateFTIndexSentence(false, $5, $7, $9, $11);
auto sentence = new CreateFTIndexSentence(false, $5, $7, $9, $11);
delete $9;
$$ = sentence;

}
| KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN opt_analyzer {
$$ = new CreateFTIndexSentence(true, $5, $7, $9, $11);
auto sentence = new CreateFTIndexSentence(true, $5, $7, $9, $11);
delete $9;
$$ = sentence;
}
;

Expand Down
1 change: 1 addition & 0 deletions src/parser/scanner.lex
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ LABEL_FULL_WIDTH {CN_EN_FULL_WIDTH}{CN_EN_NUM_FULL_WIDTH}*
"HTTP" { return TokenType::KW_HTTP; }
"HTTPS" { return TokenType::KW_HTTPS; }
"FULLTEXT" { return TokenType::KW_FULLTEXT; }
"ANALYZER" { return TokenType::KW_ANALYZER; }
"AUTO" { return TokenType::KW_AUTO; }
"ES_QUERY" { return TokenType::KW_ES_QUERY; }
"TEXT" { return TokenType::KW_TEXT; }
Expand Down
10 changes: 10 additions & 0 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2928,6 +2928,16 @@ TEST_F(ParserTest, Zone) {
}

TEST_F(ParserTest, FullText) {
{
std::string query = "CREATE FULLTEXT TAG INDEX i1 on t1(str)";
auto result = parse(query);
EXPECT_TRUE(result.ok()) << result.status();
}
{
std::string query = "CREATE FULLTEXT TAG INDEX i1 on t1(str) analyzer=\"standard\"";
auto result = parse(query);
EXPECT_TRUE(result.ok()) << result.status();
}
{
std::string query = "LOOKUP ON t1 WHERE ES_QUERY(abc, \"qwerty\")";
auto result = parse(query);
Expand Down

0 comments on commit b59eaf1

Please sign in to comment.