Skip to content

Commit

Permalink
Implemented SYNC on replication
Browse files Browse the repository at this point in the history
First version of the SYNC on replication only operation.
SYNC on "replication AND persistence" and SYNC on "replication OR
persistence" are still to be done.

Change-Id: I12e34f74d525910812f043eda8c23e12202b976e
Reviewed-on: http://review.membase.org/4787
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Reviewed-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information
fdmanana authored and chiyoung committed Mar 6, 2011
1 parent af5959c commit 78fdf39
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 28 deletions.
17 changes: 14 additions & 3 deletions callbacks.hh
Expand Up @@ -7,17 +7,20 @@
#include "locks.hh"

class Item;
class StoredValue;

/**
* Value for callback for GET operations.
*/
class GetValue {
public:
GetValue() : value(NULL), id(-1), vb_version(-1), status(ENGINE_KEY_ENOENT) { }
GetValue() : value(NULL), storedValue(NULL), id(-1),
vb_version(-1), status(ENGINE_KEY_ENOENT) { }

explicit GetValue(Item *v, ENGINE_ERROR_CODE s=ENGINE_SUCCESS,
uint64_t i = -1, uint16_t vbucket_version = -1) :
value(v), id(i), vb_version(vbucket_version), status(s) { }
uint64_t i = -1, uint16_t vbucket_version = -1,
StoredValue *sv = NULL) :
value(v), storedValue(sv), id(i), vb_version(vbucket_version), status(s) { }

/**
* The value retrieved for the key.
Expand All @@ -39,9 +42,17 @@ public:
*/
uint16_t getVBucketVersion() { return vb_version; }

/**
* Get the StoredValue instance associated with the item (if applicable).
*/
StoredValue* getStoredValue() const {
return storedValue;
}

private:

Item* value;
StoredValue* storedValue;
uint64_t id;
uint16_t vb_version;
ENGINE_ERROR_CODE status;
Expand Down
11 changes: 6 additions & 5 deletions command_ids.h
Expand Up @@ -33,10 +33,11 @@
/*
* IDs for the events of the SYNC command.
*/
#define SYNC_PERSISTED_EVENT 1
#define SYNC_MODIFIED_EVENT 2
#define SYNC_DELETED_EVENT 3
#define SYNC_INVALID_KEY 4
#define SYNC_INVALID_CAS 5
#define SYNC_PERSISTED_EVENT 1
#define SYNC_MODIFIED_EVENT 2
#define SYNC_DELETED_EVENT 3
#define SYNC_REPLICATED_EVENT 4
#define SYNC_INVALID_KEY 5
#define SYNC_INVALID_CAS 6

#endif /* COMMAND_IDS_H */
4 changes: 2 additions & 2 deletions ep.cc
Expand Up @@ -903,7 +903,7 @@ GetValue EventuallyPersistentStore::get(const std::string &key,
bgFetch(key, vbucket, vbuckets.getBucketVersion(vbucket),
v->getId(), cookie);
}
return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId());
return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId(), -1, v);
}

// return an invalid cas value if the item is locked
Expand All @@ -912,7 +912,7 @@ GetValue EventuallyPersistentStore::get(const std::string &key,
: v->getCas();
GetValue rv(new Item(v->getKey(), v->getFlags(), v->getExptime(),
v->getValue(), icas, v->getId(), vbucket),
ENGINE_SUCCESS, v->getId());
ENGINE_SUCCESS, v->getId(), -1, v);
return rv;
} else {
GetValue rv;
Expand Down
31 changes: 27 additions & 4 deletions ep_engine.cc
Expand Up @@ -1521,13 +1521,22 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
GetValue gv(epstore->get(item->getKey(), item->getVBucketId(),
cookie, false));
if (gv.getStatus() == ENGINE_SUCCESS) {
*itm = gv.getValue();
delete item;
*itm = item = gv.getValue();
} else {
*itm = item;
}
*vbucket = static_cast<Item*>(*itm)->getVBucketId();

if (gv.getStoredValue() != NULL) {
gv.getStoredValue()->incrementNumReplicas();
syncRegistry.itemReplicated(*item);
} else {
getLogger()->log(EXTENSION_LOG_WARNING, NULL,
"NULL StoredValue* for key %s, vbucket %d",
item->getKey().c_str(), item->getVBucketId());
}

if (!connection->vbucketFilter(*vbucket)) {
// We were going to use the item that we received from
// disk, but the filter says not to, so we need to get rid
Expand Down Expand Up @@ -1555,9 +1564,13 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie
false, false));
ENGINE_ERROR_CODE r = gv.getStatus();
if (r == ENGINE_SUCCESS) {
assert(gv.getStoredValue() != NULL);
*itm = gv.getValue();
ret = TAP_MUTATION;

gv.getStoredValue()->incrementNumReplicas();
syncRegistry.itemReplicated(*gv.getValue());

++stats.numTapFGFetched;
++connection->queueDrain;
} else if (r == ENGINE_KEY_ENOENT) {
Expand Down Expand Up @@ -3182,7 +3195,17 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::sync(std::set<key_spec_t> *keys,
syncRegistry.addMutationListener(syncListener);
break;
case REP:
// TODO
{
syncRegistry.addReplicationListener(syncListener);

std::vector< std::pair<StoredValue*, uint16_t> >::iterator itv;
for (itv = storedValues.begin(); itv != storedValues.end(); itv++) {
StoredValue *sv = itv->first;
key_spec_t keyspec(sv->getCas(), itv->second, sv->getKey());

syncListener->keySynced(keyspec, sv->getNumReplicas());
}
}
break;
case REP_OR_PERSIST:
// TODO
Expand Down Expand Up @@ -3240,7 +3263,7 @@ static void assembleSyncResponse(std::stringstream &resp, SyncListener *syncList
nkeys += syncListener->getDeletedKeys().size();
break;
case REP:
// TODO
nkeys += syncListener->getReplicatedKeys().size();
break;
case REP_OR_PERSIST:
// TODO
Expand All @@ -3265,7 +3288,7 @@ static void assembleSyncResponse(std::stringstream &resp, SyncListener *syncList
addSyncKeySpecs(resp, syncListener->getDeletedKeys(), SYNC_DELETED_EVENT);
break;
case REP:
// TODO
addSyncKeySpecs(resp, syncListener->getReplicatedKeys(), SYNC_REPLICATED_EVENT);
break;
case REP_OR_PERSIST:
// TODO
Expand Down
190 changes: 190 additions & 0 deletions ep_testsuite.cc
Expand Up @@ -794,6 +794,14 @@ typedef struct {
uint32_t wait;
} set_key_thread_params;

typedef struct {
ENGINE_HANDLE *h;
ENGINE_HANDLE_V1 *h1;
std::set<key_spec_t> *expectedKeys;
std::string streamName;
uint32_t wait;
} tap_stream_thread_params;

extern "C" {
static void* conc_del_set_thread(void *arg) {
struct handle_pair *hp = static_cast<handle_pair *>(arg);
Expand Down Expand Up @@ -847,6 +855,96 @@ extern "C" {

return NULL;
}

static void* tap_stream_thread(void *arg) {
tap_stream_thread_params *params = static_cast<tap_stream_thread_params *>(arg);
std::set<key_spec_t> *expectedKeys = params->expectedKeys;

usleep(params->wait);

std::set<uint16_t> vbuckets;
for (std::set<key_spec_t>::iterator it = expectedKeys->begin();
it != expectedKeys->end(); it++) {

vbuckets.insert(it->vbucketid);
}

uint16_t *vbucketfilter = new uint16_t[1 + vbuckets.size()];
vbucketfilter[0] = htons((uint16_t) vbuckets.size());
off_t off = 1;

for (std::set<uint16_t>::iterator it = vbuckets.begin();
it != vbuckets.end(); it++) {

vbucketfilter[off++] = htons(*it);
}

const void *cookie = testHarness.create_cookie();
testHarness.lock_cookie(cookie);

TAP_ITERATOR iter = params->h1->get_tap_iterator(params->h, cookie,
params->streamName.c_str(),
params->streamName.length(),
TAP_CONNECT_FLAG_LIST_VBUCKETS,
static_cast<void *>(vbucketfilter),
sizeof(uint16_t) + (sizeof(uint16_t) * vbuckets.size()));
check(iter != NULL, "Failed to create a tap iterator");

item *it;
Item *item;
void *engine_specific;
uint16_t nengine_specific;
uint8_t ttl;
uint16_t flags;
uint32_t seqno;
tap_event_t event;
std::set<key_spec_t> keysReceived;
bool done = false;
uint16_t vbid;

do {
event = iter(params->h, cookie, &it, &engine_specific,
&nengine_specific, &ttl, &flags,
&seqno, &vbid);

switch (event) {
case TAP_PAUSE:
done = (keysReceived.size() == expectedKeys->size());
if (!done) {
testHarness.waitfor_cookie(cookie);
}
break;
case TAP_NOOP:
case TAP_OPAQUE:
break;
case TAP_MUTATION:
item = reinterpret_cast<Item *>(it);

check(vbuckets.find(item->getVBucketId()) != vbuckets.end(),
"Received an item for a vbucket we don't subscribe to");

if (expectedKeys->find(*item) != expectedKeys->end()) {
keysReceived.insert(*item);
}
break;
case TAP_DISCONNECT:
done = true;
break;
default:
std::cerr << "Unexpected event: " << event << std::endl;
done = true;
}
} while (!done);

testHarness.unlock_cookie(cookie);
params->h1->release(params->h, cookie, it);
delete [] vbucketfilter;

check(keysReceived.size() == expectedKeys->size(),
"Didn't received all the expected items from the tap stream.");

return NULL;
}
}

static enum test_result test_conc_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
Expand Down Expand Up @@ -4172,6 +4270,97 @@ static enum test_result test_sync_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h
return SUCCESS;
}

static enum test_result test_sync_replication(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
const uint16_t test_vbid = 307;
check(set_vbucket_state(h, h1, test_vbid, vbucket_state_active),
"Failed to set test vbucket state.");

const uint8_t nReplicas = 4;
const key_spec_t keyspecs[] = {
key_spec_t(0, test_vbid, "key1"), key_spec_t(0, test_vbid, "key2"),
key_spec_t(0, test_vbid, "key3"), key_spec_t(0, test_vbid, "key4"),
key_spec_t(0, test_vbid, "bad_key")
};
const uint16_t nkeys = 5;

for (int i = 0; i < (nkeys - 1); i++) {
check(store(h, h1, NULL, OPERATION_SET, keyspecs[i].key.c_str(),
"foobar", NULL, 0, test_vbid) == ENGINE_SUCCESS,
"Failed to store an item.");
}

std::set<key_spec_t> *expectedKeyset = new std::set<key_spec_t>();
for (int i = 0; i < (nkeys - 1); i++) {
expectedKeyset->insert(keyspecs[i]);
}

pthread_t threads[nReplicas];
std::vector<tap_stream_thread_params*> params;

for (int i = 0; i < nReplicas; i++) {
tap_stream_thread_params *p = new tap_stream_thread_params();
std::stringstream ss;
ss << "tap_stream_" << (char) ('0' + (i + 1));

p->h = h;
p->h1 = h1;
p->expectedKeys = expectedKeyset;
p->wait = 2000000;
p->streamName = ss.str();
params.push_back(p);
}

for (int i = 0; i < nReplicas; i++) {
int r = pthread_create(&threads[i], NULL, tap_stream_thread, params[i]);
assert(r == 0);
}

protocol_binary_request_header *pkt;
pkt = create_sync_packet((uint32_t) ((nReplicas & 0x0f) << 4), nkeys, keyspecs);

check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS,
"SYNC on replication operation failed");

for (int i = 0; i < nReplicas; i++) {
void *trv = NULL;
int r = pthread_join(threads[i], &trv);
assert(r == 0);
}

// verify the response sent to the client is correct
std::list< std::pair<key_spec_t, uint8_t> > resp = parse_sync_response(last_body);

check(resp.size() == nkeys, "response has the same # of keys");

std::list< std::pair<key_spec_t, uint8_t> >::iterator itresp = resp.begin();

for ( ; itresp != resp.end(); itresp++) {
key_spec_t keyspec = itresp->first;
uint8_t eventid = itresp->second;

check(keyspec.vbucketid == test_vbid, "right vbucket id");

if (keyspec.key == "bad_key") {
check(keyspec.cas == 0, "right cas");
check(eventid == SYNC_INVALID_KEY,
"right event id (SYNC_INVALID_KEY)");
} else {
check(eventid == SYNC_REPLICATED_EVENT,
"right event id (SYNC_REPLICATED_EVENT)");
}
}

std::vector<tap_stream_thread_params*>::iterator it = params.begin();
for ( ; it != params.end(); it++) {
delete *it;
}

delete expectedKeyset;
free(pkt);

return SUCCESS;
}

MEMCACHED_PUBLIC_API
engine_test_t* get_tests(void) {

Expand Down Expand Up @@ -4377,6 +4566,7 @@ engine_test_t* get_tests(void) {
{"sync bad flags", test_sync_bad_flags, NULL, teardown, NULL},
{"sync persistence", test_sync_persistence, NULL, teardown, NULL},
{"sync mutation", test_sync_mutation, NULL, teardown, NULL},
{"sync replication", test_sync_replication, NULL, teardown, NULL},
{NULL, NULL, NULL, NULL, NULL}
};
return tests;
Expand Down

0 comments on commit 78fdf39

Please sign in to comment.