Skip to content

Commit

Permalink
Schedule backfill task through Non-IO dispatcher.
Browse files Browse the repository at this point in the history
Previously, each backfill task was scheduled by creating a separate
thread. We observed that this might cause much overhead especially
with more than one replica in the large cluster.

This change addresses the above issue by scheduling a backfill task
through Non-IO dispatcher. If a given backfill task requires
backfill from disk, it kicks off separate disk backfill tasks
through Read-Only dispatcher. Backfill from memory is scheduled per
vbucket by Non-IO dispatcher.

Each memory backfill task sleeps for 1 sec if the current backfill
backlog for the corresponding TAP producer is greater than the
threshold (5000 by default).

Change-Id: I4fb63d89a49ad10748a7b540e6383d3d20f02d7e
Reviewed-on: http://review.couchbase.org/6888
Reviewed-by: Bin Cui <bin.cui@gmail.com>
Reviewed-by: Trond Norbye <trond.norbye@gmail.com>
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information
chiyoung committed Jun 14, 2011
1 parent 5d1bd13 commit 24f6eee
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 350 deletions.
2 changes: 2 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ ep_la_SOURCES = \
atomic/gcc_atomics.h \
atomic/libatomic.h \
atomic.hh \
backfill.hh \
backfill.cc \
callbacks.hh \
checkpoint.hh \
checkpoint.cc \
Expand Down
188 changes: 188 additions & 0 deletions backfill.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "config.h"
#include "vbucket.hh"
#include "ep_engine.h"
#include "ep.hh"
#include "backfill.hh"


double BackFillVisitor::backfillResidentThreshold = DEFAULT_BACKFILL_RESIDENT_THRESHOLD;

void BackfillDiskLoad::callback(GetValue &gv) {
// If a vbucket version of a bg fetched item is different from the current version,
// skip this item.
if (vbucket_version != gv.getVBucketVersion()) {
delete gv.getValue();
return;
}
ReceivedItemTapOperation tapop(true);
// if the tap connection is closed, then free an Item instance
if (!connMap.performTapOp(name, tapop, gv.getValue())) {
delete gv.getValue();
}

NotifyPausedTapOperation notifyOp;
connMap.performTapOp(name, notifyOp, engine);
}

bool BackfillDiskLoad::callback(Dispatcher &, TaskId) {
bool valid = false;
if (connMap.checkConnectivity(name) && !engine->getEpStore()->isFlushAllScheduled()) {
store->dump(vbucket, *this);
valid = true;
}
// Should decr the disk backfill counter regardless of the connectivity status
CompleteDiskBackfillTapOperation op;
connMap.performTapOp(name, op, static_cast<void*>(NULL));

if (valid && connMap.addBackfillCompletionMessage(name)) {
engine->notifyTapNotificationThread();
}

return false;
}

std::string BackfillDiskLoad::description() {
std::stringstream rv;
rv << "Loading TAP backfill from disk for vb " << vbucket;
return rv.str();
}

bool BackFillVisitor::visitBucket(RCPtr<VBucket> vb) {
apply();

if (filter(vb->getId())) {
VBucketVisitor::visitBucket(vb);
// If the current resident ratio for a given vbucket is below the resident threshold
// for memory backfill only, schedule the disk backfill for more efficient bg fetches.
double numItems = static_cast<double>(vb->ht.getNumItems());
double numNonResident = static_cast<double>(vb->ht.getNumNonResidentItems());
if (numItems == 0) {
return true;
}
residentRatioBelowThreshold =
((numItems - numNonResident) / numItems) < backfillResidentThreshold ? true : false;
if (efficientVBDump && residentRatioBelowThreshold) {
vbuckets.push_back(vb->getId());
ScheduleDiskBackfillTapOperation tapop;
engine->tapConnMap.performTapOp(name, tapop, static_cast<void*>(NULL));
}
// When the backfill is scheduled for a given vbucket, set the TAP cursor to
// the beginning of the open checkpoint.
engine->tapConnMap.SetCursorToOpenCheckpoint(name, vb->getId());
return true;
}
return false;
}

void BackFillVisitor::visit(StoredValue *v) {
// If efficient VBdump is supported and an item is not resident,
// skip the item as it will be fetched by the disk backfill.
if (efficientVBDump && residentRatioBelowThreshold && !v->isResident()) {
return;
}
std::string k = v->getKey();
queued_item qi(new QueuedItem(k, currentBucket->getId(), queue_op_set, -1, v->getId()));
uint16_t shardId = engine->kvstore->getShardId(*qi);
found.push_back(std::make_pair(shardId, qi));
}

void BackFillVisitor::apply(void) {
// If efficient VBdump is supported, schedule all the disk backfill tasks.
if (efficientVBDump) {
std::vector<uint16_t>::iterator it = vbuckets.begin();
for (; it != vbuckets.end(); it++) {
Dispatcher *d(engine->epstore->getRODispatcher());
KVStore *underlying(engine->epstore->getROUnderlying());
assert(d);
shared_ptr<DispatcherCallback> cb(new BackfillDiskLoad(name,
engine,
engine->tapConnMap,
underlying,
*it,
validityToken));
d->schedule(cb, NULL, Priority::TapBgFetcherPriority);
}
vbuckets.clear();
}

setEvents();
}

void BackFillVisitor::setResidentItemThreshold(double residentThreshold) {
if (residentThreshold < MINIMUM_BACKFILL_RESIDENT_THRESHOLD) {
std::stringstream ss;
ss << "Resident item threshold " << residentThreshold
<< " for memory backfill only is too low. Ignore this new threshold...";
getLogger()->log(EXTENSION_LOG_WARNING, NULL, ss.str().c_str());
return;
}
backfillResidentThreshold = residentThreshold;
}

void BackFillVisitor::setEvents() {
if (checkValidity()) {
if (!found.empty()) {
// Don't notify unless we've got some data..
TaggedQueuedItemComparator<uint16_t> comparator;
std::sort(found.begin(), found.end(), comparator);

std::vector<std::pair<uint16_t, queued_item> >::iterator it(found.begin());
for (; it != found.end(); ++it) {
queue->push_back(it->second);
}
found.clear();
engine->tapConnMap.setEvents(name, queue);
}
}
}

bool BackFillVisitor::pauseVisitor() {
bool tooBig(true);

ssize_t theSize(engine->tapConnMap.backfillQueueDepth(name));
if (!checkValidity() || theSize < 0) {
getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
"TapProducer %s went away. Stopping backfill.\n",
name.c_str());
valid = false;
return false;
}

tooBig = theSize > maxBackfillSize;

if (tooBig) {
getLogger()->log(EXTENSION_LOG_DEBUG, NULL,
"Tap queue depth too big for %s, sleeping\n",
name.c_str());
}
return tooBig;
}

void BackFillVisitor::complete() {
apply();
CompleteBackfillTapOperation tapop;
engine->tapConnMap.performTapOp(name, tapop, static_cast<void*>(NULL));
if (engine->tapConnMap.addBackfillCompletionMessage(name)) {
engine->notifyTapNotificationThread();
}
releaseEngineResources();
}

bool BackFillVisitor::checkValidity() {
if (valid) {
valid = engine->tapConnMap.checkConnectivity(name);
if (!valid) {
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"Backfilling connectivity for %s went invalid. Stopping backfill.\n",
name.c_str());
}
}
return valid;
}

bool BackfillTask::callback(Dispatcher &d, TaskId t) {
(void) t;
epstore->visit(bfv, "Backfill task", &d, Priority::BackfillTaskPriority, true, 1);
return false;
}
139 changes: 139 additions & 0 deletions backfill.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#ifndef BACKFILL_HH
#define BACKFILL_HH 1

#include <assert.h>
#include <set>

#include "common.hh"
#include "stats.hh"
#include "dispatcher.hh"
#include "ep_engine.h"

#define DEFAULT_BACKFILL_RESIDENT_THRESHOLD 0.9
#define MINIMUM_BACKFILL_RESIDENT_THRESHOLD 0.7

/**
* Dispatcher callback responsible for bulk backfilling tap queues
* from a KVStore.
*
* Note that this is only used if the KVStore reports that it has
* efficient vbucket ops.
*/
class BackfillDiskLoad : public DispatcherCallback, public Callback<GetValue> {
public:

BackfillDiskLoad(const std::string &n, EventuallyPersistentEngine* e,
TapConnMap &tcm, KVStore *s, uint16_t vbid, const void *token)
: name(n), engine(e), connMap(tcm), store(s), vbucket(vbid), validityToken(token) {

vbucket_version = engine->getEpStore()->getVBucketVersion(vbucket);
}

void callback(GetValue &gv);

bool callback(Dispatcher &, TaskId);

std::string description();

private:
const std::string name;
EventuallyPersistentEngine *engine;
TapConnMap &connMap;
KVStore *store;
uint16_t vbucket;
uint16_t vbucket_version;
const void *validityToken;
};

/**
* VBucketVisitor to backfill a TapProducer. This visitor basically performs backfill from memory
* for only resident items if it needs to schedule a separate disk backfill task because of
* low resident ratio.
*/
class BackFillVisitor : public VBucketVisitor {
public:
BackFillVisitor(EventuallyPersistentEngine *e, TapProducer *tc,
const void *token, const VBucketFilter &backfillVBfilter):
VBucketVisitor(), engine(e), name(tc->getName()),
queue(new std::list<queued_item>),
found(), filter(backfillVBfilter), validityToken(token),
maxBackfillSize(e->tapBacklogLimit), valid(true),
efficientVBDump(e->epstore->getStorageProperties().hasEfficientVBDump()),
residentRatioBelowThreshold(false) {
found.reserve(e->tapBacklogLimit);
}

virtual ~BackFillVisitor() {
delete queue;
}

void releaseEngineResources() {
engine->tapConnMap.releaseValidityToken(validityToken);
}

bool visitBucket(RCPtr<VBucket> vb);

void visit(StoredValue *v);

bool shouldContinue() {
return checkValidity();
}

void apply(void);

void complete(void);

static void setResidentItemThreshold(double residentThreshold);

private:

void setEvents();

bool pauseVisitor();

bool checkValidity();

EventuallyPersistentEngine *engine;
const std::string name;
std::list<queued_item> *queue;
std::vector<std::pair<uint16_t, queued_item> > found;
std::vector<uint16_t> vbuckets;
VBucketFilter filter;
const void *validityToken;
ssize_t maxBackfillSize;
bool valid;
bool efficientVBDump;
bool residentRatioBelowThreshold;

static double backfillResidentThreshold;
};

/**
* Backfill task scheduled by non-IO dispatcher. Each backfill task performs backfill from
* memory or disk depending on the resident ratio. Each backfill task can backfill more than one
* vbucket, but will snooze for 1 sec if the current backfill backlog for the corresponding TAP
* producer is greater than the threshold (5000 by default).
*/
class BackfillTask : public DispatcherCallback {
public:

BackfillTask(EventuallyPersistentEngine *e, TapProducer *tc,
EventuallyPersistentStore *s, const void *tok,
const VBucketFilter &backfillVBFilter):
bfv(new BackFillVisitor(e, tc, tok, backfillVBFilter)), engine(e), epstore(s) {}

virtual ~BackfillTask() {}

bool callback(Dispatcher &d, TaskId t);

std::string description() {
return std::string("Backfilling items from memory and disk.");
}

shared_ptr<BackFillVisitor> bfv;
EventuallyPersistentEngine *engine;
EventuallyPersistentStore *epstore;
};

#endif /* BACKFILL_HH */
6 changes: 5 additions & 1 deletion ep.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2204,9 +2204,13 @@ void TransactionContext::addUncommittedItem(const queued_item &item) {
++numUncommittedItems;
}

bool VBCBAdaptor::callback(Dispatcher &, TaskId) {
bool VBCBAdaptor::callback(Dispatcher & d, TaskId t) {
RCPtr<VBucket> vb = store->vbuckets.getBucket(currentvb);
if (vb) {
if (visitor->pauseVisitor()) {
d.snooze(t, sleepTime);
return true;
}
if (visitor->visitBucket(vb)) {
vb->ht.visit(*visitor);
}
Expand Down
16 changes: 12 additions & 4 deletions ep.hh
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ public:
*/
virtual void complete() { }

/**
* Return true if visiting vbuckets should be paused temporarily.
*/
virtual bool pauseVisitor() {
return false;
}

protected:
RCPtr<VBucket> currentBucket;
};
Expand Down Expand Up @@ -401,8 +408,8 @@ class VBCBAdaptor : public DispatcherCallback {
public:

VBCBAdaptor(EventuallyPersistentStore *s,
shared_ptr<VBucketVisitor> v, const char *l)
: store(s), visitor(v), label(l), currentvb(0) {}
shared_ptr<VBucketVisitor> v, const char *l, double sleep=0)
: store(s), visitor(v), label(l), sleepTime(sleep), currentvb(0) {}

std::string description() {
std::stringstream rv;
Expand All @@ -416,6 +423,7 @@ private:
EventuallyPersistentStore *store;
shared_ptr<VBucketVisitor> visitor;
const char *label;
double sleepTime;
uint16_t currentvb;

DISALLOW_COPY_AND_ASSIGN(VBCBAdaptor);
Expand Down Expand Up @@ -685,8 +693,8 @@ public:
* Note that this is asynchronous.
*/
void visit(shared_ptr<VBucketVisitor> visitor, const char *lbl,
Dispatcher *d, const Priority &prio, bool isDaemon=true) {
d->schedule(shared_ptr<DispatcherCallback>(new VBCBAdaptor(this, visitor, lbl)),
Dispatcher *d, const Priority &prio, bool isDaemon=true, double sleepTime=0) {
d->schedule(shared_ptr<DispatcherCallback>(new VBCBAdaptor(this, visitor, lbl, sleepTime)),
NULL, prio, 0, isDaemon);
}

Expand Down
Loading

0 comments on commit 24f6eee

Please sign in to comment.