Skip to content

Commit

Permalink
Refactor fulltext index plan (#5077)
Browse files Browse the repository at this point in the history
* allow multi fulltext index on a tag/edge

add FulltextIndexScan

modify fulltext index plan

support add listeener in tck

add ft index test demo in tck

fmt

fix standalone

fix standalone

fix standalone

1. access es on create index
2. fix host address

fix bug

* fix bug

* fix bug

* fmt

* disable some ut

* address some comment

* address some comment
  • Loading branch information
cangfengzhs committed Dec 23, 2022
1 parent 53d26dc commit 8e7aa68
Show file tree
Hide file tree
Showing 33 changed files with 673 additions and 184 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,17 @@ jobs:
case ${{ matrix.os }} in
centos7)
# normal cluster
make CONTAINERIZED=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' up
;;
ubuntu2004)
# ssl cluster
make CONTAINERIZED=true ENABLE_SSL=true CA_SIGNED=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' ENABLE_SSL=true CA_SIGNED=true up
;;
esac
;;
clang-*)
# graph ssl only cluster
make CONTAINERIZED=true ENABLE_SSL=false ENABLE_GRAPH_SSL=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' ENABLE_SSL=false ENABLE_GRAPH_SSL=true up
;;
esac
working-directory: tests/
Expand All @@ -203,7 +203,7 @@ jobs:
timeout-minutes: 15
- name: TCK
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
make RM_DIR=false DEBUG=false ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 60
- name: LDBC
Expand Down
4 changes: 3 additions & 1 deletion src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ HttpResponse HttpClient::sendRequest(const std::string& url,
setRespHeader(curl, resp.header);
setRespBody(curl, resp.body);
setTimeout(curl);
setAuth(curl, username, password);
if (!username.empty()) {
setAuth(curl, username, password);
}
resp.curlCode = curl_easy_perform(curl);
if (resp.curlCode != 0) {
resp.curlMessage = std::string(curl_easy_strerror(resp.curlCode));
Expand Down
5 changes: 3 additions & 2 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void ESBulk::put(const std::string& indexName,
body["dst"] = dst;
body["rank"] = rank;
body["text"] = text;
documents_[docId] = {std::move(action), std::move(body)};
documents_[indexName].emplace_back(std::move(action));
documents_[indexName].emplace_back(std::move(body));
}

void ESBulk::delete_(const std::string& indexName,
Expand All @@ -54,7 +55,7 @@ void ESBulk::delete_(const std::string& indexName,
metadata["_type"] = "_doc";
metadata["_index"] = indexName;
action["delete"] = std::move(metadata);
documents_[docId] = {std::move(action)};
documents_[indexName].emplace_back(std::move(action));
}

bool ESBulk::empty() {
Expand Down
5 changes: 5 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class ESAdapter {
public:
explicit ESAdapter(std::vector<ESClient>&& clients);
ESAdapter() = default;
ESAdapter(const ESAdapter& adapter) : clients_(adapter.clients_) {}
ESAdapter& operator=(const ESAdapter& adapter) {
clients_ = adapter.clients_;
return *this;
}
virtual ~ESAdapter() = default;
virtual void setClients(std::vector<ESClient>&& clients);
virtual Status createIndex(const std::string& name);
Expand Down
24 changes: 18 additions & 6 deletions src/common/plugin/fulltext/elasticsearch/ESClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ StatusOr<folly::dynamic> ESClient::createIndex(const std::string& name,
auto resp = httpClient_.put(
url, {"Content-Type: application/json"}, folly::toJson(body), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
auto ret = folly::parseJson(resp.body);
return ret;
Expand All @@ -71,7 +73,9 @@ StatusOr<folly::dynamic> ESClient::dropIndex(const std::string& name) {
std::string url = fmt::format("{}://{}/{}", protocol_, address_, name);
auto resp = httpClient_.delete_(url, {"Content-Type: application/json"}, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -80,7 +84,9 @@ StatusOr<folly::dynamic> ESClient::getIndex(const std::string& name) {
std::string url = fmt::format("{}://{}/{}", protocol_, address_, name);
auto resp = httpClient_.get(url, {"Content-Type: application/json"}, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -96,7 +102,9 @@ StatusOr<folly::dynamic> ESClient::deleteByQuery(const std::string& index,
auto resp = httpClient_.post(
url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -111,7 +119,9 @@ StatusOr<folly::dynamic> ESClient::search(const std::string& index,
auto resp = httpClient_.post(
url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -127,7 +137,9 @@ StatusOr<folly::dynamic> ESClient::bulk(const std::vector<folly::dynamic>& bulk,
auto resp =
httpClient_.post(url, {"Content-Type: application/x-ndjson"}, body, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand Down
13 changes: 12 additions & 1 deletion src/common/plugin/fulltext/elasticsearch/ESClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ namespace nebula::plugin {

class ESClient {
public:
ESClient(const ESClient& client) = default;
ESClient(HttpClient& httpClient,
const std::string& protocol,
const std::string& address,
const std::string& user,
const std::string& password);

ESClient& operator=(const ESClient& client) {
if (&client == this) {
return *this;
}
protocol_ = client.protocol_;
httpClient_ = client.httpClient_;
address_ = client.address_;
username_ = client.username_;
password_ = client.password_;
return *this;
}
StatusOr<folly::dynamic> createIndex(const std::string& name, const folly::dynamic& object);
StatusOr<folly::dynamic> dropIndex(const std::string& name);

Expand Down
24 changes: 13 additions & 11 deletions src/common/plugin/fulltext/test/ElasticsearchTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ TEST_F(ESTest, createIndex) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
std::vector<plugin::ESClient> clients;
clients.push_back(client);
plugin::ESAdapter adapter(std::move(clients));
{
auto result = adapter.createIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -179,7 +181,7 @@ TEST_F(ESTest, dropIndex) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.dropIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -229,7 +231,7 @@ content-length: 78
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.isIndexExist("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -282,7 +284,7 @@ content-length: 78
.WillOnce(Return(curlErrorResp_));

plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.clearIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -322,7 +324,7 @@ TEST_F(ESTest, prefix) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.prefix("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -366,7 +368,7 @@ TEST_F(ESTest, wildcard) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.wildcard("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -410,7 +412,7 @@ TEST_F(ESTest, regexp) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.regexp("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -452,7 +454,7 @@ TEST_F(ESTest, bulk) {
.Times(1)
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
plugin::ESBulk bulk;
bulk.put("nebula_index_1", "1", "", "", 0, "vertex text");
bulk.delete_("nebula_index_2", "", "a", "b", 10);
Expand Down Expand Up @@ -498,7 +500,7 @@ TEST_F(ESTest, fuzzy) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.fuzzy("nebula_index_1", "abc", "2", -1, -1);
ASSERT_TRUE(result.ok());
Expand All @@ -524,7 +526,7 @@ class RealESTest : public ::testing::Test {};

TEST_F(RealESTest, DISABLED_CREATE_DROP_INDEX) {
plugin::ESClient client(HttpClient::instance(), "http", FLAGS_es_address, "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.createIndex("nebula_index_1");
ASSERT_TRUE(result.ok()) << result.message();
Expand All @@ -548,7 +550,7 @@ TEST_F(RealESTest, DISABLED_CREATE_DROP_INDEX) {
TEST_F(RealESTest, DISABLED_QUERY) {
plugin::ESClient client(HttpClient::instance(), "http", FLAGS_es_address, "", "");
std::string indexName = "nebula_index_2";
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.createIndex(indexName);
ASSERT_TRUE(result.ok()) << result.message();
Expand Down
7 changes: 6 additions & 1 deletion src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,17 @@ struct GoContext final : AstContext {
struct LookupContext final : public AstContext {
bool isEdge{false};
bool dedup{false};
bool isEmptyResultSet{false};
int32_t schemaId{-1};
Expression* filter{nullptr};
YieldColumns* yieldExpr{nullptr};
std::vector<std::string> idxReturnCols;
std::vector<std::string> idxColNames;

// fulltext index
bool isFulltextIndex{false};
std::string fulltextIndex;
Expression* fulltextExpr{nullptr};

// order by
};

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nebula_add_library(
query/AggregateExecutor.cpp
query/DedupExecutor.cpp
query/FilterExecutor.cpp
query/FulltextIndexScanExecutor.cpp
query/GetEdgesExecutor.cpp
query/GetNeighborsExecutor.cpp
query/GetVerticesExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "graph/executor/query/DataCollectExecutor.h"
#include "graph/executor/query/DedupExecutor.h"
#include "graph/executor/query/FilterExecutor.h"
#include "graph/executor/query/FulltextIndexScanExecutor.h"
#include "graph/executor/query/GetDstBySrcExecutor.h"
#include "graph/executor/query/GetEdgesExecutor.h"
#include "graph/executor/query/GetNeighborsExecutor.h"
Expand Down Expand Up @@ -196,6 +197,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kGetNeighbors: {
return pool->makeAndAdd<GetNeighborsExecutor>(node, qctx);
}
case PlanNode::Kind::kFulltextIndexScan: {
return pool->makeAndAdd<FulltextIndexScanExecutor>(node, qctx);
}
case PlanNode::Kind::kLimit: {
return pool->makeAndAdd<LimitExecutor>(node, qctx);
}
Expand Down
Loading

0 comments on commit 8e7aa68

Please sign in to comment.