Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more fulltext index tck case #5122

Merged
merged 2 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ enum ErrorCode {
E_QUERY_NOT_FOUND = -2073, // Query not found
E_AGENT_HB_FAILUE = -2074, // Failed to receive heartbeat from agent

E_ACCESS_ES_FAILURE = -2080, // Failed to access elasticsearch

// 3xxx for storaged
E_CONSENSUS_ERROR = -3001, // Consensus cannot be reached during an election
E_KEY_HAS_EXISTS = -3002, // Key already exists
Expand Down
48 changes: 31 additions & 17 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ void ESListener::init() {
}
isIntVid_ = vidTypeRet.value() == nebula::cpp2::PropertyType::INT64;

auto cRet = schemaMan_->getServiceClients(meta::cpp2::ExternalServiceType::ELASTICSEARCH);
if (!cRet.ok() || cRet.value().empty()) {
LOG(FATAL) << "elasticsearch clients error";
}
std::vector<nebula::plugin::ESClient> esClients;
for (const auto& c : cRet.value()) {
auto host = c.host;
std::string user, password;
if (c.user_ref().has_value()) {
user = *c.user_ref();
password = *c.pwd_ref();
}
std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
esClients.emplace_back(HttpClient::instance(), protocol, host.toRawString(), user, password);
}
esAdapter_.setClients(std::move(esClients));
auto sRet = schemaMan_->toGraphSpaceName(spaceId_);
if (!sRet.ok()) {
LOG(FATAL) << "space name error";
Expand Down Expand Up @@ -70,7 +54,13 @@ bool ESListener::apply(const BatchHolder& batch) {
pickTagAndEdgeData(std::get<0>(log), std::get<1>(log), std::get<2>(log), callback);
}
if (!bulk.empty()) {
auto status = esAdapter_.bulk(bulk);
auto esAdapterRes = getESAdapter();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to retrieve adapter for each data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the password of elasticsearch may be modified during the system operation. Therefore, you need to obtain the configuration of the elasticsearch service again.

In addition, there was a problem before that. If there was no sign in text service when adding listener, the nebula-listener would crash directly. Therefore, the address of elasticsearch cannot be obtained during initialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

if (!esAdapterRes.ok()) {
LOG(ERROR) << esAdapterRes.status();
return false;
}
auto esAdapter = std::move(esAdapterRes).value();
auto status = esAdapter.bulk(bulk);
if (!status.ok()) {
LOG(ERROR) << status;
return false;
Expand Down Expand Up @@ -371,5 +361,29 @@ std::string ESListener::truncateVid(const std::string& vid) {
return vid;
}

StatusOr<::nebula::plugin::ESAdapter> ESListener::getESAdapter() {
auto cRet = schemaMan_->getServiceClients(meta::cpp2::ExternalServiceType::ELASTICSEARCH);
if (!cRet.ok()) {
LOG(ERROR) << cRet.status().message();
return cRet.status();
}
if (cRet.value().empty()) {
LOG(ERROR) << "There is no elasticsearch service";
return ::nebula::Status::Error("There is no elasticsearch service");
}
std::vector<nebula::plugin::ESClient> esClients;
for (const auto& c : cRet.value()) {
auto host = c.host;
std::string user, password;
if (c.user_ref().has_value()) {
user = *c.user_ref();
password = *c.pwd_ref();
}
std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
esClients.emplace_back(HttpClient::instance(), protocol, host.toRawString(), user, password);
}
return ::nebula::plugin::ESAdapter(std::move(esClients));
}

} // namespace kvstore
} // namespace nebula
4 changes: 3 additions & 1 deletion src/kvstore/listener/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ class ESListener : public Listener {
const PickFunc& func);

std::string truncateVid(const std::string& vid);

StatusOr<::nebula::plugin::ESAdapter> getESAdapter();

std::unique_ptr<std::string> lastApplyLogFile_{nullptr};
std::unique_ptr<std::string> spaceName_{nullptr};
::nebula::plugin::ESAdapter esAdapter_;
int32_t vIdLen_;
bool isIntVid_{false};
};
Expand Down
6 changes: 2 additions & 4 deletions src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) {
plugin::ESAdapter esAdapter(std::move(esClients));
auto createIndexresult = esAdapter.createIndex(name);
if (!createIndexresult.ok()) {
// TODO(hs.zhang): fix error code
LOG(ERROR) << createIndexresult.message();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNKNOWN);
handleErrorCode(nebula::cpp2::ErrorCode::E_ACCESS_ES_FAILURE);
onFinished();
return;
}
Expand Down Expand Up @@ -209,9 +208,8 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
plugin::ESAdapter esAdapter(std::move(esClients));
auto dropIndexresult = esAdapter.dropIndex(req.get_fulltext_index_name());
if (!dropIndexresult.ok()) {
// TODO(hs.zhang): fix error code
LOG(ERROR) << dropIndexresult.message();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNKNOWN);
handleErrorCode(nebula::cpp2::ErrorCode::E_ACCESS_ES_FAILURE);
onFinished();
return;
}
Expand Down
109 changes: 109 additions & 0 deletions tests/tck/features/fulltext_index/FultextIndexDDL.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
Feature: FulltextIndexTest

Background:
Given an empty graph
And create a space with following options:
| partition_num | 1 |
| replica_factor | 1 |
| vid_type | FIXED_STRING(30) |
And add listeners to space

@ft_index
Scenario: fulltext ddl
When executing query:
"""
CREATE TAG ddl_tag(prop1 string,prop2 fixed_string(20),prop3 int);
CREATE EDGE ddl_edge(prop1 string,prop2 float);
"""
Then the execution should be successful
And wait 3 seconds
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop1 on ddl_tag(prop1);
"""
Then the execution should be successful
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop2 on ddl_tag(prop2);
"""
Then the execution should be successful
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop3 on ddl_tag(prop3);
"""
Then a ExecutionError should be raised at runtime: Unsupported!
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
| "nebula_index_ddl_tag_prop1" | "Tag" | "ddl_tag" | "prop1" |
| "nebula_index_ddl_tag_prop2" | "Tag" | "ddl_tag" | "prop2" |
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop1;
"""
Then the execution should be successful
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop2;
"""
Then the execution should be successful
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop1 on ddl_tag(prop2);
"""
Then the execution should be successful
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
| "nebula_index_ddl_tag_prop1" | "Tag" | "ddl_tag" | "prop2" |
When executing query:
"""
DROP TAG ddl_tag;
"""
Then a ExecutionError should be raised at runtime: Related index exists, please drop index first
When executing query:
"""
ALTER TAG ddl_tag DROP (prop2);
"""
Then a ExecutionError should be raised at runtime: Related fulltext index exists, please drop it first
When executing query:
"""
ALTER TAG ddl_tag DROP (prop1);
"""
Then the execution should be successful
When executing query:
"""
ALTER TAG ddl_tag ADD (prop1 string);
"""
Then the execution should be successful
When executing query:
"""
ALTER TAG ddl_tag CHANGE (prop2 string);
"""
Then a ExecutionError should be raised at runtime: Related fulltext index exists, please drop it first
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop1;
"""
Then the execution should be successful
When executing query:
"""
DROP TAG ddl_tag;
DROP EDGE ddl_edge;
"""
Then the execution should be successful