Skip to content

Commit

Permalink
Merge branch 'master' into mem
Browse files Browse the repository at this point in the history
  • Loading branch information
codesigner committed Dec 27, 2022
2 parents 2b082d0 + ec5d2ac commit 7f660c2
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 22 deletions.
2 changes: 2 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ enum ErrorCode {
E_QUERY_NOT_FOUND = -2073, // Query not found
E_AGENT_HB_FAILUE = -2074, // Failed to receive heartbeat from agent

E_ACCESS_ES_FAILURE = -2080, // Failed to access elasticsearch

E_GRAPH_MEMORY_EXCEEDED = -2600, // Graph memory exceeded

// 3xxx for storaged
Expand Down
48 changes: 31 additions & 17 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,6 @@ void ESListener::init() {
}
isIntVid_ = vidTypeRet.value() == nebula::cpp2::PropertyType::INT64;

auto cRet = schemaMan_->getServiceClients(meta::cpp2::ExternalServiceType::ELASTICSEARCH);
if (!cRet.ok() || cRet.value().empty()) {
LOG(FATAL) << "elasticsearch clients error";
}
std::vector<nebula::plugin::ESClient> esClients;
for (const auto& c : cRet.value()) {
auto host = c.host;
std::string user, password;
if (c.user_ref().has_value()) {
user = *c.user_ref();
password = *c.pwd_ref();
}
std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
esClients.emplace_back(HttpClient::instance(), protocol, host.toRawString(), user, password);
}
esAdapter_.setClients(std::move(esClients));
auto sRet = schemaMan_->toGraphSpaceName(spaceId_);
if (!sRet.ok()) {
LOG(FATAL) << "space name error";
Expand Down Expand Up @@ -70,7 +54,13 @@ bool ESListener::apply(const BatchHolder& batch) {
pickTagAndEdgeData(std::get<0>(log), std::get<1>(log), std::get<2>(log), callback);
}
if (!bulk.empty()) {
auto status = esAdapter_.bulk(bulk);
auto esAdapterRes = getESAdapter();
if (!esAdapterRes.ok()) {
LOG(ERROR) << esAdapterRes.status();
return false;
}
auto esAdapter = std::move(esAdapterRes).value();
auto status = esAdapter.bulk(bulk);
if (!status.ok()) {
LOG(ERROR) << status;
return false;
Expand Down Expand Up @@ -371,5 +361,29 @@ std::string ESListener::truncateVid(const std::string& vid) {
return vid;
}

StatusOr<::nebula::plugin::ESAdapter> ESListener::getESAdapter() {
auto cRet = schemaMan_->getServiceClients(meta::cpp2::ExternalServiceType::ELASTICSEARCH);
if (!cRet.ok()) {
LOG(ERROR) << cRet.status().message();
return cRet.status();
}
if (cRet.value().empty()) {
LOG(ERROR) << "There is no elasticsearch service";
return ::nebula::Status::Error("There is no elasticsearch service");
}
std::vector<nebula::plugin::ESClient> esClients;
for (const auto& c : cRet.value()) {
auto host = c.host;
std::string user, password;
if (c.user_ref().has_value()) {
user = *c.user_ref();
password = *c.pwd_ref();
}
std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http";
esClients.emplace_back(HttpClient::instance(), protocol, host.toRawString(), user, password);
}
return ::nebula::plugin::ESAdapter(std::move(esClients));
}

} // namespace kvstore
} // namespace nebula
4 changes: 3 additions & 1 deletion src/kvstore/listener/elasticsearch/ESListener.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ class ESListener : public Listener {
const PickFunc& func);

std::string truncateVid(const std::string& vid);

StatusOr<::nebula::plugin::ESAdapter> getESAdapter();

std::unique_ptr<std::string> lastApplyLogFile_{nullptr};
std::unique_ptr<std::string> spaceName_{nullptr};
::nebula::plugin::ESAdapter esAdapter_;
int32_t vIdLen_;
bool isIntVid_{false};
};
Expand Down
6 changes: 2 additions & 4 deletions src/meta/processors/index/FTIndexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) {
plugin::ESAdapter esAdapter(std::move(esClients));
auto createIndexresult = esAdapter.createIndex(name);
if (!createIndexresult.ok()) {
// TODO(hs.zhang): fix error code
LOG(ERROR) << createIndexresult.message();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNKNOWN);
handleErrorCode(nebula::cpp2::ErrorCode::E_ACCESS_ES_FAILURE);
onFinished();
return;
}
Expand Down Expand Up @@ -209,9 +208,8 @@ void DropFTIndexProcessor::process(const cpp2::DropFTIndexReq& req) {
plugin::ESAdapter esAdapter(std::move(esClients));
auto dropIndexresult = esAdapter.dropIndex(req.get_fulltext_index_name());
if (!dropIndexresult.ok()) {
// TODO(hs.zhang): fix error code
LOG(ERROR) << dropIndexresult.message();
handleErrorCode(nebula::cpp2::ErrorCode::E_UNKNOWN);
handleErrorCode(nebula::cpp2::ErrorCode::E_ACCESS_ES_FAILURE);
onFinished();
return;
}
Expand Down
109 changes: 109 additions & 0 deletions tests/tck/features/fulltext_index/FultextIndexDDL.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright (c) 2022 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.
Feature: FulltextIndexTest

Background:
Given an empty graph
And create a space with following options:
| partition_num | 1 |
| replica_factor | 1 |
| vid_type | FIXED_STRING(30) |
And add listeners to space

@ft_index
Scenario: fulltext ddl
When executing query:
"""
CREATE TAG ddl_tag(prop1 string,prop2 fixed_string(20),prop3 int);
CREATE EDGE ddl_edge(prop1 string,prop2 float);
"""
Then the execution should be successful
And wait 3 seconds
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop1 on ddl_tag(prop1);
"""
Then the execution should be successful
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop2 on ddl_tag(prop2);
"""
Then the execution should be successful
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop3 on ddl_tag(prop3);
"""
Then a ExecutionError should be raised at runtime: Unsupported!
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
| "nebula_index_ddl_tag_prop1" | "Tag" | "ddl_tag" | "prop1" |
| "nebula_index_ddl_tag_prop2" | "Tag" | "ddl_tag" | "prop2" |
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop1;
"""
Then the execution should be successful
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop2;
"""
Then the execution should be successful
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
When executing query:
"""
CREATE FULLTEXT TAG INDEX nebula_index_ddl_tag_prop1 on ddl_tag(prop2);
"""
Then the execution should be successful
When executing query:
"""
SHOW FULLTEXT INDEXES;
"""
Then the result should be, in any order:
| Name | Schema Type | Schema Name | Fields |
| "nebula_index_ddl_tag_prop1" | "Tag" | "ddl_tag" | "prop2" |
When executing query:
"""
DROP TAG ddl_tag;
"""
Then a ExecutionError should be raised at runtime: Related index exists, please drop index first
When executing query:
"""
ALTER TAG ddl_tag DROP (prop2);
"""
Then a ExecutionError should be raised at runtime: Related fulltext index exists, please drop it first
When executing query:
"""
ALTER TAG ddl_tag DROP (prop1);
"""
Then the execution should be successful
When executing query:
"""
ALTER TAG ddl_tag ADD (prop1 string);
"""
Then the execution should be successful
When executing query:
"""
ALTER TAG ddl_tag CHANGE (prop2 string);
"""
Then a ExecutionError should be raised at runtime: Related fulltext index exists, please drop it first
When executing query:
"""
DROP FULLTEXT INDEX nebula_index_ddl_tag_prop1;
"""
Then the execution should be successful
When executing query:
"""
DROP TAG ddl_tag;
DROP EDGE ddl_edge;
"""
Then the execution should be successful

0 comments on commit 7f660c2

Please sign in to comment.