Skip to content

Commit

Permalink
MB-16181: Add collections.set_manifest support
Browse files Browse the repository at this point in the history
Add a method which will accept the new manifest and apply it to
all active vbuckets.

The latest manifest is saved in memory and also used for when any VB
is set to active

Change-Id: Ic6a339bc5af279d105b679f528ff3675d1f16ac7
Reviewed-on: http://review.couchbase.org/77436
Reviewed-by: Dave Rigby <daver@couchbase.com>
Tested-by: Build Bot <build@couchbase.com>
  • Loading branch information
jimwwalker authored and daverigby committed May 23, 2017
1 parent f137afd commit 126ed5a
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Expand Up @@ -151,7 +151,8 @@ SET(OBJECTREGISTRY_SOURCE src/objectregistry.cc)
SET(CONFIG_SOURCE src/configuration.cc
${CMAKE_CURRENT_BINARY_DIR}/src/generated_configuration.cc)

SET(COLLECTIONS_SOURCE src/collections/manifest.cc
SET(COLLECTIONS_SOURCE src/collections/manager.cc
src/collections/manifest.cc
src/collections/vbucket_manifest.cc
src/collections/vbucket_manifest_entry.cc)

Expand Down
76 changes: 76 additions & 0 deletions src/collections/manager.cc
@@ -0,0 +1,76 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2017 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "collections/manager.h"
#include "collections/manifest.h"
#include "kv_bucket.h"
#include "vbucket.h"

Collections::Manager::Manager() : current(std::make_unique<Manifest>()) {
}

cb::engine_error Collections::Manager::update(KVBucket& bucket,
const std::string& json) {
std::unique_lock<std::mutex> ul(lock, std::try_to_lock);
if (!ul.owns_lock()) {
// Make concurrent updates fail, in realiy there should only be one
// admin connection making changes.
return cb::engine_error(cb::engine_errc::temporary_failure,
"Collections::Manager::update already locked");
}

std::unique_ptr<Manifest> newManifest;
// Construct a newManifest (will throw if JSON was illegal)
try {
newManifest = std::make_unique<Manifest>(json);
} catch (std::exception& e) {
LOG(EXTENSION_LOG_NOTICE,
"Collections::Manager::update can't construct manifest e.what:%s",
e.what());
return cb::engine_error(
cb::engine_errc::invalid_arguments,
"Collections::Manager::update manifest json invalid:" + json);
}

// Validate manifest revision is increasing
if (newManifest->getRevision() <= current->getRevision()) {
return cb::engine_error(
cb::engine_errc::invalid_arguments,
"Collections::Manager::update manifest revision:" +
std::to_string(current->getRevision()) + " json:" +
json);
}

current = std::move(newManifest);

for (int i = 0; i < bucket.getVBuckets().getSize(); i++) {
auto vb = bucket.getVBuckets().getBucket(i);

if (vb && vb->getState() == vbucket_state_active) {
vb->updateFromManifest(*current);
}
}

return cb::engine_error(cb::engine_errc::success,
"Collections::Manager::update");
}

void Collections::Manager::update(VBucket& vb) const {
// Lock manager updates
std::lock_guard<std::mutex> ul(lock);
vb.updateFromManifest(*current);
}
65 changes: 65 additions & 0 deletions src/collections/manager.h
@@ -0,0 +1,65 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2017 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <memcached/engine_error.h>

#include <memory>
#include <mutex>

class KVBucket;
class VBucket;

namespace Collections {

class Manifest;

/**
* Collections::Manager provides some bucket level management functions
* such as the code which enables the MCBP set_collections command.
*/
class Manager {
public:
Manager();

/**
* Update the bucket with the latest JSON collections manifest.
*
* Locks the Manager and prevents concurrent updates, concurrent updates
* are failed with TMPFAIL as in reality there should be 1 admin connection.
*
* @param bucket the bucket receiving a set-collections command.
* @param json the json manifest form a set-collections command.
* @returns engine_error indicating why the update failed.
*/
cb::engine_error update(KVBucket& bucket, const std::string& json);

/**
* Update the vbucket's manifest with the current Manifest
* The Manager is locked to prevent current changing whilst this update
* occurs.
*/
void update(VBucket& vb) const;

private:
mutable std::mutex lock;

/// Store the most recent (current) manifest received
std::unique_ptr<Manifest> current;
};
}
7 changes: 7 additions & 0 deletions src/ep_engine.cc
Expand Up @@ -1764,6 +1764,12 @@ static ENGINE_ERROR_CODE EvpGetClusterConfig(ENGINE_HANDLE* handle,
return callback(cookie, config, len);
}

static cb::engine_error EvpCollectionsSetManifest(ENGINE_HANDLE* handle,
cb::const_char_buffer json) {
auto engine = acquireEngine(handle);
return engine->getKVBucket()->setCollections(json);
}

void LOG(EXTENSION_LOG_LEVEL severity, const char *fmt, ...) {
va_list va;
va_start(va, fmt);
Expand Down Expand Up @@ -1832,6 +1838,7 @@ EventuallyPersistentEngine::EventuallyPersistentEngine(
ENGINE_HANDLE_V1::dcp.response_handler = EvpDcpResponseHandler;
ENGINE_HANDLE_V1::dcp.system_event = EvpDcpSystemEvent;
ENGINE_HANDLE_V1::set_log_level = EvpSetLogLevel;
ENGINE_HANDLE_V1::collections.set_manifest = EvpCollectionsSetManifest;

serverApi = getServerApiFunc();
memset(&info, 0, sizeof(info));
Expand Down
19 changes: 18 additions & 1 deletion src/kv_bucket.cc
Expand Up @@ -34,6 +34,7 @@

#include "access_scanner.h"
#include "checkpoint_remover.h"
#include "collections/manager.h"
#include "conflict_resolution.h"
#include "connmap.h"
#include "dcp/dcpconnmap.h"
Expand Down Expand Up @@ -351,7 +352,8 @@ KVBucket::KVBucket(EventuallyPersistentEngine& theEngine)
bgFetchDelay(0),
backfillMemoryThreshold(0.95),
statsSnapshotTaskId(0),
lastTransTimePerItem(0) {
lastTransTimePerItem(0),
collectionsManager(std::make_unique<Collections::Manager>()) {
cachedResidentRatio.activeRatio.store(0);
cachedResidentRatio.replicaRatio.store(0);

Expand Down Expand Up @@ -818,6 +820,11 @@ ENGINE_ERROR_CODE KVBucket::setVBucketState_UNLOCKED(
// Before adding the VB to the map increment the revision
getRWUnderlying(vbid)->incrementRevision(vbid);

// If active, update the VB from the bucket's collection state
if (to == vbucket_state_active) {
collectionsManager->update(*newvb);
}

if (vbMap.addBucket(newvb) == ENGINE_ERANGE) {
return ENGINE_ERANGE;
}
Expand Down Expand Up @@ -2791,3 +2798,13 @@ void KVBucket::initializeExpiryPager(Configuration& config) {
config.addValueChangedListener("exp_pager_initial_run_time",
new EPStoreValueChangeListener(*this));
}

cb::engine_error KVBucket::setCollections(cb::const_char_buffer json) {
// cJSON can't accept a size so we must create a string
std::string manifest(json.data(), json.size());

// Inhibit VB state changes whilst updating the vbuckets
LockHolder lh(vbsetMutex);

return collectionsManager->update(*this, manifest);
}
11 changes: 11 additions & 0 deletions src/kv_bucket.h
Expand Up @@ -30,6 +30,9 @@
#include <deque>

class VBucketCountVisitor;
namespace Collections {
class Manager;
}

/**
* VBucket visitor callback adaptor.
Expand Down Expand Up @@ -740,6 +743,12 @@ class KVBucket : public KVBucketIface {
uint64_t maxCas = 0,
const std::string& collectionsManifest = "") = 0;

/**
* Method to handle set_collections commands
* @param json a buffer containing a JSON manifest to apply to the bucket
*/
cb::engine_error setCollections(cb::const_char_buffer json);

protected:
// During the warmup phase we might want to enable external traffic
// at a given point in time.. The LoadStorageKvPairCallback will be
Expand Down Expand Up @@ -860,6 +869,8 @@ class KVBucket : public KVBucketIface {
std::mutex compactionLock;
std::list<CompTaskEntry> compactionTasks;

std::unique_ptr<Collections::Manager> collectionsManager;

friend class KVBucketTest;

DISALLOW_COPY_AND_ASSIGN(KVBucket);
Expand Down
6 changes: 6 additions & 0 deletions src/kv_bucket_iface.h
Expand Up @@ -822,6 +822,12 @@ class KVBucketIface {
*/
virtual size_t getNumPersistedDeletes(uint16_t vbid) = 0;

/**
* Method to handle set_collections commands
* @param json a buffer containing a JSON manifest to apply to the bucket
*/
virtual cb::engine_error setCollections(cb::const_char_buffer json) = 0;

protected:

// Methods called during warmup
Expand Down
66 changes: 66 additions & 0 deletions tests/module_tests/collections/evp_store_collections_test.cc
Expand Up @@ -880,3 +880,69 @@ TEST_F(CollectionsDcpTest, test_dcp_separator_many) {
// And done
EXPECT_EQ(ENGINE_SUCCESS, producer->step(producers.get()));
}

class CollectionsManagerTest : public CollectionsTest {};

/**
* Test checks that setCollections propagates the collection data to active
* vbuckets.
*/
TEST_F(CollectionsManagerTest, basic) {
// Add some more VBuckets just so there's some iteration happening
const int extraVbuckets = 2;
for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
store->setVBucketState(vb, vbucket_state_active, false);
}

store->setCollections(
{R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});

// Check all vbuckets got the collections
for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
auto vbp = store->getVBucket(vb);
EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
{"meat@@bacon", DocNamespace::Collections}));
EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
{"anykey", DocNamespace::DefaultCollection}));
}
}

/**
* Test checks that setCollections propagates the collection data to active
* vbuckets and not the replicas
*/
TEST_F(CollectionsManagerTest, basic2) {
// Add some more VBuckets just so there's some iteration happening
const int extraVbuckets = 2;
// Add active and replica
for (int vb = vbid + 1; vb <= (vbid + extraVbuckets); vb++) {
if (vb & 1) {
store->setVBucketState(vb, vbucket_state_active, false);
} else {
store->setVBucketState(vb, vbucket_state_replica, false);
}
}

store->setCollections(
{R"({"revision":1,"separator":"@@","collections":["$default", "meat"]})"});

// Check all vbuckets got the collections
for (int vb = vbid; vb <= (vbid + extraVbuckets); vb++) {
auto vbp = store->getVBucket(vb);
if (vbp->getState() == vbucket_state_active) {
EXPECT_EQ("@@", vbp->lockCollections().getSeparator());
EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
{"meat@@bacon", DocNamespace::Collections}));
EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
{"anykey", DocNamespace::DefaultCollection}));
} else {
// Replica will be in default constructed settings
EXPECT_EQ("::", vbp->lockCollections().getSeparator());
EXPECT_FALSE(vbp->lockCollections().doesKeyContainValidCollection(
{"meat@@bacon", DocNamespace::Collections}));
EXPECT_TRUE(vbp->lockCollections().doesKeyContainValidCollection(
{"anykey", DocNamespace::DefaultCollection}));
}
}
}

0 comments on commit 126ed5a

Please sign in to comment.