Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

MB-7017 Separate entry per high priority checkpoint flush request

The flusher can receive multiple high priority checkpoint
persistence requests for the same vbucket within a short time
window. Therefore, it should maintain a separate entry per
request with the corresponding upstream connection cookie, and
notify either SUCCESS or TIMEOUT to that connection cookie.

Change-Id: I9263d0161363ca88b55ece99a38da81a5f87656f
Reviewed-on: http://review.couchbase.org/22105
Reviewed-by: Jin Lim <jin@couchbase.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information...
commit 713e2486b58458f996c9fe6486b11d8ba3170005 1 parent 96d50b2
@chiyoung chiyoung authored Farshid Ghods committed
View
2  docs/stats.org
@@ -192,6 +192,8 @@ For introductory information on stats within membase, start with the
| | checkpoint before a new one is created |
| ep_chk_period | The maximum lifetime of a checkpoint |
| | before a new one is created |
+| ep_chk_persistence_remains | Number of remaining vbuckets for |
+| | checkpoint persistence |
| ep_chk_persistence_timeout | Timeout for vbucket checkpoint |
| | persistence |
| ep_chk_remover_stime | The time interval for purging closed |
View
80 src/ep.cc
@@ -1802,11 +1802,14 @@ vb_flush_queue_t* EventuallyPersistentStore::beginFlush() {
vbuckets.setPersistenceCheckpointId(vbid, chkid);
schedule_vb_snapshot = true;
}
- HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
- if (vb_entry.cookie) {
+ std::list<HighPriorityVBEntry> vb_entries =
+ flusher->getHighPriorityVBEntries(vbid);
+ std::list<HighPriorityVBEntry>::iterator vit = vb_entries.begin();
+ for (; vit != vb_entries.end(); ++vit) {
+ HighPriorityVBEntry &vb_entry = *vit;
if (vb_entry.checkpoint <= chkid) {
engine.notifyIOComplete(vb_entry.cookie, ENGINE_SUCCESS);
- flusher->removeHighPriorityVBucket(vbid);
+ flusher->removeHighPriorityVBEntry(vbid, vb_entry.cookie);
hrtime_t wall_time(gethrtime() - vb_entry.start);
stats.chkPersistenceHisto.add(wall_time / 1000);
flusher->adjustCheckpointFlushTimeout(wall_time / 1000000000);
@@ -1818,7 +1821,7 @@ vb_flush_queue_t* EventuallyPersistentStore::beginFlush() {
size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
if (spent > flusher->getCheckpointFlushTimeout()) {
engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
- flusher->removeHighPriorityVBucket(vbid);
+ flusher->removeHighPriorityVBEntry(vbid, vb_entry.cookie);
flusher->adjustCheckpointFlushTimeout(spent);
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Notified the timeout on checkpoint "
@@ -1989,17 +1992,20 @@ int EventuallyPersistentStore::flushVBQueue(RCPtr<VBucket> &vb,
int oldest = data_age;
if (vb_queue.empty()) {
- HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
- size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
- if (vb_entry.cookie &&
- (!vb || spent > flusher->getCheckpointFlushTimeout())) {
- engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
- flusher->removeHighPriorityVBucket(vbid);
- flusher->adjustCheckpointFlushTimeout(spent);
- getLogger()->log(EXTENSION_LOG_WARNING, NULL,
- "Notified the timeout on checkpoint "
- "persistence for vbucket %d, cookie %p\n",
- vbid, vb_entry.cookie);
+ std::list<HighPriorityVBEntry> vb_entries = flusher->getHighPriorityVBEntries(vbid);
+ std::list<HighPriorityVBEntry>::iterator vit = vb_entries.begin();
+ for (; vit != vb_entries.end(); ++vit) {
+ HighPriorityVBEntry &vb_entry = *vit;
+ size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
+ if (!vb || spent > flusher->getCheckpointFlushTimeout()) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
+ flusher->removeHighPriorityVBEntry(vbid, vb_entry.cookie);
+ flusher->adjustCheckpointFlushTimeout(spent);
+ getLogger()->log(EXTENSION_LOG_WARNING, NULL,
+ "Notified the timeout on checkpoint "
+ "persistence for vbucket %d, cookie %p\n",
+ vbid, vb_entry.cookie);
+ }
}
return oldest;
}
@@ -2012,8 +2018,6 @@ int EventuallyPersistentStore::flushVBQueue(RCPtr<VBucket> &vb,
}
}
- bool notified = false;
- HighPriorityVBEntry vb_entry = flusher->getHighPriorityVBEntry(vbid);
if (vb) {
if (rejectQueue.empty()) {
uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
@@ -2021,18 +2025,6 @@ int EventuallyPersistentStore::flushVBQueue(RCPtr<VBucket> &vb,
vbuckets.setPersistenceCheckpointId(vbid, chkid);
snapshotVBState = true;
}
- if (vb_entry.checkpoint <= chkid && vb_entry.cookie) {
- engine.notifyIOComplete(vb_entry.cookie, ENGINE_SUCCESS);
- flusher->removeHighPriorityVBucket(vbid);
- notified = true;
- hrtime_t wall_time(gethrtime() - vb_entry.start);
- stats.chkPersistenceHisto.add(wall_time / 1000);
- flusher->adjustCheckpointFlushTimeout(wall_time / 1000000000);
- getLogger()->log(EXTENSION_LOG_WARNING, NULL,
- "Notified the completion of checkpoint "
- "persistence for vbucket %d, cookie %p\n",
- vbid, vb_entry.cookie);
- }
} else {
size_t qsize = rejectQueue.size();
// Requeue the rejects.
@@ -2046,16 +2038,32 @@ int EventuallyPersistentStore::flushVBQueue(RCPtr<VBucket> &vb,
}
}
- if (!notified && vb_entry.cookie) {
- size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
- if (!vb || spent > flusher->getCheckpointFlushTimeout()) {
- engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
- flusher->removeHighPriorityVBucket(vbid);
- flusher->adjustCheckpointFlushTimeout(spent);
+ uint64_t persisted_chkid = vbuckets.getPersistenceCheckpointId(vbid);
+ std::list<HighPriorityVBEntry> vb_entries = flusher->getHighPriorityVBEntries(vbid);
+ std::list<HighPriorityVBEntry>::iterator vit = vb_entries.begin();
+ for (; vit != vb_entries.end(); ++vit) {
+ HighPriorityVBEntry &vb_entry = *vit;
+ if (vb_entry.checkpoint <= persisted_chkid) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_SUCCESS);
+ flusher->removeHighPriorityVBEntry(vbid, vb_entry.cookie);
+ hrtime_t wall_time(gethrtime() - vb_entry.start);
+ stats.chkPersistenceHisto.add(wall_time / 1000);
+ flusher->adjustCheckpointFlushTimeout(wall_time / 1000000000);
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
- "Notified the timeout on checkpoint "
+ "Notified the completion of checkpoint "
"persistence for vbucket %d, cookie %p\n",
vbid, vb_entry.cookie);
+ } else {
+ size_t spent = (gethrtime() - vb_entry.start) / 1000000000;
+ if (!vb || spent > flusher->getCheckpointFlushTimeout()) {
+ engine.notifyIOComplete(vb_entry.cookie, ENGINE_TMPFAIL);
+ flusher->removeHighPriorityVBEntry(vbid, vb_entry.cookie);
+ flusher->adjustCheckpointFlushTimeout(spent);
+ getLogger()->log(EXTENSION_LOG_WARNING, NULL,
+ "Notified the timeout on checkpoint "
+ "persistence for vbucket %d, cookie %p\n",
+ vbid, vb_entry.cookie);
+ }
}
}
View
5 src/ep_engine.cc
@@ -2710,6 +2710,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::doEngineStats(const void *cookie,
add_casted_stat("ep_chk_persistence_timeout",
epstore->getFlusher()->getCheckpointFlushTimeout(),
add_stat, cookie);
+ add_casted_stat("ep_chk_persistence_remains",
+ epstore->getFlusher()->getNumOfHighPriorityVBs(),
+ add_stat, cookie);
return ENGINE_SUCCESS;
}
@@ -3696,7 +3699,7 @@ EventuallyPersistentEngine::handleCheckpointCmds(const void *cookie,
epstore->getVBuckets().getPersistenceCheckpointId(vbucket);
if (chk_id > persisted_chk_id) {
Flusher *flusher = const_cast<Flusher *>(epstore->getFlusher());
- flusher->addHighPriorityVBucket(vbucket, chk_id, cookie);
+ flusher->addHighPriorityVBEntry(vbucket, chk_id, cookie);
storeEngineSpecific(cookie, this);
return ENGINE_EWOULDBLOCK;
}
View
48 src/flusher.cc
@@ -260,36 +260,62 @@ int Flusher::doFlush() {
return flushRv;
}
-void Flusher::addHighPriorityVBucket(uint16_t vbid, uint64_t chkid,
+void Flusher::addHighPriorityVBEntry(uint16_t vbid, uint64_t chkid,
const void *cookie) {
LockHolder lh(priorityVBMutex);
- priorityVBList[vbid] = HighPriorityVBEntry(cookie, chkid);
+ std::map<uint16_t, std::list<HighPriorityVBEntry> >::iterator it =
+ priorityVBList.find(vbid);
+ if (it == priorityVBList.end()) {
+ std::list<HighPriorityVBEntry> vb_entries;
+ vb_entries.push_back(HighPriorityVBEntry(cookie, chkid));
+ priorityVBList.insert(std::make_pair(vbid, vb_entries));
+ } else {
+ it->second.push_back(HighPriorityVBEntry(cookie, chkid));
+ }
}
-void Flusher::removeHighPriorityVBucket(uint16_t vbid) {
+void Flusher::removeHighPriorityVBEntry(uint16_t vbid, const void *cookie) {
LockHolder lh(priorityVBMutex);
- priorityVBList.erase(vbid);
+ std::map<uint16_t, std::list<HighPriorityVBEntry> >::iterator it =
+ priorityVBList.find(vbid);
+ if (it != priorityVBList.end()) {
+ std::list<HighPriorityVBEntry> &vb_entries = it->second;
+ std::list<HighPriorityVBEntry>::iterator vit = vb_entries.begin();
+ for (; vit != vb_entries.end(); ++vit) {
+ if ((*vit).cookie == cookie) {
+ break;
+ }
+ }
+ if (vit != vb_entries.end()) {
+ vb_entries.erase(vit);
+ }
+ if (vb_entries.empty()) {
+ priorityVBList.erase(vbid);
+ }
+ }
}
void Flusher::getAllHighPriorityVBuckets(std::vector<uint16_t> &vbs) {
LockHolder lh(priorityVBMutex);
- std::map<uint16_t, HighPriorityVBEntry>::iterator it = priorityVBList.begin();
+ std::map<uint16_t, std::list<HighPriorityVBEntry> >::iterator it =
+ priorityVBList.begin();
for (; it != priorityVBList.end(); ++it) {
vbs.push_back(it->first);
}
}
-HighPriorityVBEntry Flusher::getHighPriorityVBEntry(uint16_t vbid) {
+std::list<HighPriorityVBEntry> Flusher::getHighPriorityVBEntries(uint16_t vbid) {
LockHolder lh(priorityVBMutex);
- std::map<uint16_t, HighPriorityVBEntry>::iterator it = priorityVBList.find(vbid);
+ std::list<HighPriorityVBEntry> vb_entries;
+ std::map<uint16_t, std::list<HighPriorityVBEntry> >::iterator it =
+ priorityVBList.find(vbid);
if (it != priorityVBList.end()) {
- return it->second;
- } else {
- return HighPriorityVBEntry(NULL, 0);
+ vb_entries.assign(it->second.begin(), it->second.end());
}
+ return vb_entries;
}
-size_t Flusher::getNumOfHighPriorityVBs() {
+size_t Flusher::getNumOfHighPriorityVBs() const {
return priorityVBList.size();
}
View
10 src/flusher.hh
@@ -91,12 +91,12 @@ public:
enum flusher_state state() const;
const char * stateName() const;
- void addHighPriorityVBucket(uint16_t vbid, uint64_t chkid,
+ void addHighPriorityVBEntry(uint16_t vbid, uint64_t chkid,
const void *cookie);
- void removeHighPriorityVBucket(uint16_t vbid);
+ void removeHighPriorityVBEntry(uint16_t vbid, const void *cookie);
void getAllHighPriorityVBuckets(std::vector<uint16_t> &vbs);
- HighPriorityVBEntry getHighPriorityVBEntry(uint16_t vbid);
- size_t getNumOfHighPriorityVBs();
+ std::list<HighPriorityVBEntry> getHighPriorityVBEntries(uint16_t vbid);
+ size_t getNumOfHighPriorityVBs() const;
size_t getCheckpointFlushTimeout() const;
void adjustCheckpointFlushTimeout(size_t wall_time);
@@ -126,7 +126,7 @@ private:
Atomic<bool> forceShutdownReceived;
Mutex priorityVBMutex;
- std::map<uint16_t, HighPriorityVBEntry> priorityVBList;
+ std::map<uint16_t, std::list<HighPriorityVBEntry> > priorityVBList;
size_t flushPhase;
uint16_t nextVbid;
size_t chkFlushTimeout;
View
77 tests/ep_testsuite.cc
@@ -4479,39 +4479,58 @@ static enum test_result test_extend_open_checkpoint(ENGINE_HANDLE *h, ENGINE_HAN
return SUCCESS;
}
-static enum test_result test_checkpoint_persistence(ENGINE_HANDLE *h,
- ENGINE_HANDLE_V1 *h1) {
- for (int j = 0; j < 1000; ++j) {
- std::stringstream ss;
- ss << "key" << j;
- item *i;
- check(store(h, h1, NULL, OPERATION_SET,
- ss.str().c_str(), ss.str().c_str(), &i, 0, 0) == ENGINE_SUCCESS,
- "Failed to store a value");
- h1->release(h, NULL, i);
- }
+extern "C" {
+ static void* checkpoint_persistence_thread(void *arg) {
+ struct handle_pair *hp = static_cast<handle_pair *>(arg);
- createCheckpoint(h, h1);
- check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
- "Expected success response from creating a new checkpoint");
+ // Issue a request with the unexpected large checkpoint id 100, which
+ // will cause timeout.
+ check(checkpointPersistence(hp->h, hp->h1, 100) == ENGINE_TMPFAIL,
+ "Expected temp failure for checkpoint persistence request");
+ check(get_int_stat(hp->h, hp->h1, "ep_chk_persistence_timeout") > 10,
+ "Expected CHECKPOINT_PERSISTENCE_TIMEOUT was adjusted to be greater"
+ " than 10 secs");
- // Last closed checkpoint id for vbucket 0.
- int closed_chk_id = get_int_stat(h, h1, "vb_0:last_closed_checkpoint_id",
- "checkpoint 0");
- // Request to prioritize persisting vbucket 0.
- check(checkpointPersistence(h, h1, closed_chk_id) == ENGINE_SUCCESS,
- "Failed to request checkpoint persistence");
- check(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS,
- "Expected success response for checkpoint persistence");
+ item *it = NULL;
+ for (int j = 0; j < 1000; ++j) {
+ std::stringstream ss;
+ ss << "key" << j;
+ item *i;
+ check(store(hp->h, hp->h1, NULL, OPERATION_SET,
+ ss.str().c_str(), ss.str().c_str(), &i, 0, 0) == ENGINE_SUCCESS,
+ "Failed to store a value");
+ hp->h1->release(hp->h, NULL, i);
+ }
+
+ createCheckpoint(hp->h, hp->h1);
- // Issue another request with unexpected larger checkpoint id 100, which
- // causes timeout.
- check(checkpointPersistence(h, h1, 100) == ENGINE_TMPFAIL,
- "Expected temp failure for checkpoint persistence request");
- check(get_int_stat(h, h1, "ep_chk_persistence_timeout") > 10,
- "Expected CHECKPOINT_PERSISTENCE_TIMEOUT was adjusted to be greater"
- " than 10 secs");
+ // Last closed checkpoint id for vbucket 0.
+ int closed_chk_id = get_int_stat(hp->h, hp->h1, "vb_0:last_closed_checkpoint_id",
+ "checkpoint 0");
+ // Request to prioritize persisting vbucket 0.
+ check(checkpointPersistence(hp->h, hp->h1, closed_chk_id) == ENGINE_SUCCESS,
+ "Failed to request checkpoint persistence");
+
+ return NULL;
+ }
+}
+static enum test_result test_checkpoint_persistence(ENGINE_HANDLE *h,
+ ENGINE_HANDLE_V1 *h1) {
+ const int n_threads = 2;
+ pthread_t threads[n_threads];
+ struct handle_pair hp = {h, h1};
+
+ for (int i = 0; i < n_threads; ++i) {
+ int r = pthread_create(&threads[i], NULL, checkpoint_persistence_thread, &hp);
+ assert(r == 0);
+ }
+
+ for (int i = 0; i < n_threads; ++i) {
+ void *trv = NULL;
+ int r = pthread_join(threads[i], &trv);
+ assert(r == 0);
+ }
return SUCCESS;
}
Please sign in to comment.
Something went wrong with that request. Please try again.