diff --git a/callbacks.hh b/callbacks.hh index d2fd31b40..b701f6871 100644 --- a/callbacks.hh +++ b/callbacks.hh @@ -7,17 +7,20 @@ #include "locks.hh" class Item; +class StoredValue; /** * Value for callback for GET operations. */ class GetValue { public: - GetValue() : value(NULL), id(-1), vb_version(-1), status(ENGINE_KEY_ENOENT) { } + GetValue() : value(NULL), storedValue(NULL), id(-1), + vb_version(-1), status(ENGINE_KEY_ENOENT) { } explicit GetValue(Item *v, ENGINE_ERROR_CODE s=ENGINE_SUCCESS, - uint64_t i = -1, uint16_t vbucket_version = -1) : - value(v), id(i), vb_version(vbucket_version), status(s) { } + uint64_t i = -1, uint16_t vbucket_version = -1, + StoredValue *sv = NULL) : + value(v), storedValue(sv), id(i), vb_version(vbucket_version), status(s) { } /** * The value retrieved for the key. @@ -39,9 +42,17 @@ public: */ uint16_t getVBucketVersion() { return vb_version; } + /** + * Get the StoredValue instance associated with the item (if applicable). + */ + StoredValue* getStoredValue() const { + return storedValue; + } + private: Item* value; + StoredValue* storedValue; uint64_t id; uint16_t vb_version; ENGINE_ERROR_CODE status; diff --git a/command_ids.h b/command_ids.h index 524026f08..f41827a88 100644 --- a/command_ids.h +++ b/command_ids.h @@ -33,10 +33,11 @@ /* * IDs for the events of the SYNC command. */ -#define SYNC_PERSISTED_EVENT 1 -#define SYNC_MODIFIED_EVENT 2 -#define SYNC_DELETED_EVENT 3 -#define SYNC_INVALID_KEY 4 -#define SYNC_INVALID_CAS 5 +#define SYNC_PERSISTED_EVENT 1 +#define SYNC_MODIFIED_EVENT 2 +#define SYNC_DELETED_EVENT 3 +#define SYNC_REPLICATED_EVENT 4 +#define SYNC_INVALID_KEY 5 +#define SYNC_INVALID_CAS 6 #endif /* COMMAND_IDS_H */ diff --git a/ep.cc b/ep.cc index b42ab1bb3..70d76d63a 100644 --- a/ep.cc +++ b/ep.cc @@ -903,7 +903,7 @@ GetValue EventuallyPersistentStore::get(const std::string &key, bgFetch(key, vbucket, vbuckets.getBucketVersion(vbucket), v->getId(), cookie); } - return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId()); + return GetValue(NULL, ENGINE_EWOULDBLOCK, v->getId(), -1, v); } // return an invalid cas value if the item is locked @@ -912,7 +912,7 @@ GetValue EventuallyPersistentStore::get(const std::string &key, : v->getCas(); GetValue rv(new Item(v->getKey(), v->getFlags(), v->getExptime(), v->getValue(), icas, v->getId(), vbucket), - ENGINE_SUCCESS, v->getId()); + ENGINE_SUCCESS, v->getId(), -1, v); return rv; } else { GetValue rv; diff --git a/ep_engine.cc b/ep_engine.cc index 728d2b23e..25992e4d0 100644 --- a/ep_engine.cc +++ b/ep_engine.cc @@ -1521,13 +1521,22 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie GetValue gv(epstore->get(item->getKey(), item->getVBucketId(), cookie, false)); if (gv.getStatus() == ENGINE_SUCCESS) { - *itm = gv.getValue(); delete item; + *itm = item = gv.getValue(); } else { *itm = item; } *vbucket = static_cast(*itm)->getVBucketId(); + if (gv.getStoredValue() != NULL) { + gv.getStoredValue()->incrementNumReplicas(); + syncRegistry.itemReplicated(*item); + } else { + getLogger()->log(EXTENSION_LOG_WARNING, NULL, + "NULL StoredValue* for key %s, vbucket %d", + item->getKey().c_str(), item->getVBucketId()); + } + if (!connection->vbucketFilter(*vbucket)) { // We were going to use the item that we received from // disk, but the filter says not to, so we need to get rid @@ -1555,9 +1564,13 @@ inline tap_event_t EventuallyPersistentEngine::doWalkTapQueue(const void *cookie false, false)); ENGINE_ERROR_CODE r = gv.getStatus(); if (r == ENGINE_SUCCESS) { + assert(gv.getStoredValue() != NULL); *itm = gv.getValue(); ret = TAP_MUTATION; + gv.getStoredValue()->incrementNumReplicas(); + syncRegistry.itemReplicated(*gv.getValue()); + ++stats.numTapFGFetched; ++connection->queueDrain; } else if (r == ENGINE_KEY_ENOENT) { @@ -3182,7 +3195,17 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::sync(std::set *keys, syncRegistry.addMutationListener(syncListener); break; case REP: - // TODO + { + syncRegistry.addReplicationListener(syncListener); + + std::vector< std::pair >::iterator itv; + for (itv = storedValues.begin(); itv != storedValues.end(); itv++) { + StoredValue *sv = itv->first; + key_spec_t keyspec(sv->getCas(), itv->second, sv->getKey()); + + syncListener->keySynced(keyspec, sv->getNumReplicas()); + } + } break; case REP_OR_PERSIST: // TODO @@ -3240,7 +3263,7 @@ static void assembleSyncResponse(std::stringstream &resp, SyncListener *syncList nkeys += syncListener->getDeletedKeys().size(); break; case REP: - // TODO + nkeys += syncListener->getReplicatedKeys().size(); break; case REP_OR_PERSIST: // TODO @@ -3265,7 +3288,7 @@ static void assembleSyncResponse(std::stringstream &resp, SyncListener *syncList addSyncKeySpecs(resp, syncListener->getDeletedKeys(), SYNC_DELETED_EVENT); break; case REP: - // TODO + addSyncKeySpecs(resp, syncListener->getReplicatedKeys(), SYNC_REPLICATED_EVENT); break; case REP_OR_PERSIST: // TODO diff --git a/ep_testsuite.cc b/ep_testsuite.cc index ce9c16e15..851f013f0 100644 --- a/ep_testsuite.cc +++ b/ep_testsuite.cc @@ -794,6 +794,14 @@ typedef struct { uint32_t wait; } set_key_thread_params; +typedef struct { + ENGINE_HANDLE *h; + ENGINE_HANDLE_V1 *h1; + std::set *expectedKeys; + std::string streamName; + uint32_t wait; +} tap_stream_thread_params; + extern "C" { static void* conc_del_set_thread(void *arg) { struct handle_pair *hp = static_cast(arg); @@ -847,6 +855,96 @@ extern "C" { return NULL; } + + static void* tap_stream_thread(void *arg) { + tap_stream_thread_params *params = static_cast(arg); + std::set *expectedKeys = params->expectedKeys; + + usleep(params->wait); + + std::set vbuckets; + for (std::set::iterator it = expectedKeys->begin(); + it != expectedKeys->end(); it++) { + + vbuckets.insert(it->vbucketid); + } + + uint16_t *vbucketfilter = new uint16_t[1 + vbuckets.size()]; + vbucketfilter[0] = htons((uint16_t) vbuckets.size()); + off_t off = 1; + + for (std::set::iterator it = vbuckets.begin(); + it != vbuckets.end(); it++) { + + vbucketfilter[off++] = htons(*it); + } + + const void *cookie = testHarness.create_cookie(); + testHarness.lock_cookie(cookie); + + TAP_ITERATOR iter = params->h1->get_tap_iterator(params->h, cookie, + params->streamName.c_str(), + params->streamName.length(), + TAP_CONNECT_FLAG_LIST_VBUCKETS, + static_cast(vbucketfilter), + sizeof(uint16_t) + (sizeof(uint16_t) * vbuckets.size())); + check(iter != NULL, "Failed to create a tap iterator"); + + item *it; + Item *item; + void *engine_specific; + uint16_t nengine_specific; + uint8_t ttl; + uint16_t flags; + uint32_t seqno; + tap_event_t event; + std::set keysReceived; + bool done = false; + uint16_t vbid; + + do { + event = iter(params->h, cookie, &it, &engine_specific, + &nengine_specific, &ttl, &flags, + &seqno, &vbid); + + switch (event) { + case TAP_PAUSE: + done = (keysReceived.size() == expectedKeys->size()); + if (!done) { + testHarness.waitfor_cookie(cookie); + } + break; + case TAP_NOOP: + case TAP_OPAQUE: + break; + case TAP_MUTATION: + item = reinterpret_cast(it); + + check(vbuckets.find(item->getVBucketId()) != vbuckets.end(), + "Received an item for a vbucket we don't subscribe to"); + + if (expectedKeys->find(*item) != expectedKeys->end()) { + keysReceived.insert(*item); + } + break; + case TAP_DISCONNECT: + done = true; + break; + default: + std::cerr << "Unexpected event: " << event << std::endl; + done = true; + } + } while (!done); + + testHarness.unlock_cookie(cookie); + params->h1->release(params->h, cookie, it); + delete [] vbucketfilter; + + check(keysReceived.size() == expectedKeys->size(), + "Didn't received all the expected items from the tap stream."); + + return NULL; + } } static enum test_result test_conc_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) { @@ -4172,6 +4270,97 @@ static enum test_result test_sync_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h return SUCCESS; } +static enum test_result test_sync_replication(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) { + const uint16_t test_vbid = 307; + check(set_vbucket_state(h, h1, test_vbid, vbucket_state_active), + "Failed to set test vbucket state."); + + const uint8_t nReplicas = 4; + const key_spec_t keyspecs[] = { + key_spec_t(0, test_vbid, "key1"), key_spec_t(0, test_vbid, "key2"), + key_spec_t(0, test_vbid, "key3"), key_spec_t(0, test_vbid, "key4"), + key_spec_t(0, test_vbid, "bad_key") + }; + const uint16_t nkeys = 5; + + for (int i = 0; i < (nkeys - 1); i++) { + check(store(h, h1, NULL, OPERATION_SET, keyspecs[i].key.c_str(), + "foobar", NULL, 0, test_vbid) == ENGINE_SUCCESS, + "Failed to store an item."); + } + + std::set *expectedKeyset = new std::set(); + for (int i = 0; i < (nkeys - 1); i++) { + expectedKeyset->insert(keyspecs[i]); + } + + pthread_t threads[nReplicas]; + std::vector params; + + for (int i = 0; i < nReplicas; i++) { + tap_stream_thread_params *p = new tap_stream_thread_params(); + std::stringstream ss; + ss << "tap_stream_" << (char) ('0' + (i + 1)); + + p->h = h; + p->h1 = h1; + p->expectedKeys = expectedKeyset; + p->wait = 2000000; + p->streamName = ss.str(); + params.push_back(p); + } + + for (int i = 0; i < nReplicas; i++) { + int r = pthread_create(&threads[i], NULL, tap_stream_thread, params[i]); + assert(r == 0); + } + + protocol_binary_request_header *pkt; + pkt = create_sync_packet((uint32_t) ((nReplicas & 0x0f) << 4), nkeys, keyspecs); + + check(h1->unknown_command(h, NULL, pkt, add_response) == ENGINE_SUCCESS, + "SYNC on replication operation failed"); + + for (int i = 0; i < nReplicas; i++) { + void *trv = NULL; + int r = pthread_join(threads[i], &trv); + assert(r == 0); + } + + // verify the response sent to the client is correct + std::list< std::pair > resp = parse_sync_response(last_body); + + check(resp.size() == nkeys, "response has the same # of keys"); + + std::list< std::pair >::iterator itresp = resp.begin(); + + for ( ; itresp != resp.end(); itresp++) { + key_spec_t keyspec = itresp->first; + uint8_t eventid = itresp->second; + + check(keyspec.vbucketid == test_vbid, "right vbucket id"); + + if (keyspec.key == "bad_key") { + check(keyspec.cas == 0, "right cas"); + check(eventid == SYNC_INVALID_KEY, + "right event id (SYNC_INVALID_KEY)"); + } else { + check(eventid == SYNC_REPLICATED_EVENT, + "right event id (SYNC_REPLICATED_EVENT)"); + } + } + + std::vector::iterator it = params.begin(); + for ( ; it != params.end(); it++) { + delete *it; + } + + delete expectedKeyset; + free(pkt); + + return SUCCESS; +} + MEMCACHED_PUBLIC_API engine_test_t* get_tests(void) { @@ -4377,6 +4566,7 @@ engine_test_t* get_tests(void) { {"sync bad flags", test_sync_bad_flags, NULL, teardown, NULL}, {"sync persistence", test_sync_persistence, NULL, teardown, NULL}, {"sync mutation", test_sync_mutation, NULL, teardown, NULL}, + {"sync replication", test_sync_replication, NULL, teardown, NULL}, {NULL, NULL, NULL, NULL, NULL} }; return tests; diff --git a/stored-value.hh b/stored-value.hh index 0fecef394..1795716e6 100644 --- a/stored-value.hh +++ b/stored-value.hh @@ -210,6 +210,19 @@ public: return flags; } + /** + * Get the number of times this value was replicated. + * + * @return the number of times this value was replicaded + */ + uint8_t getNumReplicas() const { + return replicas; + } + + void incrementNumReplicas(uint8_t count = 1) { + replicas += count; + } + /** * Set a new value for this item. * @@ -232,6 +245,7 @@ public: } markDirty(); increaseCurrentSize(stats, ht, size()); + replicas = 0; } size_t valLength() { @@ -509,7 +523,7 @@ private: StoredValue(const Item &itm, StoredValue *n, EPStats &stats, HashTable &ht, bool setDirty = true, bool small = false) : value(itm.getValue()), next(n), id(itm.getId()), - dirtiness(0), _isSmall(small), flags(itm.getFlags()) + dirtiness(0), _isSmall(small), flags(itm.getFlags()), replicas(0) { if (_isSmall) { @@ -541,13 +555,14 @@ private: friend class HashTable; friend class StoredValueFactory; - value_t value; // 16 bytes - StoredValue *next; // 8 bytes - int64_t id; // 8 bytes - uint32_t dirtiness : 30; // 30 bits -+ - bool _isSmall : 1; // 1 bit | 4 bytes - bool _isDirty : 1; // 1 bit --+ - uint32_t flags; // 4 bytes + value_t value; // 16 bytes + StoredValue *next; // 8 bytes + int64_t id; // 8 bytes + uint32_t dirtiness : 30; // 30 bits -+ + bool _isSmall : 1; // 1 bit | 4 bytes + bool _isDirty : 1; // 1 bit --+ + uint32_t flags; // 4 bytes + Atomic replicas; // 1 byte union stored_value_bodies extra; diff --git a/sync_registry.cc b/sync_registry.cc index e6c0cfccd..d9f2e36e0 100644 --- a/sync_registry.cc +++ b/sync_registry.cc @@ -66,6 +66,18 @@ void SyncRegistry::itemDeleted(const key_spec_t &keyspec) { } +void SyncRegistry::addReplicationListener(SyncListener *syncListener) { + LockHolder lh(replicationMutex); + replicationListeners.push_back(syncListener); +} + + +void SyncRegistry::itemReplicated(const key_spec_t &keyspec, uint8_t replicaCount) { + LockHolder lh(replicationMutex); + notifyListeners(replicationListeners, keyspec, replicaCount); +} + + void SyncRegistry::notifyListeners(std::list &listeners, const key_spec_t &keyspec, bool deleted) { @@ -82,17 +94,33 @@ void SyncRegistry::notifyListeners(std::list &listeners, } +void SyncRegistry::notifyListeners(std::list &listeners, + const key_spec_t &keyspec, + uint8_t replicaCount) { + std::list::iterator it = listeners.begin(); + + while (it != listeners.end()) { + SyncListener *listener = *it; + if (listener->keySynced(keyspec, replicaCount)) { + it = listeners.erase(it); + } else { + it++; + } + } +} + + SyncListener::SyncListener(EventuallyPersistentEngine &epEngine, const void *c, std::set *keys, sync_type_t sync_type, uint8_t replicaCount) : engine(epEngine), cookie(c), keySpecs(keys), - syncType(sync_type), replicas(replicaCount) { + syncType(sync_type), replicasPerKey(replicaCount) { - // TODO: support replication sync, replication and persistence sync, and - // replicator or persistence sync - assert(syncType == PERSIST || syncType == MUTATION); + // TODO: support "replication AND persistence sync", and + // "replicator OR persistence sync" + assert(syncType == PERSIST || syncType == MUTATION || syncType == REP); } @@ -125,7 +153,42 @@ bool SyncListener::keySynced(const key_spec_t &keyspec, bool deleted) { finished = ((modifiedKeys.size() + deletedKeys.size()) == keySpecs->size()); break; case REP: - // TODO + case REP_OR_PERSIST: + case REP_AND_PERSIST: + break; + } + + if (finished) { + engine.getServerApi()->cookie->store_engine_specific(cookie, this); + engine.notifyIOComplete(cookie, ENGINE_SUCCESS); + } + } + + return finished; +} + + +bool SyncListener::keySynced(const key_spec_t &keyspec, uint8_t numReplicas) { + bool finished = false; + LockHolder lh(mutex); + std::set::iterator it = keySpecs->find(keyspec); + + if (it != keySpecs->end()) { + uint8_t replicasDone = numReplicas; + + if (replicaCounts.find(keyspec) != replicaCounts.end()) { + replicasDone += replicaCounts[keyspec]; + } + + replicaCounts[keyspec] = replicasDone; + + if (replicasDone >= replicasPerKey) { + replicatedKeys.insert(keyspec); + } + + switch (syncType) { + case REP: + finished = (replicatedKeys.size() == keySpecs->size()); break; case REP_OR_PERSIST: // TODO @@ -133,6 +196,9 @@ bool SyncListener::keySynced(const key_spec_t &keyspec, bool deleted) { case REP_AND_PERSIST: // TODO break; + case PERSIST: + case MUTATION: + break; } if (finished) { diff --git a/sync_registry.hh b/sync_registry.hh index 4cc221c22..23e889982 100644 --- a/sync_registry.hh +++ b/sync_registry.hh @@ -19,6 +19,7 @@ #include #include +#include #include "common.hh" #include "item.hh" @@ -75,18 +76,28 @@ public: void itemModified(const key_spec_t &keyspec); void itemDeleted(const key_spec_t &keyspec); + void addReplicationListener(SyncListener *syncListener); + void itemReplicated(const key_spec_t &keyspec, uint8_t replicaCount = 1); + private: void notifyListeners(std::list &listeners, const key_spec_t &keyspec, bool deleted); + void notifyListeners(std::list &listeners, + const key_spec_t &keyspec, + uint8_t replicaCount); + std::list persistenceListeners; Mutex persistenceMutex; std::list mutationListeners; Mutex mutationMutex; + std::list replicationListeners; + Mutex replicationMutex; + DISALLOW_COPY_AND_ASSIGN(SyncRegistry); }; @@ -103,6 +114,7 @@ public: ~SyncListener(); bool keySynced(const key_spec_t &keyspec, bool deleted = false); + bool keySynced(const key_spec_t &keyspec, uint8_t numReplicas); sync_type_t getSyncType() const { return syncType; @@ -120,6 +132,10 @@ public: return deletedKeys; } + std::set& getReplicatedKeys() { + return replicatedKeys; + } + std::set& getNonExistentKeys() { return nonExistentKeys; } @@ -134,10 +150,12 @@ private: const void *cookie; std::set *keySpecs; sync_type_t syncType; - uint8_t replicas; + const uint8_t replicasPerKey; std::set persistedKeys; std::set modifiedKeys; std::set deletedKeys; + std::set replicatedKeys; + std::map replicaCounts; std::set nonExistentKeys; std::set invalidCasKeys; Mutex mutex;