Skip to content

Commit

Permalink
Add meta offline detect (#4610)
Browse files Browse the repository at this point in the history
  • Loading branch information
SuperYoko committed Sep 8, 2022
1 parent 085381b commit 510119f
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/kvstore/raftex/Host.h
Expand Up @@ -216,6 +216,14 @@ class Host final : public std::enable_shared_from_this<Host> {
*/
void setResponse(const cpp2::AppendLogResponse& resp);

void setLastHeartbeatTime(int64_t time) {
lastHeartbeatTime_ = time;
}

int64_t getLastHeartbeatTime() const {
return lastHeartbeatTime_;
}

/**
* @brief If there are more logs to send, build the append log request
*
Expand Down Expand Up @@ -262,6 +270,9 @@ class Host final : public std::enable_shared_from_this<Host> {

// CommittedLogId of follower
LogID followerCommittedLogId_{0};

// last HB response time from the peer
int64_t lastHeartbeatTime_{0};
};

} // namespace raftex
Expand Down
20 changes: 20 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Expand Up @@ -19,6 +19,7 @@
#include "common/time/ScopedTimer.h"
#include "common/time/WallClock.h"
#include "common/utils/LogStrListIterator.h"
#include "common/utils/MetaKeyUtils.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
Expand Down Expand Up @@ -626,6 +627,21 @@ void RaftPart::updateQuorum() {
quorum_ = (total + 1) / 2;
}

bool RaftPart::checkAlive(const HostAddr& addr) {
static const int64_t kTimeoutInMs = FLAGS_raft_heartbeat_interval_secs * 1000 * 2;
int64_t now = time::WallClock::fastNowInMilliSec();
auto it = std::find_if(
hosts_.begin(), hosts_.end(), [&addr](const auto& h) { return h->address() == addr; });
if (it == hosts_.end()) {
return false;
}
auto last = it->get()->getLastHeartbeatTime();
if (now - last > kTimeoutInMs) {
return false;
}
return true;
}

void RaftPart::addPeer(const HostAddr& peer) {
CHECK(!raftLock_.try_lock());
if (peer == addr_) {
Expand Down Expand Up @@ -2079,6 +2095,10 @@ void RaftPart::sendHeartbeat() {
if (!hosts[resp.first]->isLearner() &&
resp.second.get_error_code() == nebula::cpp2::ErrorCode::SUCCEEDED) {
++numSucceeded;
// only metad 0 space 0 part need this state now.
if (spaceId_ == kDefaultSpaceId) {
hosts[resp.first]->setLastHeartbeatTime(time::WallClock::fastNowInMilliSec());
}
}
highestTerm = std::max(highestTerm, resp.second.get_current_term());
}
Expand Down
2 changes: 2 additions & 0 deletions src/kvstore/raftex/RaftPart.h
Expand Up @@ -377,6 +377,8 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
*/
std::vector<HostAddr> peers() const;

bool checkAlive(const HostAddr& host);

/**
* @brief All listeners address
*
Expand Down
13 changes: 11 additions & 2 deletions src/meta/processors/parts/ListHostsProcessor.cpp
Expand Up @@ -5,6 +5,7 @@

#include "meta/processors/parts/ListHostsProcessor.h"

#include "common/utils/Utils.h"
#include "meta/ActiveHostsMan.h"
#include "meta/processors/admin/AdminClient.h"
#include "version/Version.h"
Expand Down Expand Up @@ -83,17 +84,25 @@ nebula::cpp2::ErrorCode ListHostsProcessor::allMetaHostsStatus() {
LOG(INFO) << "List Hosts Failed, error: " << apache::thrift::util::enumNameSafe(retCode);
return retCode;
}
auto metaPeers = nebula::value(errOrPart)->peers();
auto part = nebula::value(errOrPart);
auto metaPeers = part->peers();
// transform raft port to severe port
for (auto& metaHost : metaPeers) {
metaHost = Utils::getStoreAddrFromRaftAddr(metaHost);
}
for (auto& host : metaPeers) {
cpp2::HostItem item;
// Leader it self must be online
auto partAddr = Utils::getRaftAddrFromStoreAddr(host);
item.status_ref() = cpp2::HostStatus::ONLINE;
if (partAddr != part->address()) {
if (!part->checkAlive(partAddr)) {
item.status_ref() = cpp2::HostStatus::OFFLINE;
}
}
item.hostAddr_ref() = std::move(host);
item.role_ref() = cpp2::HostRole::META;
item.git_info_sha_ref() = gitInfoSha();
item.status_ref() = cpp2::HostStatus::ONLINE;
item.version_ref() = getOriginVersion();
hostItems_.emplace_back(item);
}
Expand Down

0 comments on commit 510119f

Please sign in to comment.