Permalink
Fetching contributors…
Cannot retrieve contributors at this time
7929 lines (6786 sloc) 307 KB
/* -*- MODE: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2010 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Usage: (to run just a single test case)
// make engine_tests EP_TEST_NUM=3
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <condition_variable>
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <iomanip>
#include <map>
#include <mutex>
#include <set>
#include <sstream>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "atomic.h"
#include "ep_test_apis.h"
#include "ep_testsuite_common.h"
#include "locks.h"
#include <libcouchstore/couch_db.h>
#include <memcached/engine.h>
#include <memcached/engine_testapp.h>
#include <platform/cb_malloc.h>
#include <platform/dirutils.h>
#include <JSON_checker.h>
#include <memcached/types.h>
#include <string_utilities.h>
#include <xattr/blob.h>
#include <xattr/utils.h>
#ifdef linux
/* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
* optimize the conversion functions, but the prototypes generate warnings
* from gcc. The conversion methods isn't the bottleneck for my app, so
* just remove the warnings by undef'ing the optimization ..
*/
#undef ntohs
#undef ntohl
#undef htons
#undef htonl
#endif
// ptr_fun don't like the extern "C" thing for unlock cookie.. cast it
// away ;)
typedef void (*UNLOCK_COOKIE_T)(const void *cookie);
#define IMMEDIATE_MEM_STATS ";mem_merge_count_threshold=1"
#define MULTI_DISPATCHER_CONFIG \
"ht_size=129;ht_locks=3;chk_remover_stime=1;chk_period=60"
class ThreadData {
public:
ThreadData(ENGINE_HANDLE *eh, ENGINE_HANDLE_V1 *ehv1,
int e=0) : h(eh), h1(ehv1), extra(e) {}
ENGINE_HANDLE *h;
ENGINE_HANDLE_V1 *h1;
int extra;
};
enum class BucketType { EP, Ephemeral };
static void check_observe_seqno(bool failover,
BucketType bucket_type,
uint8_t format_type,
uint16_t vb_id,
uint64_t vb_uuid,
uint64_t last_persisted_seqno,
uint64_t current_seqno,
uint64_t failover_vbuuid = 0,
uint64_t failover_seqno = 0) {
uint8_t recv_format_type;
uint16_t recv_vb_id;
uint64_t recv_vb_uuid;
uint64_t recv_last_persisted_seqno;
uint64_t recv_current_seqno;
uint64_t recv_failover_vbuuid;
uint64_t recv_failover_seqno;
memcpy(&recv_format_type, last_body.data(), sizeof(uint8_t));
checkeq(format_type, recv_format_type, "Wrong format type in result");
memcpy(&recv_vb_id, last_body.data() + 1, sizeof(uint16_t));
checkeq(vb_id, ntohs(recv_vb_id), "Wrong vbucket id in result");
memcpy(&recv_vb_uuid, last_body.data() + 3, sizeof(uint64_t));
checkeq(vb_uuid, ntohll(recv_vb_uuid), "Wrong vbucket uuid in result");
memcpy(&recv_last_persisted_seqno, last_body.data() + 11, sizeof(uint64_t));
switch (bucket_type) {
case BucketType::EP:
// Should get the "real" persisted seqno:
checkeq(last_persisted_seqno,
ntohll(recv_last_persisted_seqno),
"Wrong persisted seqno in result (EP)");
break;
case BucketType::Ephemeral:
// For ephemeral, this should always be zero, as there is no
// persistence.
checkeq(uint64_t(0),
ntohll(recv_last_persisted_seqno),
"Wrong persisted seqno in result (Ephemeral)");
break;
}
memcpy(&recv_current_seqno, last_body.data() + 19, sizeof(uint64_t));
checkeq(current_seqno, ntohll(recv_current_seqno), "Wrong current seqno in result");
if (failover) {
memcpy(&recv_failover_vbuuid, last_body.data() + 27, sizeof(uint64_t));
checkeq(failover_vbuuid, ntohll(recv_failover_vbuuid),
"Wrong failover uuid in result");
memcpy(&recv_failover_seqno, last_body.data() + 35, sizeof(uint64_t));
checkeq(failover_seqno, ntohll(recv_failover_seqno),
"Wrong failover seqno in result");
}
}
static enum test_result test_replace_with_eviction(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET,"key", "somevalue", &i),
"Failed to set value.");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
evict_key(h, h1, "key");
int numBgFetched = get_int_stat(h, h1, "ep_bg_fetched");
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_REPLACE,"key", "somevalue1", &i),
"Failed to replace existing value.");
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
if (eviction_policy == "full_eviction") {
numBgFetched++;
}
checkeq(numBgFetched,
get_int_stat(h, h1, "ep_bg_fetched"),
"Bg fetched value didn't match");
h1->release(h, NULL, i);
check_key_value(h, h1, "key", "somevalue1", 10);
return SUCCESS;
}
static enum test_result test_wrong_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
ENGINE_STORE_OPERATION op) {
item *i = NULL;
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
uint64_t cas = 11;
if (op == OPERATION_ADD) {
// Add operation with cas != 0 doesn't make sense
cas = 0;
}
checkeq(ENGINE_NOT_MY_VBUCKET,
store(h, h1, NULL, op, "key", "somevalue", &i, cas, 1),
"Expected not_my_vbucket");
h1->release(h, NULL, i);
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
return SUCCESS;
}
static enum test_result test_pending_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
ENGINE_STORE_OPERATION op) {
const void *cookie = testHarness.create_cookie();
testHarness.set_ewouldblock_handling(cookie, false);
item *i = NULL;
check(set_vbucket_state(h, h1, 1, vbucket_state_pending),
"Failed to set vbucket state.");
check(verify_vbucket_state(h, h1, 1, vbucket_state_pending),
"Bucket state was not set to pending.");
uint64_t cas = 11;
if (op == OPERATION_ADD) {
// Add operation with cas != 0 doesn't make sense..
cas = 0;
}
checkeq(ENGINE_EWOULDBLOCK,
store(h, h1, cookie, op, "key", "somevalue", &i, cas, 1),
"Expected ewouldblock");
h1->release(h, NULL, i);
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_replica_vb_mutation(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
ENGINE_STORE_OPERATION op) {
item *i = NULL;
check(set_vbucket_state(h, h1, 1, vbucket_state_replica),
"Failed to set vbucket state.");
check(verify_vbucket_state(h, h1, 1, vbucket_state_replica),
"Bucket state was not set to replica.");
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
uint64_t cas = 11;
if (op == OPERATION_ADD) {
// performing add with a CAS != 0 doesn't make sense...
cas = 0;
}
checkeq(ENGINE_NOT_MY_VBUCKET,
store(h, h1, NULL, op, "key", "somevalue", &i, cas, 1),
"Expected not my vbucket");
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
h1->release(h, NULL, i);
return SUCCESS;
}
//
// ----------------------------------------------------------------------
// The actual tests are below.
// ----------------------------------------------------------------------
//
static int checkCurrItemsAfterShutdown(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
int numItems2Load, bool shutdownForce) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
std::vector<std::string> keys;
for (int index = 0; index < numItems2Load; ++index) {
std::stringstream s;
s << "keys_2_load-" << index;
std::string key(s.str());
keys.push_back(key);
}
checkeq(0, get_int_stat(h, h1, "ep_total_persisted"),
"Expected ep_total_persisted equals 0");
checkeq(0, get_int_stat(h, h1, "curr_items"),
"Expected curr_items equals 0");
// stop flusher before loading new items
protocol_binary_request_header *pkt = createPacket(PROTOCOL_BINARY_CMD_STOP_PERSISTENCE);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"CMD_STOP_PERSISTENCE failed!");
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS,
last_status.load(),
"Failed to stop persistence!");
cb_free(pkt);
std::vector<std::string>::iterator itr;
for (itr = keys.begin(); itr != keys.end(); ++itr) {
item *i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, itr->c_str(), "oracle", &i, 0, 0),
"Failed to store a value");
h1->release(h, NULL, i);
}
checkeq(0, get_int_stat(h, h1, "ep_total_persisted"),
"Incorrect ep_total_persisted, expected 0");
std::stringstream ss;
ss << "Incorrect curr_items, expected " << numItems2Load;
std::string errmsg(ss.str());
checkeq(numItems2Load, get_int_stat(h, h1, "curr_items"),
errmsg.c_str());
// resume flusher before shutdown + warmup
pkt = createPacket(PROTOCOL_BINARY_CMD_START_PERSISTENCE);
checkeq(ENGINE_SUCCESS, h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"CMD_START_PERSISTENCE failed!");
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Failed to start persistence!");
cb_free(pkt);
// shutdown engine force and restart
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, shutdownForce);
wait_for_warmup_complete(h, h1);
return get_int_stat(h, h1, "curr_items");
}
static enum test_result test_flush_shutdown_force(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
int numItems2load = 3000;
bool shutdownForce = true;
int currItems = checkCurrItemsAfterShutdown(h, h1, numItems2load, shutdownForce);
check (currItems <= numItems2load,
"Number of curr items should be <= 3000, unless previous "
"shutdown force had to wait for the flusher");
return SUCCESS;
}
static enum test_result test_flush_shutdown_noforce(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
int numItems2load = 3000;
bool shutdownForce = false;
int currItems = checkCurrItemsAfterShutdown(h, h1, numItems2load, shutdownForce);
check (currItems == numItems2load,
"Number of curr items should be equal to 3000, unless previous "
"shutdown did not wait for the flusher");
return SUCCESS;
}
static enum test_result test_flush_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *i = NULL;
// First try to delete something we know to not be there.
checkeq(ENGINE_KEY_ENOENT, del(h, h1, "key", 0, 0),
"Failed to fail initial delete.");
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i),
"Failed set.");
h1->release(h, NULL, i);
check_key_value(h, h1, "key", "somevalue", 9);
// Restart once to ensure written to disk.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
// Read value from disk.
check_key_value(h, h1, "key", "somevalue", 9);
// Flush
set_degraded_mode(h, h1, NULL, true);
checkeq(ENGINE_SUCCESS, h1->flush(h, NULL),
"Failed to flush");
set_degraded_mode(h, h1, NULL, false);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i),
"Failed post-flush set.");
h1->release(h, NULL, i);
check_key_value(h, h1, "key2", "somevalue", 9);
// Restart again, ensure written to disk.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key3", "somevalue", &i),
"Failed post-flush, post-restart set.");
h1->release(h, NULL, i);
check_key_value(h, h1, "key3", "somevalue", 9);
// Read value again, should not be there.
checkeq(ENGINE_KEY_ENOENT, verify_key(h, h1, "key"),
"Expected missing key");
return SUCCESS;
}
static enum test_result test_shutdown_snapshot_range(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
const int num_items = 100;
for (int j = 0; j < num_items; ++j) {
item *i = NULL;
std::stringstream ss;
ss << "key" << j;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, ss.str().c_str(), "data", &i),
"Failed to store a value");
h1->release(h, NULL, i);
}
wait_for_flusher_to_settle(h, h1);
int end = get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno");
/* change vb state to replica before restarting (as it happens in graceful
failover)*/
check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
"Failed set vbucket 0 to replica state.");
/* trigger persist vb state task */
check(set_param(h, h1, protocol_binary_engine_param_flush,
"vb_state_persist_run", "0"),
"Failed to trigger vb state persist");
/* restart the engine */
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
/* Check if snapshot range is persisted correctly */
checkeq(end, get_int_stat(h, h1, "vb_0:last_persisted_snap_start",
"vbucket-seqno"),
"Wrong snapshot start persisted");
checkeq(end, get_int_stat(h, h1, "vb_0:last_persisted_snap_end",
"vbucket-seqno"),
"Wrong snapshot end persisted");
return SUCCESS;
}
static enum test_result test_flush_multiv_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *i = NULL;
check(set_vbucket_state(h, h1, 2, vbucket_state_active),
"Failed to set vbucket state.");
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i),
"Failed set.");
h1->release(h, NULL, i);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key2", "somevalue", &i, 0, 2),
"Failed set in vb2.");
h1->release(h, NULL, i);
// Restart once to ensure written to disk.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
// Read value from disk.
check_key_value(h, h1, "key", "somevalue", 9);
// Flush
set_degraded_mode(h, h1, NULL, true);
checkeq(ENGINE_SUCCESS, h1->flush(h, NULL),
"Failed to flush");
set_degraded_mode(h, h1, NULL, false);
// Restart again, ensure written to disk.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
// Read value again, should not be there.
checkeq(ENGINE_KEY_ENOENT, verify_key(h, h1, "key"), "Expected missing key");
check(verify_vbucket_missing(h, h1, 2), "Bucket 2 came back.");
return SUCCESS;
}
static enum test_result test_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *i = NULL;
static const char val[] = "somevalue";
ENGINE_ERROR_CODE ret = store(h, h1, NULL, OPERATION_SET, "key", val, &i);
checkeq(ENGINE_SUCCESS, ret, "Failed set.");
h1->release(h, NULL, i);
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check_key_value(h, h1, "key", val, strlen(val));
return SUCCESS;
}
static enum test_result test_restart_session_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
const void* cookie = createTapConn(h, h1, "tap_client_thread");
testHarness.unlock_cookie(cookie);
testHarness.destroy_cookie(cookie);
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
cookie = createTapConn(h, h1, "tap_client_thread");
checkeq(ENGINE_SUCCESS, h1->get_stats(h, NULL, "tap", 3, add_stats),
"Failed to get stats.");
std::string val = vals["eq_tapq:tap_client_thread:backfill_completed"];
checkeq(0, strcmp(val.c_str(), "true"), "Don't expect the backfill upon restart");
testHarness.unlock_cookie(cookie);
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_specialKeys(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
ENGINE_ERROR_CODE ret;
// Simplified Chinese "Couchbase"
static const char key0[] = "沙发数据库";
static const char val0[] = "some Chinese value";
check((ret = store(h, h1, NULL, OPERATION_SET, key0, val0, &i)) == ENGINE_SUCCESS,
"Failed set Chinese key");
check_key_value(h, h1, key0, val0, strlen(val0));
h1->release(h, NULL, i);
// Traditional Chinese "Couchbase"
static const char key1[] = "沙發數據庫";
static const char val1[] = "some Traditional Chinese value";
check((ret = store(h, h1, NULL, OPERATION_SET, key1, val1, &i)) == ENGINE_SUCCESS,
"Failed set Traditional Chinese key");
h1->release(h, NULL, i);
// Korean "couch potato"
static const char key2[] = "쇼파감자";
static const char val2[] = "some Korean value";
check((ret = store(h, h1, NULL, OPERATION_SET, key2, val2, &i)) == ENGINE_SUCCESS,
"Failed set Korean key");
h1->release(h, NULL, i);
// Russian "couch potato"
static const char key3[] = "лодырь, лентяй";
static const char val3[] = "some Russian value";
check((ret = store(h, h1, NULL, OPERATION_SET, key3, val3, &i)) == ENGINE_SUCCESS,
"Failed set Russian key");
h1->release(h, NULL, i);
// Japanese "couch potato"
static const char key4[] = "カウチポテト";
static const char val4[] = "some Japanese value";
check((ret = store(h, h1, NULL, OPERATION_SET, key4, val4, &i)) == ENGINE_SUCCESS,
"Failed set Japanese key");
h1->release(h, NULL, i);
// Indian char key, and no idea what it is
static const char key5[] = "हरियानवी";
static const char val5[] = "some Indian value";
check((ret = store(h, h1, NULL, OPERATION_SET, key5, val5, &i)) == ENGINE_SUCCESS,
"Failed set Indian key");
h1->release(h, NULL, i);
// Portuguese translation "couch potato"
static const char key6[] = "sedentário";
static const char val6[] = "some Portuguese value";
check((ret = store(h, h1, NULL, OPERATION_SET, key6, val6, &i)) == ENGINE_SUCCESS,
"Failed set Portuguese key");
h1->release(h, NULL, i);
// Arabic translation "couch potato"
static const char key7[] = "الحافلةالبطاطة";
static const char val7[] = "some Arabic value";
check((ret = store(h, h1, NULL, OPERATION_SET, key7, val7, &i)) == ENGINE_SUCCESS,
"Failed set Arabic key");
h1->release(h, NULL, i);
if (isWarmupEnabled(h, h1)) {
// Check that after warmup the keys are still present.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check_key_value(h, h1, key0, val0, strlen(val0));
check_key_value(h, h1, key1, val1, strlen(val1));
check_key_value(h, h1, key2, val2, strlen(val2));
check_key_value(h, h1, key3, val3, strlen(val3));
check_key_value(h, h1, key4, val4, strlen(val4));
check_key_value(h, h1, key5, val5, strlen(val5));
check_key_value(h, h1, key6, val6, strlen(val6));
check_key_value(h, h1, key7, val7, strlen(val7));
}
return SUCCESS;
}
static enum test_result test_binKeys(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
ENGINE_ERROR_CODE ret;
// binary key with char values beyond 0x7F
static const char key0[] = "\xe0\xed\xf1\x6f\x7f\xf8\xfa";
static const char val0[] = "some value val8";
check((ret = store(h, h1, NULL, OPERATION_SET, key0, val0, &i)) == ENGINE_SUCCESS,
"Failed set binary key0");
check_key_value(h, h1, key0, val0, strlen(val0));
h1->release(h, NULL, i);
// binary keys with char values beyond 0x7F
static const char key1[] = "\xf1\xfd\xfe\xff\xf0\xf8\xef";
static const char val1[] = "some value val9";
check((ret = store(h, h1, NULL, OPERATION_SET, key1, val1, &i)) == ENGINE_SUCCESS,
"Failed set binary key1");
check_key_value(h, h1, key1, val1, strlen(val1));
h1->release(h, NULL, i);
// binary keys with special utf-8 BOM (Byte Order Mark) values 0xBB 0xBF 0xEF
static const char key2[] = "\xff\xfe\xbb\xbf\xef";
static const char val2[] = "some utf-8 bom value";
check((ret = store(h, h1, NULL, OPERATION_SET, key2, val2, &i)) == ENGINE_SUCCESS,
"Failed set binary utf-8 bom key");
check_key_value(h, h1, key2, val2, strlen(val2));
h1->release(h, NULL, i);
// binary keys with special utf-16BE BOM values "U+FEFF"
static const char key3[] = "U+\xfe\xff\xefU+\xff\xfe";
static const char val3[] = "some utf-16 bom value";
check((ret = store(h, h1, NULL, OPERATION_SET, key3, val3, &i)) == ENGINE_SUCCESS,
"Failed set binary utf-16 bom key");
check_key_value(h, h1, key3, val3, strlen(val3));
h1->release(h, NULL, i);
if (isWarmupEnabled(h, h1)) {
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check_key_value(h, h1, key0, val0, strlen(val0));
check_key_value(h, h1, key1, val1, strlen(val1));
check_key_value(h, h1, key2, val2, strlen(val2));
check_key_value(h, h1, key3, val3, strlen(val3));
}
return SUCCESS;
}
static enum test_result test_restart_bin_val(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
char binaryData[] = "abcdefg\0gfedcba";
cb_assert(sizeof(binaryData) != strlen(binaryData));
item *i = NULL;
checkeq(ENGINE_SUCCESS,
storeCasVb11(h, h1, NULL, OPERATION_SET, "key",
binaryData, sizeof(binaryData), 82758, &i, 0, 0),
"Failed set.");
h1->release(h, NULL, i);
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check_key_value(h, h1, "key", binaryData, sizeof(binaryData));
return SUCCESS;
}
static enum test_result test_wrong_vb_get(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
checkeq(ENGINE_NOT_MY_VBUCKET, verify_key(h, h1, "key", 1),
"Expected wrong bucket.");
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
return SUCCESS;
}
static enum test_result test_vb_get_pending(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_pending),
"Failed to set vbucket state.");
const void *cookie = testHarness.create_cookie();
testHarness.set_ewouldblock_handling(cookie, false);
item *i = NULL;
checkeq(ENGINE_EWOULDBLOCK,
get(h, h1, cookie, &i, "key", 1),
"Expected woodblock.");
h1->release(h, NULL, i);
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_vb_get_replica(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_replica),
"Failed to set vbucket state.");
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
checkeq(ENGINE_NOT_MY_VBUCKET,
verify_key(h, h1, "key", 1),
"Expected not my bucket.");
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
return SUCCESS;
}
static enum test_result test_wrong_vb_set(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_wrong_vb_mutation(h, h1, OPERATION_SET);
}
static enum test_result test_wrong_vb_cas(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_wrong_vb_mutation(h, h1, OPERATION_CAS);
}
static enum test_result test_wrong_vb_add(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_wrong_vb_mutation(h, h1, OPERATION_ADD);
}
static enum test_result test_wrong_vb_replace(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_wrong_vb_mutation(h, h1, OPERATION_REPLACE);
}
static enum test_result test_wrong_vb_del(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
checkeq(ENGINE_NOT_MY_VBUCKET, del(h, h1, "key", 0, 1),
"Expected wrong bucket.");
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
return SUCCESS;
}
/* Returns a string in the format "%Y-%m-%d %H:%M:%S" of the specified
* time point.
*/
std::string make_time_string(std::chrono::system_clock::time_point time_point) {
time_t tt = std::chrono::system_clock::to_time_t(time_point);
#ifdef _MSC_VER
// Windows' gmtime() is already thread-safe.
struct tm* split = gmtime(&tt);
#else
struct tm local_storage;
struct tm* split = gmtime_r(&tt, &local_storage);
#endif
char timeStr[20];
strftime(timeStr, 20, "%Y-%m-%d %H:%M:%S", split);
return timeStr;
}
static enum test_result test_expiry_pager_settings(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
cb_assert(!get_bool_stat(h, h1, "ep_exp_pager_enabled"));
checkeq(3600, get_int_stat(h, h1, "ep_exp_pager_stime"),
"Expiry pager sleep time not expected");
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_stime", "1");
checkeq(1, get_int_stat(h, h1, "ep_exp_pager_stime"),
"Expiry pager sleep time not updated");
cb_assert(!get_bool_stat(h, h1, "ep_exp_pager_enabled"));
sleep(1);
checkeq(0, get_int_stat(h, h1, "ep_num_expiry_pager_runs"),
"Expiry pager run count is not zero");
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_enabled", "true");
checkeq(1, get_int_stat(h, h1, "ep_exp_pager_stime"),
"Expiry pager sleep time not updated");
wait_for_stat_to_be_gte(h, h1, "ep_num_expiry_pager_runs", 1);
// Reload engine
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
cb_assert(!get_bool_stat(h, h1, "ep_exp_pager_enabled"));
// Enable expiry pager again
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_enabled", "true");
checkeq(get_int_stat(h, h1, "ep_exp_pager_initial_run_time"), -1,
"Task time should be disable upon warmup");
std::string err_msg;
// Update exp_pager_initial_run_time and ensure the update is successful
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_initial_run_time", "3");
std::string expected_time = "03:00";
std::string str = get_str_stat(h, h1, "ep_expiry_pager_task_time");
err_msg.assign("Updated time incorrect, expect: " +
expected_time + ", actual: " + str.substr(11, 5));
checkeq(0, str.substr(11, 5).compare(expected_time), err_msg.c_str());
// Update exp_pager_stime by 30 minutes and ensure that the update is successful
const std::chrono::minutes update_by{30};
std::string targetTaskTime1{make_time_string(std::chrono::system_clock::now() +
update_by)};
set_param(h, h1, protocol_binary_engine_param_flush, "exp_pager_stime",
std::to_string(update_by.count() * 60).c_str());
str = get_str_stat(h, h1, "ep_expiry_pager_task_time");
std::string targetTaskTime2{make_time_string(std::chrono::system_clock::now() +
update_by)};
// ep_expiry_pager_task_time should fall within the range of
// targetTaskTime1 and targetTaskTime2
err_msg.assign("Unexpected task time range, expect: " +
targetTaskTime1 + " <= " + str + " <= " + targetTaskTime2);
check(targetTaskTime1 <= str, err_msg.c_str());
check(str <= targetTaskTime2, err_msg.c_str());
return SUCCESS;
}
static enum test_result test_expiry_with_xattr(ENGINE_HANDLE* h,
ENGINE_HANDLE_V1* h1) {
const char* key = "test_expiry";
cb::xattr::Blob blob;
//Add a few XAttrs
blob.set(to_const_byte_buffer("user"),
to_const_byte_buffer("{\"author\":\"bubba\"}"));
blob.set(to_const_byte_buffer("_sync"),
to_const_byte_buffer("{\"cas\":\"0xdeadbeefcafefeed\"}"));
blob.set(to_const_byte_buffer("meta"),
to_const_byte_buffer("{\"content-type\":\"text\"}"));
auto xattr_value = blob.finalize();
//Now, append user data to the xattrs and store the data
std::string value_data("test_expiry_value");
std::vector<char> data;
std::copy(xattr_value.buf, xattr_value.buf + xattr_value.len,
std::back_inserter(data));
std::copy(value_data.c_str(), value_data.c_str() + value_data.length(),
std::back_inserter(data));
const void* cookie = testHarness.create_cookie();
item *itm = nullptr;
checkeq(ENGINE_SUCCESS,
storeCasVb11(h, h1, cookie, OPERATION_SET, key,
reinterpret_cast<char*>(data.data()),
data.size(), 9258, &itm, 0, 0, 10,
PROTOCOL_BINARY_DATATYPE_XATTR),
"Failed to store xattr document");
h1->release(h, nullptr, itm);
if (isPersistentBucket(h, h1)) {
wait_for_flusher_to_settle(h, h1);
}
testHarness.time_travel(11);
checkeq(true,
get_meta(h, h1, "test_expiry", true, GetMetaVersion::V2, cookie),
"Get meta command failed");
auto prev_revseqno = last_meta.revSeqno;
checkeq(static_cast<uint8_t>(PROTOCOL_BINARY_DATATYPE_XATTR),
last_datatype.load(), "Datatype is not XATTR");
checkeq(ENGINE_SUCCESS,
get(h, h1, cookie, &itm, key, 0,
DocStateFilter::AliveOrDeleted),
"Unable to get a deleted item");
checkeq(true,
get_meta(h, h1, "test_expiry", false, GetMetaVersion::V1, cookie),
"Get meta command failed");
checkeq(last_meta.revSeqno, prev_revseqno + 1,
"rev seqno must have incremented by 1");
/* Retrieve the item info and create a new blob out of the data */
item_info info;
checkeq(true, h1->get_item_info(h, cookie, itm, &info),
"Unable to retrieve item info");
cb::byte_buffer value_buf{static_cast<uint8_t*>(info.value[0].iov_base),
info.value[0].iov_len};
cb::xattr::Blob new_blob(value_buf);
/* Only system extended attributes need to be present at this point.
* Thus, check the blob length with the system size.
*/
const auto systemsize = new_blob.finalize().len;
checkeq(systemsize, new_blob.get_system_size(),
"The size of the blob doesn't match the size of system attributes");
const std::string& cas_str{"{\"cas\":\"0xdeadbeefcafefeed\"}"};
const std::string& sync_str = to_string(blob.get(to_const_byte_buffer("_sync")));
checkeq(cas_str, sync_str , "system xattr is invalid");
h1->release(h, nullptr, itm);
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_expiry(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
const char *key = "test_expiry";
const char *data = "some test data here.";
item *it = NULL;
ENGINE_ERROR_CODE rv;
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 2,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
item_info info;
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
uint64_t cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
testHarness.time_travel(5);
checkeq(ENGINE_KEY_ENOENT,
get(h, h1, NULL, &it, key, 0),
"Item didn't expire");
int expired_access = get_int_stat(h, h1, "ep_expired_access");
int expired_pager = get_int_stat(h, h1, "ep_expired_pager");
int active_expired = get_int_stat(h, h1, "vb_active_expired");
checkeq(0, expired_pager, "Expected zero expired item by pager");
checkeq(1, expired_access, "Expected an expired item on access");
checkeq(1, active_expired, "Expected an expired active item");
checkeq(ENGINE_SUCCESS, store(h, h1, NULL, OPERATION_SET, key, data, &it),
"Failed set.");
h1->release(h, NULL, it);
// When run under full eviction, the total item stats are set from the
// flusher. So we need to wait for it to finish before checking the
// total number of items.
wait_for_flusher_to_settle(h, h1);
std::stringstream ss;
ss << "curr_items stat should be still 1 after ";
ss << "overwriting the key that was expired, but not purged yet";
checkeq(1, get_int_stat(h, h1, "curr_items"), ss.str().c_str());
return SUCCESS;
}
static enum test_result test_expiry_loader(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
const char *key = "test_expiry_loader";
const char *data = "some test data here.";
item *it = NULL;
ENGINE_ERROR_CODE rv;
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 2,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
item_info info;
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
uint64_t cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
testHarness.time_travel(3);
checkeq(ENGINE_KEY_ENOENT,
get(h, h1, NULL, &it, key, 0),
"Item didn't expire");
// Restart the engine to ensure the above expired item is not loaded
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
cb_assert(0 == get_int_stat(h, h1, "ep_warmup_value_count", "warmup"));
return SUCCESS;
}
static enum test_result test_expiration_on_compaction(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (get_bool_stat(h, h1, "ep_exp_pager_enabled")) {
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_enabled", "false");
}
checkeq(1, get_int_stat(h, h1, "vb_0:persistence:num_visits",
"checkpoint"), "Cursor moved before item load");
for (int i = 0; i < 50; i++) {
item *itm = NULL;
std::stringstream ss;
ss << "key" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
"somevalue", &itm, 0, 0, 10,
PROTOCOL_BINARY_RAW_BYTES),
"Set failed.");
h1->release(h, NULL, itm);
}
wait_for_flusher_to_settle(h, h1);
checkeq(50, get_int_stat(h, h1, "curr_items"),
"Unexpected number of items on database");
check(1 < get_int_stat(h, h1, "vb_0:persistence:num_visits", "checkpoint"),
"Cursor not moved even after flusher runs");
testHarness.time_travel(15);
// Compaction on VBucket
compact_db(h, h1, 0, 0, 0, 0, 0);
wait_for_stat_to_be(h, h1, "ep_pending_compactions", 0);
checkeq(50, get_int_stat(h, h1, "ep_expired_compactor"),
"Unexpected expirations by compactor");
return SUCCESS;
}
static enum test_result test_expiration_on_warmup(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
set_param(h, h1, protocol_binary_engine_param_flush,
"exp_pager_enabled", "false");
int pager_runs = get_int_stat(h, h1, "ep_num_expiry_pager_runs");
const char *key = "KEY";
const char *data = "VALUE";
item *it = NULL;
ENGINE_ERROR_CODE rv;
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 10,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
item_info info;
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
uint64_t cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
wait_for_flusher_to_settle(h, h1);
checkeq(1, get_int_stat(h, h1, "curr_items"), "Failed store item");
testHarness.time_travel(15);
checkeq(pager_runs, get_int_stat(h, h1, "ep_num_expiry_pager_runs"),
"Expiry pager shouldn't have run during this time");
// Restart the engine to ensure the above item is expired
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check(get_bool_stat(h, h1, "ep_exp_pager_enabled"),
"Expiry pager should be enabled on warmup");
// Wait for the expiry pager to run and expire our item.
wait_for_stat_to_be_gte(h, h1, "ep_expired_pager", 1, nullptr, /*secs*/10);
// Note: previously we checked that curr_items was zero here (immediately
// after waiting for ep_expired_pager == 1), however we cannot assume that
// - items are actually expired asynchronously.
// See EPStore::deleteExpiredItem - for non-temporary, expired items we
// call processSoftDelete (soft-marking the item as deleted in the
// hashtable), and then call queueDirty to queue a deletion, and then
// increment the expired stat. Only when that delete is actually persisted
// and the deleted callback is invoked -
// PeristenceCallback::callback(int&) - is curr_items finally decremented.
// Therefore we need to wait for the flusher to settle (i.e. delete
// callback to be called) for the curr_items stat to be accurate.
wait_for_flusher_to_settle(h, h1);
checkeq(0, get_int_stat(h, h1, "curr_items"),
"The item should have been expired.");
return SUCCESS;
}
static enum test_result test_bug3454(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
const char *key = "test_expiry_duplicate_warmup";
const char *data = "some test data here.";
item *it = NULL;
ENGINE_ERROR_CODE rv;
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 5,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
item_info info;
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
uint64_t cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
wait_for_flusher_to_settle(h, h1);
// Advance the ep_engine time by 10 sec for the above item to be expired.
testHarness.time_travel(10);
checkeq(ENGINE_KEY_ENOENT,
get(h, h1, NULL, &it, key, 0),
"Item didn't expire");
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 0,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
cas = 0;
// Add a new item with the same key.
rv = h1->store(h, NULL, it, &cas, OPERATION_ADD, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Add failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
wait_for_flusher_to_settle(h, h1);
checkeq(ENGINE_SUCCESS,
get(h, h1, NULL, &it, key, 0),
"Item shouldn't expire");
h1->release(h, NULL, it);
// Restart the engine to ensure the above unexpired new item is loaded
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
cb_assert(1 == get_int_stat(h, h1, "ep_warmup_value_count", "warmup"));
cb_assert(0 == get_int_stat(h, h1, "ep_warmup_dups", "warmup"));
return SUCCESS;
}
static enum test_result test_bug3522(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
const char *key = "test_expiry_no_items_warmup";
const char *data = "some test data here.";
item *it = NULL;
ENGINE_ERROR_CODE rv;
rv = allocate(h, h1, NULL, &it, key, strlen(data), 0, 0,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
item_info info;
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, data, strlen(data));
uint64_t cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, data, strlen(data));
h1->release(h, NULL, it);
wait_for_flusher_to_settle(h, h1);
// Add a new item with the same key and 2 sec of expiration.
const char *new_data = "new data here.";
rv = allocate(h, h1, NULL, &it, key, strlen(new_data), 0, 2,
PROTOCOL_BINARY_RAW_BYTES, 0);
checkeq(ENGINE_SUCCESS, rv, "Allocation failed.");
if (!h1->get_item_info(h, NULL, it, &info)) {
abort();
}
memcpy(info.value[0].iov_base, new_data, strlen(new_data));
int pager_runs = get_int_stat(h, h1, "ep_num_expiry_pager_runs");
cas = 0;
rv = h1->store(h, NULL, it, &cas, OPERATION_SET, DocumentState::Alive);
checkeq(ENGINE_SUCCESS, rv, "Set failed.");
check_key_value(h, h1, key, new_data, strlen(new_data));
h1->release(h, NULL, it);
testHarness.time_travel(3);
wait_for_stat_change(h, h1, "ep_num_expiry_pager_runs", pager_runs);
wait_for_flusher_to_settle(h, h1);
// Restart the engine.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
// TODO: modify this for a better test case
cb_assert(0 == get_int_stat(h, h1, "ep_warmup_dups", "warmup"));
return SUCCESS;
}
static enum test_result test_get_replica_active_state(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
protocol_binary_request_header *pkt;
pkt = prepare_get_replica(h, h1, vbucket_state_active);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"Get Replica Failed");
checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
"Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
cb_free(pkt);
return SUCCESS;
}
static enum test_result test_get_replica_pending_state(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
protocol_binary_request_header *pkt;
const void *cookie = testHarness.create_cookie();
testHarness.set_ewouldblock_handling(cookie, false);
pkt = prepare_get_replica(h, h1, vbucket_state_pending);
checkeq(ENGINE_EWOULDBLOCK,
h1->unknown_command(h, cookie, pkt, add_response, testHarness.doc_namespace),
"Should have returned error for pending state");
testHarness.destroy_cookie(cookie);
cb_free(pkt);
return SUCCESS;
}
static enum test_result test_get_replica_dead_state(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
protocol_binary_request_header *pkt;
pkt = prepare_get_replica(h, h1, vbucket_state_dead);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"Get Replica Failed");
checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
"Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
cb_free(pkt);
return SUCCESS;
}
static enum test_result test_get_replica(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
protocol_binary_request_header *pkt;
pkt = prepare_get_replica(h, h1, vbucket_state_replica);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"Get Replica Failed");
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected PROTOCOL_BINARY_RESPONSE_SUCCESS response.");
checkeq(std::string("replicadata"), last_body,
"Should have returned identical value");
cb_free(pkt);
return SUCCESS;
}
static enum test_result test_get_replica_non_resident(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "value", &i, 0, 0),
"Store Failed");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
wait_for_stat_to_be(h, h1, "ep_total_persisted", 1);
evict_key(h, h1, "key", 0, "Ejected.");
check(set_vbucket_state(h, h1, 0, vbucket_state_replica),
"Failed to set vbucket to replica");
get_replica(h, h1, "key", 0);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected success");
return SUCCESS;
}
static enum test_result test_get_replica_invalid_key(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
protocol_binary_request_header *pkt;
bool makeinvalidkey = true;
pkt = prepare_get_replica(h, h1, vbucket_state_replica, makeinvalidkey);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"Get Replica Failed");
checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, last_status.load(),
"Expected PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET response.");
cb_free(pkt);
return SUCCESS;
}
static enum test_result test_vb_del_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
const void *cookie = testHarness.create_cookie();
testHarness.set_ewouldblock_handling(cookie, false);
check(set_vbucket_state(h, h1, 1, vbucket_state_pending),
"Failed to set vbucket state.");
checkeq(ENGINE_EWOULDBLOCK, del(h, h1, "key", 0, 1, cookie),
"Expected woodblock.");
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_vb_del_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_replica),
"Failed to set vbucket state.");
int numNotMyVBucket = get_int_stat(h, h1, "ep_num_not_my_vbuckets");
checkeq(ENGINE_NOT_MY_VBUCKET, del(h, h1, "key", 0, 1),
"Expected not my vbucket.");
wait_for_stat_change(h, h1, "ep_num_not_my_vbuckets", numNotMyVBucket);
return SUCCESS;
}
static enum test_result test_vbucket_get_miss(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return verify_vbucket_missing(h, h1, 1) ? SUCCESS : FAIL;
}
static enum test_result test_vbucket_get(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return verify_vbucket_state(h, h1, 0, vbucket_state_active) ? SUCCESS : FAIL;
}
static enum test_result test_vbucket_create(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!verify_vbucket_missing(h, h1, 1)) {
fprintf(stderr, "vbucket wasn't missing.\n");
return FAIL;
}
if (!set_vbucket_state(h, h1, 1, vbucket_state_active)) {
fprintf(stderr, "set state failed.\n");
return FAIL;
}
return verify_vbucket_state(h, h1, 1, vbucket_state_active) ? SUCCESS : FAIL;
}
static enum test_result test_takeover_stats_race_with_vb_create_TAP(
ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state information");
checkeq(0,
get_int_stat(h, h1, "on_disk_deletes", "tap-vbtakeover 1"),
"Invalid number of on-disk deletes");
return SUCCESS;
}
static enum test_result test_takeover_stats_race_with_vb_create_DCP(
ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state information");
checkeq(0,
get_int_stat(h, h1, "on_disk_deletes", "dcp-vbtakeover 1"),
"Invalid number of on-disk deletes");
return SUCCESS;
}
static enum test_result test_takeover_stats_num_persisted_deletes(
ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
/* set an item */
std::string key("key");
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.c_str(), "data", nullptr),
"Failed to store an item");
/* delete the item */
checkeq(ENGINE_SUCCESS, del(h, h1, key.c_str(), 0, 0),
"Failed to delete the item");
/* wait for persistence */
wait_for_flusher_to_settle(h, h1);
/* check if persisted deletes stats is got correctly */
checkeq(1,
get_int_stat(h, h1, "on_disk_deletes", "dcp-vbtakeover 0"),
"Invalid number of on-disk deletes");
return SUCCESS;
}
static enum test_result test_vbucket_compact(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const char *key = "Carss";
const char *value = "pollute";
if (!verify_vbucket_missing(h, h1, 0)) {
fprintf(stderr, "vbucket wasn't missing.\n");
return FAIL;
}
if (!set_vbucket_state(h, h1, 0, vbucket_state_active)) {
fprintf(stderr, "set state failed.\n");
return FAIL;
}
check(verify_vbucket_state(h, h1, 0, vbucket_state_active),
"VBucket state not active");
// Set two keys - one to be expired and other to remain...
item *itm = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key, value, &itm),
"Failed set.");
h1->release(h, NULL, itm);
check_key_value(h, h1, key, value, strlen(value));
// Set a non-expiring key...
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "trees", "cleanse", &itm),
"Failed set.");
h1->release(h, NULL, itm);
check_key_value(h, h1, "trees", "cleanse", strlen("cleanse"));
checkeq(ENGINE_SUCCESS, touch(h, h1, key, 0, 11), "touch Carss");
testHarness.time_travel(12);
wait_for_flusher_to_settle(h, h1);
// Store a dummy item since we do not purge the item with highest seqno
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "dummykey", "dummyvalue", &itm,
0, 0, 0),
"Error setting.");
h1->release(h, NULL, itm);
wait_for_flusher_to_settle(h, h1);
checkeq(0, get_int_stat(h, h1, "vb_0:purge_seqno", "vbucket-seqno"),
"purge_seqno not found to be zero before compaction");
// Compaction on VBucket
compact_db(h, h1, 0, 0, 2, 3, 1);
wait_for_stat_to_be(h, h1, "ep_pending_compactions", 0);
// the key tree and its value should be intact...
checkeq(ENGINE_SUCCESS, verify_key(h, h1, "trees"),
"key trees should be found.");
// the key Carrs should have disappeared...
ENGINE_ERROR_CODE val = verify_key(h, h1, "Carss");
checkeq(ENGINE_KEY_ENOENT, val, "Key Carss has not expired.");
checkeq(4, get_int_stat(h, h1, "vb_0:purge_seqno", "vbucket-seqno"),
"purge_seqno didn't match expected value");
return SUCCESS;
}
static enum test_result test_compaction_config(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
checkeq(10000,
get_int_stat(h, h1, "ep_compaction_write_queue_cap"),
"Expected compaction queue cap to be 10000");
set_param(h, h1, protocol_binary_engine_param_flush,
"compaction_write_queue_cap", "100000");
checkeq(100000, get_int_stat(h, h1, "ep_compaction_write_queue_cap"),
"Expected compaction queue cap to be 100000");
return SUCCESS;
}
struct comp_thread_ctx {
ENGINE_HANDLE *h;
ENGINE_HANDLE_V1 *h1;
uint16_t vbid;
uint16_t db_file_id;
};
extern "C" {
static void compaction_thread(void *arg) {
struct comp_thread_ctx *ctx = static_cast<comp_thread_ctx *>(arg);
compact_db(ctx->h, ctx->h1, ctx->vbid, ctx->db_file_id, 0, 0, 0);
}
}
static enum test_result test_multiple_vb_compactions(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
for (uint16_t i = 0; i < 4; ++i) {
if (!set_vbucket_state(h, h1, i, vbucket_state_active)) {
fprintf(stderr, "set state failed for vbucket %d.\n", i);
return FAIL;
}
check(verify_vbucket_state(h, h1, i, vbucket_state_active),
"VBucket state not active");
}
std::vector<std::string> keys;
for (int j = 0; j < 20000; ++j) {
std::stringstream ss;
ss << "key" << j;
std::string key(ss.str());
keys.push_back(key);
}
int count = 0;
std::vector<std::string>::iterator it;
for (it = keys.begin(); it != keys.end(); ++it) {
uint16_t vbid = count % 4;
item *i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(), &i, 0, vbid),
"Failed to store a value");
h1->release(h, NULL, i);
++count;
}
// Compact multiple vbuckets.
const int n_threads = 4;
cb_thread_t threads[n_threads];
struct comp_thread_ctx ctx[n_threads];
const int num_shards = get_int_stat(h, h1, "ep_workload:num_shards",
"workload");
for (int i = 0; i < n_threads; i++) {
ctx[i].h = h;
ctx[i].h1 = h1;
ctx[i].vbid = static_cast<uint16_t>(i);
ctx[i].db_file_id = ctx[i].vbid % num_shards;
int r = cb_create_thread(&threads[i], compaction_thread, &ctx[i], 0);
cb_assert(r == 0);
}
for (int i = 0; i < n_threads; i++) {
int r = cb_join_thread(threads[i]);
cb_assert(r == 0);
}
wait_for_stat_to_be(h, h1, "ep_pending_compactions", 0);
return SUCCESS;
}
static enum test_result
test_multi_vb_compactions_with_workload(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
for (uint16_t i = 0; i < 4; ++i) {
if (!set_vbucket_state(h, h1, i, vbucket_state_active)) {
fprintf(stderr, "set state failed for vbucket %d.\n", i);
return FAIL;
}
check(verify_vbucket_state(h, h1, i, vbucket_state_active),
"VBucket state not active");
}
std::vector<std::string> keys;
for (int j = 0; j < 10000; ++j) {
std::stringstream ss;
ss << "key" << j;
std::string key(ss.str());
keys.push_back(key);
}
int count = 0;
std::vector<std::string>::iterator it;
for (it = keys.begin(); it != keys.end(); ++it) {
uint16_t vbid = count % 4;
item *i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(),
&i, 0, vbid),
"Failed to store a value");
h1->release(h, NULL, i);
++count;
}
wait_for_flusher_to_settle(h, h1);
for (int i = 0; i < 2; ++i) {
count = 0;
for (it = keys.begin(); it != keys.end(); ++it) {
uint16_t vbid = count % 4;
item *i = NULL;
checkeq(ENGINE_SUCCESS,
get(h, h1, NULL, &i, it->c_str(), vbid),
"Unable to get stored item");
h1->release(h, NULL, i);
++count;
}
}
wait_for_stat_to_be(h, h1, "ep_workload_pattern", std::string{"read_heavy"});
// Compact multiple vbuckets.
const int n_threads = 4;
cb_thread_t threads[n_threads];
struct comp_thread_ctx ctx[n_threads];
for (int i = 0; i < n_threads; i++) {
ctx[i].h = h;
ctx[i].h1 = h1;
ctx[i].vbid = static_cast<uint16_t>(i);
int r = cb_create_thread(&threads[i], compaction_thread, &ctx[i], 0);
cb_assert(r == 0);
}
for (int i = 0; i < n_threads; i++) {
int r = cb_join_thread(threads[i]);
cb_assert(r == 0);
}
wait_for_stat_to_be(h, h1, "ep_pending_compactions", 0);
return SUCCESS;
}
static enum test_result vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
const char* value = NULL) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
vbucketDelete(h, h1, 2, value);
checkeq(PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET,
last_status.load(),
"Expected failure deleting non-existent bucket.");
check(set_vbucket_state(h, h1, 1, vbucket_state_dead),
"Failed set set vbucket 1 state.");
vbucketDelete(h, h1, 1, value);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected failure deleting non-existent bucket.");
check(verify_vbucket_missing(h, h1, 1),
"vbucket 0 was not missing after deleting it.");
return SUCCESS;
}
static enum test_result test_vbucket_destroy_stats(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
int cacheSize = get_int_stat(h, h1, "ep_total_cache_size");
int overhead = get_int_stat(h, h1, "ep_overhead");
int nonResident = get_int_stat(h, h1, "ep_num_non_resident");
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
std::vector<std::string> keys;
for (int j = 0; j < 2000; ++j) {
std::stringstream ss;
ss << "key" << j;
std::string key(ss.str());
keys.push_back(key);
}
int itemsRemoved = get_int_stat(h, h1, "ep_items_rm_from_checkpoints");
std::vector<std::string>::iterator it;
for (it = keys.begin(); it != keys.end(); ++it) {
item *i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, it->c_str(), it->c_str(),
&i, 0, 1),
"Failed to store a value");
h1->release(h, NULL, i);
}
wait_for_flusher_to_settle(h, h1);
testHarness.time_travel(65);
wait_for_stat_change(h, h1, "ep_items_rm_from_checkpoints", itemsRemoved);
check(set_vbucket_state(h, h1, 1, vbucket_state_dead),
"Failed set set vbucket 1 state.");
int vbucketDel = get_int_stat(h, h1, "ep_vbucket_del");
vbucketDelete(h, h1, 1);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS,
last_status.load(),
"Expected failure deleting non-existent bucket.");
check(verify_vbucket_missing(h, h1, 1),
"vbucket 1 was not missing after deleting it.");
wait_for_stat_change(h, h1, "ep_vbucket_del", vbucketDel);
wait_for_stat_to_be(h, h1, "ep_total_cache_size", cacheSize);
wait_for_stat_to_be(h, h1, "ep_overhead", overhead);
wait_for_stat_to_be(h, h1, "ep_num_non_resident", nonResident);
return SUCCESS;
}
static enum test_result vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1,
const char* value = NULL) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
// Store a value so the restart will try to resurrect it.
item *i = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i, 0, 1),
"Failed to set a value");
check_key_value(h, h1, "key", "somevalue", 9, 1);
h1->release(h, NULL, i);
// Reload to get a flush forced.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
check(verify_vbucket_state(h, h1, 1, vbucket_state_active),
"Bucket state was what it was initially, after restart.");
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
check_key_value(h, h1, "key", "somevalue", 9, 1);
check(set_vbucket_state(h, h1, 1, vbucket_state_dead),
"Failed set set vbucket 1 state.");
vbucketDelete(h, h1, 1, value);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected failure deleting non-existent bucket.");
check(verify_vbucket_missing(h, h1, 1),
"vbucket 1 was not missing after deleting it.");
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
if (verify_vbucket_state(h, h1, 1, vbucket_state_pending, true)) {
std::cerr << "Bucket came up in pending state after delete." << std::endl;
abort();
}
check(verify_vbucket_missing(h, h1, 1),
"vbucket 1 was not missing after restart.");
return SUCCESS;
}
static enum test_result test_async_vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return vbucket_destroy(h, h1);
}
static enum test_result test_sync_vbucket_destroy(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return vbucket_destroy(h, h1, "async=0");
}
static enum test_result test_async_vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return vbucket_destroy_restart(h, h1);
}
static enum test_result test_sync_vbucket_destroy_restart(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return vbucket_destroy_restart(h, h1, "async=0");
}
static enum test_result test_vb_set_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_pending_vb_mutation(h, h1, OPERATION_SET);
}
static enum test_result test_vb_add_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_pending_vb_mutation(h, h1, OPERATION_ADD);
}
static enum test_result test_vb_cas_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_pending_vb_mutation(h, h1, OPERATION_CAS);
}
static enum test_result test_vb_set_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_replica_vb_mutation(h, h1, OPERATION_SET);
}
static enum test_result test_vb_replace_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_replica_vb_mutation(h, h1, OPERATION_REPLACE);
}
static enum test_result test_vb_replace_pending(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_pending_vb_mutation(h, h1, OPERATION_REPLACE);
}
static enum test_result test_vb_add_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_replica_vb_mutation(h, h1, OPERATION_ADD);
}
static enum test_result test_vb_cas_replica(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
return test_replica_vb_mutation(h, h1, OPERATION_CAS);
}
static enum test_result test_stats_seqno(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
int num_keys = 100;
for (int ii = 0; ii < num_keys; ++ii) {
std::stringstream ss;
ss << "key" << ii;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
"value", NULL, 0, 0),
"Failed to store an item.");
}
wait_for_flusher_to_settle(h, h1);
checkeq(100, get_int_stat(h, h1, "vb_0:high_seqno", "vbucket-seqno"),
"Invalid seqno");
if (isPersistentBucket(h, h1)) {
checkeq(100,
get_int_stat(h, h1, "vb_0:last_persisted_seqno", "vbucket-seqno"),
"Unexpected last_persisted_seqno");
}
checkeq(0, get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno"),
"Invalid seqno");
checkeq(0, get_int_stat(h, h1, "vb_1:high_seqno", "vbucket-seqno 1"),
"Invalid seqno");
if (isPersistentBucket(h, h1)) {
checkeq(0,
get_int_stat(h, h1, "vb_1:last_persisted_seqno", "vbucket-seqno 1"),
"Invalid last_persisted_seqno");
}
uint64_t vb_uuid = get_ull_stat(h, h1, "vb_1:0:id", "failovers");
auto seqno_stats = get_all_stats(h, h1, "vbucket-seqno 1");
checkeq(vb_uuid, uint64_t(std::stoull(seqno_stats.at("vb_1:uuid"))),
"Invalid uuid");
checkeq(size_t(7), seqno_stats.size(), "Expected seven stats");
// Check invalid vbucket
checkeq(ENGINE_NOT_MY_VBUCKET,
h1->get_stats(h, NULL, "vbucket-seqno 2", 15, add_stats),
"Expected not my vbucket");
// Check bad vbucket parameter (not numeric)
checkeq(ENGINE_EINVAL,
h1->get_stats(h, NULL, "vbucket-seqno tt2", 17, add_stats),
"Expected invalid");
// Check extra spaces at the end
checkeq(ENGINE_EINVAL,
h1->get_stats(h, NULL, "vbucket-seqno ", 17, add_stats),
"Expected invalid");
return SUCCESS;
}
static enum test_result test_stats_diskinfo(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active),
"Failed to set vbucket state.");
int num_keys = 100;
for (int ii = 0; ii < num_keys; ++ii) {
std::stringstream ss;
ss << "key" << ii;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, ss.str().c_str(),
"value", NULL, 0, 1),
"Failed to store an item.");
}
wait_for_flusher_to_settle(h, h1);
size_t file_size = get_int_stat(h, h1, "ep_db_file_size", "diskinfo");
size_t data_size = get_int_stat(h, h1, "ep_db_data_size", "diskinfo");
check(file_size > 0, "DB file size should be greater than 0");
check(data_size > 0, "DB data size should be greater than 0");
check(file_size >= data_size, "DB file size should be >= DB data size");
check(get_int_stat(h, h1, "vb_1:data_size", "diskinfo detail") > 0,
"VB 1 data size should be greater than 0");
checkeq(ENGINE_EINVAL,
h1->get_stats(h, NULL, "diskinfo ", 9, add_stats),
"Expected invalid");
checkeq(ENGINE_EINVAL,
h1->get_stats(h, NULL, "diskinfo detai", 14, add_stats),
"Expected invalid");
checkeq(ENGINE_EINVAL,
h1->get_stats(h, NULL, "diskinfo detaillll", 18, add_stats),
"Expected invalid");
return SUCCESS;
}
static enum test_result test_uuid_stats(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1)
{
vals.clear();
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, "uuid", 4, add_stats),
"Failed to get stats.");
check(vals["uuid"] == "foobar", "Incorrect uuid");
return SUCCESS;
}
static enum test_result test_item_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "somevalue", &i, 0, 0),
"Failed set.");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key", "somevalueX", &i, 0, 0),
"Failed set.");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key1", "somevalueY", &i, 0, 0),
"Failed set.");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
check_key_value(h, h1, "key", "somevalueX", 10);
check_key_value(h, h1, "key1", "somevalueY", 10);
checkeq(ENGINE_SUCCESS, del(h, h1, "key1", 0, 0),
"Failed remove with value.");
wait_for_flusher_to_settle(h, h1);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, "key1", "someothervalue", &i, 0, 0),
"Failed set.");
h1->release(h, NULL, i);
wait_for_flusher_to_settle(h, h1);
check_key_value(h, h1, "key1", "someothervalue", 14);
checkeq(3,
get_int_stat(h, h1, "vb_active_ops_create"),
"Expected 3 creations");
checkeq(1,
get_int_stat(h, h1, "vb_active_ops_update"),
"Expected 1 updation");
checkeq(1,
get_int_stat(h, h1, "vb_active_ops_delete"),
"Expected 1 deletion");
return SUCCESS;
}
static enum test_result test_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
vals.clear();
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
check(vals.size() > 10, "Kind of expected more stats than that.");
check(vals.find("ep_version") != vals.end(), "Found no ep_version.");
return SUCCESS;
}
static enum test_result test_mem_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
char value[2048];
memset(value, 'b', sizeof(value));
strcpy(value + sizeof(value) - 4, "\r\n");
int itemsRemoved = get_int_stat(h, h1, "ep_items_rm_from_checkpoints");
wait_for_persisted_value(h, h1, "key", value);
testHarness.time_travel(65);
if (isPersistentBucket(h, h1)) {
wait_for_stat_change(h, h1, "ep_items_rm_from_checkpoints", itemsRemoved);
}
int mem_used = get_int_stat(h, h1, "mem_used");
int cache_size = get_int_stat(h, h1, "ep_total_cache_size");
int overhead = get_int_stat(h, h1, "ep_overhead");
int value_size = get_int_stat(h, h1, "ep_value_size");
check((mem_used - overhead) > cache_size,
"ep_kv_size should be greater than the hashtable cache size due to the checkpoint overhead");
if (isPersistentBucket(h, h1)) {
evict_key(h, h1, "key", 0, "Ejected.");
check(get_int_stat(h, h1, "ep_total_cache_size") <= cache_size,
"Evict a value shouldn't increase the total cache size");
check(get_int_stat(h, h1, "mem_used") < mem_used,
"Expected mem_used to decrease when an item is evicted");
check_key_value(h, h1, "key", value, strlen(value), 0); // Load an item from disk again.
check(get_int_stat(h, h1, "mem_used") >= mem_used,
"Expected mem_used to remain the same after an item is loaded from disk");
check(get_int_stat(h, h1, "ep_value_size") == value_size,
"Expected ep_value_size to remain the same after item is loaded from disk");
}
return SUCCESS;
}
static enum test_result test_io_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
int exp_read_bytes = 4, exp_write_bytes;
std::string backend = get_str_stat(h, h1, "ep_backend");
if (backend == "forestdb") {
exp_write_bytes = 35; /* TBD: Do not hard code the value */
} else if (backend == "couchdb") {
exp_write_bytes = 22; /* TBD: Do not hard code the value */
} else {
return SKIPPED;
}
h1->reset_stats(h, NULL);
checkeq(0, get_int_stat(h, h1, "rw_0:io_num_read", "kvstore"),
"Expected reset stats to set io_num_read to zero");
checkeq(0, get_int_stat(h, h1, "rw_0:io_num_write", "kvstore"),
"Expected reset stats to set io_num_write to zero");
checkeq(0, get_int_stat(h, h1, "rw_0:io_read_bytes", "kvstore"),
"Expected reset stats to set io_read_bytes to zero");
checkeq(0, get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore"),
"Expected reset stats to set io_write_bytes to zero");
wait_for_persisted_value(h, h1, "a", "b\r\n");
checkeq(0, get_int_stat(h, h1, "rw_0:io_num_read", "kvstore"),
"Expected storing one value to not change the read counter");
checkeq(0, get_int_stat(h, h1, "rw_0:io_read_bytes", "kvstore"),
"Expected storing one value to not change the read bytes");
checkeq(1, get_int_stat(h, h1, "rw_0:io_num_write", "kvstore"),
"Expected storing the key to update the write counter");
checkeq(exp_write_bytes,
get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore"),
"Expected storing the key to update the write bytes");
evict_key(h, h1, "a", 0, "Ejected.");
check_key_value(h, h1, "a", "b\r\n", 3, 0);
std::stringstream numReadStatStr;
std::stringstream readBytesStatStr;
if (backend == "couchdb") {
numReadStatStr << "ro_" << 0 << ":io_num_read";
readBytesStatStr << "ro_" << 0 << ":io_read_bytes";
} else if (backend == "forestdb") {
numReadStatStr << "rw_" << 0 << ":io_num_read";
readBytesStatStr << "rw_" << 0 << ":io_read_bytes";
} else {
cb_assert(false);
}
checkeq(1, get_int_stat(h, h1, numReadStatStr.str().c_str(), "kvstore"),
"Expected reading the value back in to update the read counter");
checkeq(exp_read_bytes,
get_int_stat(h, h1, readBytesStatStr.str().c_str(), "kvstore"),
"Expected reading the value back in to update the read bytes");
checkeq(1, get_int_stat(h, h1, "rw_0:io_num_write", "kvstore"),
"Expected reading the value back in to not update the write counter");
checkeq(exp_write_bytes,
get_int_stat(h, h1, "rw_0:io_write_bytes", "kvstore"),
"Expected reading the value back in to not update the write bytes");
return SUCCESS;
}
static enum test_result test_vb_file_stats(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
wait_for_flusher_to_settle(h, h1);
wait_for_stat_change(h, h1, "ep_db_data_size", 0);
int old_data_size = get_int_stat(h, h1, "ep_db_data_size");
int old_file_size = get_int_stat(h, h1, "ep_db_file_size");
check(old_file_size != 0, "Expected a non-zero value for ep_db_file_size");
// Write a value and test ...
wait_for_persisted_value(h, h1, "a", "b\r\n");
check(get_int_stat(h, h1, "ep_db_data_size") > old_data_size,
"Expected the DB data size to increase");
check(get_int_stat(h, h1, "ep_db_file_size") > old_file_size,
"Expected the DB file size to increase");
check(get_int_stat(h, h1, "vb_0:db_data_size", "vbucket-details 0") > 0,
"Expected the vbucket DB data size to non-zero");
check(get_int_stat(h, h1, "vb_0:db_file_size", "vbucket-details 0") > 0,
"Expected the vbucket DB file size to non-zero");
return SUCCESS;
}
static enum test_result test_vb_file_stats_after_warmup(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *it = NULL;
for (int i = 0; i < 100; ++i) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.str().c_str(), "somevalue", &it),
"Error setting.");
h1->release(h, NULL, it);
}
wait_for_flusher_to_settle(h, h1);
int fileSize = get_int_stat(h, h1, "vb_0:db_file_size", "vbucket-details 0");
int spaceUsed = get_int_stat(h, h1, "vb_0:db_data_size", "vbucket-details 0");
// Restart the engine.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
int newFileSize = get_int_stat(h, h1, "vb_0:db_file_size", "vbucket-details 0");
int newSpaceUsed = get_int_stat(h, h1, "vb_0:db_data_size", "vbucket-details 0");
check((float)newFileSize >= 0.9 * fileSize, "Unexpected fileSize for vbucket");
check((float)newSpaceUsed >= 0.9 * spaceUsed, "Unexpected spaceUsed for vbucket");
return SUCCESS;
}
static enum test_result test_bg_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
h1->reset_stats(h, NULL);
wait_for_persisted_value(h, h1, "a", "b\r\n");
evict_key(h, h1, "a", 0, "Ejected.");
testHarness.time_travel(43);
check_key_value(h, h1, "a", "b\r\n", 3, 0);
auto stats = get_all_stats(h, h1);
checkeq(1, std::stoi(stats.at("ep_bg_num_samples")),
"Expected one sample");
const char* bg_keys[] = { "ep_bg_min_wait",
"ep_bg_max_wait",
"ep_bg_wait_avg",
"ep_bg_min_load",
"ep_bg_max_load",
"ep_bg_load_avg"};
for (const auto* key : bg_keys) {
check(stats.find(key) != stats.end(),
(std::string("Found no ") + key).c_str());
}
evict_key(h, h1, "a", 0, "Ejected.");
check_key_value(h, h1, "a", "b\r\n", 3, 0);
check(get_int_stat(h, h1, "ep_bg_num_samples") == 2,
"Expected one sample");
h1->reset_stats(h, NULL);
checkeq(0, get_int_stat(h, h1, "ep_bg_fetched"),
"ep_bg_fetched is not reset to 0");
return SUCCESS;
}
static enum test_result test_bg_meta_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *itm = NULL;
h1->reset_stats(h, NULL);
wait_for_persisted_value(h, h1, "k1", "v1");
wait_for_persisted_value(h, h1, "k2", "v2");
evict_key(h, h1, "k1", 0, "Ejected.");
checkeq(ENGINE_SUCCESS,
del(h, h1, "k2", 0, 0), "Failed remove with value.");
wait_for_stat_to_be(h, h1, "curr_items", 1);
checkeq(0, get_int_stat(h, h1, "ep_bg_fetched"), "Expected bg_fetched to be 0");
checkeq(0, get_int_stat(h, h1, "ep_bg_meta_fetched"), "Expected bg_meta_fetched to be 0");
check(get_meta(h, h1, "k2"), "Get meta failed");
checkeq(0, get_int_stat(h, h1, "ep_bg_fetched"), "Expected bg_fetched to be 0");
checkeq(1, get_int_stat(h, h1, "ep_bg_meta_fetched"), "Expected bg_meta_fetched to be 1");
checkeq(ENGINE_SUCCESS, get(h, h1, NULL, &itm, "k1", 0), "Missing key");
checkeq(1, get_int_stat(h, h1, "ep_bg_fetched"), "Expected bg_fetched to be 1");
checkeq(1, get_int_stat(h, h1, "ep_bg_meta_fetched"), "Expected bg_meta_fetched to be 1");
h1->release(h, NULL, itm);
// store new key with some random metadata
const size_t keylen = strlen("k3");
ItemMetaData itemMeta;
itemMeta.revSeqno = 10;
itemMeta.cas = 0xdeadbeef;
itemMeta.exptime = 0;
itemMeta.flags = 0xdeadbeef;
add_with_meta(h, h1, "k3", keylen, NULL, 0, 0, &itemMeta);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(), "Set meta failed");
check(get_meta(h, h1, "k2"), "Get meta failed");
checkeq(1, get_int_stat(h, h1, "ep_bg_fetched"), "Expected bg_fetched to be 1");
checkeq(1, get_int_stat(h, h1, "ep_bg_meta_fetched"),
"Expected bg_meta_fetched to remain at 1");
return SUCCESS;
}
static enum test_result test_key_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
item *i = NULL;
check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed set vbucket 1 state.");
// set (k1,v1) in vbucket 0
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET,"k1", "v1", &i, 0, 0),
"Failed to store an item.");
h1->release(h, NULL, i);
// set (k2,v2) in vbucket 1
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET,"k2", "v2", &i, 0, 1),
"Failed to store an item.");
h1->release(h, NULL, i);
const void *cookie = testHarness.create_cookie();
// stat for key "k1" and vbucket "0"
const char *statkey1 = "key k1 0";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey1, strlen(statkey1), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
// stat for key "k2" and vbucket "1"
const char *statkey2 = "key k2 1";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey2, strlen(statkey2), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_vkey_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed set vbucket 1 state.");
check(set_vbucket_state(h, h1, 2, vbucket_state_active), "Failed set vbucket 2 state.");
check(set_vbucket_state(h, h1, 3, vbucket_state_active), "Failed set vbucket 3 state.");
check(set_vbucket_state(h, h1, 4, vbucket_state_active), "Failed set vbucket 4 state.");
wait_for_persisted_value(h, h1, "k1", "v1");
wait_for_persisted_value(h, h1, "k2", "v2", 1);
wait_for_persisted_value(h, h1, "k3", "v3", 2);
wait_for_persisted_value(h, h1, "k4", "v4", 3);
wait_for_persisted_value(h, h1, "k5", "v5", 4);
check(set_vbucket_state(h, h1, 2, vbucket_state_replica), "Failed to set VB2 state.");
check(set_vbucket_state(h, h1, 3, vbucket_state_pending), "Failed to set VB3 state.");
check(set_vbucket_state(h, h1, 4, vbucket_state_dead), "Failed to set VB4 state.");
const void *cookie = testHarness.create_cookie();
// stat for key "k1" and vbucket "0"
const char *statkey1 = "vkey k1 0";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey1, strlen(statkey1), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
check(vals.find("key_valid") != vals.end(), "Found no key_valid");
// stat for key "k2" and vbucket "1"
const char *statkey2 = "vkey k2 1";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey2, strlen(statkey2), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
check(vals.find("key_valid") != vals.end(), "Found no key_valid");
// stat for key "k3" and vbucket "2"
const char *statkey3 = "vkey k3 2";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey3, strlen(statkey3), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
check(vals.find("key_valid") != vals.end(), "Found no key_valid");
// stat for key "k4" and vbucket "3"
const char *statkey4 = "vkey k4 3";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey4, strlen(statkey4), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
check(vals.find("key_valid") != vals.end(), "Found no key_valid");
// stat for key "k5" and vbucket "4"
const char *statkey5 = "vkey k5 4";
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, cookie, statkey5, strlen(statkey5), add_stats),
"Failed to get stats.");
check(vals.find("key_is_dirty") != vals.end(), "Found no key_is_dirty");
check(vals.find("key_exptime") != vals.end(), "Found no key_exptime");
check(vals.find("key_flags") != vals.end(), "Found no key_flags");
check(vals.find("key_cas") != vals.end(), "Found no key_cas");
check(vals.find("key_vb_state") != vals.end(), "Found no key_vb_state");
check(vals.find("key_valid") != vals.end(), "Found no key_valid");
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_warmup_conf(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
checkeq(100, get_int_stat(h, h1, "ep_warmup_min_items_threshold"),
"Incorrect initial warmup min items threshold.");
checkeq(100, get_int_stat(h, h1, "ep_warmup_min_memory_threshold"),
"Incorrect initial warmup min memory threshold.");
check(!set_param(h, h1, protocol_binary_engine_param_flush,
"warmup_min_items_threshold", "a"),
"Set warmup_min_items_threshold should have failed");
check(!set_param(h, h1, protocol_binary_engine_param_flush,
"warmup_min_items_threshold", "a"),
"Set warmup_min_memory_threshold should have failed");
check(set_param(h, h1, protocol_binary_engine_param_flush,
"warmup_min_items_threshold", "80"),
"Set warmup_min_items_threshold should have worked");
check(set_param(h, h1, protocol_binary_engine_param_flush,
"warmup_min_memory_threshold", "80"),
"Set warmup_min_memory_threshold should have worked");
checkeq(80, get_int_stat(h, h1, "ep_warmup_min_items_threshold"),
"Incorrect smaller warmup min items threshold.");
checkeq(80, get_int_stat(h, h1, "ep_warmup_min_memory_threshold"),
"Incorrect smaller warmup min memory threshold.");
item *it = NULL;
for (int i = 0; i < 100; ++i) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.str().c_str(), "somevalue", &it),
"Error setting.");
h1->release(h, NULL, it);
}
// Restart the server.
std::string config(testHarness.get_current_testcase()->cfg);
config = config + "warmup_min_memory_threshold=0";
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
config.c_str(),
true, false);
wait_for_warmup_complete(h, h1);
const std::string eviction_policy = get_str_stat(h, h1, "ep_item_eviction_policy");
if (eviction_policy == "value_only") {
checkeq(100, get_int_stat(h, h1, "ep_warmup_key_count", "warmup"),
"Expected 100 keys loaded after warmup");
} else { // Full eviction mode
checkeq(0, get_int_stat(h, h1, "ep_warmup_key_count", "warmup"),
"Expected 0 keys loaded after warmup");
}
checkeq(0, get_int_stat(h, h1, "ep_warmup_value_count", "warmup"),
"Expected 0 values loaded after warmup");
return SUCCESS;
}
static enum test_result test_bloomfilter_conf(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (get_bool_stat(h, h1, "ep_bfilter_enabled") == false) {
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_enabled", "true"),
"Set bloomfilter_enabled should have worked");
}
check(get_bool_stat(h, h1, "ep_bfilter_enabled"),
"Bloom filter wasn't enabled");
check(get_float_stat(h, h1, "ep_bfilter_residency_threshold") == (float)0.1,
"Incorrect initial bfilter_residency_threshold.");
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_enabled", "false"),
"Set bloomfilter_enabled should have worked.");
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_residency_threshold", "0.15"),
"Set bfilter_residency_threshold should have worked.");
check(get_bool_stat(h, h1, "ep_bfilter_enabled") == false,
"Bloom filter should have been disabled.");
check(get_float_stat(h, h1, "ep_bfilter_residency_threshold") == (float)0.15,
"Incorrect bfilter_residency_threshold.");
return SUCCESS;
}
static enum test_result test_bloomfilters(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (get_bool_stat(h, h1, "ep_bfilter_enabled") == false) {
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_enabled", "true"),
"Set bloomfilter_enabled should have worked");
}
check(get_bool_stat(h, h1, "ep_bfilter_enabled"),
"Bloom filter wasn't enabled");
// Key is only present if bgOperations is non-zero.
int num_read_attempts = get_int_stat_or_default(h, h1, 0,
"ep_bg_num_samples");
// Ensure vbucket's bloom filter is enabled
checkeq(std::string("ENABLED"),
get_str_stat(h, h1, "vb_0:bloom_filter", "vbucket-details 0"),
"Vbucket 0's bloom filter wasn't enabled upon setup!");
int i;
item *it = NULL;
// Insert 10 items.
for (i = 0; i < 10; ++i) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
"somevalue", &it),
"Error setting.");
h1->release(h, NULL, it);
}
wait_for_flusher_to_settle(h, h1);
// Evict all 10 items.
for (i = 0; i < 10; ++i) {
std::stringstream key;
key << "key-" << i;
evict_key(h, h1, key.str().c_str(), 0, "Ejected.");
}
wait_for_flusher_to_settle(h, h1);
// Ensure 10 items are non-resident.
cb_assert(10 == get_int_stat(h, h1, "ep_num_non_resident"));
// Issue delete on first 5 items.
for (i = 0; i < 5; ++i) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_SUCCESS,
del(h, h1, key.str().c_str(), 0, 0),
"Failed remove with value.");
}
wait_for_flusher_to_settle(h, h1);
// Ensure that there are 5 non-resident items
cb_assert(5 == get_int_stat(h, h1, "ep_num_non_resident"));
cb_assert(5 == get_int_stat(h, h1, "curr_items"));
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
useconds_t sleepTime = 128;
if (eviction_policy == "value_only") { // VALUE-ONLY EVICTION MODE
checkeq(5,
get_int_stat(h, h1, "vb_0:bloom_filter_key_count",
"vbucket-details 0"),
"Unexpected no. of keys in bloom filter");
checkeq(num_read_attempts,
get_int_stat_or_default(h, h1, 0, "ep_bg_num_samples"),
"Expected bgFetch attempts to remain unchanged");
for (i = 0; i < 5; ++i) {
std::stringstream key;
key << "key-" << i;
check(get_meta(h, h1, key.str().c_str()), "Get meta failed");
}
// GetMeta would cause bgFetches as bloomfilter contains
// the deleted items.
checkeq(num_read_attempts + 5,
get_int_stat(h, h1, "ep_bg_num_samples"),
"Expected bgFetch attempts to increase by five");
// Run compaction, with drop_deletes
compact_db(h, h1, 0, 0, 15, 15, 1);
while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
decayingSleep(&sleepTime);
}
for (i = 0; i < 5; ++i) {
std::stringstream key;
key << "key-" << i;
check(get_meta(h, h1, key.str().c_str()), "Get meta failed");
}
checkeq(num_read_attempts + 5,
get_int_stat(h, h1, "ep_bg_num_samples"),
"Expected bgFetch attempts to stay as before");
} else { // FULL EVICTION MODE
checkeq(10,
get_int_stat(h, h1, "vb_0:bloom_filter_key_count",
"vbucket-details 0"),
"Unexpected no. of keys in bloom filter");
// Because of issuing deletes on non-resident items
checkeq(num_read_attempts + 5,
get_int_stat(h, h1, "ep_bg_num_samples"),
"Expected bgFetch attempts to increase by five, after deletes");
// Run compaction, with drop_deletes, to exclude deleted items
// from bloomfilter.
compact_db(h, h1, 0, 0, 15, 15, 1);
while (get_int_stat(h, h1, "ep_pending_compactions") != 0) {
decayingSleep(&sleepTime);
}
for (i = 0; i < 5; i++) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_KEY_ENOENT,
get(h, h1, NULL, &it, key.str(), 0),
"Unable to get stored item");
}
// + 6 because last delete is not purged by the compactor
checkeq(num_read_attempts + 6,
get_int_stat(h, h1, "ep_bg_num_samples"),
"Expected bgFetch attempts to stay as before");
}
return SUCCESS;
}
static enum test_result test_bloomfilters_with_store_apis(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (get_bool_stat(h, h1, "ep_bfilter_enabled") == false) {
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_enabled", "true"),
"Set bloomfilter_enabled should have worked");
}
check(get_bool_stat(h, h1, "ep_bfilter_enabled"),
"Bloom filter wasn't enabled");
int num_read_attempts = get_int_stat_or_default(h, h1, 0,
"ep_bg_num_samples");
// Ensure vbucket's bloom filter is enabled
checkeq(std::string("ENABLED"),
get_str_stat(h, h1, "vb_0:bloom_filter", "vbucket-details 0"),
"Vbucket 0's bloom filter wasn't enabled upon setup!");
for (int i = 0; i < 1000; i++) {
std::stringstream key;
key << "key-" << i;
check(!get_meta(h, h1, key.str().c_str()),
"Get meta should fail.");
}
checkeq(num_read_attempts,
get_int_stat_or_default(h, h1, 0, "ep_bg_num_samples"),
"Expected no bgFetch attempts");
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
std::string eviction_policy = vals.find("ep_item_eviction_policy")->second;
if (eviction_policy == "full_eviction") { // FULL EVICTION MODE
// Set with Meta
int j;
for (j = 0; j < 10; j++) {
uint64_t cas_for_set = last_cas;
// init some random metadata
ItemMetaData itm_meta;
itm_meta.revSeqno = 10;
itm_meta.cas = 0xdeadbeef;
itm_meta.exptime = time(NULL) + 300;
itm_meta.flags = 0xdeadbeef;
std::stringstream key;
key << "swm-" << j;
set_with_meta(h, h1, key.str().c_str(), key.str().length(),
"somevalue", 9, 0, &itm_meta, cas_for_set);
}
checkeq(num_read_attempts,
get_int_stat_or_default(h, h1, 0, "ep_bg_num_samples"),
"Expected no bgFetch attempts");
item *itm = NULL;
// Add
for (j = 0; j < 10; j++) {
std::stringstream key;
key << "add-" << j;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_ADD, key.str().c_str(),
"newvalue", &itm),
"Failed to add value again.");
h1->release(h, NULL, itm);
}
checkeq(num_read_attempts,
get_int_stat_or_default(h, h1, 0, "ep_bg_num_samples"),
"Expected no bgFetch attempts");
// Delete
for (j = 0; j < 10; j++) {
std::stringstream key;
key << "del-" << j;
checkeq(ENGINE_KEY_ENOENT,
del(h, h1, key.str().c_str(), 0, 0),
"Failed remove with value.");
}
checkeq(num_read_attempts,
get_int_stat_or_default(h, h1, 0, "ep_bg_num_samples"),
"Expected no bgFetch attempts");
}
return SUCCESS;
}
static enum test_result test_bloomfilter_delete_plus_set_scenario(
ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (get_bool_stat(h, h1, "ep_bfilter_enabled") == false) {
check(set_param(h, h1, protocol_binary_engine_param_flush,
"bfilter_enabled", "true"),
"Set bloomfilter_enabled should have worked");
}
check(get_bool_stat(h, h1, "ep_bfilter_enabled"),
"Bloom filter wasn't enabled");
// Ensure vbucket's bloom filter is enabled
checkeq(std::string("ENABLED"),
get_str_stat(h, h1, "vb_0:bloom_filter", "vbucket-details 0"),
"Vbucket 0's bloom filter wasn't enabled upon setup!");
item *itm = NULL;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET,"k1", "v1", &itm),
"Failed to fail to store an item.");
h1->release(h, NULL, itm);
wait_for_flusher_to_settle(h, h1);
int num_writes = get_int_stat(h, h1, "rw_0:io_num_write", "kvstore");
int num_persisted = get_int_stat(h, h1, "ep_total_persisted");
cb_assert(num_writes == 1 && num_persisted == 1);
checkeq(ENGINE_SUCCESS,
del(h, h1, "k1", 0, 0), "Failed remove with value.");
stop_persistence(h, h1);
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET,"k1", "v2", &itm, 0, 0),
"Failed to fail to store an item.");
h1->release(h, NULL, itm);
int key_count = get_int_stat(h, h1, "vb_0:bloom_filter_key_count",
"vbucket-details 0");
if (key_count == 0) {
check(get_int_stat(h, h1, "rw_0:io_num_write", "kvstore") <= 2,
"Unexpected number of writes");
start_persistence(h, h1);
wait_for_flusher_to_settle(h, h1);
checkeq(0, get_int_stat(h, h1, "vb_0:bloom_filter_key_count",
"vbucket-details 0"),
"Unexpected number of keys in bloomfilter");
} else {
cb_assert(key_count == 1);
checkeq(2, get_int_stat(h, h1, "rw_0:io_num_write", "kvstore"),
"Unexpected number of writes");
start_persistence(h, h1);
wait_for_flusher_to_settle(h, h1);
checkeq(1, get_int_stat(h, h1, "vb_0:bloom_filter_key_count",
"vbucket-details 0"),
"Unexpected number of keys in bloomfilter");
}
return SUCCESS;
}
static enum test_result test_datatype(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
const void *cookie = testHarness.create_cookie();
testHarness.set_datatype_support(cookie, true);
item *itm = NULL;
const std::string key("{\"foo\":\"bar\"}");
const protocol_binary_datatype_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
uint64_t cas = 0;
std::string value("x");
checkeq(ENGINE_SUCCESS,
storeCasOut(h, h1, NULL, 0, key, value, datatype, itm, cas),
"Expected set to succeed");
checkeq(ENGINE_SUCCESS,
get(h, h1, cookie, &itm, key, 0),
"Unable to get stored item");
item_info info;
h1->get_item_info(h, cookie, itm, &info);
h1->release(h, cookie, itm);
checkeq(static_cast<uint8_t>(PROTOCOL_BINARY_DATATYPE_JSON),
info.datatype, "Invalid datatype");
const char* key1 = "foo";
const char* val1 = "{\"foo1\":\"bar1\"}";
ItemMetaData itm_meta;
itm_meta.revSeqno = 10;
itm_meta.cas = info.cas;
itm_meta.exptime = info.exptime;
itm_meta.flags = info.flags;
set_with_meta(h, h1, key1, strlen(key1), val1, strlen(val1), 0, &itm_meta,
last_cas, 0, info.datatype, cookie);
checkeq(ENGINE_SUCCESS,
get(h, h1, cookie, &itm, key1, 0),
"Unable to get stored item");
h1->get_item_info(h, cookie, itm, &info);
h1->release(h, cookie, itm);
checkeq(static_cast<uint8_t>(PROTOCOL_BINARY_DATATYPE_JSON),
info.datatype, "Invalid datatype, when setWithMeta");
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_datatype_with_unknown_command(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
const void *cookie = testHarness.create_cookie();
testHarness.set_datatype_support(cookie, true);
item *itm = NULL;
const char* key = "foo";
const char* val = "{\"foo\":\"bar\"}";
uint8_t datatype = PROTOCOL_BINARY_DATATYPE_JSON;
ItemMetaData itm_meta;
itm_meta.revSeqno = 10;
itm_meta.cas = 0x1;
itm_meta.exptime = 0;
itm_meta.flags = 0;
//SET_WITH_META
set_with_meta(h, h1, key, strlen(key), val, strlen(val), 0, &itm_meta,
0, 0, datatype, cookie);
checkeq(ENGINE_SUCCESS,
get(h, h1, cookie, &itm, key, 0),
"Unable to get stored item");
item_info info;
h1->get_item_info(h, cookie, itm, &info);
h1->release(h, NULL, itm);
checkeq(static_cast<uint8_t>(PROTOCOL_BINARY_DATATYPE_JSON),
info.datatype, "Invalid datatype, when setWithMeta");
//SET_RETURN_META
set_ret_meta(h, h1, "foo1", 4, val, strlen(val), 0, 0, 0, 0, datatype,
cookie);
checkeq(PROTOCOL_BINARY_RESPONSE_SUCCESS, last_status.load(),
"Expected set returing meta to succeed");
checkeq(static_cast<uint8_t>(PROTOCOL_BINARY_DATATYPE_JSON),
last_datatype.load(), "Invalid datatype, when set_return_meta");
testHarness.destroy_cookie(cookie);
return SUCCESS;
}
static enum test_result test_session_cas_validation(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
//Testing PROTOCOL_BINARY_CMD_SET_VBUCKET..
char ext[4];
protocol_binary_request_header *pkt;
vbucket_state_t state = vbucket_state_active;
uint32_t val = static_cast<uint32_t>(state);
val = htonl(val);
memcpy(ext, (char*)&val, sizeof(val));
uint64_t cas = 0x0101010101010101;
pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"SET_VBUCKET command failed");
cb_free(pkt);
cb_assert(last_status == PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS);
cas = 0x0102030405060708;
pkt = createPacket(PROTOCOL_BINARY_CMD_SET_VBUCKET, 0, cas, ext, 4);
checkeq(ENGINE_SUCCESS,
h1->unknown_command(h, NULL, pkt, add_response, testHarness.doc_namespace),
"SET_VBUCKET command failed");
cb_free(pkt);
cb_assert(last_status == PROTOCOL_BINARY_RESPONSE_SUCCESS);
return SUCCESS;
}
static enum test_result test_access_scanner_settings(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
// Access scanner n/a without warmup.
return SKIPPED;
}
// Create a unique access log path by combining with the db path.
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
std::string dbname = vals.find("ep_dbname")->second;
const auto alog_path = std::string("alog_path=") + dbname +
DIRECTORY_SEPARATOR_CHARACTER + "access.log";
std::string newconfig = std::string(testHarness.get_current_testcase()->cfg)
+ alog_path;
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
newconfig.c_str(),
true, false);
wait_for_warmup_complete(h, h1);
std::string err_msg;
// Check access scanner is enabled and alog_task_time is at default
checkeq(true, get_bool_stat(h, h1, "ep_access_scanner_enabled"),
"Expected access scanner to be enabled");
cb_assert(get_int_stat(h, h1, "ep_alog_task_time") == 2);
// Ensure access_scanner_task_time is what its expected to be.
// Need to wait until the AccessScanner task has been setup.
wait_for_stat_change(h, h1, "ep_access_scanner_task_time",
std::string{"NOT_SCHEDULED"});
std::string str = get_str_stat(h, h1, "ep_access_scanner_task_time");
std::string expected_time = "02:00";
err_msg.assign("Initial time incorrect, expect: " +
expected_time + ", actual: " + str.substr(11, 5));
checkeq(0, str.substr(11, 5).compare(expected_time), err_msg.c_str());
// Update alog_task_time and ensure the update is successful
set_param(h, h1, protocol_binary_engine_param_flush, "alog_task_time", "5");
expected_time = "05:00";
str = get_str_stat(h, h1, "ep_access_scanner_task_time");
err_msg.assign("Updated time incorrect, expect: " +
expected_time + ", actual: " + str.substr(11, 5));
checkeq(0, str.substr(11, 5).compare(expected_time), err_msg.c_str());
// Update alog_sleep_time by 10 mins and ensure the update is successful.
const std::chrono::minutes update_by{10};
std::string targetTaskTime1{make_time_string(std::chrono::system_clock::now() +
update_by)};
set_param(h, h1, protocol_binary_engine_param_flush, "alog_sleep_time",
std::to_string(update_by.count()).c_str());
str = get_str_stat(h, h1, "ep_access_scanner_task_time");
// Recalculate now() + 10mins as upper bound on when the task should be
// scheduled.
std::string targetTaskTime2{make_time_string(std::chrono::system_clock::now() +
update_by)};
// ep_access_scanner_task_time should fall within the range of
// targetTaskTime1 and targetTaskTime2
err_msg.assign("Unexpected task time range, expect: " +
targetTaskTime1 + " <= " + str + " <= " + targetTaskTime2);
check(targetTaskTime1 <= str, err_msg.c_str());
check(str <= targetTaskTime2, err_msg.c_str());
return SUCCESS;
}
static enum test_result test_access_scanner(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
// Access scanner not applicable without warmup.
return SKIPPED;
}
// Create a unique access log path by combining with the db path.
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
const auto dbname = vals.find("ep_dbname")->second;
const auto alog_path = std::string("alog_path=") + dbname +
DIRECTORY_SEPARATOR_CHARACTER + "access.log";
/* We do not want the access scanner task to be running while we initiate it
explicitly below. Hence set the alog_task_time to about 1 ~ 2 hours
from now */
const time_t now = time(nullptr);
struct tm tm_now;
cb_gmtime_r(&now, &tm_now);
const auto two_hours_hence = (tm_now.tm_hour + 2) % 24;
const auto alog_task_time = std::string("alog_task_time=") +
std::to_string(two_hours_hence);
const auto newconfig = std::string(testHarness.get_current_testcase()->cfg)
+ alog_path + ";" + alog_task_time;
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
newconfig.c_str(),
true, false);
wait_for_warmup_complete(h, h1);
/* Check that alog_task_time was correctly updated. */
checkeq(get_int_stat(h, h1, "ep_alog_task_time"),
two_hours_hence,
"Failed to set alog_task_time to 2 hours in the future");
checkeq(ENGINE_SUCCESS,
h1->get_stats(h, NULL, NULL, 0, add_stats),
"Failed to get stats.");
std::string name = vals.find("ep_alog_path")->second;
/* Check access scanner is enabled */
checkeq(true, get_bool_stat(h, h1, "ep_access_scanner_enabled"),
"Access scanner task not enabled by default. Check test config");
const int num_shards = get_int_stat(h, h1, "ep_workload:num_shards",
"workload");
name = name + ".0";
std::string prev(name + ".old");
/* Get the resident ratio down to below 95% - point at which access.log
* generation occurs.
*/
int num_items = 0;
// Size chosen to create ~2000 items (i.e. 2x more than we sanity-check below)
// with the given max_size for this test.
const std::string value(2000, 'x');
while (true) {
// Gathering stats on every store is expensive, just check every 100 iterations
if ((num_items % 100) == 0) {
if (get_int_stat(h, h1, "vb_active_perc_mem_resident") < 94) {
break;
}
}
item *itm = NULL;
std::string key("key" + std::to_string(num_items));
ENGINE_ERROR_CODE ret = store(h, h1, NULL, OPERATION_SET,
key.c_str(), value.c_str(), &itm);
switch (ret) {
case ENGINE_SUCCESS:
num_items++;
h1->release(h, NULL, itm);
break;
case ENGINE_ENOMEM:
case ENGINE_TMPFAIL:
// Returned when at high watermark; simply retry the op.
h1->release(h, NULL, itm);
break;
default:
fprintf(stderr, "test_access_scanner: Unexpected result from store(): %d\n",
ret);
abort();
}
}
// Sanity check - ensure we have enough vBucket quota (max_size)
// such that we have 1000 items - enough to give us 0.1%
// granuarity in any residency calculations. */
if (num_items < 1000) {
std::cerr << "Error: test_access_scanner: "
"expected at least 1000 items after filling vbucket, "
"but only have " << num_items << ". "
"Check max_size setting for test." << std::endl;
return FAIL;
}
wait_for_flusher_to_settle(h, h1);
verify_curr_items(h, h1, num_items, "Wrong number of items");
int num_non_resident = get_int_stat(h, h1, "vb_active_num_non_resident");
checkge(num_non_resident, num_items * 6 / 100,
"Expected num_non_resident to be at least 6% of total items");
/* Run access scanner task once and expect it to generate access log */
check(set_param(h, h1, protocol_binary_engine_param_flush,
"access_scanner_run", "true"),
"Failed to trigger access scanner");
// Wait for the number of runs to equal the number of shards.
wait_for_stat_to_be(h, h1, "ep_num_access_scanner_runs", num_shards);
/* This time since resident ratio is < 95% access log should be generated */
checkeq(0, access(name.c_str(), F_OK),
(std::string("access log file (") + name +
") should exist (got errno:" + std::to_string(errno)).c_str());
/* Increase resident ratio by deleting items */
vbucketDelete(h, h1, 0);
check(set_vbucket_state(h, h1, 0, vbucket_state_active),
"Failed to set VB0 state.");
/* Run access scanner task once */
const int access_scanner_skips =
get_int_stat(h, h1, "ep_num_access_scanner_skips");
check(set_param(h, h1, protocol_binary_engine_param_flush,
"access_scanner_run", "true"),
"Failed to trigger access scanner");
wait_for_stat_to_be(h, h1, "ep_num_access_scanner_skips",
access_scanner_skips + num_shards);
/* Access log files should be removed because resident ratio > 95% */
checkeq(-1, access(prev.c_str(), F_OK),
".old access log file should not exist");
checkeq(-1, access(name.c_str(), F_OK), "access log file should not exist");
return SUCCESS;
}
static enum test_result test_set_param_message(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
set_param(h, h1, protocol_binary_engine_param_flush, "alog_task_time", "50");
checkeq(PROTOCOL_BINARY_RESPONSE_EINVAL, last_status.load(),
"Expected an invalid value error for an out of bounds alog_task_time");
check(std::string("Validation Error").compare(last_body), "Expected a "
"validation error in the response body");
return SUCCESS;
}
static enum test_result test_warmup_stats(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *it = NULL;
check(set_vbucket_state(h, h1, 0, vbucket_state_active), "Failed to set VB0 state.");
check(set_vbucket_state(h, h1, 1, vbucket_state_replica), "Failed to set VB1 state.");
for (int i = 0; i < 5000; ++i) {
std::stringstream key;
key << "key-" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.str().c_str(),
"somevalue", &it),
"Error setting.");
h1->release(h, NULL, it);
}
// Restart the server.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
const auto warmup_stats = get_all_stats(h, h1, "warmup");
// Check all expected warmup stats exists.
const char* warmup_keys[] = { "ep_warmup_thread",
"ep_warmup_value_count",
"ep_warmup_key_count",
"ep_warmup_dups",
"ep_warmup_oom",
"ep_warmup_time"};
for (const auto* key : warmup_keys) {
check(warmup_stats.find(key) != warmup_stats.end(),
(std::string("Found no ") + key).c_str());
}
std::string warmup_time = warmup_stats.at("ep_warmup_time");
cb_assert(std::stoi(warmup_time) > 0);
const auto prev_vb_stats = get_all_stats(h, h1, "prev-vbucket");
check(prev_vb_stats.find("vb_0") != prev_vb_stats.end(),
"Found no previous state for VB0");
check(prev_vb_stats.find("vb_1") != prev_vb_stats.end(),
"Found no previous state for VB1");
checkeq(std::string("active"), prev_vb_stats.at("vb_0"),
"Unexpected stats for vb 0");
checkeq(std::string("replica"), prev_vb_stats.at("vb_1"),
"Unexpected stats for vb 1");
const auto vb_details_stats = get_all_stats(h, h1, "vbucket-details");
checkeq(5000, std::stoi(vb_details_stats.at("vb_0:num_items")),
"Unexpected item count for vb 0");
checkeq(0, std::stoi(vb_details_stats.at("vb_1:num_items")),
"Unexpected item count for vb 1");
return SUCCESS;
}
static enum test_result test_warmup_with_threshold(ENGINE_HANDLE *h,
ENGINE_HANDLE_V1 *h1) {
if (!isWarmupEnabled(h, h1)) {
return SKIPPED;
}
item *it = NULL;
check(set_vbucket_state(h, h1, 0, vbucket_state_active), "Failed set vbucket 1 state.");
check(set_vbucket_state(h, h1, 1, vbucket_state_active), "Failed set vbucket 2 state.");
check(set_vbucket_state(h, h1, 2, vbucket_state_active), "Failed set vbucket 3 state.");
check(set_vbucket_state(h, h1, 3, vbucket_state_active), "Failed set vbucket 4 state.");
for (int i = 0; i < 10000; ++i) {
std::stringstream key;
key << "key+" << i;
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, key.str().c_str(), "somevalue", &it,
0, (i % 4)),
"Error setting.");
h1->release(h, NULL, it);
}
// Restart the server.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
checkeq(1,
get_int_stat(h, h1, "ep_warmup_min_item_threshold", "warmup"),
"Unable to set warmup_min_item_threshold to 1%");
const std::string policy = get_str_stat(h, h1, "ep_item_eviction_policy");
if (policy == "full_eviction") {
checkeq(get_int_stat(h, h1, "ep_warmup_key_count", "warmup"),
get_int_stat(h, h1, "ep_warmup_value_count", "warmup"),
"Warmed up key count didn't match warmed up value count");
} else {
checkeq(10000, get_int_stat(h, h1, "ep_warmup_key_count", "warmup"),
"Warmup didn't warmup all keys");
}
check(get_int_stat(h, h1, "ep_warmup_value_count", "warmup") <= 110,
"Warmed up value count found to be greater than 1%");
cb_assert(get_int_stat(h, h1, "ep_warmup_time", "warmup") > 0);
return SUCCESS;
}
#if 0
// Comment out the entire test since the hack gave warnings on win32
static enum test_result test_warmup_accesslog(ENGINE_HANDLE *h, ENGINE_HANDLE_V1 *h1) {
#ifdef __APPLE__
/* I'm getting a weird link error from clang.. disable the test until I
** understand why
*/
return SKIPPED;
#else
item *it = NULL;
int n_items_to_store1 = 10;
for (int i = 0; i < n_items_to_store1; ++i) {
std::stringstream key;
key << "key-" << i;
const char* keystr = key.str().c_str();
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, keystr, "somevalue", &it, 0, 0),
"Error setting.");
h1->release(h, NULL, it);
}
wait_for_flusher_to_settle(h, h1);
int n_items_to_access = 10;
for (int i = 0; i < n_items_to_access; ++i) {
std::stringstream key;
key << "key-" << i;
const char* keystr = key.str().c_str();
checkeq(ENGINE_SUCCESS,
get(h, h1, NULL, &it, keystr, 0),
"Error getting.");
h1->release(h, NULL, it);
}
// sleep so that scanner task can have timew to generate access log
sleep(61);
// store additional items
int n_items_to_store2 = 10;
for (int i = 0; i < n_items_to_store2; ++i) {
std::stringstream key;
key << "key2-" << i;
const char* keystr = key.str().c_str();
checkeq(ENGINE_SUCCESS,
store(h, h1, NULL, OPERATION_SET, keystr, "somevalue", &it, 0, 0),
"Error setting.");
h1->release(h, NULL, it);
}
// Restart the server.
testHarness.reload_engine(&h, &h1,
testHarness.engine_path,
testHarness.get_current_testcase()->cfg,
true, false);
wait_for_warmup_complete(h, h1);
// n_items_to_access items should be loaded from access log first
// but we continue to load until we hit 75% item watermark
int warmedup = get_int_stat(h, h1, "ep_warmup_value_count", "warmup");
// std::cout << "ep_warmup_value_count = " << warmedup << std::end