Skip to content

Commit

Permalink
implementation of getl command (lock and get)
Browse files Browse the repository at this point in the history
Implements pessimistic locking for ep_engine
if an item is locked, only a cas op is allowed
The cas value returned for a locked item is set to -1

Change-Id: I90b62d5f393a58c4f00f7de36add858c357dcb1d
Reviewed-on: http://review.northscale.com:8080/49
Reviewed-by: Dustin Sallings <dustin@spy.net>
Tested-by: Dustin Sallings <dustin@spy.net>
  • Loading branch information
maniktaneja authored and dustin committed May 18, 2010
1 parent d9dd164 commit ee21db8
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 28 deletions.
1 change: 1 addition & 0 deletions Makefile.am
Expand Up @@ -12,6 +12,7 @@ ep_la_SOURCES = \
dispatcher.cc dispatcher.hh \
ep.cc ep.hh \
ep_engine.cc ep_engine.h \
ep_extension.cc ep_extension.h \
flusher.cc flusher.hh \
item.cc item.hh \
kvstore.hh \
Expand Down
12 changes: 12 additions & 0 deletions callbacks.hh
Expand Up @@ -17,6 +17,18 @@ public:
* Method called on callback.
*/
virtual void callback(RV &value) = 0;

virtual void setStatus(int status) {
myStatus = status;
}

virtual int getStatus() {
return myStatus;
}

private:

int myStatus;
};

/**
Expand Down
6 changes: 0 additions & 6 deletions configure.ac
Expand Up @@ -39,8 +39,6 @@ AC_DEFUN([AC_C_HTONLL],

AC_C_HTONLL

AC_CHECK_HEADERS([arpa/inet.h])

AS_IF([test "x$SUNCC" = "xyes"],
[
CFLAGS="-D_XOPEN_SOURCE=600 $CFLAGS"
Expand All @@ -54,10 +52,6 @@ AH_BOTTOM([
#else
#define EXPORT_FUNCTION
#endif
#if HAVE_ARPA_INET_H
#include <arpa/inet.h>
#endif
])

trymemcacheddir=""
Expand Down
45 changes: 43 additions & 2 deletions ep.cc
Expand Up @@ -122,14 +122,16 @@ void EventuallyPersistentStore::set(const Item &item, Callback<bool> &cb) {
mutation_type_t mtype = storage.set(item);
bool rv = true;

if (mtype == INVALID_CAS) {
if (mtype == INVALID_CAS || mtype == IS_LOCKED) {
rv = false;
} else if (mtype == WAS_CLEAN || mtype == NOT_FOUND) {
queueDirty(item.getKey());
if (mtype == NOT_FOUND) {
stats.curr_items.incr();
}
}

cb.setStatus((int)mtype);
cb.callback(rv);
}

Expand All @@ -140,8 +142,9 @@ void EventuallyPersistentStore::get(const std::string &key,
StoredValue *v = storage.unlocked_find(key, bucket_num);

if (v) {
// return an invalid cas value if the item is locked
GetValue rv(new Item(v->getKey(), v->getFlags(), v->getExptime(),
v->getValue(), v->getCas()));
v->getValue(), v->isLocked(ep_current_time()) ? -1 : v->getCas()));
cb.callback(rv);
} else {
GetValue rv(false);
Expand All @@ -150,6 +153,44 @@ void EventuallyPersistentStore::get(const std::string &key,
lh.unlock();
}

bool EventuallyPersistentStore::getLocked(const std::string &key,
Callback<GetValue> &cb,
rel_time_t currentTime,
uint32_t lockTimeout) {

int bucket_num = storage.bucket(key);
LockHolder lh(storage.getMutex(bucket_num));
StoredValue *v = storage.unlocked_find(key, bucket_num);

if (v) {
if (v->isLocked(currentTime)) {
GetValue rv(false);
cb.callback(rv);
lh.unlock();
return false;
}

// acquire lock and increment cas value

v->lock(currentTime + lockTimeout);

Item *it = new Item(v->getKey(), v->getFlags(), v->getExptime(),
v->getValue(), v->getCas());

it->setCas();
v->setCas(it->getCas());

GetValue rv(it);
cb.callback(rv);

} else {
GetValue rv(false);
cb.callback(rv);
}
lh.unlock();
return true;
}

bool EventuallyPersistentStore::getKeyStats(const std::string &key,
struct key_stats &kstats)
{
Expand Down
4 changes: 2 additions & 2 deletions ep.hh
Expand Up @@ -2,8 +2,6 @@
#ifndef EP_HH
#define EP_HH 1

#include "config.h"

#include <pthread.h>
#include <assert.h>
#include <stdbool.h>
Expand Down Expand Up @@ -130,6 +128,8 @@ public:

bool getKeyStats(const std::string &key, key_stats &kstats);

bool getLocked(const std::string &key, Callback<GetValue> &cb, rel_time_t currentTime, uint32_t lockTimeout);

private:
/* Queue an item to be written to persistent layer. */
void queueDirty(const std::string &key) {
Expand Down
5 changes: 3 additions & 2 deletions ep_engine.cc
@@ -1,7 +1,8 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
#include "config.h"

#include <arpa/inet.h>
#include <assert.h>

#include <memcached/engine.h>

#include "ep_engine.h"
Expand Down Expand Up @@ -331,7 +332,7 @@ extern "C" {
return ENGINE_SUCCESS;
}

static void *EvpNotifyTapIo(void*arg) {
void *EvpNotifyTapIo(void*arg) {
static_cast<EventuallyPersistentEngine*>(arg)->notifyTapIoThread();
return NULL;
}
Expand Down
44 changes: 31 additions & 13 deletions ep_engine.h
Expand Up @@ -4,6 +4,7 @@
#include "ep.hh"
#include "flusher.hh"
#include "sqlite-kvstore.hh"
#include "ep_extension.h"
#include <memcached/util.h>

#include <cstdio>
Expand All @@ -22,7 +23,7 @@ extern "C" {
ENGINE_EVENT_TYPE type,
const void *event_data,
const void *cb_data);
static void *EvpNotifyTapIo(void*arg);
void *EvpNotifyTapIo(void*arg);
}

#ifdef linux
Expand Down Expand Up @@ -320,6 +321,12 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
sleep(1);
}
}

if (ret == ENGINE_SUCCESS) {
getlExtension = new GetlExtension(backend, getServerApi);
getlExtension->initialize();
}

getLogger()->log(EXTENSION_LOG_DEBUG, NULL, "Engine init complete.\n");

return ret;
Expand Down Expand Up @@ -368,6 +375,8 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
addDeleteEvent(key);
return ENGINE_SUCCESS;
} else {
// in case of the item being locked, we should probably
// return a more relavent return code. @TODO
return ENGINE_KEY_ENOENT;
}
}
Expand Down Expand Up @@ -395,6 +404,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
} else {
return ENGINE_KEY_ENOENT;
}

}

ENGINE_ERROR_CODE getStats(const void* cookie,
Expand Down Expand Up @@ -435,7 +445,8 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
// FALLTHROUGH
case OPERATION_SET:
backend->set(*it, callback);
if (callback.getValue()) {
if (callback.getValue() &&
((mutation_type_t)callback.getStatus() != IS_LOCKED)) {
*cas = it->getCas();
addMutationEvent(it);
ret = ENGINE_SUCCESS;
Expand All @@ -451,6 +462,10 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
ret = ENGINE_NOT_STORED;
} else {
backend->set(*it, callback);
// unable to set if the key is locked
if ((mutation_type_t)callback.getStatus() == IS_LOCKED) {
return ENGINE_KEY_EEXISTS;
}
*cas = it->getCas();
addMutationEvent(it);
ret = ENGINE_SUCCESS;
Expand All @@ -462,6 +477,10 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
if (get(cookie, &i, it->getKey().c_str(), it->getNKey()) == ENGINE_SUCCESS) {
itemRelease(cookie, i);
backend->set(*it, callback);
// unable to set if the key is locked
if ((mutation_type_t)callback.getStatus() == IS_LOCKED) {
return ENGINE_KEY_EEXISTS;
}
*cas = it->getCas();
addMutationEvent(it);
ret = ENGINE_SUCCESS;
Expand Down Expand Up @@ -537,13 +556,12 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
}
}


std::stringstream vals;
vals << val << "\r\n";
size_t nb = vals.str().length();
char value[80];
size_t nb = snprintf(value, sizeof(value), "%llu\r\n",
(unsigned long long)val);
*result = val;
Item *nit = new Item(key, (uint16_t)nkey, item->getFlags(),
exptime, vals.str().c_str(), nb);
exptime, value, nb);
nit->setCas(item->getCas());
ret = store(cookie, nit, cas, OPERATION_CAS);
delete nit;
Expand All @@ -553,13 +571,11 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {

delete item;
} else if (ret == ENGINE_KEY_ENOENT && create) {
std::stringstream vals;
vals << initial << "\r\n";
size_t nb = vals.str().length();

char value[80];
size_t nb = snprintf(value, sizeof(value), "%llu\r\n",
(unsigned long long)initial);
*result = initial;
Item *item = new Item(key, (uint16_t)nkey, 0, exptime,
vals.str().c_str(), nb);
Item *item = new Item(key, (uint16_t)nkey, 0, exptime, value, nb);
ret = store(cookie, item, cas, OPERATION_ADD);
delete item;
}
Expand Down Expand Up @@ -844,6 +860,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
~EventuallyPersistentEngine() {
delete epstore;
delete sqliteDb;
delete getlExtension;
}

engine_info *getInfo() {
Expand Down Expand Up @@ -1227,6 +1244,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
engine_info info;
char buffer[sizeof(engine_info) + 10 * sizeof(feature_info) ];
} info;
GetlExtension *getlExtension;
};

class BackFillVisitor : public HashTableVisitor {
Expand Down

0 comments on commit ee21db8

Please sign in to comment.