Skip to content

Commit

Permalink
Meta server: fix extra replicas removal due to evacuation completion …
Browse files Browse the repository at this point in the history
…by adding stale chunk notify RPC (with single chunk ID) emitted due chunk evacuation, to chunk in flight RPCs in order to prevent starting re-balancing and/or extra replicas removal while the RPC is still in flight.
  • Loading branch information
mikeov committed Aug 9, 2020
1 parent 1b5a3f5 commit 432924b
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/cc/emulator/ChunkServerEmulator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ ChunkServerEmulator::Init(int64_t totalSpace, int64_t usedSpace,

void
ChunkServerEmulator::Enqueue(MetaChunkRequest& req,
int timeout, bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag)
int timeout, bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag,
chunkId_t addChunkIdInFlight)
{
if (0 != mDispatchRecursionCount) {
panic("dispatch recursion");
Expand Down
3 changes: 2 additions & 1 deletion src/cc/emulator/ChunkServerEmulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class ChunkServerEmulator : private TcpSocket, public ChunkServer

protected:
virtual void Enqueue(MetaChunkRequest& req, int timeout,
bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag);
bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag,
chunkId_t addChunkIdInFlight);

private:
typedef vector<MetaChunkRequest*> PendingReqs;
Expand Down
29 changes: 18 additions & 11 deletions src/cc/meta/ChunkServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,9 @@ ChunkServer::Submit(MetaRequest& op)
inline void
ChunkServer::RemoveInFlight(MetaChunkRequest& req)
{
if (0 <= req.chunkId) {
if (MetaChunkRequest::kNullIterator == req.inFlightIt) {
panic("chunk server invalid chunks in flight iterator");
if (MetaChunkRequest::kNullIterator != req.inFlightIt) {
if (req.chunkId < 0 && ! req.GetChunkIds()) {
panic("chunk server invalid chunks in flight op");
return;
}
sChunkOpsInFlight.erase(req.inFlightIt);
Expand Down Expand Up @@ -2561,9 +2561,8 @@ ChunkServer::Enqueue(MetaChunkLogInFlight& r)
if (r.replayFlag || r.submitCount <= 0 ||
(! r.logseq.IsValid() && 0 <= r.status) ||
! req || 0 != req->submitCount ||
(0 <= req->chunkId ?
req != req->inFlightIt->second :
MetaChunkRequest::kNullIterator != req->inFlightIt)) {
(MetaChunkRequest::kNullIterator != req->inFlightIt ?
req != req->inFlightIt->second : 0 <= req->chunkId)) {
panic("chunk server: invalid submit attempt");
r.status = -EFAULT;
}
Expand Down Expand Up @@ -2606,7 +2605,8 @@ ChunkServer::Enqueue(MetaChunkLogInFlight& r)
///
void
ChunkServer::Enqueue(MetaChunkRequest& req,
int timeout, bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag)
int timeout, bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag,
chunkId_t addChunkIdInFlight)
{
if (this != &*req.server || ! mHelloDone) {
panic(mHelloDone ?
Expand Down Expand Up @@ -2650,9 +2650,11 @@ ChunkServer::Enqueue(MetaChunkRequest& req,
req.suspended = true;
req.shortRpcFormatFlag = mShortRpcFormatFlag;
if (! loggedFlag) {
if (0 <= req.chunkId) {
const chunkId_t chunkIdInFlight = 0 <= addChunkIdInFlight ?
addChunkIdInFlight : req.chunkId;
if (0 <= chunkIdInFlight) {
req.inFlightIt = sChunkOpsInFlight.insert(
make_pair(req.chunkId, &req));
make_pair(chunkIdInFlight, &req));
}
if (! req.replayFlag) {
mLogInFlightCount++;
Expand Down Expand Up @@ -2998,14 +3000,19 @@ ChunkServer::NotifyStaleChunks(
}

void
ChunkServer::NotifyStaleChunk(chunkId_t staleChunkId, bool evacuatedFlag)
ChunkServer::NotifyStaleChunkSelf(chunkId_t staleChunkId, bool evacuatedFlag)
{
MetaChunkStaleNotify& req = *(new MetaChunkStaleNotify(
GetSelfPtr(), evacuatedFlag,
mStaleChunksHexFormatFlag, 0));
req.staleChunkIds.Insert(staleChunkId);
mChunksToEvacuate.Erase(staleChunkId);
Enqueue(req);
const int kTimeout = -1;
const bool kStaleChunkIdFlag = false;
const bool kLoggedFlag = false;
const bool kRemoveReplicaFlag = false;
Enqueue(req, kTimeout, kStaleChunkIdFlag, kLoggedFlag, kRemoveReplicaFlag,
staleChunkId);
}

void
Expand Down
16 changes: 13 additions & 3 deletions src/cc/meta/ChunkServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,10 @@ class ChunkServer :
{ NotifyStaleChunks(staleChunks, false, true, &ca); }
void NotifyStaleChunks(InFlightChunks& staleChunks, MetaHello& hello)
{ NotifyStaleChunks(staleChunks, false, true, 0, &hello); }
void NotifyStaleChunk(chunkId_t staleChunk, bool evacuatedFlag = false);
void NotifyStaleChunk(chunkId_t staleChunk)
{ NotifyStaleChunkSelf(staleChunk, false); }
void NotifyStaleChunkEvacuated(chunkId_t staleChunk)
{ NotifyStaleChunkSelf(staleChunk, true); }

/// There is a difference between the version # as stored
/// at the chunkserver and what is on the metaserver. By sending
Expand Down Expand Up @@ -994,10 +997,16 @@ class ChunkServer :
const NetConnectionPtr& conn, const ServerLocation& loc);
/// Chunk server emulator overrides this.
virtual void Enqueue(MetaChunkRequest& req, int timeout,
bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag);
bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag,
chunkId_t addChunkIdInFlight);
void Enqueue(MetaChunkRequest& req, int timeout,
bool staleChunkIdFlag, bool loggedFlag, bool removeReplicaFlag) {
Enqueue(req, timeout, staleChunkIdFlag, loggedFlag, removeReplicaFlag,
-1);
}
void Enqueue(MetaChunkRequest& req, int timeout,
bool staleChunkIdFlag, bool loggedFlag) {
Enqueue(req, timeout, staleChunkIdFlag, loggedFlag, false);
Enqueue(req, timeout, staleChunkIdFlag, loggedFlag, false, -1);
}
void Enqueue(MetaChunkRequest& req, int timeout, bool staleChunkIdFlag) {
Enqueue(req, timeout, staleChunkIdFlag, false);
Expand Down Expand Up @@ -1462,6 +1471,7 @@ class ChunkServer :
inline ChunkServerPtr GetSelfPtr();
bool ReplayValidate(MetaRequest& r) const;
inline void RemoveInFlight(MetaChunkRequest& req);
void NotifyStaleChunkSelf(chunkId_t staleChunk, bool evacuatedFlag);
void SubmitMetaBye();
static inline int GetMaxPendingHelloBytes();
};
Expand Down
17 changes: 11 additions & 6 deletions src/cc/meta/LayoutManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11484,7 +11484,7 @@ LayoutManager::CanReplicateChunkNow(
// number of replicas, then delete all extra copies at once.
// Take into the account hibernated servers, delay the re-replication /
// recovery until the hibernated servers are removed.
// For now count hibernated server only in the case if
// For now count hibernated server only if it is not chunk recovery.
if (extraReplicas <= 0) {
extraReplicas -= numRetiringServers;
} else if (recoveryInfo && (int)hibernatedCount <= extraReplicas) {
Expand All @@ -11504,9 +11504,6 @@ LayoutManager::CanReplicateChunkNow(
// appended to the transaction log. Handle possible out of date map entry by
// delaying / rescheduling extra replicas removal if / when any stale notify
// for this chunk is in flight.
// In theory delete is redundant in the pending op list below at the moment,
// as delete should not be issued to remove extra replicas. Is is in the list
// for additional safety / in case if this changes in the future.
MetaOp const kPendingOpTypes[] = {
META_CHUNK_STALENOTIFY,
META_CHUNK_DELETE,
Expand Down Expand Up @@ -12145,10 +12142,18 @@ LayoutManager::RemoveRetiring(
i++;
continue;
}
KFS_LOG_STREAM_DEBUG <<
"remove retiring:"
" srv: " << server->GetServerLocation() <<
" chunk: " << ci.GetChunkId() <<
(server->IsReplay() ? " replay" : "") <<
" retiring: " << server->IsRetiring() <<
" down: " << server->IsDown() <<
" del: " << deleteRetiringFlag <<
KFS_LOG_EOM;
if (server->IsDown() || ! server->IsRetiring()) {
// Queue RPC to log and remove entry, and replica.
const bool kEvacuateChunkFlag = true;
server->NotifyStaleChunk(chunkId, kEvacuateChunkFlag);
server->NotifyStaleChunkEvacuated(chunkId);
} else {
if (deleteRetiringFlag) {
server->DeleteChunk(chunkId);
Expand Down

0 comments on commit 432924b

Please sign in to comment.