Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix create fulltext index failed #3747

Merged
merged 4 commits into from Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 25 additions & 26 deletions src/kvstore/raftex/RaftPart.cpp
Expand Up @@ -697,19 +697,19 @@ folly::Future<nebula::cpp2::ErrorCode> RaftPart::appendLogAsync(ClusterID source
// until majority accept the logs, the leadership changes, or
// the partition stops
VLOG(2) << idStr_ << "Calling appendLogsInternal()";
AppendLogsIterator it(firstId,
termId,
std::move(swappedOutLogs),
[this](AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
AppendLogsIterator it(
firstId,
termId,
std::move(swappedOutLogs),
[this](AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
appendLogsInternal(std::move(it), termId);

return retFuture;
Expand Down Expand Up @@ -964,19 +964,18 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
// continue to replicate the logs
sendingPromise_ = std::move(cachingPromise_);
cachingPromise_.reset();
iter = AppendLogsIterator(
firstLogId,
currTerm,
std::move(logs_),
[this](AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
iter = AppendLogsIterator(firstLogId,
currTerm,
std::move(logs_),
[this](AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED);
}
return opRet;
});
logs_.clear();
bufferOverFlow_ = false;
}
Expand Down
4 changes: 3 additions & 1 deletion src/meta/processors/index/FTIndexProcessor.cpp
Expand Up @@ -100,7 +100,9 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) {
onFinished();
return;
}
if (index.get_depend_schema() == indexItem.get_depend_schema()) {
// Because tagId/edgeType is the space range, judge the spaceId and schemaId
if (index.get_space_id() == indexItem.get_space_id() &&
index.get_depend_schema() == indexItem.get_depend_schema()) {
LOG(ERROR) << "Depends on the same schema , index : " << indexName;
handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED);
onFinished();
Expand Down
70 changes: 68 additions & 2 deletions src/meta/test/IndexProcessorTest.cpp
Expand Up @@ -1558,6 +1558,15 @@ void mockSchemas(kvstore::KVStore* kv) {
schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(1, edgeType, ver),
MetaKeyUtils::schemaVal("test_edge", srcsch));

// space 2
schemas.emplace_back(MetaKeyUtils::indexTagKey(2, "test_tag"), tagIdVal);
schemas.emplace_back(MetaKeyUtils::schemaTagKey(2, tagId, ver),
MetaKeyUtils::schemaVal("test_tag", srcsch));

schemas.emplace_back(MetaKeyUtils::indexEdgeKey(2, "test_edge"), edgeTypeVal);
schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(2, edgeType, ver),
MetaKeyUtils::schemaVal("test_edge", srcsch));

folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(0, 0, std::move(schemas), [&](nebula::cpp2::ErrorCode code) {
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code);
Expand All @@ -1570,6 +1579,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
fs::TempDir rootPath("/tmp/CreateFTIndexTest.XXXXXX");
auto kv = MockCluster::initMetaKV(rootPath.path());
TestUtils::assembleSpace(kv.get(), 1, 1);
TestUtils::assembleSpace(kv.get(), 2, 1, 1, 1, true);
mockSchemas(kv.get());
for (auto id : {5, 6}) {
// expected error. column col_fixed_string_2 is fixed_string,
Expand Down Expand Up @@ -1627,7 +1637,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
} else {
schemaId.edge_type_ref() = 6;
}
index.space_id_ref() = 2;
index.space_id_ref() = 3;
index.depend_schema_ref() = std::move(schemaId);
index.fields_ref() = {"col_string"};
req.fulltext_index_name_ref() = "test_ft_index";
Expand Down Expand Up @@ -1825,6 +1835,63 @@ TEST(IndexProcessorTest, CreateFTIndexTest) {
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND, resp.get_code());
}

// expected success
// Different spaces, the same tag name(same tagId), create full-text indexes with different names.
{
{
for (auto i = 0; i < 2; ++i) {
cpp2::CreateFTIndexReq req;
cpp2::FTIndex index;
nebula::cpp2::SchemaID schemaId;
schemaId.tag_id_ref() = 5;
index.space_id_ref() = i + 1;
index.depend_schema_ref() = std::move(schemaId);
index.fields_ref() = {"col_string", "col_fixed_string_1"};
req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1);
req.index_ref() = std::move(index);

auto* processor = CreateFTIndexProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
{
cpp2::ListFTIndexesReq req;
auto* processor = ListFTIndexesProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
auto indexes = resp.get_indexes();
ASSERT_EQ(2, indexes.size());
for (auto i = 0u; i < indexes.size(); ++i) {
auto key = folly::stringPrintf("ft_tag_index_space%u", i + 1);
auto iter = indexes.find(key);
ASSERT_NE(indexes.end(), iter);
std::vector<std::string> fields = {"col_string", "col_fixed_string_1"};
ASSERT_EQ(fields, iter->second.get_fields());
ASSERT_EQ(i + 1, iter->second.get_space_id());
nebula::cpp2::SchemaID schemaId;
schemaId.tag_id_ref() = 5;
ASSERT_EQ(schemaId, iter->second.get_depend_schema());
}
}
{
for (auto i = 0; i < 2; ++i) {
cpp2::DropFTIndexReq req;
req.space_id_ref() = i + 1;
req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1);
auto* processor = DropFTIndexProcessor::instance(kv.get());
auto f = processor->getFuture();
processor->process(req);
auto resp = std::move(f).get();
ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code());
}
}
}
}

TEST(IndexProcessorTest, DropWithFTIndexTest) {
Expand Down Expand Up @@ -2282,7 +2349,6 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) {
ASSERT_EQ(14, resp.get_id().get_index_id());
}
}

} // namespace meta
} // namespace nebula

Expand Down
18 changes: 14 additions & 4 deletions src/meta/test/TestUtils.h
Expand Up @@ -175,16 +175,26 @@ class TestUtils {
GraphSpaceID id,
int32_t partitionNum,
int32_t replica = 1,
int32_t totalHost = 1) {
int32_t totalHost = 1,
bool multispace = false) {
// mock the part distribution like create space
cpp2::SpaceDesc properties;
properties.space_name_ref() = "test_space";
if (multispace) {
properties.space_name_ref() = folly::stringPrintf("test_space_%d", id);
} else {
properties.space_name_ref() = "test_space";
}
properties.partition_num_ref() = partitionNum;
properties.replica_factor_ref() = replica;
auto spaceVal = MetaKeyUtils::spaceVal(properties);
std::vector<nebula::kvstore::KV> data;
data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
if (multispace) {
data.emplace_back(MetaKeyUtils::indexSpaceKey(folly::stringPrintf("test_space_%d", id)),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
} else {
data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"),
std::string(reinterpret_cast<const char*>(&id), sizeof(GraphSpaceID)));
}
data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties));

std::vector<HostAddr> allHosts;
Expand Down