Skip to content

Commit

Permalink
add more metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
critical27 committed Feb 9, 2022
1 parent f68cff6 commit c532203
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 115 deletions.
1 change: 0 additions & 1 deletion src/common/base/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nebula_add_library(
Status.cpp
SanitizerOptions.cpp
SignalHandler.cpp
SlowOpTracker.cpp
StringValue.cpp
${gdb_debug_script}
)
Expand Down
11 changes: 0 additions & 11 deletions src/common/base/SlowOpTracker.cpp

This file was deleted.

40 changes: 0 additions & 40 deletions src/common/base/SlowOpTracker.h

This file was deleted.

7 changes: 0 additions & 7 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,6 @@ nebula_add_test(
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME slow_op_tracker_test
SOURCES SlowOpTrackerTest.cpp
OBJECTS $<TARGET_OBJECTS:base_obj> $<TARGET_OBJECTS:time_obj>
LIBRARIES gtest gtest_main
)

nebula_add_test(
NAME lru_test
SOURCES ConcurrentLRUCacheTest.cpp
Expand Down
28 changes: 0 additions & 28 deletions src/common/base/test/SlowOpTrackerTest.cpp

This file was deleted.

8 changes: 7 additions & 1 deletion src/kvstore/raftex/Host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
#include <thrift/lib/cpp/util/EnumUtils.h>

#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/time/WallClock.h"
#include "kvstore/raftex/RaftPart.h"
#include "kvstore/stats/KVStats.h"
#include "kvstore/wal/FileBasedWal.h"

DEFINE_uint32(max_appendlog_batch_size,
Expand Down Expand Up @@ -158,9 +161,12 @@ void Host::setResponse(const cpp2::AppendLogResponse& r) {

void Host::appendLogsInternal(folly::EventBase* eb, std::shared_ptr<cpp2::AppendLogRequest> req) {
using TransportException = apache::thrift::transport::TTransportException;
auto beforeRpcUs = time::WallClock::fastNowInMicroSec();
sendAppendLogRequest(eb, req)
.via(eb)
.thenValue([eb, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
.thenValue([eb, beforeRpcUs, self = shared_from_this()](cpp2::AppendLogResponse&& resp) {
stats::StatsManager::addValue(kAppendLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeRpcUs);
VLOG_IF(1, FLAGS_trace_raft)
<< self->idStr_ << "AppendLogResponse "
<< "code " << apache::thrift::util::enumNameSafe(resp.get_error_code()) << ", currTerm "
Expand Down
48 changes: 24 additions & 24 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

#include "common/base/Base.h"
#include "common/base/CollectNSucceeded.h"
#include "common/base/SlowOpTracker.h"
#include "common/network/NetworkUtils.h"
#include "common/stats/StatsManager.h"
#include "common/thread/NamedThread.h"
#include "common/thrift/ThriftClientManager.h"
#include "common/time/ScopedTimer.h"
#include "common/time/WallClock.h"
#include "common/utils/LogStrListIterator.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
Expand Down Expand Up @@ -741,18 +741,18 @@ void RaftPart::appendLogsInternal(AppendLogsIterator iter, TermID termId) {
prevLogTerm = lastLogTerm_;
committed = committedLogId_;
// Step 1: Write WAL
SlowOpTracker tracker;
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
{
SCOPED_TIMER(&execTime_);
if (!wal_->appendLogs(iter)) {
VLOG_EVERY_N(2, 1000) << idStr_ << "Failed to write into WAL";
res = nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL;
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
break;
}
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
lastId = wal_->lastLogId();
if (tracker.slow()) {
tracker.output(idStr_, folly::stringPrintf("Write WAL, total %ld", lastId - prevLogId + 1));
}
VLOG(4) << idStr_ << "Succeeded writing logs [" << iter.firstLogId() << ", " << lastId
<< "] to WAL";
} while (false);
Expand Down Expand Up @@ -797,7 +797,7 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
<< iter.firstLogId() << ", " << lastLogId << "] to all peer hosts";

lastMsgSentDur_.reset();
SlowOpTracker tracker;
auto beforeAppendLogUs = time::WallClock::fastNowInMicroSec();
collectNSucceeded(gen::from(hosts) |
gen::map([self = shared_from_this(),
eb,
Expand Down Expand Up @@ -830,13 +830,11 @@ void RaftPart::replicateLogs(folly::EventBase* eb,
prevLogId,
prevLogTerm,
pHosts = std::move(hosts),
tracker](folly::Try<AppendLogResponses>&& result) mutable {
beforeAppendLogUs](folly::Try<AppendLogResponses>&& result) mutable {
VLOG(4) << self->idStr_ << "Received enough response";
CHECK(!result.hasException());
if (tracker.slow()) {
tracker.output(self->idStr_,
folly::stringPrintf("Total send logs: %ld", lastLogId - prevLogId + 1));
}
stats::StatsManager::addValue(kReplicateLogLatencyUs,
time::WallClock::fastNowInMicroSec() - beforeAppendLogUs);
self->processAppendLogResponses(*result,
eb,
std::move(it),
Expand Down Expand Up @@ -913,7 +911,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,

{
auto walIt = wal_->iterator(committedId + 1, lastLogId);
SlowOpTracker tracker;
// Step 3: Commit the batch
auto [code, lastCommitId, lastCommitTerm] = commitLogs(std::move(walIt), true);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand All @@ -933,10 +930,6 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps,
} else {
LOG(FATAL) << idStr_ << "Failed to commit logs";
}
if (tracker.slow()) {
tracker.output(idStr_,
folly::stringPrintf("Total commit: %ld", committedLogId_ - committedId));
}
VLOG(4) << idStr_ << "Leader succeeded in committing the logs " << committedId + 1 << " to "
<< lastLogId;
}
Expand Down Expand Up @@ -1185,6 +1178,7 @@ folly::Future<bool> RaftPart::leaderElection(bool isPreVote) {

auto proposedTerm = voteReq.get_term();
auto resps = ElectionResponses();
stats::StatsManager::addValue(kNumStartElect);
if (hosts.empty()) {
auto ret = handleElectionResponses(resps, hosts, proposedTerm, isPreVote);
inElection_ = false;
Expand Down Expand Up @@ -1462,7 +1456,7 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
// Reset the last message time
lastMsgRecvDur_.reset();
isBlindFollower_ = false;
stats::StatsManager::addValue(kNumRaftVotes);
stats::StatsManager::addValue(kNumGrantVotes);
return;
}

Expand Down Expand Up @@ -1585,7 +1579,13 @@ void RaftPart::processAppendLogRequest(const cpp2::AppendLogRequest& req,
std::make_move_iterator(req.get_log_str_list().begin() + diffIndex),
std::make_move_iterator(req.get_log_str_list().end()));
RaftLogIterator logIter(firstId, std::move(logEntries));
if (wal_->appendLogs(logIter)) {
bool result = false;
{
SCOPED_TIMER(&execTime_);
result = wal_->appendLogs(logIter);
}
stats::StatsManager::addValue(kAppendWalLatencyUs, execTime_);
if (result) {
CHECK_EQ(lastId, wal_->lastLogId());
lastLogId_ = wal_->lastLogId();
lastLogTerm_ = wal_->lastLogTerm();
Expand Down
15 changes: 13 additions & 2 deletions src/kvstore/stats/KVStats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,28 @@ namespace nebula {

stats::CounterId kCommitLogLatencyUs;
stats::CounterId kCommitSnapshotLatencyUs;
stats::CounterId kAppendWalLatencyUs;
stats::CounterId kReplicateLogLatencyUs;
stats::CounterId kAppendLogLatencyUs;
stats::CounterId kTransferLeaderLatencyUs;
stats::CounterId kNumRaftVotes;
stats::CounterId kNumStartElect;
stats::CounterId kNumGrantVotes;

void initKVStats() {
kCommitLogLatencyUs = stats::StatsManager::registerHisto(
"commit_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kCommitSnapshotLatencyUs = stats::StatsManager::registerHisto(
"commit_snapshot_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendWalLatencyUs = stats::StatsManager::registerHisto(
"append_wal_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kReplicateLogLatencyUs = stats::StatsManager::registerHisto(
"replicate_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kAppendLogLatencyUs = stats::StatsManager::registerHisto(
"append_log_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kTransferLeaderLatencyUs = stats::StatsManager::registerHisto(
"transfer_leader_latency_us", 1000, 0, 2000, "avg, p75, p95, p99, p999");
kNumRaftVotes = stats::StatsManager::registerStats("num_raft_votes", "rate, sum");
kNumStartElect = stats::StatsManager::registerStats("num_start_elect", "rate, sum");
kNumGrantVotes = stats::StatsManager::registerStats("num_grant_votes", "rate, sum");
}

} // namespace nebula
6 changes: 5 additions & 1 deletion src/kvstore/stats/KVStats.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ namespace nebula {

extern stats::CounterId kCommitLogLatencyUs;
extern stats::CounterId kCommitSnapshotLatencyUs;
extern stats::CounterId kAppendWalLatencyUs;
extern stats::CounterId kReplicateLogLatencyUs;
extern stats::CounterId kAppendLogLatencyUs;
extern stats::CounterId kTransferLeaderLatencyUs;
extern stats::CounterId kNumRaftVotes;
extern stats::CounterId kNumStartElect;
extern stats::CounterId kNumGrantVotes;

void initKVStats();

Expand Down

0 comments on commit c532203

Please sign in to comment.