Skip to content

Commit

Permalink
Improve visibility of consistency checker (apple#11018) (apple#11038)
Browse files Browse the repository at this point in the history
* improve visibility consistency checker

* fix for CI failure
  • Loading branch information
kakaiu committed Oct 26, 2023
1 parent 8b96b55 commit d24a62c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 3 deletions.
47 changes: 45 additions & 2 deletions fdbserver/ConsistencyScan.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,12 +399,18 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
*bytesReadInPrevRound == 0
? maxRate
: std::min(maxRate, static_cast<int64_t>(ceil(*bytesReadInPrevRound / (float)targetInterval)));
TraceEvent("ConsistencyCheck_RateLimitForThisRound")
.detail("RateLimit", rateLimitForThisRound)
.detail("BytesReadInPrevRound", *bytesReadInPrevRound)
.detail("TargetInterval", targetInterval)
.detail("MaxRate", maxRate);
ASSERT(rateLimitForThisRound >= 0 && rateLimitForThisRound <= maxRate);
TraceEvent("ConsistencyCheck_RateLimitForThisRound").detail("RateLimit", rateLimitForThisRound);
state Reference<IRateControl> rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
state double rateLimiterStartTime = now();
state int64_t bytesReadInthisRound = 0;
state bool resume = !(restart || shuffleShards);
state double rateLimiterCumulatedWaitTime = 0;
state bool decideToRunAtMaxRate = false;

state double dbSize = 100e12;
state int ssCount = 1e6;
Expand Down Expand Up @@ -450,6 +456,8 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
SystemDBWriteLockedNow(cx.getReference()), allKeys.begin, allKeys.end));
state int customReplicatedShards = 0;
state int underReplicatedShards = 0;
state int64_t numCheckedShard = 0;
state int64_t numCheckedReadShard = 0;

for (; i < ranges.size(); i++) {
state int shard = shardOrder[i];
Expand All @@ -468,6 +476,10 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
// If the destStorageServers is non-empty, then this shard is being relocated
state bool isRelocating = destStorageServers.size() > 0;

state double shardCheckStartTime = now();
state double rateLimiterWaitTimeForThisShard = 0;
state double dataConsistencyCheckTimeForThisShard = 0;

int desiredReplicas = configuration.storageTeamSize;
if (ddLargeTeamEnabled()) {
// For every custom range that overlaps with this shard range, print it and update the replication count
Expand Down Expand Up @@ -641,6 +653,7 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
// Read a limited number of entries at a time, repeating until all keys in the shard have been read
loop {
try {
state double dataConsistencyCheckBeginTime = now();
lastSampleKey = lastStartSampleKey;

// Get the min version of the storage servers
Expand Down Expand Up @@ -881,6 +894,8 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
}
}

dataConsistencyCheckTimeForThisShard += (now() - dataConsistencyCheckBeginTime);

if (firstValidServer >= 0) {
state VectorRef<KeyValueRef> data = keyValueFutures[firstValidServer].get().get().data;

Expand Down Expand Up @@ -970,15 +985,20 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
TraceEvent("ConsistencyCheck_RateLimit")
.detail("RateLimitForThisRound", rateLimitForThisRound)
.detail("TotalAmountRead", totalReadAmount);
state double rateLimiterBeforeWaitTime = now();
wait(rateLimiter->getAllowance(totalReadAmount));
TraceEvent("ConsistencyCheck_AmountRead1").detail("TotalAmountRead", totalReadAmount);
double rateLimiterCurrentWaitTime = now() - rateLimiterBeforeWaitTime;
rateLimiterWaitTimeForThisShard += rateLimiterCurrentWaitTime;
rateLimiterCumulatedWaitTime += rateLimiterCurrentWaitTime;
TraceEvent("ConsistencyCheck_AmountRead").detail("TotalAmountRead", totalReadAmount);
// Set ratelimit to max allowed if current round has been going on for a while
if (now() - rateLimiterStartTime > 1.1 * targetInterval && rateLimitForThisRound != maxRate) {
rateLimitForThisRound = maxRate;
rateLimiter = Reference<IRateControl>(new SpeedLimit(rateLimitForThisRound, 1));
rateLimiterStartTime = now();
TraceEvent(SevInfo, "ConsistencyCheck_RateLimitSetMaxForThisRound")
.detail("RateLimit", rateLimitForThisRound);
decideToRunAtMaxRate = true;
}
}
bytesReadInRange += totalReadAmount;
Expand Down Expand Up @@ -1121,11 +1141,34 @@ ACTOR Future<Void> checkDataConsistency(Database cx,
}

if (bytesReadInRange > 0) {
numCheckedReadShard++;
TraceEvent("ConsistencyCheck_ReadRange")
.suppressFor(1.0)
.detail("Range", range)
.detail("BytesRead", bytesReadInRange);
}
numCheckedShard++;

TraceEvent("ConsistencyCheck_ShardComplete")
.suppressFor(1.0)
.detail("Index", i)
.detail("BytesReadInthisRound", bytesReadInthisRound)
.detail("NumCheckedReadShard", numCheckedReadShard)
.detail("NumCheckedShard", numCheckedShard)
.detail("Range", range)
.detail("BytesRead", bytesReadInRange)
.detail("ShardCheckTime", now() - shardCheckStartTime)
.detail("DataConsistencyCheckTime", dataConsistencyCheckTimeForThisShard)
.detail("RateLimitTime", rateLimiterWaitTimeForThisShard)
.detail("CumulatedRateLimitTime", rateLimiterCumulatedWaitTime)
.detail("DecideToRunAtMaxRate", decideToRunAtMaxRate)
.detail("ClientId", clientId)
.detail("ClientCount", clientCount)
.detail("FirstClient", firstClient)
.detail("Distributed", distributed)
.detail("PerformTSSCheck", performTSSCheck)
.detail("ShardSampleFactor", shardSampleFactor)
.detail("EffectiveClientCount", effectiveClientCount);
}

if (customReplicatedShards > SERVER_KNOBS->DD_MAX_SHARDS_ON_LARGE_TEAMS) {
Expand Down
5 changes: 4 additions & 1 deletion fdbserver/workloads/ConsistencyCheck.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ struct ConsistencyCheckWorkload : TestWorkload {
ACTOR Future<Void> runCheck(Database cx, ConsistencyCheckWorkload* self) {
CODE_PROBE(self->performQuiescentChecks, "Quiescent consistency check");
CODE_PROBE(!self->performQuiescentChecks, "Non-quiescent consistency check");
state double consistenyCheckerBeginTime = now();

if (self->firstClient || self->distributed) {
try {
Expand Down Expand Up @@ -403,7 +404,9 @@ struct ConsistencyCheckWorkload : TestWorkload {
}
}

TraceEvent("ConsistencyCheck_FinishedCheck").detail("Repetitions", self->repetitions);
TraceEvent("ConsistencyCheck_FinishedCheck")
.detail("Repetitions", self->repetitions)
.detail("TimeSpan", now() - consistenyCheckerBeginTime);

return Void();
}
Expand Down

0 comments on commit d24a62c

Please sign in to comment.