Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
520 lines (466 sloc) 14.4 KB
#include <iostream>
#include <sstream>
#include <vector>
#include <utility>
#include <string>
#include <set>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <numa.h>
#include "../macros.h"
#include "../varkey.h"
#include "../thread.h"
#include "../util.h"
#include "../spinbarrier.h"
#include "../core.h"
#include "bench.h"
using namespace std;
using namespace util;
static size_t nkeys;
static const size_t YCSBRecordSize = 100;
// [R, W, RMW, Scan]
// we're missing remove for now
// the default is a modification of YCSB "A" we made (80/20 R/W)
static unsigned g_txn_workload_mix[] = { 80, 20, 0, 0 };
class ycsb_worker : public bench_worker {
public:
ycsb_worker(unsigned int worker_id,
unsigned long seed, abstract_db *db,
const map<string, abstract_ordered_index *> &open_tables,
spin_barrier *barrier_a, spin_barrier *barrier_b)
: bench_worker(worker_id, true, seed, db,
open_tables, barrier_a, barrier_b),
tbl(open_tables.at("USERTABLE")),
computation_n(0)
{
obj_key0.reserve(str_arena::MinStrReserveLength);
obj_key1.reserve(str_arena::MinStrReserveLength);
obj_v.reserve(str_arena::MinStrReserveLength);
}
txn_result
txn_read()
{
void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
scoped_str_arena s_arena(arena);
try {
const uint64_t k = r.next() % nkeys;
ALWAYS_ASSERT(tbl->get(txn, u64_varkey(k).str(obj_key0), obj_v));
computation_n += obj_v.size();
measure_txn_counters(txn, "txn_read");
if (likely(db->commit_txn(txn)))
return txn_result(true, 0);
} catch (abstract_db::abstract_abort_exception &ex) {
db->abort_txn(txn);
}
return txn_result(false, 0);
}
static txn_result
TxnRead(bench_worker *w)
{
return static_cast<ycsb_worker *>(w)->txn_read();
}
txn_result
txn_write()
{
void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_GET_PUT);
scoped_str_arena s_arena(arena);
try {
tbl->put(txn, u64_varkey(r.next() % nkeys).str(str()), str().assign(YCSBRecordSize, 'b'));
measure_txn_counters(txn, "txn_write");
if (likely(db->commit_txn(txn)))
return txn_result(true, 0);
} catch (abstract_db::abstract_abort_exception &ex) {
db->abort_txn(txn);
}
return txn_result(false, 0);
}
static txn_result
TxnWrite(bench_worker *w)
{
return static_cast<ycsb_worker *>(w)->txn_write();
}
txn_result
txn_rmw()
{
void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_RMW);
scoped_str_arena s_arena(arena);
try {
const uint64_t key = r.next() % nkeys;
ALWAYS_ASSERT(tbl->get(txn, u64_varkey(key).str(obj_key0), obj_v));
computation_n += obj_v.size();
tbl->put(txn, obj_key0, str().assign(YCSBRecordSize, 'c'));
measure_txn_counters(txn, "txn_rmw");
if (likely(db->commit_txn(txn)))
return txn_result(true, 0);
} catch (abstract_db::abstract_abort_exception &ex) {
db->abort_txn(txn);
}
return txn_result(false, 0);
}
static txn_result
TxnRmw(bench_worker *w)
{
return static_cast<ycsb_worker *>(w)->txn_rmw();
}
class worker_scan_callback : public abstract_ordered_index::scan_callback {
public:
worker_scan_callback() : n(0) {}
virtual bool
invoke(const char *, size_t, const string &value)
{
n += value.size();
return true;
}
size_t n;
};
txn_result
txn_scan()
{
void * const txn = db->new_txn(txn_flags, arena, txn_buf(), abstract_db::HINT_KV_SCAN);
scoped_str_arena s_arena(arena);
const size_t kstart = r.next() % nkeys;
const string &kbegin = u64_varkey(kstart).str(obj_key0);
const string &kend = u64_varkey(kstart + 100).str(obj_key1);
worker_scan_callback c;
try {
tbl->scan(txn, kbegin, &kend, c);
computation_n += c.n;
measure_txn_counters(txn, "txn_scan");
if (likely(db->commit_txn(txn)))
return txn_result(true, 0);
} catch (abstract_db::abstract_abort_exception &ex) {
db->abort_txn(txn);
}
return txn_result(false, 0);
}
static txn_result
TxnScan(bench_worker *w)
{
return static_cast<ycsb_worker *>(w)->txn_scan();
}
virtual workload_desc_vec
get_workload() const
{
//w.push_back(workload_desc("Read", 0.95, TxnRead));
//w.push_back(workload_desc("ReadModifyWrite", 0.04, TxnRmw));
//w.push_back(workload_desc("Write", 0.01, TxnWrite));
//w.push_back(workload_desc("Read", 1.0, TxnRead));
//w.push_back(workload_desc("Write", 1.0, TxnWrite));
// YCSB workload "A" - 50/50 read/write
//w.push_back(workload_desc("Read", 0.5, TxnRead));
//w.push_back(workload_desc("Write", 0.5, TxnWrite));
// YCSB workload custom - 80/20 read/write
//w.push_back(workload_desc("Read", 0.8, TxnRead));
//w.push_back(workload_desc("Write", 0.2, TxnWrite));
workload_desc_vec w;
unsigned m = 0;
for (size_t i = 0; i < ARRAY_NELEMS(g_txn_workload_mix); i++)
m += g_txn_workload_mix[i];
ALWAYS_ASSERT(m == 100);
if (g_txn_workload_mix[0])
w.push_back(workload_desc("Read", double(g_txn_workload_mix[0])/100.0, TxnRead));
if (g_txn_workload_mix[1])
w.push_back(workload_desc("Write", double(g_txn_workload_mix[1])/100.0, TxnWrite));
if (g_txn_workload_mix[2])
w.push_back(workload_desc("ReadModifyWrite", double(g_txn_workload_mix[2])/100.0, TxnRmw));
if (g_txn_workload_mix[3])
w.push_back(workload_desc("Scan", double(g_txn_workload_mix[3])/100.0, TxnScan));
return w;
}
protected:
virtual void
on_run_setup() OVERRIDE
{
if (!pin_cpus)
return;
const size_t a = worker_id % coreid::num_cpus_online();
const size_t b = a % nthreads;
rcu::s_instance.pin_current_thread(b);
}
inline ALWAYS_INLINE string &
str() {
return *arena.next();
}
private:
abstract_ordered_index *tbl;
string obj_key0;
string obj_key1;
string obj_v;
uint64_t computation_n;
};
static void
ycsb_load_keyrange(
uint64_t keystart,
uint64_t keyend,
unsigned int pinid,
abstract_db *db,
abstract_ordered_index *tbl,
str_arena &arena,
uint64_t txn_flags,
void *txn_buf)
{
if (pin_cpus) {
ALWAYS_ASSERT(pinid < nthreads);
rcu::s_instance.pin_current_thread(pinid);
rcu::s_instance.fault_region();
}
const size_t batchsize = (db->txn_max_batch_size() == -1) ?
10000 : db->txn_max_batch_size();
ALWAYS_ASSERT(batchsize > 0);
const size_t nkeys = keyend - keystart;
ALWAYS_ASSERT(nkeys > 0);
const size_t nbatches = nkeys < batchsize ? 1 : (nkeys / batchsize);
for (size_t batchid = 0; batchid < nbatches;) {
scoped_str_arena s_arena(arena);
void * const txn = db->new_txn(txn_flags, arena, txn_buf);
try {
const size_t rend = (batchid + 1 == nbatches) ?
keyend : keystart + ((batchid + 1) * batchsize);
for (size_t i = batchid * batchsize + keystart; i < rend; i++) {
ALWAYS_ASSERT(i >= keystart && i < keyend);
const string k = u64_varkey(i).str();
const string v(YCSBRecordSize, 'a');
tbl->insert(txn, k, v);
}
if (db->commit_txn(txn))
batchid++;
else
db->abort_txn(txn);
} catch (abstract_db::abstract_abort_exception &ex) {
db->abort_txn(txn);
}
}
if (verbose)
cerr << "[INFO] finished loading USERTABLE range [kstart="
<< keystart << ", kend=" << keyend << ") - nkeys: " << nkeys << endl;
}
class ycsb_usertable_loader : public bench_loader {
public:
ycsb_usertable_loader(unsigned long seed,
abstract_db *db,
const map<string, abstract_ordered_index *> &open_tables)
: bench_loader(seed, db, open_tables)
{}
protected:
virtual void
load()
{
abstract_ordered_index *tbl = open_tables.at("USERTABLE");
const size_t nkeysperthd = nkeys / nthreads;
for (size_t i = 0; i < nthreads; i++) {
const size_t keystart = i * nkeysperthd;
const size_t keyend = min((i + 1) * nkeysperthd, nkeys);
ycsb_load_keyrange(
keystart,
keyend,
i,
db,
tbl,
arena,
txn_flags,
txn_buf());
}
}
};
class ycsb_parallel_usertable_loader : public bench_loader {
public:
ycsb_parallel_usertable_loader(unsigned long seed,
abstract_db *db,
const map<string, abstract_ordered_index *> &open_tables,
unsigned int pinid,
uint64_t keystart,
uint64_t keyend)
: bench_loader(seed, db, open_tables),
pinid(pinid), keystart(keystart), keyend(keyend)
{
INVARIANT(keyend > keystart);
if (verbose)
cerr << "[INFO] YCSB par loader cpu " << pinid
<< " [" << keystart << ", " << keyend << ")" << endl;
}
protected:
virtual void
load()
{
abstract_ordered_index *tbl = open_tables.at("USERTABLE");
ycsb_load_keyrange(
keystart,
keyend,
pinid,
db,
tbl,
arena,
txn_flags,
txn_buf());
}
private:
unsigned int pinid;
uint64_t keystart;
uint64_t keyend;
};
class ycsb_bench_runner : public bench_runner {
public:
ycsb_bench_runner(abstract_db *db)
: bench_runner(db)
{
open_tables["USERTABLE"] = db->open_index("USERTABLE", YCSBRecordSize);
}
protected:
virtual vector<bench_loader *>
make_loaders()
{
vector<bench_loader *> ret;
const unsigned long ncpus = coreid::num_cpus_online();
if (enable_parallel_loading && nkeys >= nthreads) {
// divide the key space amongst all the loaders
const size_t nkeysperloader = nkeys / ncpus;
if (nthreads > ncpus) {
for (size_t i = 0; i < ncpus; i++) {
const uint64_t kend = (i + 1 == ncpus) ?
nkeys : (i + 1) * nkeysperloader;
ret.push_back(
new ycsb_parallel_usertable_loader(
0, db, open_tables, i,
i * nkeysperloader, kend));
}
} else {
// load balance the loaders amongst numa nodes in RR fashion
//
// XXX: here we hardcode an assumption about the NUMA topology of
// the system
const vector<unsigned> numa_nodes_used = get_numa_nodes_used(nthreads);
// assign loaders to cores based on numa node assignment in RR fashion
const unsigned loaders_per_node = ncpus / numa_nodes_used.size();
vector<unsigned> node_allocations(numa_nodes_used.size(), loaders_per_node);
// RR the remaining
for (unsigned i = 0;
i < (ncpus - loaders_per_node * numa_nodes_used.size());
i++)
node_allocations[i]++;
size_t loader_i = 0;
for (size_t i = 0; i < numa_nodes_used.size(); i++) {
// allocate loaders_per_node loaders to this numa node
const vector<unsigned> cpus = numa_node_to_cpus(numa_nodes_used[i]);
const vector<unsigned> cpus_avail = exclude(cpus, nthreads);
const unsigned nloaders = node_allocations[i];
for (size_t j = 0; j < nloaders; j++, loader_i++) {
const uint64_t kend = (loader_i + 1 == ncpus) ?
nkeys : (loader_i + 1) * nkeysperloader;
ret.push_back(
new ycsb_parallel_usertable_loader(
0, db, open_tables, cpus_avail[j % cpus_avail.size()],
loader_i * nkeysperloader, kend));
}
}
}
} else {
ret.push_back(new ycsb_usertable_loader(0, db, open_tables));
}
return ret;
}
virtual vector<bench_worker *>
make_workers()
{
const unsigned alignment = coreid::num_cpus_online();
const int blockstart =
coreid::allocate_contiguous_aligned_block(nthreads, alignment);
ALWAYS_ASSERT(blockstart >= 0);
ALWAYS_ASSERT((blockstart % alignment) == 0);
fast_random r(8544290);
vector<bench_worker *> ret;
for (size_t i = 0; i < nthreads; i++)
ret.push_back(
new ycsb_worker(
blockstart + i, r.next(), db, open_tables,
&barrier_a, &barrier_b));
return ret;
}
private:
static vector<unsigned>
get_numa_nodes_used(unsigned nthds)
{
// assuming CPUs [0, nthds) are used, what are all the
// NUMA nodes touched by [0, nthds)
set<unsigned> ret;
for (unsigned i = 0; i < nthds; i++) {
const int node = numa_node_of_cpu(i);
ALWAYS_ASSERT(node >= 0);
ret.insert(node);
}
return vector<unsigned>(ret.begin(), ret.end());
}
static vector<unsigned>
numa_node_to_cpus(unsigned node)
{
struct bitmask *bm = numa_allocate_cpumask();
ALWAYS_ASSERT(!::numa_node_to_cpus(node, bm));
vector<unsigned> ret;
for (int i = 0; i < numa_num_configured_cpus(); i++)
if (numa_bitmask_isbitset(bm, i))
ret.push_back(i);
numa_free_cpumask(bm);
return ret;
}
static vector<unsigned>
exclude(const vector<unsigned> &cpus, unsigned nthds)
{
vector<unsigned> ret;
for (auto n : cpus)
if (n < nthds)
ret.push_back(n);
return ret;
}
};
void
ycsb_do_test(abstract_db *db, int argc, char **argv)
{
nkeys = size_t(scale_factor * 1000.0);
ALWAYS_ASSERT(nkeys > 0);
// parse options
optind = 1;
while (1) {
static struct option long_options[] = {
{"workload-mix" , required_argument , 0 , 'w'},
{0, 0, 0, 0}
};
int option_index = 0;
int c = getopt_long(argc, argv, "w:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 0:
if (long_options[option_index].flag != 0)
break;
abort();
break;
case 'w':
{
const vector<string> toks = split(optarg, ',');
ALWAYS_ASSERT(toks.size() == ARRAY_NELEMS(g_txn_workload_mix));
unsigned s = 0;
for (size_t i = 0; i < toks.size(); i++) {
unsigned p = strtoul(toks[i].c_str(), nullptr, 10);
ALWAYS_ASSERT(p >= 0 && p <= 100);
s += p;
g_txn_workload_mix[i] = p;
}
ALWAYS_ASSERT(s == 100);
}
break;
case '?':
/* getopt_long already printed an error message. */
exit(1);
default:
abort();
}
}
if (verbose) {
cerr << "ycsb settings:" << endl;
cerr << " workload_mix: "
<< format_list(g_txn_workload_mix, g_txn_workload_mix + ARRAY_NELEMS(g_txn_workload_mix))
<< endl;
}
ycsb_bench_runner r(db);
r.run();
}