Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

V2.5.0 rebase master #534

Merged
merged 2 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
if (it == peers.end()) {
return Status::PartNotFound();
}
if (peers.size() == 1 && peers.front() == leader) {
// if there is only one replica, skip transfer leader phase
return Status::OK();
}
auto target = dst;
if (dst == kRandomPeer) {
for (auto& p : peers) {
Expand Down
7 changes: 2 additions & 5 deletions src/meta/processors/admin/Balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ nebula::cpp2::ErrorCode Balancer::recovery() {
return recRet;
}
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
// save the balance plan again because FAILED tasks would be marked as IN_PROGRESS again
return plan_->saveInStore();
}

nebula::cpp2::ErrorCode
Expand Down Expand Up @@ -303,10 +304,6 @@ Balancer::genTasks(GraphSpaceID spaceId,
}
}

if (confirmedHostParts.size() < 2) {
LOG(INFO) << "Too few hosts, no need for balance!";
return nebula::cpp2::ErrorCode::E_NO_VALID_HOST;
}
// 2. Make all hosts in confirmedHostParts balanced
if (balanceParts(plan_->id_, spaceId, confirmedHostParts, totalParts, tasks)) {
return tasks;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/processors/partsMan/ListHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::fillLeaders() {
}
auto it = std::find(activeHosts.begin(), activeHosts.end(), host);
if (it == activeHosts.end()) {
LOG(INFO) << "skip inactive host: " << host;
VLOG(1) << "skip inactive host: " << host;
continue; // skip inactive host
}

Expand All @@ -209,7 +209,7 @@ nebula::cpp2::ErrorCode ListHostsProcessor::fillLeaders() {
});

if (hostIt == hostItems_.end()) {
LOG(INFO) << "skip inactive host";
VLOG(1) << "skip inactive host";
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ DEFINE_int32(waiting_catch_up_retry_times, 30, "retry times when waiting for cat
DEFINE_int32(waiting_catch_up_interval_in_secs, 30,
"interval between two requests for catching up state");

DEFINE_int32(waiting_new_leader_retry_times, 30, "retry times when waiting for catching up data");
DEFINE_int32(waiting_new_leader_retry_times, 5, "retry times when waiting for new leader");

DEFINE_int32(waiting_new_leader_interval_in_secs, 5,
"interval between two requests for catching up state");
Expand Down
7 changes: 1 addition & 6 deletions src/storage/mutate/AddEdgesAtomicProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ void AddEdgesAtomicProcessor::processByChain(const cpp2::AddEdgesRequest& req) {
for (auto& part : *req.parts_ref()) {
auto localPart = part.first;
for (auto& edge : part.second) {
auto stPartId = env_->metaClient_->partId(spaceId_,
auto remotePart = env_->metaClient_->partId(spaceId_,
(*(*edge.key_ref()).dst_ref()).getStr());
if (!stPartId.ok()) {
failedPart[localPart] = nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
break;
}
auto remotePart = stPartId.value();
ChainId cid{localPart, remotePart};
if (FLAGS_trace_toss) {
auto& ekey = *edge.key_ref();
Expand Down
4 changes: 1 addition & 3 deletions src/storage/test/TossEnvironment.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,7 @@ struct TossEnvironment {

int32_t getPartId(const std::string& src) {
// auto stPart = mClient_->partId(spaceId_, edgeKey.src.getStr());
auto stPart = mClient_->partId(spaceId_, src);
LOG_IF(FATAL, !stPart.ok()) << "mClient_->partId failed";
return stPart.value();
return mClient_->partId(spaceId_, src);
}

/**
Expand Down
13 changes: 3 additions & 10 deletions src/storage/transaction/TransactionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,7 @@ TransactionManager::updateEdgeAtomic(size_t vIdLen,
PartitionID partId,
const cpp2::EdgeKey& edgeKey,
GetBatchFunc batchGetter) {
auto stRemotePart = env_->metaClient_->partId(spaceId, (*edgeKey.dst_ref()).getStr());
if (!stRemotePart.ok()) {
return folly::makeFuture(nebula::cpp2::ErrorCode::E_UNKNOWN);
}
auto remotePart = stRemotePart.value();
auto remotePart = env_->metaClient_->partId(spaceId, (*edgeKey.dst_ref()).getStr());
auto localKey = TransactionUtils::edgeKey(vIdLen, partId, edgeKey);

std::vector<KV> data{std::make_pair(localKey, "")};
Expand Down Expand Up @@ -265,11 +261,8 @@ TransactionManager::resumeTransaction(size_t vIdLen,
auto c = folly::makePromiseContract<nebula::cpp2::ErrorCode>();

auto dst = NebulaKeyUtils::getDstId(vIdLen, localKey);
auto stRemotePartId = env_->metaClient_->partId(spaceId, dst.str());
if (!stRemotePartId.ok()) {
return nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND;
}
auto remoteKey = TransactionUtils::reverseRawKey(vIdLen, stRemotePartId.value(), localKey);
auto remotePartId = env_->metaClient_->partId(spaceId, dst.str());
auto remoteKey = TransactionUtils::reverseRawKey(vIdLen, remotePartId, localKey);

LOG_IF(INFO, FLAGS_trace_toss) << "try to get remote key=" << folly::hexlify(remoteKey)
<< ", according to lock=" << folly::hexlify(lockKey);
Expand Down
42 changes: 33 additions & 9 deletions src/tools/db-dump/DbDumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ Status DbDumper::initSpace() {
}
spaceVidLen_ = spaceVidLen.value();

auto vidTypeStatus = metaClient_->getSpaceVidType(spaceId_);
if (!vidTypeStatus) {
return vidTypeStatus.status();
}
spaceVidType_ = std::move(vidTypeStatus).value();

auto partNum = metaClient_->partsNum(spaceId_);
if (!partNum.ok()) {
return Status::Error("Get partition number from '%s' failed.", FLAGS_space_name.c_str());
Expand All @@ -94,7 +100,15 @@ Status DbDumper::initParams() {
std::vector<std::string> tags, edges;
try {
folly::splitTo<PartitionID>(',', FLAGS_parts, std::inserter(parts_, parts_.begin()), true);
folly::splitTo<VertexID>(',', FLAGS_vids, std::inserter(vids_, vids_.begin()), true);
if (spaceVidType_ == meta::cpp2::PropertyType::INT64) {
std::vector<int64_t> intVids;
folly::splitTo<int64_t>(',', FLAGS_vids, std::inserter(intVids, intVids.begin()), true);
for (auto vid : intVids) {
vids_.emplace(std::string(reinterpret_cast<const char*>(&vid), 8));
}
} else {
folly::splitTo<VertexID>(',', FLAGS_vids, std::inserter(vids_, vids_.begin()), true);
}
folly::splitTo<std::string>(',', FLAGS_tags, std::inserter(tags, tags.begin()), true);
folly::splitTo<std::string>(',', FLAGS_edges, std::inserter(edges, edges.begin()), true);
} catch (const std::exception& e) {
Expand Down Expand Up @@ -209,7 +223,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid);
seek(prefix);
}
Expand All @@ -221,7 +235,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto edgeType : edgeTypes_) {
auto prefix = NebulaKeyUtils::edgePrefix(spaceVidLen_, partId, vid, edgeType);
seek(prefix);
Expand All @@ -235,7 +249,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto tagId : tagIds_) {
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid, tagId);
seek(prefix);
Expand All @@ -249,7 +263,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto edgeType : edgeTypes_) {
auto prefix = NebulaKeyUtils::edgePrefix(spaceVidLen_, partId, vid, edgeType);
seek(prefix);
Expand All @@ -260,7 +274,7 @@ void DbDumper::run() {
if (!isValidVidLen(vid)) {
continue;
}
auto partId = std::hash<VertexID>()(vid) % partNum_ + 1;
auto partId = metaClient_->partId(partNum_, vid);
for (auto tagId : tagIds_) {
auto prefix = NebulaKeyUtils::vertexPrefix(spaceVidLen_, partId, vid, tagId);
seek(prefix);
Expand Down Expand Up @@ -515,16 +529,16 @@ void DbDumper::iterates(kvstore::RocksPrefixIter* it) {

inline void DbDumper::printTagKey(const folly::StringPiece& key) {
auto part = NebulaKeyUtils::getPart(key);
auto vid = NebulaKeyUtils::getVertexId(spaceVidLen_, key);
auto vid = getVertexId(NebulaKeyUtils::getVertexId(spaceVidLen_, key));
auto tagId = NebulaKeyUtils::getTagId(spaceVidLen_, key);
std::cout << "[vertex] key: " << part << ", " << vid << ", " << getTagName(tagId);
}

inline void DbDumper::printEdgeKey(const folly::StringPiece& key) {
auto part = NebulaKeyUtils::getPart(key);
auto edgeType = NebulaKeyUtils::getEdgeType(spaceVidLen_, key);
auto src = NebulaKeyUtils::getSrcId(spaceVidLen_, key);
auto dst = NebulaKeyUtils::getDstId(spaceVidLen_, key);
auto src = getVertexId(NebulaKeyUtils::getSrcId(spaceVidLen_, key));
auto dst = getVertexId(NebulaKeyUtils::getDstId(spaceVidLen_, key));
auto rank = NebulaKeyUtils::getRank(spaceVidLen_, key);
std::cout << "[edge] key: " << part << ", " << src << ", " << getEdgeName(edgeType) << ", "
<< rank << ", " << dst;
Expand Down Expand Up @@ -569,5 +583,15 @@ std::string DbDumper::getEdgeName(const EdgeType edgeType) {
return name.value();
}
}

Value DbDumper::getVertexId(const folly::StringPiece &vidStr) {
if (spaceVidType_ == meta::cpp2::PropertyType::INT64) {
int64_t val;
memcpy(reinterpret_cast<void*>(&val), vidStr.begin(), sizeof(int64_t));
return val;
} else {
return vidStr.str();
}
}
} // namespace storage
} // namespace nebula
3 changes: 3 additions & 0 deletions src/tools/db-dump/DbDumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ class DbDumper {

bool isValidVidLen(VertexID vid);

Value getVertexId(const folly::StringPiece &vidStr);

private:
std::unique_ptr<rocksdb::DB> db_;
rocksdb::Options options_;
std::unique_ptr<meta::MetaClient> metaClient_;
std::unique_ptr<meta::ServerBasedSchemaManager> schemaMng_;
GraphSpaceID spaceId_;
int32_t spaceVidLen_;
meta::cpp2::PropertyType spaceVidType_;
int32_t partNum_;
std::unordered_set<PartitionID> parts_;
std::unordered_set<VertexID> vids_;
Expand Down