Skip to content

Commit

Permalink
MB-4461 collapse multiple closed checkpoints into one checkpoint.
Browse files Browse the repository at this point in the history
If persistence or replication is very slow on replica vbuckets,
this would cause memory usage to grow continuously because the
number of checkpoints can increase over the time. To address
this issue, this change supports merging multiple checkpoints
into one checkpoint on replica vbuckets.

Change-Id: Ice1c8b145ef54ed3834dbc62eee41a8611825553
Reviewed-on: http://review.couchbase.org/11811
Reviewed-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information
chiyoung committed Dec 22, 2011
1 parent 925dc5f commit dd4d60b
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 5 deletions.
132 changes: 130 additions & 2 deletions checkpoint.cc
Expand Up @@ -102,6 +102,48 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &item, CheckpointManager
return rv;
}

size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) {
size_t numNewItems = 0;
size_t newEntryMemOverhead = 0;
std::list<queued_item>::reverse_iterator rit = pPrevCheckpoint->rbegin();
for (; rit != pPrevCheckpoint->rend(); ++rit) {
const std::string &key = (*rit)->getKey();
if (key.size() == 0) {
continue;
}
checkpoint_index::iterator it = keyIndex.find(key);
if (it == keyIndex.end()) {
// Skip the first two meta items
std::list<queued_item>::iterator pos = toWrite.begin();
for (; pos != toWrite.end(); ++pos) {
if ((*pos)->getKey().compare("") != 0) {
break;
}
}
toWrite.insert(pos, *rit);
index_entry entry = {--pos, pPrevCheckpoint->getMutationIdForKey(key)};
keyIndex[key] = entry;
newEntryMemOverhead += key.size() + sizeof(index_entry);
++numItems;
++numNewItems;
}
}
indexMemOverhead += newEntryMemOverhead;
stats.memOverhead.incr(newEntryMemOverhead);
assert(stats.memOverhead.get() < GIGANTOR);
return numNewItems;
}

uint64_t Checkpoint::getMutationIdForKey(const std::string &key) {
uint64_t mid = 0;
checkpoint_index::iterator it = keyIndex.find(key);
if (it != keyIndex.end()) {
mid = it->second.mutation_id;
}
return mid;
}


Atomic<rel_time_t> CheckpointManager::checkpointPeriod = DEFAULT_CHECKPOINT_PERIOD;
Atomic<size_t> CheckpointManager::checkpointMaxItems = DEFAULT_CHECKPOINT_ITEMS;
Atomic<size_t> CheckpointManager::maxCheckpoints = DEFAULT_MAX_CHECKPOINTS;
Expand All @@ -117,8 +159,7 @@ CheckpointManager::~CheckpointManager() {
}
}

uint64_t CheckpointManager::getOpenCheckpointId() {
LockHolder lh(queueLock);
uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() {
if (checkpointList.size() == 0) {
return 0;
}
Expand All @@ -127,6 +168,11 @@ uint64_t CheckpointManager::getOpenCheckpointId() {
return checkpointList.back()->getState() == opened ? id : id + 1;
}

uint64_t CheckpointManager::getOpenCheckpointId() {
LockHolder lh(queueLock);
return getOpenCheckpointId_UNLOCKED();
}

void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) {
if (checkpointList.size() > 0) {
checkpointList.back()->setId(id);
Expand Down Expand Up @@ -503,6 +549,14 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(const RCPtr<VBucket> &vbu
}
unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList,
checkpointList.begin(), it);
// If any cursor on a replica vbucket or downstream active vbucket receiving checkpoints from
// the upstream master is very slow and causes more closed checkpoints in memory,
// collapse those closed checkpoints into a single one to reduce the memory overhead.
if (!keepClosedCheckpoints &&
(vbucket->getState() == vbucket_state_replica ||
(vbucket->getState() == vbucket_state_active && inconsistentSlaveCheckpoint))) {
collapseClosedCheckpoints(unrefCheckpointList);
}
lh.unlock();

std::list<Checkpoint*>::iterator chkpoint_it = unrefCheckpointList.begin();
Expand Down Expand Up @@ -541,6 +595,73 @@ void CheckpointManager::removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint
}
}

void CheckpointManager::collapseClosedCheckpoints(std::list<Checkpoint*> &collapsedChks) {
// If there are one open checkpoint and more than one closed checkpoint, collapse those
// closed checkpoints into one checkpoint to reduce the memory overhead.
if (checkpointList.size() > 2) {
std::set<std::string> slowCursors;
std::set<std::string> fastCursors;
std::list<Checkpoint*>::iterator lastClosedChk = checkpointList.end();
--lastClosedChk; --lastClosedChk; // Move to the lastest closed checkpoint.
fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(),
(*lastClosedChk)->getCursorNameList().end());
std::list<Checkpoint*>::reverse_iterator rit = checkpointList.rbegin();
++rit; ++rit;// Move to the second lastest closed checkpoint.
size_t numDuplicatedItems = 0, numMetaItems = 0;
for (; rit != checkpointList.rend(); ++rit) {
size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit);
numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems);
numMetaItems += 2; // checkpoint start and end meta items
slowCursors.insert((*rit)->getCursorNameList().begin(),
(*rit)->getCursorNameList().end());
}
// Reposition the slow cursors to the beginning of the last closed checkpoint.
std::set<std::string>::iterator sit = slowCursors.begin();
for (; sit != slowCursors.end(); ++sit) {
if ((*sit).compare(persistenceCursor.name) == 0) { // Reposition persistence cursor
persistenceCursor.currentCheckpoint = lastClosedChk;
persistenceCursor.currentPos = (*lastClosedChk)->begin();
persistenceCursor.offset = 0;
(*lastClosedChk)->registerCursorName(persistenceCursor.name);
} else if ((*sit).compare(onlineUpdateCursor.name) == 0) { // onlineUpdate cursor
onlineUpdateCursor.currentCheckpoint = lastClosedChk;
onlineUpdateCursor.currentPos = (*lastClosedChk)->begin();
onlineUpdateCursor.offset = 0;
(*lastClosedChk)->registerCursorName(onlineUpdateCursor.name);
} else { // Reposition tap cursors
std::map<const std::string, CheckpointCursor>::iterator mit = tapCursors.find(*sit);
if (mit != tapCursors.end()) {
mit->second.currentCheckpoint = lastClosedChk;
mit->second.currentPos = (*lastClosedChk)->begin();
mit->second.offset = 0;
(*lastClosedChk)->registerCursorName(mit->second.name);
}
}
}

numItems -= (numDuplicatedItems + numMetaItems);
Checkpoint *pOpenCheckpoint = checkpointList.back();
const std::set<std::string> &openCheckpointCursors = pOpenCheckpoint->getCursorNameList();
fastCursors.insert(openCheckpointCursors.begin(), openCheckpointCursors.end());
std::set<std::string>::const_iterator cit = fastCursors.begin();
// Update the offset of each fast cursor.
for (; cit != fastCursors.end(); ++cit) {
if ((*cit).compare(persistenceCursor.name) == 0) {
decrPersistenceCursorOffset(numDuplicatedItems + numMetaItems);
} else if ((*cit).compare(onlineUpdateCursor.name) == 0) {
onlineUpdateCursor.offset -= (numDuplicatedItems + numMetaItems);
} else {
std::map<const std::string, CheckpointCursor>::iterator mit = tapCursors.find(*cit);
if (mit != tapCursors.end()) {
mit->second.offset -= (numDuplicatedItems + numMetaItems);
}
}
}
collapsedChks.splice(collapsedChks.end(), checkpointList,
checkpointList.begin(), lastClosedChk);
}
}

bool CheckpointManager::queueDirty(const queued_item &item, const RCPtr<VBucket> &vbucket) {
LockHolder lh(queueLock);
if (vbucket->getState() != vbucket_state_active &&
Expand Down Expand Up @@ -880,6 +1001,13 @@ bool CheckpointManager::checkAndAddNewCheckpoint(uint64_t id, bool &pCursorRepos
}

if (it == checkpointList.end()) {
if ((checkpointList.back()->getId() + 1) < id) {
isCollapsedCheckpoint = true;
uint64_t oid = getOpenCheckpointId_UNLOCKED();
lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0;
} else if ((checkpointList.back()->getId() + 1) == id) {
isCollapsedCheckpoint = false;
}
if (checkpointList.back()->getState() == opened) {
closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId());
}
Expand Down
38 changes: 35 additions & 3 deletions checkpoint.hh
Expand Up @@ -187,6 +187,14 @@ public:
return toWrite.end();
}

std::list<queued_item>::reverse_iterator rbegin() {
return toWrite.rbegin();
}

std::list<queued_item>::reverse_iterator rend() {
return toWrite.rend();
}

uint64_t getCasForKey(const std::string &key);

/**
Expand All @@ -199,6 +207,21 @@ public:
return sizeof(Checkpoint) + indexMemOverhead;
}

/**
* Merge the previous checkpoint into the this checkpoint by adding the items from
* the previous checkpoint, which don't exist in this checkpoint.
* @param pPrevCheckpoint pointer to the previous checkpoint.
* @return the number of items added from the previous checkpoint.
*/
size_t mergePrevCheckpoint(Checkpoint *pPrevCheckpoint);

/**
* Get the mutation id for a given key in this checkpoint
* @param key a key to retrieve its mutation id
* @return the mutation id for a given key
*/
uint64_t getMutationIdForKey(const std::string &key);

private:
EPStats &stats;
uint64_t checkpointId;
Expand All @@ -225,19 +248,24 @@ public:
CheckpointManager(EPStats &st, uint16_t vbucket, uint64_t checkpointId = 1) :
stats(st), vbucketId(vbucket), numItems(0),
mutationCounter(0), persistenceCursor("persistence"), onlineUpdateCursor("online_update"),
doOnlineUpdate(false), doHotReload(false) {
isCollapsedCheckpoint(false), doOnlineUpdate(false), doHotReload(false) {

addNewCheckpoint(checkpointId);
registerPersistenceCursor();
}

~CheckpointManager();

uint64_t getOpenCheckpointId_UNLOCKED();
uint64_t getOpenCheckpointId();

uint64_t getLastClosedCheckpointId() {
uint64_t id = getOpenCheckpointId();
return id > 0 ? (id - 1) : 0;
LockHolder lh(queueLock);
if (!isCollapsedCheckpoint) {
uint64_t id = getOpenCheckpointId_UNLOCKED();
lastClosedCheckpointId = id > 0 ? (id - 1) : 0;
}
return lastClosedCheckpointId;
}

void setOpenCheckpointId_UNLOCKED(uint64_t id);
Expand Down Expand Up @@ -542,6 +570,8 @@ private:

bool isCheckpointCreationForHighMemUsage(const RCPtr<VBucket> &vbucket);

void collapseClosedCheckpoints(std::list<Checkpoint*> &collapsedChks);

static bool validateCheckpointMaxItemsParam(size_t checkpoint_max_items);
static bool validateCheckpointPeriodParam(size_t checkpoint_period);
static bool validateMaxCheckpointsParam(size_t max_checkpoints);
Expand All @@ -556,6 +586,8 @@ private:
std::list<Checkpoint*> checkpointList;
CheckpointCursor persistenceCursor;
CheckpointCursor onlineUpdateCursor;
bool isCollapsedCheckpoint;
uint64_t lastClosedCheckpointId;
std::map<const std::string, CheckpointCursor> tapCursors;

// Period of a checkpoint in terms of time in sec
Expand Down

0 comments on commit dd4d60b

Please sign in to comment.