Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor fulltext index plan #5077

Merged
merged 8 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,17 @@ jobs:
case ${{ matrix.os }} in
centos7)
# normal cluster
make CONTAINERIZED=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' up
;;
ubuntu2004)
# ssl cluster
make CONTAINERIZED=true ENABLE_SSL=true CA_SIGNED=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' ENABLE_SSL=true CA_SIGNED=true up
;;
esac
;;
clang-*)
# graph ssl only cluster
make CONTAINERIZED=true ENABLE_SSL=false ENABLE_GRAPH_SSL=true up
make CONTAINERIZED=true ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' ENABLE_SSL=false ENABLE_GRAPH_SSL=true up
;;
esac
working-directory: tests/
Expand All @@ -203,7 +203,7 @@ jobs:
timeout-minutes: 15
- name: TCK
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
make RM_DIR=false DEBUG=false ENABLE_FT_INDEX=true ES_ADDRESS='"elasticsearch":9200' J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 60
- name: LDBC
Expand Down
4 changes: 3 additions & 1 deletion src/common/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ HttpResponse HttpClient::sendRequest(const std::string& url,
setRespHeader(curl, resp.header);
setRespBody(curl, resp.body);
setTimeout(curl);
setAuth(curl, username, password);
if (!username.empty()) {
setAuth(curl, username, password);
}
resp.curlCode = curl_easy_perform(curl);
if (resp.curlCode != 0) {
resp.curlMessage = std::string(curl_easy_strerror(resp.curlCode));
Expand Down
5 changes: 3 additions & 2 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ void ESBulk::put(const std::string& indexName,
body["dst"] = dst;
body["rank"] = rank;
body["text"] = text;
documents_[docId] = {std::move(action), std::move(body)};
documents_[indexName].emplace_back(std::move(action));
documents_[indexName].emplace_back(std::move(body));
}

void ESBulk::delete_(const std::string& indexName,
Expand All @@ -54,7 +55,7 @@ void ESBulk::delete_(const std::string& indexName,
metadata["_type"] = "_doc";
metadata["_index"] = indexName;
action["delete"] = std::move(metadata);
documents_[docId] = {std::move(action)};
documents_[indexName].emplace_back(std::move(action));
}

bool ESBulk::empty() {
Expand Down
5 changes: 5 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class ESAdapter {
public:
explicit ESAdapter(std::vector<ESClient>&& clients);
ESAdapter() = default;
ESAdapter(const ESAdapter& adapter) : clients_(adapter.clients_) {}
ESAdapter& operator=(const ESAdapter& adapter) {
clients_ = adapter.clients_;
return *this;
}
virtual ~ESAdapter() = default;
virtual void setClients(std::vector<ESClient>&& clients);
virtual Status createIndex(const std::string& name);
Expand Down
24 changes: 18 additions & 6 deletions src/common/plugin/fulltext/elasticsearch/ESClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ StatusOr<folly::dynamic> ESClient::createIndex(const std::string& name,
auto resp = httpClient_.put(
url, {"Content-Type: application/json"}, folly::toJson(body), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
auto ret = folly::parseJson(resp.body);
return ret;
Expand All @@ -71,7 +73,9 @@ StatusOr<folly::dynamic> ESClient::dropIndex(const std::string& name) {
std::string url = fmt::format("{}://{}/{}", protocol_, address_, name);
auto resp = httpClient_.delete_(url, {"Content-Type: application/json"}, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -80,7 +84,9 @@ StatusOr<folly::dynamic> ESClient::getIndex(const std::string& name) {
std::string url = fmt::format("{}://{}/{}", protocol_, address_, name);
auto resp = httpClient_.get(url, {"Content-Type: application/json"}, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -96,7 +102,9 @@ StatusOr<folly::dynamic> ESClient::deleteByQuery(const std::string& index,
auto resp = httpClient_.post(
url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -111,7 +119,9 @@ StatusOr<folly::dynamic> ESClient::search(const std::string& index,
auto resp = httpClient_.post(
url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand All @@ -127,7 +137,9 @@ StatusOr<folly::dynamic> ESClient::bulk(const std::vector<folly::dynamic>& bulk,
auto resp =
httpClient_.post(url, {"Content-Type: application/x-ndjson"}, body, username_, password_);
if (resp.curlCode != 0) {
return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage));
std::string msg = fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage);
LOG(ERROR) << msg;
return Status::Error(msg);
}
return folly::parseJson(resp.body);
}
Expand Down
13 changes: 12 additions & 1 deletion src/common/plugin/fulltext/elasticsearch/ESClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ namespace nebula::plugin {

class ESClient {
public:
ESClient(const ESClient& client) = default;
ESClient(HttpClient& httpClient,
const std::string& protocol,
const std::string& address,
const std::string& user,
const std::string& password);

ESClient& operator=(const ESClient& client) {
cangfengzhs marked this conversation as resolved.
Show resolved Hide resolved
if (&client == this) {
return *this;
}
protocol_ = client.protocol_;
httpClient_ = client.httpClient_;
address_ = client.address_;
username_ = client.username_;
password_ = client.password_;
return *this;
}
StatusOr<folly::dynamic> createIndex(const std::string& name, const folly::dynamic& object);
StatusOr<folly::dynamic> dropIndex(const std::string& name);

Expand Down
24 changes: 13 additions & 11 deletions src/common/plugin/fulltext/test/ElasticsearchTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ TEST_F(ESTest, createIndex) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
std::vector<plugin::ESClient> clients;
clients.push_back(client);
plugin::ESAdapter adapter(std::move(clients));
{
auto result = adapter.createIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -179,7 +181,7 @@ TEST_F(ESTest, dropIndex) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.dropIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -229,7 +231,7 @@ content-length: 78
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.isIndexExist("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -282,7 +284,7 @@ content-length: 78
.WillOnce(Return(curlErrorResp_));

plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.clearIndex("nebula_index_1");
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -322,7 +324,7 @@ TEST_F(ESTest, prefix) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.prefix("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -366,7 +368,7 @@ TEST_F(ESTest, wildcard) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.wildcard("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -410,7 +412,7 @@ TEST_F(ESTest, regexp) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.regexp("nebula_index_1", "abc", -1, -1);
ASSERT_TRUE(result.ok());
Expand Down Expand Up @@ -452,7 +454,7 @@ TEST_F(ESTest, bulk) {
.Times(1)
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
plugin::ESBulk bulk;
bulk.put("nebula_index_1", "1", "", "", 0, "vertex text");
bulk.delete_("nebula_index_2", "", "a", "b", 10);
Expand Down Expand Up @@ -498,7 +500,7 @@ TEST_F(ESTest, fuzzy) {
.WillOnce(Return(esErrorResp_))
.WillOnce(Return(curlErrorResp_));
plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.fuzzy("nebula_index_1", "abc", "2", -1, -1);
ASSERT_TRUE(result.ok());
Expand All @@ -524,7 +526,7 @@ class RealESTest : public ::testing::Test {};

TEST_F(RealESTest, DISABLED_CREATE_DROP_INDEX) {
plugin::ESClient client(HttpClient::instance(), "http", FLAGS_es_address, "", "");
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.createIndex("nebula_index_1");
ASSERT_TRUE(result.ok()) << result.message();
Expand All @@ -548,7 +550,7 @@ TEST_F(RealESTest, DISABLED_CREATE_DROP_INDEX) {
TEST_F(RealESTest, DISABLED_QUERY) {
plugin::ESClient client(HttpClient::instance(), "http", FLAGS_es_address, "", "");
std::string indexName = "nebula_index_2";
plugin::ESAdapter adapter({client});
plugin::ESAdapter adapter(std::vector<plugin::ESClient>({client}));
{
auto result = adapter.createIndex(indexName);
ASSERT_TRUE(result.ok()) << result.message();
Expand Down
7 changes: 6 additions & 1 deletion src/graph/context/ast/QueryAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,17 @@ struct GoContext final : AstContext {
struct LookupContext final : public AstContext {
bool isEdge{false};
bool dedup{false};
bool isEmptyResultSet{false};
int32_t schemaId{-1};
Expression* filter{nullptr};
YieldColumns* yieldExpr{nullptr};
std::vector<std::string> idxReturnCols;
std::vector<std::string> idxColNames;

// fulltext index
bool isFulltextIndex{false};
std::string fulltextIndex;
Expression* fulltextExpr{nullptr};

// order by
};

Expand Down
1 change: 1 addition & 0 deletions src/graph/executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nebula_add_library(
query/AggregateExecutor.cpp
query/DedupExecutor.cpp
query/FilterExecutor.cpp
query/FulltextIndexScanExecutor.cpp
query/GetEdgesExecutor.cpp
query/GetNeighborsExecutor.cpp
query/GetVerticesExecutor.cpp
Expand Down
4 changes: 4 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "graph/executor/query/DataCollectExecutor.h"
#include "graph/executor/query/DedupExecutor.h"
#include "graph/executor/query/FilterExecutor.h"
#include "graph/executor/query/FulltextIndexScanExecutor.h"
#include "graph/executor/query/GetDstBySrcExecutor.h"
#include "graph/executor/query/GetEdgesExecutor.h"
#include "graph/executor/query/GetNeighborsExecutor.h"
Expand Down Expand Up @@ -196,6 +197,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kGetNeighbors: {
return pool->makeAndAdd<GetNeighborsExecutor>(node, qctx);
}
case PlanNode::Kind::kFulltextIndexScan: {
return pool->makeAndAdd<FulltextIndexScanExecutor>(node, qctx);
}
case PlanNode::Kind::kLimit: {
return pool->makeAndAdd<LimitExecutor>(node, qctx);
}
Expand Down
Loading