Skip to content

Commit

Permalink
Merge remote-tracking branch 'gerrit/2.0.2'
Browse files Browse the repository at this point in the history
Change-Id: I47ee0dabca3073240259441ccad0ee8a20579513
  • Loading branch information
Mike Wiederhold committed Apr 24, 2013
2 parents 10bead1 + c21211c commit e3f4a40
Show file tree
Hide file tree
Showing 27 changed files with 539 additions and 174 deletions.
2 changes: 1 addition & 1 deletion Makefile.am
Expand Up @@ -286,7 +286,7 @@ priority_test_SOURCES = tests/module_tests/priority_test.cc src/priority.h \
src/priority.cc

sizes_CPPFLAGS = $(AM_CPPFLAGS)
sizes_SOURCES = src/sizes.cc
sizes_SOURCES = src/sizes.cc src/mutex.h src/mutex.cc src/testlogger.cc
sizes_DEPENDENCIES = src/vbucket.h src/stored-value.h src/item.h

gen_config_CPPFLAGS = -I$(top_srcdir)/tools $(AM_CPPFLAGS)
Expand Down
2 changes: 2 additions & 0 deletions management/cbepctl
@@ -1,6 +1,8 @@
#!/usr/bin/env python

import time
import sys

import clitool
import mc_bin_client
import memcacheConstants
Expand Down
8 changes: 7 additions & 1 deletion management/cbstats
Expand Up @@ -187,6 +187,11 @@ def stats_vkey(mc, key, vb):
print "verification for key", key
stats_formatter(vbs)

@cmd
def stats_vb_takeover(mc, vb, name):
cmd = "tap-vbtakeover %s %s" % (str(vb), name)
stats_formatter(stats_perform(mc, cmd))

@cmd
def stats_all(mc):
stats_formatter(stats_perform(mc))
Expand Down Expand Up @@ -307,7 +312,7 @@ def stats_hash(mc, with_detail=None):
maxes.append(int(v))
if 'min_dep' in k:
mins.append(int(v))
if 'counted' in k:
if ':counted' in k:
counts.append(int(v))
if ':histo' in k:
vb, kbucket = k.split(':')
Expand Down Expand Up @@ -402,6 +407,7 @@ def main():
c.addCommand('tap', stats_tap, 'tap')
c.addCommand('tapagg', stats_tapagg, 'tapagg')
c.addCommand('timings', stats_timings, 'timings')
c.addCommand('vb-takeover', stats_vb_takeover, 'vb-takeover vb name')
c.addCommand('vbucket', stats_vbucket, 'vbucket')
c.addCommand('vbucket-details', stats_vbucket_details, 'vbucket-details')
c.addCommand('vkey', stats_vkey, 'vkey keyname vbid')
Expand Down
8 changes: 8 additions & 0 deletions src/checkpoint.cc
Expand Up @@ -937,6 +937,14 @@ bool CheckpointManager::moveCursorToNextCheckpoint(CheckpointCursor &cursor) {
return true;
}

size_t CheckpointManager::getNumOpenChkItems() {
LockHolder lh(queueLock);
if (checkpointList.empty()) {
return 0;
}
return checkpointList.back()->getNumItems();
}

uint64_t CheckpointManager::checkOpenCheckpoint_UNLOCKED(bool forceCreation, bool timeBound) {
int checkpoint_id = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/checkpoint.h
Expand Up @@ -408,6 +408,8 @@ class CheckpointManager {
return numItems;
}

size_t getNumOpenChkItems();

size_t getNumCheckpoints();

/**
Expand Down
10 changes: 9 additions & 1 deletion src/couch-kvstore/couch-fs-stats.cc
Expand Up @@ -29,19 +29,21 @@ static ssize_t cfs_pread(couch_file_handle, void *, size_t, cs_off_t);
static ssize_t cfs_pwrite(couch_file_handle, const void *, size_t, cs_off_t);
static cs_off_t cfs_goto_eof(couch_file_handle);
static couchstore_error_t cfs_sync(couch_file_handle);
static couchstore_error_t cfs_advise(couch_file_handle, cs_off_t, cs_off_t, couchstore_file_advice_t);
static void cfs_destroy(couch_file_handle);
}

couch_file_ops getCouchstoreStatsOps(CouchstoreStats* stats) {
couch_file_ops ops = {
3,
4,
cfs_construct,
cfs_open,
cfs_close,
cfs_pread,
cfs_pwrite,
cfs_goto_eof,
cfs_sync,
cfs_advise,
cfs_destroy,
stats
};
Expand Down Expand Up @@ -104,6 +106,12 @@ static couchstore_error_t cfs_sync(couch_file_handle h) {
return sf->orig_ops->sync(sf->orig_handle);
}

static couchstore_error_t cfs_advise(couch_file_handle h, cs_off_t offs, cs_off_t len,
couchstore_file_advice_t adv) {
StatFile* sf = reinterpret_cast<StatFile*>(h);
sf->orig_ops->advise(sf->orig_handle, offs, len, adv);
}

static void cfs_destroy(couch_file_handle h) {
StatFile* sf = reinterpret_cast<StatFile*>(h);
sf->orig_ops->destructor(sf->orig_handle);
Expand Down
2 changes: 1 addition & 1 deletion src/couch-kvstore/couch-kvstore-dummy.cc
Expand Up @@ -19,7 +19,7 @@

#include "couch-kvstore/couch-kvstore-dummy.h"

CouchKVStore::CouchKVStore(EventuallyPersistentEngine &, bool) : KVStore()
CouchKVStore::CouchKVStore(EPStats &, Configuration &, bool) : KVStore()
{
throw std::runtime_error("This kvstore should never be used");
}
Expand Down
3 changes: 1 addition & 2 deletions src/couch-kvstore/couch-kvstore-dummy.h
Expand Up @@ -29,7 +29,6 @@

#include "kvstore.h"

class EventuallyPersistentEngine;
class EPStats;

/**
Expand All @@ -39,7 +38,7 @@ class EPStats;
class CouchKVStore : public KVStore
{
public:
CouchKVStore(EventuallyPersistentEngine &theEngine, bool read_only = false);
CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false);
CouchKVStore(const CouchKVStore &from);
void reset();
bool begin();
Expand Down
46 changes: 26 additions & 20 deletions src/couch-kvstore/couch-kvstore.cc
Expand Up @@ -37,7 +37,6 @@
#include "common.h"
#include "couch-kvstore/couch-kvstore.h"
#include "couch-kvstore/dirutils.h"
#include "ep_engine.h"
#define STATWRITER_NAMESPACE couchstore_engine
#include "statwriter.h"
#undef STATWRITER_NAMESPACE
Expand Down Expand Up @@ -186,7 +185,7 @@ struct LoadResponseCtx {
shared_ptr<Callback<GetValue> > callback;
uint16_t vbucketId;
bool keysonly;
EventuallyPersistentEngine *engine;
EPStats *stats;
};

CouchRequest::CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &cb, bool del) :
Expand Down Expand Up @@ -233,13 +232,9 @@ CouchRequest::CouchRequest(const Item &it, uint64_t rev, CouchRequestCallback &c
start = gethrtime();
}

CouchKVStore::CouchKVStore(EventuallyPersistentEngine &theEngine,
bool read_only) :
KVStore(read_only), engine(theEngine),
epStats(theEngine.getEpStats()),
configuration(theEngine.getConfiguration()),
dbname(configuration.getDbname()),
couchNotifier(NULL), pendingCommitCnt(0),
CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only) :
KVStore(read_only), epStats(stats), configuration(config),
dbname(configuration.getDbname()), couchNotifier(NULL), pendingCommitCnt(0),
intransaction(false), dbFileRevMapPopulated(false)
{
open();
Expand All @@ -253,8 +248,7 @@ CouchKVStore::CouchKVStore(EventuallyPersistentEngine &theEngine,
}

CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
KVStore(copyFrom), engine(copyFrom.engine),
epStats(copyFrom.epStats),
KVStore(copyFrom), epStats(copyFrom.epStats),
configuration(copyFrom.configuration),
dbname(copyFrom.dbname),
couchNotifier(NULL), dbFileRevMap(copyFrom.dbFileRevMap),
Expand Down Expand Up @@ -730,7 +724,7 @@ bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
"Warning: failed to notify CouchDB of update, "
"vbid=%u rev=%llu error=0x%x\n",
vbucketId, fileRev, lcb.val);
if (!engine.isShutdownMode()) {
if (!epStats.shutdown.isShutdown) {
closeDatabaseHandle(db);
return false;
}
Expand Down Expand Up @@ -939,7 +933,7 @@ void CouchKVStore::loadDB(shared_ptr<Callback<GetValue> > cb, bool keysOnly,
ctx.vbucketId = itr->first;
ctx.keysonly = keysOnly;
ctx.callback = cb;
ctx.engine = &engine;
ctx.stats = &epStats;
errorCode = couchstore_changes_since(db, 0, options, recordDbDumpC,
static_cast<void *>(&ctx));
if (errorCode != COUCHSTORE_SUCCESS) {
Expand Down Expand Up @@ -968,7 +962,7 @@ void CouchKVStore::open()
intransaction = false;
if (!isReadOnly()) {
delete couchNotifier;
couchNotifier = new CouchNotifier(&engine, configuration);
couchNotifier = new CouchNotifier(epStats, configuration);
}

struct stat dbstat;
Expand Down Expand Up @@ -1225,11 +1219,9 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx)
{
LoadResponseCtx *loadCtx = (LoadResponseCtx *)ctx;
shared_ptr<Callback<GetValue> > cb = loadCtx->callback;
EventuallyPersistentEngine *engine= loadCtx->engine;

// by setting volatile let the compiler know that the value
// can change any time by anyone
volatile bool warmup = !engine->getEpStats().warmupComplete.get();
EPStats *stats= loadCtx->stats;
volatile bool warmup = !stats->warmupComplete.get();

Doc *doc = NULL;
void *valuePtr = NULL;
Expand Down Expand Up @@ -1287,7 +1279,7 @@ int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx)

int returnCode = COUCHSTORE_SUCCESS;
if (warmup) {
if (engine->getEpStats().warmupComplete.get()) {
if (stats->warmupComplete.get()) {
// warmup has completed, return COUCHSTORE_ERROR_CANCEL to
// cancel remaining data dumps from couchstore
LOG(EXTENSION_LOG_WARNING,
Expand Down Expand Up @@ -1411,7 +1403,7 @@ couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev, Doc **doc
return errCode;
}

if (engine.isShutdownMode()) {
if (epStats.shutdown.isShutdown) {
// shutdown is in progress, no need to notify mccouch
// the compactor must have already exited!
closeDatabaseHandle(db);
Expand Down Expand Up @@ -1440,6 +1432,12 @@ couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev, Doc **doc
}
}
st.batchSize.add(docCount);

if (db && !retry_save_docs) {
DbInfo info;
couchstore_db_info(db, &info);
cachedDeleteCount[vbid] = info.deleted_count;
}
closeDatabaseHandle(db);
}
} while (retry_save_docs);
Expand Down Expand Up @@ -1695,4 +1693,12 @@ bool CouchKVStore::getEstimatedItemCount(size_t &items)
return true;
}

size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
std::map<uint16_t, size_t>::iterator itr = cachedDeleteCount.find(vbid);
if (itr != cachedDeleteCount.end()) {
return itr->second;
}
return 0;
}

/* end of couch-kvstore.cc */
13 changes: 10 additions & 3 deletions src/couch-kvstore/couch-kvstore.h
Expand Up @@ -269,8 +269,7 @@ class CouchKVStore : public KVStore
* @param theEngine EventuallyPersistentEngine instance
* @param read_only flag indicating if this kvstore instance is for read-only operations
*/
CouchKVStore(EventuallyPersistentEngine &theEngine,
bool read_only = false);
CouchKVStore(EPStats &stats, Configuration &config, bool read_only = false);

/**
* Copy constructor
Expand Down Expand Up @@ -451,6 +450,13 @@ class CouchKVStore : public KVStore
*/
bool getEstimatedItemCount(size_t &items);

/**
* Get the number of deleted items that are persisted to a vbucket file
*
* @param vbid The vbucket if of the file to get the number of deletes for
*/
size_t getNumPersistedDeletes(uint16_t vbid);

/**
* Perform the pre-optimizations before persisting dirty items
*
Expand Down Expand Up @@ -535,7 +541,6 @@ class CouchKVStore : public KVStore
void setDocsCommitted(uint16_t docs);
void closeDatabaseHandle(Db *db);

EventuallyPersistentEngine &engine;
EPStats &epStats;
Configuration &configuration;
const std::string dbname;
Expand All @@ -552,6 +557,8 @@ class CouchKVStore : public KVStore
couch_file_ops statCollectingFileOps;
/* vbucket state cache*/
vbucket_map_t cachedVBStates;
/* deleted docs in each file*/
std::map<uint16_t, size_t> cachedDeleteCount;
};

#endif // SRC_COUCH_KVSTORE_COUCH_KVSTORE_H_
10 changes: 5 additions & 5 deletions src/couch-kvstore/couch-notifier.cc
Expand Up @@ -174,11 +174,11 @@ class NotifyVbucketUpdateResponseHandler: public BinaryPacketHandler {
/*
* Implementation of the member functions in the CouchNotifier class
*/
CouchNotifier::CouchNotifier(EventuallyPersistentEngine *e, Configuration &config) :
sock(INVALID_SOCKET), configuration(config), configurationError(true),
seqno(0),
CouchNotifier::CouchNotifier(EPStats &st, Configuration &config) :
sock(INVALID_SOCKET), stats(st), configuration(config),
configurationError(true), seqno(0),
currentCommand(0xff), lastSentCommand(0xff), lastReceivedCommand(0xff),
engine(e), connected(false), inSelectBucket(false)
connected(false), inSelectBucket(false)
{
memset(&sendMsg, 0, sizeof(sendMsg));
sendMsg.msg_iov = sendIov;
Expand Down Expand Up @@ -336,7 +336,7 @@ void CouchNotifier::ensureConnection()

LOG(EXTENSION_LOG_WARNING, "%s\n", rv.str().c_str());
while (!connect()) {
if (engine->isForceShutdown() && engine->isShutdownMode()) {
if (stats.forceShutdown && stats.shutdown.isShutdown) {
return ;
}

Expand Down
4 changes: 2 additions & 2 deletions src/couch-kvstore/couch-notifier.h
Expand Up @@ -131,7 +131,7 @@ class VBStateNotification {

class CouchNotifier {
public:
CouchNotifier(EventuallyPersistentEngine *engine, Configuration &config);
CouchNotifier(EPStats &st, Configuration &config);

void flush(Callback<bool> &cb);
void delVBucket(uint16_t vb, Callback<bool> &cb);
Expand Down Expand Up @@ -180,6 +180,7 @@ class CouchNotifier {

evutil_socket_t sock;

EPStats &stats;
Configuration &configuration;
bool configurationError;

Expand Down Expand Up @@ -248,7 +249,6 @@ class CouchNotifier {

Mutex mutex;
std::list<BinaryPacketHandler*> responseHandler;
EventuallyPersistentEngine *engine;
bool connected;
bool inSelectBucket;

Expand Down

0 comments on commit e3f4a40

Please sign in to comment.