Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

MB-5010 Reset the thread local data whenever leaving a bucket.

A thread local data is set to a bucket pointer when a given
thread enters into that bucket for supporting the memory
accounting per bucket. This means that the thread local data
should be reset when the thread leaves from its bucket.

Conflicts:

        ep_engine.cc

Change-Id: I02124045c4503bf5d860e1063ab4d0aa11fd43b1
Reviewed-on: http://review.couchbase.org/14773
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Reviewed-by: Michael Wiederhold <mike@couchbase.com>
Reviewed-on: http://review.couchbase.org/14919
  • Loading branch information...
commit 5ff0677d102335f4b31ae50795766e2606036fcd 1 parent 8c7a79a
@chiyoung chiyoung authored
View
369 ep_engine.cc
@@ -58,6 +58,32 @@ static inline EventuallyPersistentEngine* getHandle(ENGINE_HANDLE* handle)
return ret;
}
+static inline void releaseHandle(ENGINE_HANDLE* handle) {
+ (void) handle;
+ ObjectRegistry::onSwitchThread(NULL);
+}
+
+/**
+ * Call the response callback and return the appropriate value so that
+ * the core knows what to do..
+ */
+static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
+ uint16_t keylen,
+ const void *ext, uint8_t extlen,
+ const void *body, uint32_t bodylen,
+ uint8_t datatype, uint16_t status,
+ uint64_t cas, const void *cookie)
+{
+ ENGINE_ERROR_CODE rv = ENGINE_FAILED;
+ EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
+ if (response(key, keylen, ext, extlen, body, bodylen, datatype,
+ status, cas, cookie)) {
+ rv = ENGINE_SUCCESS;
+ }
+ ObjectRegistry::onSwitchThread(e);
+ return rv;
+}
+
void LookupCallback::callback(GetValue &value) {
if (value.getStatus() == ENGINE_SUCCESS) {
engine->addLookupResult(cookie, value.getValue());
@@ -79,19 +105,24 @@ extern "C" {
static const engine_info* EvpGetInfo(ENGINE_HANDLE* handle)
{
- return getHandle(handle)->getInfo();
+ engine_info* info = getHandle(handle)->getInfo();
+ releaseHandle(handle);
+ return info;
}
static ENGINE_ERROR_CODE EvpInitialize(ENGINE_HANDLE* handle,
const char* config_str)
{
- return getHandle(handle)->initialize(config_str);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->initialize(config_str);
+ releaseHandle(handle);
+ return err_code;
}
static void EvpDestroy(ENGINE_HANDLE* handle, const bool force)
{
getHandle(handle)->destroy(force);
delete getHandle(handle);
+ releaseHandle(NULL);
}
static ENGINE_ERROR_CODE EvpItemAllocate(ENGINE_HANDLE* handle,
@@ -103,8 +134,11 @@ extern "C" {
const int flags,
const rel_time_t exptime)
{
- return getHandle(handle)->itemAllocate(cookie, itm, key,
- nkey, nbytes, flags, exptime);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->itemAllocate(cookie, itm, key,
+ nkey, nbytes, flags,
+ exptime);
+ releaseHandle(handle);
+ return err_code;
}
static ENGINE_ERROR_CODE EvpItemDelete(ENGINE_HANDLE* handle,
@@ -114,7 +148,10 @@ extern "C" {
uint64_t cas,
uint16_t vbucket)
{
- return getHandle(handle)->itemDelete(cookie, key, nkey, cas, vbucket);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->itemDelete(cookie, key, nkey,
+ cas, vbucket);
+ releaseHandle(handle);
+ return err_code;
}
static void EvpItemRelease(ENGINE_HANDLE* handle,
@@ -122,6 +159,7 @@ extern "C" {
item* itm)
{
getHandle(handle)->itemRelease(cookie, itm);
+ releaseHandle(handle);
}
static ENGINE_ERROR_CODE EvpGet(ENGINE_HANDLE* handle,
@@ -131,7 +169,9 @@ extern "C" {
const int nkey,
uint16_t vbucket)
{
- return getHandle(handle)->get(cookie, itm, key, nkey, vbucket);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->get(cookie, itm, key, nkey, vbucket);
+ releaseHandle(handle);
+ return err_code;
}
static ENGINE_ERROR_CODE EvpGetStats(ENGINE_HANDLE* handle,
@@ -140,7 +180,10 @@ extern "C" {
int nkey,
ADD_STAT add_stat)
{
- return getHandle(handle)->getStats(cookie, stat_key, nkey, add_stat);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->getStats(cookie, stat_key, nkey,
+ add_stat);
+ releaseHandle(handle);
+ return err_code;
}
static ENGINE_ERROR_CODE EvpStore(ENGINE_HANDLE* handle,
@@ -150,7 +193,10 @@ extern "C" {
ENGINE_STORE_OPERATION operation,
uint16_t vbucket)
{
- return getHandle(handle)->store(cookie, itm, cas, operation, vbucket);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->store(cookie, itm, cas, operation,
+ vbucket);
+ releaseHandle(handle);
+ return err_code;
}
static ENGINE_ERROR_CODE EvpArithmetic(ENGINE_HANDLE* handle,
@@ -166,20 +212,25 @@ extern "C" {
uint64_t *result,
uint16_t vbucket)
{
- return getHandle(handle)->arithmetic(cookie, key, nkey, increment,
- create, delta, initial, exptime,
- cas, result, vbucket);
+ ENGINE_ERROR_CODE ecode = getHandle(handle)->arithmetic(cookie, key, nkey, increment,
+ create, delta, initial,
+ exptime, cas, result, vbucket);
+ releaseHandle(handle);
+ return ecode;
}
static ENGINE_ERROR_CODE EvpFlush(ENGINE_HANDLE* handle,
const void* cookie, time_t when)
{
- return getHandle(handle)->flush(cookie, when);
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->flush(cookie, when);
+ releaseHandle(handle);
+ return err_code;
}
static void EvpResetStats(ENGINE_HANDLE* handle, const void *)
{
getHandle(handle)->resetStats();
+ releaseHandle(handle);
}
static protocol_binary_response_status stopFlusher(EventuallyPersistentEngine *e,
@@ -598,8 +649,8 @@ extern "C" {
// Return invalid if no key or observe set is specified
if (keylen == 0 || (bodylen - keylen - extlen) == 0) {
- response(NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+ sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
return ENGINE_FAILED;
}
@@ -625,8 +676,8 @@ extern "C" {
// Return invalid if no key or observe set is specified
if (keylen == 0 || (bodylen - keylen) == 0) {
- response(NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+ sendResponse(response, NULL, 0, NULL, 0, "", 0, PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
return ENGINE_FAILED;
}
@@ -653,16 +704,15 @@ extern "C" {
RCPtr<VBucket> vb = e->getVBucket(vbucket);
if (!vb) {
const std::string msg("That's not my bucket.");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0, cookie);
} else {
vbucket_state_t state = (vbucket_state_t)ntohl(vb->getState());
- response(NULL, 0, NULL, 0, &state, sizeof(state),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, &state, sizeof(state),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
}
- return ENGINE_SUCCESS;
}
static ENGINE_ERROR_CODE setVBucket(EventuallyPersistentEngine *e,
@@ -677,9 +727,9 @@ extern "C" {
- ntohs(req->message.header.request.keylen);
if (bodylen != sizeof(vbucket_state_t)) {
const std::string msg("Incorrect packet format");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+ sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
}
vbucket_state_t state;
@@ -688,16 +738,15 @@ extern "C" {
if (!is_valid_vbucket_state_t(state)) {
const std::string msg("Invalid vbucket state");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+ sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
}
e->setVBucketState(ntohs(req->message.header.request.vbucket), state);
- response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
-
- return ENGINE_SUCCESS;
+ return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
}
static ENGINE_ERROR_CODE delVBucket(EventuallyPersistentEngine *e,
@@ -706,30 +755,29 @@ extern "C" {
ADD_RESPONSE response) {
uint16_t vbucket = ntohs(req->request.vbucket);
if (e->deleteVBucket(vbucket)) {
- response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
} else {
// If we fail to delete, try to figure out why.
RCPtr<VBucket> vb = e->getVBucket(vbucket);
if (!vb) {
const std::string msg("Failed to delete vbucket. Bucket not found.");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0, cookie);
} else if(vb->getState() != vbucket_state_dead) {
const std::string msg("Failed to delete vbucket. Must be in the dead state.");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
} else {
const std::string msg("Failed to delete vbucket. Unknown reason.");
- response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0, cookie);
+ return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0, cookie);
}
}
-
- return ENGINE_SUCCESS;
}
static ENGINE_ERROR_CODE getReplicaCmd(EventuallyPersistentEngine *e,
@@ -770,13 +818,12 @@ extern "C" {
protocol_binary_request_header *request,
ADD_RESPONSE response)
{
- protocol_binary_response_status res =
- PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
+ EventuallyPersistentEngine *h = getHandle(handle);
+ protocol_binary_response_status res = PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
const char *msg = NULL;
size_t msg_size = 0;
Item *itm = NULL;
- EventuallyPersistentEngine *h = getHandle(handle);
EPStats &stats = h->getEpStats();
ENGINE_ERROR_CODE rv = ENGINE_SUCCESS;
@@ -784,32 +831,40 @@ extern "C" {
case PROTOCOL_BINARY_CMD_GET_VBUCKET:
{
BlockTimer timer(&stats.getVbucketCmdHisto);
- return getVBucket(h, cookie, request, response);
+ rv = getVBucket(h, cookie, request, response);
+ releaseHandle(handle);
+ return rv;
}
-
case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
{
BlockTimer timer(&stats.delVbucketCmdHisto);
- return delVBucket(h, cookie, request, response);
+ rv = delVBucket(h, cookie, request, response);
+ releaseHandle(handle);
+ return rv;
}
- break;
-
case PROTOCOL_BINARY_CMD_SET_VBUCKET:
{
BlockTimer timer(&stats.setVbucketCmdHisto);
- return setVBucket(h, cookie, request, response);
+ rv = setVBucket(h, cookie, request, response);
+ releaseHandle(handle);
+ return rv;
}
- break;
case PROTOCOL_BINARY_CMD_TOUCH:
case PROTOCOL_BINARY_CMD_GAT:
case PROTOCOL_BINARY_CMD_GATQ:
- return h->touch(cookie, request, response);
-
+ {
+ rv = h->touch(cookie, request, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_RESTORE_FILE:
case CMD_RESTORE_ABORT:
case CMD_RESTORE_COMPLETE:
- return h->handleRestoreCmd(cookie, request, response);
-
+ {
+ rv = h->handleRestoreCmd(cookie, request, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_STOP_PERSISTENCE:
res = stopFlusher(h, &msg, &msg_size);
break;
@@ -827,6 +882,7 @@ extern "C" {
rv = getLocked(h, request, cookie, &itm, &msg, &msg_size, &res);
if (rv == ENGINE_EWOULDBLOCK) {
// we dont have the value for the item yet
+ releaseHandle(handle);
return rv;
}
break;
@@ -834,40 +890,70 @@ extern "C" {
res = unlockKey(h, request, &msg, &msg_size);
break;
case CMD_OBSERVE:
- return observeCmd(h, request, cookie, response);
- break;
+ {
+ rv = observeCmd(h, request, cookie, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_UNOBSERVE:
- return unobserveCmd(h, request, cookie, response);
- break;
+ {
+ rv = unobserveCmd(h, request, cookie, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_DEREGISTER_TAP_CLIENT:
- return h->deregisterTapClient(cookie, request, response);
- break;
+ {
+ rv = h->deregisterTapClient(cookie, request, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_RESET_REPLICATION_CHAIN:
- return h->resetReplicationChain(cookie, request, response);
+ {
+ rv = h->resetReplicationChain(cookie, request, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_LAST_CLOSED_CHECKPOINT:
case CMD_CREATE_CHECKPOINT:
case CMD_EXTEND_CHECKPOINT:
- return h->handleCheckpointCmds(cookie, request, response);
+ {
+ rv = h->handleCheckpointCmds(cookie, request, response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_GET_META:
case CMD_GETQ_META:
- return h->getMeta(cookie,
- reinterpret_cast<protocol_binary_request_get_meta*>(request),
- response);
+ {
+ rv = h->getMeta(cookie,
+ reinterpret_cast<protocol_binary_request_get_meta*>(request),
+ response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_SET_WITH_META:
case CMD_SETQ_WITH_META:
case CMD_ADD_WITH_META:
case CMD_ADDQ_WITH_META:
- return h->setWithMeta(cookie,
- reinterpret_cast<protocol_binary_request_set_with_meta*>(request),
- response);
+ {
+ rv = h->setWithMeta(cookie,
+ reinterpret_cast<protocol_binary_request_set_with_meta*>(request),
+ response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_DEL_WITH_META:
case CMD_DELQ_WITH_META:
- return h->deleteWithMeta(cookie,
- reinterpret_cast<protocol_binary_request_delete_with_meta*>(request),
- response);
+ {
+ rv = h->deleteWithMeta(cookie,
+ reinterpret_cast<protocol_binary_request_delete_with_meta*>(request),
+ response);
+ releaseHandle(handle);
+ return rv;
+ }
case CMD_GET_REPLICA:
rv = getReplicaCmd(h, request, cookie, &itm, &msg, &res);
if (rv != ENGINE_SUCCESS) {
+ releaseHandle(handle);
return rv;
}
break;
@@ -876,37 +962,35 @@ extern "C" {
// Send a special response for getl since we don't want to send the key
if (itm && request->request.opcode == CMD_GET_LOCKED) {
uint32_t flags = itm->getFlags();
-
- response(NULL, 0, (const void *)&flags, sizeof(uint32_t),
- static_cast<const void *>(itm->getData()),
- itm->getNBytes(),
- PROTOCOL_BINARY_RAW_BYTES,
- static_cast<uint16_t>(res), itm->getCas(),
- cookie);
+ rv = sendResponse(response, NULL, 0, (const void *)&flags, sizeof(uint32_t),
+ static_cast<const void *>(itm->getData()),
+ itm->getNBytes(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ static_cast<uint16_t>(res), itm->getCas(),
+ cookie);
delete itm;
} else if (itm) {
std::string key = itm->getKey();
uint32_t flags = itm->getFlags();
-
- response(static_cast<const void *>(key.data()),
- itm->getNKey(),
- (const void *)&flags, sizeof(uint32_t),
- static_cast<const void *>(itm->getData()),
- itm->getNBytes(),
- PROTOCOL_BINARY_RAW_BYTES,
- static_cast<uint16_t>(res), itm->getCas(),
- cookie);
+ rv = sendResponse(response, static_cast<const void *>(key.data()),
+ itm->getNKey(),
+ (const void *)&flags, sizeof(uint32_t),
+ static_cast<const void *>(itm->getData()),
+ itm->getNBytes(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ static_cast<uint16_t>(res), itm->getCas(),
+ cookie);
delete itm;
} else {
-
msg_size = (msg_size > 0 || msg == NULL) ? msg_size : strlen(msg);
- response(NULL, 0, NULL, 0,
- msg, static_cast<uint16_t>(msg_size),
- PROTOCOL_BINARY_RAW_BYTES,
- static_cast<uint16_t>(res), 0, cookie);
+ rv = sendResponse(response, NULL, 0, NULL, 0,
+ msg, static_cast<uint16_t>(msg_size),
+ PROTOCOL_BINARY_RAW_BYTES,
+ static_cast<uint16_t>(res), 0, cookie);
}
- return ENGINE_SUCCESS;
+ releaseHandle(handle);
+ return rv;
}
static void EvpItemSetCas(ENGINE_HANDLE* , const void *,
@@ -931,11 +1015,12 @@ extern "C" {
size_t ndata,
uint16_t vbucket)
{
- return getHandle(handle)->tapNotify(cookie, engine_specific, nengine,
- ttl, tap_flags, tap_event,
- tap_seqno, key, nkey, flags,
- exptime, cas, data, ndata,
+ ENGINE_ERROR_CODE err_code = getHandle(handle)->tapNotify(cookie, engine_specific,
+ nengine, ttl, tap_flags, tap_event, tap_seqno,
+ key, nkey, flags, exptime, cas, data, ndata,
vbucket);
+ releaseHandle(handle);
+ return err_code;
}
static tap_event_t EvpTapIterator(ENGINE_HANDLE* handle,
@@ -943,8 +1028,10 @@ extern "C" {
void **es, uint16_t *nes, uint8_t *ttl,
uint16_t *flags, uint32_t *seqno,
uint16_t *vbucket) {
- return getHandle(handle)->walkTapQueue(cookie, itm, es, nes, ttl,
- flags, seqno, vbucket);
+ tap_event_t tap_event = getHandle(handle)->walkTapQueue(cookie, itm, es, nes, ttl,
+ flags, seqno, vbucket);
+ releaseHandle(handle);
+ return tap_event;
}
static TAP_ITERATOR EvpGetTapIterator(ENGINE_HANDLE* handle,
@@ -953,14 +1040,17 @@ extern "C" {
size_t nclient,
uint32_t flags,
const void* userdata,
- size_t nuserdata) {
+ size_t nuserdata)
+ {
+ EventuallyPersistentEngine *h = getHandle(handle);
std::string c(static_cast<const char*>(client), nclient);
// Figure out what we want from the userdata before adding it to the API
// to the handle
- if (getHandle(handle)->createTapQueue(cookie, c, flags,
- userdata, nuserdata)) {
+ if (h->createTapQueue(cookie, c, flags, userdata, nuserdata)) {
+ releaseHandle(handle);
return EvpTapIterator;
} else {
+ releaseHandle(handle);
return NULL;
}
}
@@ -974,6 +1064,7 @@ extern "C" {
assert(event_data == NULL);
void *c = const_cast<void*>(cb_data);
getHandle(static_cast<ENGINE_HANDLE*>(c))->handleDisconnect(cookie);
+ releaseHandle(static_cast<ENGINE_HANDLE*>(c));
}
@@ -1037,25 +1128,6 @@ extern "C" {
}
} // C linkage
-/**
- * Call the response callback and return the appropriate value so that
- * the core knows what to do..
- */
-static ENGINE_ERROR_CODE sendResponse(ADD_RESPONSE response, const void *key,
- uint16_t keylen,
- const void *ext, uint8_t extlen,
- const void *body, uint32_t bodylen,
- uint8_t datatype, uint16_t status,
- uint64_t cas, const void *cookie)
-{
- if (response(key, keylen, ext, extlen, body, bodylen, datatype,
- status, cas, cookie)) {
- return ENGINE_SUCCESS;
- } else {
- return ENGINE_FAILED;
- }
-}
-
static SERVER_EXTENSION_API *extensionApi;
EXTENSION_LOGGER_DESCRIPTOR *getLogger(void) {
@@ -3278,8 +3350,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie,
ADD_RESPONSE response)
{
if (request->request.extlen != 4 || request->request.keylen == 0) {
- return sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
+ return sendResponse(response, NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
}
@@ -3298,38 +3369,30 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::touch(const void *cookie,
exptime));
ENGINE_ERROR_CODE rv = gv.getStatus();
if (rv == ENGINE_SUCCESS) {
- bool ret;
Item *it = gv.getValue();
if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
- ret = response(NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
+ rv = sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
} else {
uint32_t flags = it->getFlags();
- ret = response(NULL, 0, &flags, sizeof(flags),
- it->getData(), it->getNBytes(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_SUCCESS, it->getCas(),
- cookie);
+ rv = sendResponse(response, NULL, 0, &flags, sizeof(flags),
+ it->getData(), it->getNBytes(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_SUCCESS, it->getCas(),
+ cookie);
}
delete it;
- if (ret) {
- rv = ENGINE_SUCCESS;
- } else {
- rv = ENGINE_FAILED;
- }
} else if (rv == ENGINE_KEY_ENOENT) {
if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
// GATQ should not return response upon cache miss
rv = ENGINE_SUCCESS;
} else {
- rv = sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
+ rv = sendResponse(response, NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
}
} else if (rv == ENGINE_NOT_MY_VBUCKET) {
- rv = sendResponse(response, NULL, 0, NULL, 0, NULL, 0,
- PROTOCOL_BINARY_RAW_BYTES,
+ rv = sendResponse(response, NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0, cookie);
}
@@ -3379,12 +3442,9 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::handleRestoreCmd(const void *cooki
LockHolder lh(restore.mutex);
if (restore.manager == NULL) { // we need another "mode" variable
std::string msg = "Restore mode is not enabled.";
- if (response(NULL, 0, NULL, 0, msg.c_str(), msg.length(),
- PROTOCOL_BINARY_RAW_BYTES,
- PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0, cookie)) {
- return ENGINE_SUCCESS;
- }
- return ENGINE_FAILED;
+ return sendResponse(response, NULL, 0, NULL, 0, msg.c_str(), msg.length(),
+ PROTOCOL_BINARY_RAW_BYTES,
+ PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0, cookie);
}
if (request->request.opcode == CMD_RESTORE_FILE) {
@@ -3393,8 +3453,7 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::handleRestoreCmd(const void *cooki
try {
restore.manager->initialize(filename);
} catch (std::string e) {
- return sendResponse(response, NULL, 0, NULL, 0, e.c_str(),
- e.length(),
+ return sendResponse(response, NULL, 0, NULL, 0, e.c_str(), e.length(),
PROTOCOL_BINARY_RAW_BYTES,
PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
}
View
10 objectregistry.cc
@@ -107,9 +107,15 @@ void ObjectRegistry::onDeleteItem(Item *pItem)
}
}
-void ObjectRegistry::onSwitchThread(EventuallyPersistentEngine *engine)
+EventuallyPersistentEngine *ObjectRegistry::onSwitchThread(EventuallyPersistentEngine *engine,
+ bool want_old_thread_local)
{
- th->set(engine);
+ EventuallyPersistentEngine *old_engine = NULL;
+ if (want_old_thread_local) {
+ old_engine = th->get();
+ }
+ th->set(engine);
+ return old_engine;
}
bool ObjectRegistry::memoryAllocated(size_t mem) {
View
3  objectregistry.hh
@@ -32,7 +32,8 @@ public:
static void onCreateItem(Item *pItem);
static void onDeleteItem(Item *pItem);
- static void onSwitchThread(EventuallyPersistentEngine *engine);
+ static EventuallyPersistentEngine *onSwitchThread(EventuallyPersistentEngine *engine,
+ bool want_old_thread_local = false);
static bool memoryAllocated(size_t mem);
static bool memoryDeallocated(size_t mem);
View
4 restore.hh
@@ -22,6 +22,7 @@
#include <memcached/engine.h>
class EventuallyPersistentEngine;
+class ObjectRegistry;
/**
* The restore manager is responsible for running the restore of a set of
@@ -98,9 +99,12 @@ protected:
std::stringstream value;
value << val;
std::string n = name.str();
+
+ EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
add_stat(n.data(), static_cast<uint16_t>(n.length()),
value.str().data(), static_cast<uint32_t>(value.str().length()),
cookie);
+ ObjectRegistry::onSwitchThread(e);
}
void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) {
View
3  statwriter.hh
@@ -5,14 +5,17 @@
#include <memcached/engine.h>
#include <memcached/protocol_binary.h>
+#include "ep_engine.h"
#include "histo.hh"
namespace STATWRITER_NAMESPACE {
inline void add_casted_stat(const char *k, const char *v,
ADD_STAT add_stat, const void *cookie) {
+ EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
add_stat(k, static_cast<uint16_t>(strlen(k)),
v, static_cast<uint32_t>(strlen(v)), cookie);
+ ObjectRegistry::onSwitchThread(e);
}
template <typename T>
View
3  tapconnection.hh
@@ -335,9 +335,12 @@ protected:
std::stringstream value;
value << val;
std::string n = tap.str();
+
+ EventuallyPersistentEngine *e = ObjectRegistry::onSwitchThread(NULL, true);
add_stat(n.data(), static_cast<uint16_t>(n.length()),
value.str().data(), static_cast<uint32_t>(value.str().length()),
c);
+ ObjectRegistry::onSwitchThread(e);
}
void addStat(const char *nm, bool val, ADD_STAT add_stat, const void *c) {
Please sign in to comment.
Something went wrong with that request. Please try again.