diff --git a/checkpoint.cc b/checkpoint.cc index 49b4d0814..8839fe2d3 100644 --- a/checkpoint.cc +++ b/checkpoint.cc @@ -102,6 +102,48 @@ queue_dirty_t Checkpoint::queueDirty(const queued_item &item, CheckpointManager return rv; } +size_t Checkpoint::mergePrevCheckpoint(Checkpoint *pPrevCheckpoint) { + size_t numNewItems = 0; + size_t newEntryMemOverhead = 0; + std::list::reverse_iterator rit = pPrevCheckpoint->rbegin(); + for (; rit != pPrevCheckpoint->rend(); ++rit) { + const std::string &key = (*rit)->getKey(); + if (key.size() == 0) { + continue; + } + checkpoint_index::iterator it = keyIndex.find(key); + if (it == keyIndex.end()) { + // Skip the first two meta items + std::list::iterator pos = toWrite.begin(); + for (; pos != toWrite.end(); ++pos) { + if ((*pos)->getKey().compare("") != 0) { + break; + } + } + toWrite.insert(pos, *rit); + index_entry entry = {--pos, pPrevCheckpoint->getMutationIdForKey(key)}; + keyIndex[key] = entry; + newEntryMemOverhead += key.size() + sizeof(index_entry); + ++numItems; + ++numNewItems; + } + } + indexMemOverhead += newEntryMemOverhead; + stats.memOverhead.incr(newEntryMemOverhead); + assert(stats.memOverhead.get() < GIGANTOR); + return numNewItems; +} + +uint64_t Checkpoint::getMutationIdForKey(const std::string &key) { + uint64_t mid = 0; + checkpoint_index::iterator it = keyIndex.find(key); + if (it != keyIndex.end()) { + mid = it->second.mutation_id; + } + return mid; +} + + Atomic CheckpointManager::checkpointPeriod = DEFAULT_CHECKPOINT_PERIOD; Atomic CheckpointManager::checkpointMaxItems = DEFAULT_CHECKPOINT_ITEMS; Atomic CheckpointManager::maxCheckpoints = DEFAULT_MAX_CHECKPOINTS; @@ -117,8 +159,7 @@ CheckpointManager::~CheckpointManager() { } } -uint64_t CheckpointManager::getOpenCheckpointId() { - LockHolder lh(queueLock); +uint64_t CheckpointManager::getOpenCheckpointId_UNLOCKED() { if (checkpointList.size() == 0) { return 0; } @@ -127,6 +168,11 @@ uint64_t CheckpointManager::getOpenCheckpointId() { return checkpointList.back()->getState() == opened ? id : id + 1; } +uint64_t CheckpointManager::getOpenCheckpointId() { + LockHolder lh(queueLock); + return getOpenCheckpointId_UNLOCKED(); +} + void CheckpointManager::setOpenCheckpointId_UNLOCKED(uint64_t id) { if (checkpointList.size() > 0) { checkpointList.back()->setId(id); @@ -503,6 +549,14 @@ size_t CheckpointManager::removeClosedUnrefCheckpoints(const RCPtr &vbu } unrefCheckpointList.splice(unrefCheckpointList.begin(), checkpointList, checkpointList.begin(), it); + // If any cursor on a replica vbucket or downstream active vbucket receiving checkpoints from + // the upstream master is very slow and causes more closed checkpoints in memory, + // collapse those closed checkpoints into a single one to reduce the memory overhead. + if (!keepClosedCheckpoints && + (vbucket->getState() == vbucket_state_replica || + (vbucket->getState() == vbucket_state_active && inconsistentSlaveCheckpoint))) { + collapseClosedCheckpoints(unrefCheckpointList); + } lh.unlock(); std::list::iterator chkpoint_it = unrefCheckpointList.begin(); @@ -541,6 +595,73 @@ void CheckpointManager::removeInvalidCursorsOnCheckpoint(Checkpoint *pCheckpoint } } +void CheckpointManager::collapseClosedCheckpoints(std::list &collapsedChks) { + // If there are one open checkpoint and more than one closed checkpoint, collapse those + // closed checkpoints into one checkpoint to reduce the memory overhead. + if (checkpointList.size() > 2) { + std::set slowCursors; + std::set fastCursors; + std::list::iterator lastClosedChk = checkpointList.end(); + --lastClosedChk; --lastClosedChk; // Move to the lastest closed checkpoint. + fastCursors.insert((*lastClosedChk)->getCursorNameList().begin(), + (*lastClosedChk)->getCursorNameList().end()); + std::list::reverse_iterator rit = checkpointList.rbegin(); + ++rit; ++rit;// Move to the second lastest closed checkpoint. + size_t numDuplicatedItems = 0, numMetaItems = 0; + for (; rit != checkpointList.rend(); ++rit) { + size_t numAddedItems = (*lastClosedChk)->mergePrevCheckpoint(*rit); + numDuplicatedItems += ((*rit)->getNumItems() - numAddedItems); + numMetaItems += 2; // checkpoint start and end meta items + slowCursors.insert((*rit)->getCursorNameList().begin(), + (*rit)->getCursorNameList().end()); + } + // Reposition the slow cursors to the beginning of the last closed checkpoint. + std::set::iterator sit = slowCursors.begin(); + for (; sit != slowCursors.end(); ++sit) { + if ((*sit).compare(persistenceCursor.name) == 0) { // Reposition persistence cursor + persistenceCursor.currentCheckpoint = lastClosedChk; + persistenceCursor.currentPos = (*lastClosedChk)->begin(); + persistenceCursor.offset = 0; + (*lastClosedChk)->registerCursorName(persistenceCursor.name); + } else if ((*sit).compare(onlineUpdateCursor.name) == 0) { // onlineUpdate cursor + onlineUpdateCursor.currentCheckpoint = lastClosedChk; + onlineUpdateCursor.currentPos = (*lastClosedChk)->begin(); + onlineUpdateCursor.offset = 0; + (*lastClosedChk)->registerCursorName(onlineUpdateCursor.name); + } else { // Reposition tap cursors + std::map::iterator mit = tapCursors.find(*sit); + if (mit != tapCursors.end()) { + mit->second.currentCheckpoint = lastClosedChk; + mit->second.currentPos = (*lastClosedChk)->begin(); + mit->second.offset = 0; + (*lastClosedChk)->registerCursorName(mit->second.name); + } + } + } + + numItems -= (numDuplicatedItems + numMetaItems); + Checkpoint *pOpenCheckpoint = checkpointList.back(); + const std::set &openCheckpointCursors = pOpenCheckpoint->getCursorNameList(); + fastCursors.insert(openCheckpointCursors.begin(), openCheckpointCursors.end()); + std::set::const_iterator cit = fastCursors.begin(); + // Update the offset of each fast cursor. + for (; cit != fastCursors.end(); ++cit) { + if ((*cit).compare(persistenceCursor.name) == 0) { + decrPersistenceCursorOffset(numDuplicatedItems + numMetaItems); + } else if ((*cit).compare(onlineUpdateCursor.name) == 0) { + onlineUpdateCursor.offset -= (numDuplicatedItems + numMetaItems); + } else { + std::map::iterator mit = tapCursors.find(*cit); + if (mit != tapCursors.end()) { + mit->second.offset -= (numDuplicatedItems + numMetaItems); + } + } + } + collapsedChks.splice(collapsedChks.end(), checkpointList, + checkpointList.begin(), lastClosedChk); + } +} + bool CheckpointManager::queueDirty(const queued_item &item, const RCPtr &vbucket) { LockHolder lh(queueLock); if (vbucket->getState() != vbucket_state_active && @@ -880,6 +1001,13 @@ bool CheckpointManager::checkAndAddNewCheckpoint(uint64_t id, bool &pCursorRepos } if (it == checkpointList.end()) { + if ((checkpointList.back()->getId() + 1) < id) { + isCollapsedCheckpoint = true; + uint64_t oid = getOpenCheckpointId_UNLOCKED(); + lastClosedCheckpointId = oid > 0 ? (oid - 1) : 0; + } else if ((checkpointList.back()->getId() + 1) == id) { + isCollapsedCheckpoint = false; + } if (checkpointList.back()->getState() == opened) { closeOpenCheckpoint_UNLOCKED(checkpointList.back()->getId()); } diff --git a/checkpoint.hh b/checkpoint.hh index 8eb2a3a3b..6c5f0bf30 100644 --- a/checkpoint.hh +++ b/checkpoint.hh @@ -187,6 +187,14 @@ public: return toWrite.end(); } + std::list::reverse_iterator rbegin() { + return toWrite.rbegin(); + } + + std::list::reverse_iterator rend() { + return toWrite.rend(); + } + uint64_t getCasForKey(const std::string &key); /** @@ -199,6 +207,21 @@ public: return sizeof(Checkpoint) + indexMemOverhead; } + /** + * Merge the previous checkpoint into the this checkpoint by adding the items from + * the previous checkpoint, which don't exist in this checkpoint. + * @param pPrevCheckpoint pointer to the previous checkpoint. + * @return the number of items added from the previous checkpoint. + */ + size_t mergePrevCheckpoint(Checkpoint *pPrevCheckpoint); + + /** + * Get the mutation id for a given key in this checkpoint + * @param key a key to retrieve its mutation id + * @return the mutation id for a given key + */ + uint64_t getMutationIdForKey(const std::string &key); + private: EPStats &stats; uint64_t checkpointId; @@ -225,7 +248,7 @@ public: CheckpointManager(EPStats &st, uint16_t vbucket, uint64_t checkpointId = 1) : stats(st), vbucketId(vbucket), numItems(0), mutationCounter(0), persistenceCursor("persistence"), onlineUpdateCursor("online_update"), - doOnlineUpdate(false), doHotReload(false) { + isCollapsedCheckpoint(false), doOnlineUpdate(false), doHotReload(false) { addNewCheckpoint(checkpointId); registerPersistenceCursor(); @@ -233,11 +256,16 @@ public: ~CheckpointManager(); + uint64_t getOpenCheckpointId_UNLOCKED(); uint64_t getOpenCheckpointId(); uint64_t getLastClosedCheckpointId() { - uint64_t id = getOpenCheckpointId(); - return id > 0 ? (id - 1) : 0; + LockHolder lh(queueLock); + if (!isCollapsedCheckpoint) { + uint64_t id = getOpenCheckpointId_UNLOCKED(); + lastClosedCheckpointId = id > 0 ? (id - 1) : 0; + } + return lastClosedCheckpointId; } void setOpenCheckpointId_UNLOCKED(uint64_t id); @@ -542,6 +570,8 @@ private: bool isCheckpointCreationForHighMemUsage(const RCPtr &vbucket); + void collapseClosedCheckpoints(std::list &collapsedChks); + static bool validateCheckpointMaxItemsParam(size_t checkpoint_max_items); static bool validateCheckpointPeriodParam(size_t checkpoint_period); static bool validateMaxCheckpointsParam(size_t max_checkpoints); @@ -556,6 +586,8 @@ private: std::list checkpointList; CheckpointCursor persistenceCursor; CheckpointCursor onlineUpdateCursor; + bool isCollapsedCheckpoint; + uint64_t lastClosedCheckpointId; std::map tapCursors; // Period of a checkpoint in terms of time in sec