Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
1222 lines (1033 sloc) 35.7 KB
/**
* A stand-alone binary which doesn't depend on the system,
* used to test the current persistence strategy
*/
#include <cassert>
#include <iostream>
#include <cstdint>
#include <random>
#include <vector>
#include <set>
#include <atomic>
#include <thread>
#include <sstream>
#include <unistd.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <fcntl.h>
#include <getopt.h>
#include <time.h>
#include <lz4.h>
#include "macros.h"
#include "circbuf.h"
#include "amd64.h"
#include "record/serializer.h"
#include "util.h"
using namespace std;
using namespace util;
struct tidhelpers {
// copied from txn_proto2_impl.h
static const uint64_t NBitsNumber = 24;
static const size_t CoreBits = NMAXCOREBITS; // allow 2^CoreShift distinct threads
static const size_t NMaxCores = NMAXCORES;
static const uint64_t CoreMask = (NMaxCores - 1);
static const uint64_t NumIdShift = CoreBits;
static const uint64_t NumIdMask = ((((uint64_t)1) << NBitsNumber) - 1) << NumIdShift;
static const uint64_t EpochShift = CoreBits + NBitsNumber;
static const uint64_t EpochMask = ((uint64_t)-1) << EpochShift;
static inline
uint64_t CoreId(uint64_t v)
{
return v & CoreMask;
}
static inline
uint64_t NumId(uint64_t v)
{
return (v & NumIdMask) >> NumIdShift;
}
static inline
uint64_t EpochId(uint64_t v)
{
return (v & EpochMask) >> EpochShift;
}
static inline
uint64_t MakeTid(uint64_t core_id, uint64_t num_id, uint64_t epoch_id)
{
// some sanity checking
static_assert((CoreMask | NumIdMask | EpochMask) == ((uint64_t)-1), "xx");
static_assert((CoreMask & NumIdMask) == 0, "xx");
static_assert((NumIdMask & EpochMask) == 0, "xx");
return (core_id) | (num_id << NumIdShift) | (epoch_id << EpochShift);
}
static uint64_t
vecidmax(uint64_t coremax, const vector<uint64_t> &v)
{
uint64_t ret = NumId(coremax);
for (size_t i = 0; i < v.size(); i++)
ret = max(ret, NumId(v[i]));
return ret;
}
static string
Str(uint64_t v)
{
ostringstream b;
b << "[core=" << CoreId(v) << " | n="
<< NumId(v) << " | epoch="
<< EpochId(v) << "]";
return b.str();
}
};
//static void
//fillstring(std::string &s, size_t t)
//{
// s.clear();
// for (size_t i = 0; i < t; i++)
// s[i] = (char) i;
//}
template <typename PRNG>
static inline void
fillkey(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
{
s.resize(sz);
serializer<uint64_t, false> ser;
ser.write((uint8_t *) s.data(), idx);
}
template <typename PRNG>
static inline void
fillvalue(std::string &s, uint64_t idx, size_t sz, PRNG &prng)
{
uniform_int_distribution<uint32_t> dist(0, 10000);
s.resize(sz);
serializer<uint32_t, false> s_uint32_t;
for (size_t i = 0; i < sz; i += sizeof(uint32_t)) {
if (i + sizeof(uint32_t) <= sz) {
const uint32_t x = dist(prng);
s_uint32_t.write((uint8_t *) &s[i], x);
}
}
}
/** simulate global database state */
static const size_t g_nrecords = 1000000;
static const size_t g_ntxns_worker = 1000000;
static const size_t g_nmax_loggers = 16;
static vector<uint64_t> g_database;
static atomic<uint64_t> g_ntxns_committed(0);
static atomic<uint64_t> g_ntxns_written(0);
static atomic<uint64_t> g_bytes_written[g_nmax_loggers];
static size_t g_nworkers = 1;
static int g_verbose = 0;
static int g_fsync_background = 0;
static size_t g_readset = 30;
static size_t g_writeset = 16;
static size_t g_keysize = 8; // in bytes
static size_t g_valuesize = 32; // in bytes
/** simulation framework */
// all simulations are epoch based
class database_simulation {
public:
static const unsigned long g_epoch_time_ns = 30000000; /* 30ms in ns */
database_simulation()
: keep_going_(true),
epoch_thread_(),
epoch_number_(1), // start at 1 so 0 can be fully persistent initially
system_sync_epoch_(0)
{
// XXX: depends on g_nworkers to be set by now
for (size_t i = 0; i < g_nworkers; i++)
per_thread_epochs_[i]->store(1, memory_order_release);
for (size_t i = 0; i < g_nmax_loggers; i++)
for (size_t j = 0; j < g_nworkers; j++)
per_thread_sync_epochs_[i].epochs_[j].store(0, memory_order_release);
}
virtual ~database_simulation() {}
virtual void
init()
{
epoch_thread_ = move(thread(&database_simulation::epoch_thread, this));
}
virtual void worker(unsigned id) = 0;
virtual void logger(const vector<int> &fd,
const vector<vector<unsigned>> &assignments) = 0;
virtual void
terminate()
{
keep_going_->store(false, memory_order_release);
epoch_thread_.join();
}
static bool
AssignmentsValid(const vector<vector<unsigned>> &assignments,
unsigned nfds,
unsigned nworkers)
{
// each worker must be assigned exactly once in the assignment
// there must be <= nfds assignments
if (assignments.size() > nfds)
return false;
set<unsigned> seen;
for (auto &assignment : assignments)
for (auto w : assignment) {
if (seen.count(w) || w >= nworkers)
return false;
seen.insert(w);
}
return seen.size() == nworkers;
}
protected:
void
epoch_thread()
{
while (keep_going_->load(memory_order_acquire)) {
struct timespec t;
t.tv_sec = g_epoch_time_ns / ONE_SECOND_NS;
t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
nanosleep(&t, nullptr);
// make sure all threads are at the current epoch
const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
retry:
bool allthere = true;
for (size_t i = 0;
i < g_nworkers && keep_going_->load(memory_order_acquire);
i++) {
if (per_thread_epochs_[i]->load(memory_order_acquire) < curepoch) {
allthere = false;
break;
}
}
if (!keep_going_->load(memory_order_acquire))
return;
if (!allthere) {
nop_pause();
goto retry;
}
//cerr << "bumping epoch" << endl;
epoch_number_->store(curepoch + 1, memory_order_release); // bump it
}
}
aligned_padded_elem<atomic<bool>> keep_going_;
thread epoch_thread_;
aligned_padded_elem<atomic<uint64_t>> epoch_number_;
aligned_padded_elem<atomic<uint64_t>> per_thread_epochs_[NMAXCORES];
// v = per_thread_sync_epochs_[i].epochs_[j]: logger i has persisted up
// through (including) all transactions <= epoch v on core j. since core =>
// logger mapping is static, taking:
// min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
// yields the entire system's persistent epoch
struct {
atomic<uint64_t> epochs_[NMAXCORES];
CACHE_PADOUT;
} per_thread_sync_epochs_[g_nmax_loggers] CACHE_ALIGNED;
// conservative estimate (<=) for:
// min_{core} max_{logger} per_thread_sync_epochs_[logger].epochs_[core]
aligned_padded_elem<atomic<uint64_t>> system_sync_epoch_;
};
struct logbuf_header {
uint64_t nentries_; // > 0 for all valid log buffers
uint64_t last_tid_; // TID of the last commit
} PACKED;
struct pbuffer {
bool io_scheduled_; // has the logger scheduled IO yet?
size_t curoff_; // current offset into buf_, either for writing
// or during the dep computation phase
size_t remaining_; // number of deps remaining to compute
std::string buf_; // the actual buffer, of size g_buffer_size
inline uint8_t *
pointer()
{
return (uint8_t *) buf_.data() + curoff_;
}
inline logbuf_header *
header()
{
return (logbuf_header *) buf_.data();
}
inline const logbuf_header *
header() const
{
return (const logbuf_header *) buf_.data();
}
};
class onecopy_logbased_simulation : public database_simulation {
public:
static const size_t g_perthread_buffers = 64; // 64 outstanding buffers
static const size_t g_buffer_size = (1<<20); // in bytes
static const size_t g_horizon_size = (1<<16); // in bytes, for compression only
static circbuf<pbuffer, g_perthread_buffers> g_all_buffers[NMAXCORES];
static circbuf<pbuffer, g_perthread_buffers> g_persist_buffers[NMAXCORES];
protected:
virtual const uint8_t *
read_log_entry(const uint8_t *p, uint64_t &tid,
std::function<void(uint64_t)> readfunctor) = 0;
virtual uint64_t
compute_log_record_space() const = 0;
virtual void
write_log_record(uint8_t *p,
uint64_t tidcommit,
const vector<uint64_t> &readset,
const vector<pair<string, string>> &writeset) = 0;
virtual void
logger_on_io_completion() {}
virtual bool
do_compression() const = 0;
pbuffer *
getbuffer(unsigned id)
{
// block until we get a buf
pbuffer *ret = g_all_buffers[id].deq();
ret->io_scheduled_ = false;
ret->buf_.assign(g_buffer_size, 0);
ret->curoff_ = sizeof(logbuf_header);
ret->remaining_ = 0;
return ret;
}
public:
void
init() OVERRIDE
{
database_simulation::init();
for (size_t i = 0; i < g_nworkers; i++) {
for (size_t j = 0; j < g_perthread_buffers; j++) {
struct pbuffer *p = new pbuffer;
g_all_buffers[i].enq(p);
}
}
}
private:
inline size_t
inplace_update_persistent_info(
vector<pair<uint64_t, uint64_t>> &outstanding_commits,
uint64_t cursyncepoch)
{
size_t ncommits_synced = 0;
// can erase all entries with x.first <= cursyncepoch
size_t idx = 0;
for (; idx < outstanding_commits.size(); idx++) {
if (outstanding_commits[idx].first <= cursyncepoch)
ncommits_synced += outstanding_commits[idx].second;
else
break;
}
// erase entries [0, idx)
// XXX: slow
outstanding_commits.erase(outstanding_commits.begin(),
outstanding_commits.begin() + idx);
return ncommits_synced;
}
inline pbuffer *
ensure_buffer_with_space(unsigned id, pbuffer *cur, size_t space_needed)
{
if (!cur) {
cur = getbuffer(id);
} else if (g_buffer_size - cur->curoff_ < space_needed) {
g_persist_buffers[id].enq(cur);
cur = getbuffer(id);
}
INVARIANT(cur);
INVARIANT(g_buffer_size - cur->curoff_ >= space_needed);
return cur;
}
/**
* write the horizon from [p, p+sz) into cur, assuming that cur has enough
* space. space needed is at least:
* sizeof(uint32_t) + LZ4_compressBound(sz)
*
* also updates the buffer's headers and offset to reflect the write
*
* returns the compressed size of the horizon
*/
inline uint64_t
write_horizon(void *lz4ctx,
const uint8_t *p, uint64_t sz,
uint64_t nentries, uint64_t lasttid,
pbuffer *cur)
{
#ifdef CHECK_INVARIANTS
const uint64_t needed = sizeof(uint32_t) + LZ4_compressBound(sz);
INVARIANT(g_buffer_size - cur->curoff_ >= needed);
#endif
const int ret = LZ4_compress_heap(
lz4ctx,
(const char *) p,
(char *) cur->pointer() + sizeof(uint32_t),
sz);
INVARIANT(ret >= 0);
serializer<uint32_t, false> s_uint32_t;
s_uint32_t.write(cur->pointer(), ret);
cur->curoff_ += sizeof(uint32_t) + ret;
cur->header()->nentries_ += nentries;
cur->header()->last_tid_ = lasttid;
return ret;
}
protected:
void
worker(unsigned id) OVERRIDE
{
const bool compress = do_compression();
uint8_t horizon[g_horizon_size]; // LZ4 looks at 65kb windows
// where are we in the window, how many elems in this window?
size_t horizon_p = 0, horizon_nentries = 0;
uint64_t horizon_last_tid = 0; // last committed TID in the horizon
double cratios = 0.0;
unsigned long ncompressions = 0;
void *lz4ctx = nullptr; // holds a heap-allocated LZ4 hash table
if (compress)
lz4ctx = LZ4_create();
mt19937 prng(id);
// read/write sets are uniform for now
uniform_int_distribution<unsigned> dist(0, g_nrecords - 1);
vector<uint64_t> readset(g_readset);
vector<pair<string, string>> writeset(g_writeset);
for (auto &pr : writeset) {
pr.first.reserve(g_keysize);
pr.second.reserve(g_valuesize);
}
struct pbuffer *curbuf = nullptr;
uint64_t lasttid = 0,
ncommits_currentepoch = 0,
ncommits_synced = 0;
vector<pair<uint64_t, uint64_t>> outstanding_commits;
for (size_t i = 0; i < g_ntxns_worker; i++) {
// update epoch info
const uint64_t lastepoch = per_thread_epochs_[id]->load(memory_order_acquire);
const uint64_t curepoch = epoch_number_->load(memory_order_acquire);
if (lastepoch != curepoch) {
// try to sync outstanding commits
INVARIANT(curepoch == (lastepoch + 1));
const size_t cursyncepoch = system_sync_epoch_->load(memory_order_acquire);
ncommits_synced +=
inplace_update_persistent_info(outstanding_commits, cursyncepoch);
// add information about the last epoch
outstanding_commits.emplace_back(lastepoch, ncommits_currentepoch);
ncommits_currentepoch = 0;
per_thread_epochs_[id]->store(curepoch, memory_order_release);
}
for (size_t j = 0; j < g_readset; j++)
readset[j] = g_database[dist(prng)];
const uint64_t idmax = tidhelpers::vecidmax(lasttid, readset);
// XXX: ignore future epochs for now
const uint64_t tidcommit = tidhelpers::MakeTid(id, idmax + 1, curepoch);
lasttid = tidcommit;
for (size_t j = 0; j < g_writeset; j++) {
auto idx = dist(prng);
g_database[idx] = lasttid;
fillkey(writeset[j].first, idx, g_keysize, prng);
fillvalue(writeset[j].second, idx, g_valuesize, prng);
}
const uint64_t space_needed = compute_log_record_space();
if (compress) {
if (horizon_p + space_needed > g_horizon_size) {
// need to compress and write horizon
curbuf = ensure_buffer_with_space(id, curbuf,
sizeof(uint32_t) + LZ4_compressBound(horizon_p));
const uint64_t compsz =
write_horizon(lz4ctx, &horizon[0], horizon_p,
horizon_nentries, horizon_last_tid,
curbuf);
const double cratio = double(horizon_p) / double(compsz);
cratios += cratio;
ncompressions++;
// can reset horizon
horizon_p = horizon_nentries = horizon_last_tid = 0;
}
write_log_record(&horizon[0] + horizon_p, tidcommit, readset, writeset);
horizon_p += space_needed;
horizon_nentries++;
horizon_last_tid = tidcommit;
ncommits_currentepoch++;
} else {
curbuf = ensure_buffer_with_space(id, curbuf, space_needed);
uint8_t *p = curbuf->pointer();
write_log_record(p, tidcommit, readset, writeset);
//cerr << "write tidcommit=" << tidhelpers::Str(tidcommit) << endl;
curbuf->curoff_ += space_needed;
curbuf->header()->nentries_++;
curbuf->header()->last_tid_ = tidcommit;
ncommits_currentepoch++;
}
}
if (compress) {
if (horizon_nentries) {
curbuf = ensure_buffer_with_space(id, curbuf,
sizeof(uint32_t) + LZ4_compressBound(horizon_p));
const uint64_t compsz =
write_horizon(lz4ctx, &horizon[0], horizon_p,
horizon_nentries, horizon_last_tid,
curbuf);
const double cratio = double(horizon_p) / double(compsz);
cratios += cratio;
ncompressions++;
horizon_p = horizon_nentries = horizon_last_tid = 0;
}
LZ4_free(lz4ctx);
}
if (curbuf) {
// XXX: hacky - an agreed upon future epoch for all threads to converge
// on upon finishing
const uint64_t FutureEpoch = 100000;
const uint64_t waitfor = tidhelpers::EpochId(
curbuf->header()->last_tid_);
INVARIANT(per_thread_epochs_[id]->load(memory_order_acquire) == waitfor);
ALWAYS_ASSERT(waitfor < FutureEpoch);
curbuf->header()->last_tid_ =
tidhelpers::MakeTid(id, 0, FutureEpoch);
g_persist_buffers[id].enq(curbuf);
outstanding_commits.emplace_back(waitfor, ncommits_currentepoch);
//cerr << "worker " << id << " waitfor epoch " << waitfor << endl;
// get these commits persisted
while (system_sync_epoch_->load(memory_order_acquire) < waitfor)
nop_pause();
ncommits_synced +=
inplace_update_persistent_info(outstanding_commits, waitfor);
ALWAYS_ASSERT(outstanding_commits.empty());
}
if (g_verbose && compress)
cerr << "Average compression ratio: " << cratios / double(ncompressions) << endl;
g_ntxns_committed.fetch_add(ncommits_synced, memory_order_release);
}
private:
void
fsyncer(unsigned id, int fd, one_way_post<int> &channel)
{
for (;;) {
int ret;
channel.peek(ret);
if (ret == -1)
return;
ret = fdatasync(fd);
if (ret == -1) {
perror("fdatasync");
exit(1);
}
channel.consume(ret);
}
}
void
writer(unsigned id, int fd, const vector<unsigned> &assignment)
{
vector<iovec> iovs(g_nworkers * g_perthread_buffers);
vector<pbuffer *> pxs;
struct timespec last_io_completed;
one_way_post<int> *channel =
g_fsync_background ? new one_way_post<int> : nullptr;
uint64_t total_nbytes_written = 0,
total_txns_written = 0;
bool sense = false; // cur is at sense, prev is at !sense
uint64_t nbytes_written[2], txns_written[2], epoch_prefixes[2][g_nworkers];
memset(&nbytes_written[0], 0, sizeof(nbytes_written));
memset(&txns_written[0], 0, sizeof(txns_written));
memset(&epoch_prefixes[0], 0, sizeof(epoch_prefixes[0]));
memset(&epoch_prefixes[1], 0, sizeof(epoch_prefixes[1]));
clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
thread fsync_thread;
if (g_fsync_background) {
fsync_thread = move(thread(
&onecopy_logbased_simulation::fsyncer, this, id, fd, ref(*channel)));
fsync_thread.detach();
}
while (keep_going_->load(memory_order_acquire)) {
// don't allow this loop to proceed less than an epoch's worth of time,
// so we can batch IO
struct timespec now, diff;
clock_gettime(CLOCK_MONOTONIC, &now);
timespec_utils::subtract(&now, &last_io_completed, &diff);
if (diff.tv_sec == 0 && diff.tv_nsec < long(g_epoch_time_ns)) {
// need to sleep it out
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = g_epoch_time_ns - diff.tv_nsec;
nanosleep(&ts, nullptr);
}
clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
size_t nwritten = 0;
nbytes_written[sense] = txns_written[sense] = 0;
for (auto idx : assignment) {
INVARIANT(idx >= 0 && idx < g_nworkers);
g_persist_buffers[idx].peekall(pxs);
for (auto px : pxs) {
INVARIANT(px);
INVARIANT(!px->io_scheduled_);
iovs[nwritten].iov_base = (void *) px->buf_.data();
iovs[nwritten].iov_len = px->curoff_;
nbytes_written[sense] += px->curoff_;
px->io_scheduled_ = true;
px->curoff_ = sizeof(logbuf_header);
px->remaining_ = px->header()->nentries_;
txns_written[sense] += px->header()->nentries_;
nwritten++;
INVARIANT(tidhelpers::CoreId(px->header()->last_tid_) == idx);
INVARIANT(epoch_prefixes[sense][idx] <=
tidhelpers::EpochId(px->header()->last_tid_));
INVARIANT(tidhelpers::EpochId(px->header()->last_tid_) > 0);
epoch_prefixes[sense][idx] =
tidhelpers::EpochId(px->header()->last_tid_) - 1;
}
}
if (!nwritten) {
// XXX: should probably sleep here
nop_pause();
if (!g_fsync_background || !channel->can_post()) {
//cerr << "writer skipping because no work to do" << endl;
continue;
}
}
//cerr << "writer " << id << " nwritten " << nwritten << endl;
const ssize_t ret =
nwritten ? writev(fd, &iovs[0], nwritten) : 0;
if (ret == -1) {
perror("writev");
exit(1);
}
bool dosense;
if (g_fsync_background) {
// wait for fsync from the previous write
if (nwritten)
channel->post(0, true);
else
INVARIANT(channel->can_post());
dosense = !sense;
} else {
int ret = fdatasync(fd);
if (ret == -1) {
perror("fdatasync");
exit(1);
}
dosense = sense;
}
// update metadata from previous write
for (size_t i = 0; i < g_nworkers; i++) {
const uint64_t x0 =
per_thread_sync_epochs_[id].epochs_[i].load(memory_order_acquire);
const uint64_t x1 = epoch_prefixes[dosense][i];
if (x1 > x0)
per_thread_sync_epochs_[id].epochs_[i].store(
x1, memory_order_release);
}
total_nbytes_written += nbytes_written[dosense];
total_txns_written += txns_written[dosense];
// bump the sense
sense = !sense;
// return all buffers that have been io_scheduled_ - we can do this as
// soon as write returns
for (auto idx : assignment) {
pbuffer *px;
while ((px = g_persist_buffers[idx].peek()) &&
px->io_scheduled_) {
g_persist_buffers[idx].deq();
g_all_buffers[idx].enq(px);
}
}
}
g_bytes_written[id].store(total_nbytes_written, memory_order_release);
g_ntxns_written.fetch_add(total_txns_written, memory_order_release);
}
inline void
advance_system_sync_epoch(const vector<vector<unsigned>> &assignments)
{
uint64_t min_so_far = numeric_limits<uint64_t>::max();
for (size_t i = 0; i < assignments.size(); i++)
for (auto j : assignments[i])
min_so_far =
min(per_thread_sync_epochs_[i].epochs_[j].load(memory_order_acquire), min_so_far);
#ifdef CHECK_INVARIANTS
const uint64_t syssync = system_sync_epoch_->load(memory_order_acquire);
INVARIANT(syssync <= min_so_far);
#endif
system_sync_epoch_->store(min_so_far, memory_order_release);
}
public:
void
logger(const vector<int> &fds,
const vector<vector<unsigned>> &assignments_given) OVERRIDE
{
// compute thread => logger assignment
vector<thread> writers;
vector<vector<unsigned>> assignments(assignments_given);
if (assignments.empty()) {
// compute assuming homogenous disks
if (g_nworkers <= fds.size()) {
// each thread gets its own logging worker
for (size_t i = 0; i < g_nworkers; i++)
assignments.push_back({(unsigned) i});
} else {
// XXX: currently we assume each logger is equally as fast- we should
// adjust ratios accordingly for non-homogenous loggers
const size_t threads_per_logger = g_nworkers / fds.size();
for (size_t i = 0; i < fds.size(); i++) {
assignments.emplace_back(
MakeRange<unsigned>(
i * threads_per_logger,
((i + 1) == fds.size()) ?
g_nworkers :
(i + 1) * threads_per_logger));
}
}
}
INVARIANT(AssignmentsValid(assignments, fds.size(), g_nworkers));
timer tt;
for (size_t i = 0; i < assignments.size(); i++)
writers.emplace_back(
&onecopy_logbased_simulation::writer,
this, i, fds[i], ref(assignments[i]));
if (g_verbose)
cerr << "assignments: " << assignments << endl;
while (keep_going_->load(memory_order_acquire)) {
// periodically compute which epoch is the persistence epoch,
// and update system_sync_epoch_
struct timespec t;
t.tv_sec = g_epoch_time_ns / ONE_SECOND_NS;
t.tv_nsec = g_epoch_time_ns % ONE_SECOND_NS;
nanosleep(&t, nullptr);
advance_system_sync_epoch(assignments);
}
for (auto &t : writers)
t.join();
if (g_verbose) {
cerr << "current epoch: " << epoch_number_->load(memory_order_acquire) << endl;
cerr << "sync epoch : " << system_sync_epoch_->load(memory_order_acquire) << endl;
const double xsec = tt.lap_ms() / 1000.0;
for (size_t i = 0; i < writers.size(); i++)
cerr << "writer " << i << " " <<
(double(g_bytes_written[i].load(memory_order_acquire)) /
double(1UL << 20) /
xsec) << " MB/sec" << endl;
}
}
protected:
vector<pbuffer *> pxs_; // just some scratch space
};
circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
onecopy_logbased_simulation::g_all_buffers[NMAXCORES];
circbuf<pbuffer, onecopy_logbased_simulation::g_perthread_buffers>
onecopy_logbased_simulation::g_persist_buffers[NMAXCORES];
class explicit_deptracking_simulation : public onecopy_logbased_simulation {
public:
/** global state about our persistence calculations */
// contains the latest TID inclusive, per core, which is (transitively)
// persistent. note that the prefix of the DB which is totally persistent is
// simply the max of this table.
static uint64_t g_persistence_vc[NMAXCORES];
protected:
bool do_compression() const OVERRIDE { return false; }
const uint8_t *
read_log_entry(const uint8_t *p, uint64_t &tid,
std::function<void(uint64_t)> readfunctor) OVERRIDE
{
serializer<uint8_t, false> s_uint8_t;
serializer<uint64_t, false> s_uint64_t;
uint8_t readset_sz, writeset_sz, key_sz, value_sz;
uint64_t v;
p = s_uint64_t.read(p, &tid);
p = s_uint8_t.read(p, &readset_sz);
INVARIANT(size_t(readset_sz) == g_readset);
for (size_t i = 0; i < size_t(readset_sz); i++) {
p = s_uint64_t.read(p, &v);
readfunctor(v);
}
p = s_uint8_t.read(p, &writeset_sz);
INVARIANT(size_t(writeset_sz) == g_writeset);
for (size_t i = 0; i < size_t(writeset_sz); i++) {
p = s_uint8_t.read(p, &key_sz);
INVARIANT(size_t(key_sz) == g_keysize);
p += size_t(key_sz);
p = s_uint8_t.read(p, &value_sz);
INVARIANT(size_t(value_sz) == g_valuesize);
p += size_t(value_sz);
}
return p;
}
uint64_t
compute_log_record_space() const OVERRIDE
{
// compute how much space we need for this entry
uint64_t space_needed = 0;
// 8 bytes to indicate TID
space_needed += sizeof(uint64_t);
// one byte to indicate # of read deps
space_needed += 1;
// each dep occupies 8 bytes
space_needed += g_readset * sizeof(uint64_t);
// one byte to indicate # of records written
space_needed += 1;
// each record occupies (1 + key_length + 1 + value_length) bytes
space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
return space_needed;
}
void
write_log_record(uint8_t *p,
uint64_t tidcommit,
const vector<uint64_t> &readset,
const vector<pair<string, string>> &writeset) OVERRIDE
{
serializer<uint8_t, false> s_uint8_t;
serializer<uint64_t, false> s_uint64_t;
p = s_uint64_t.write(p, tidcommit);
p = s_uint8_t.write(p, readset.size());
for (auto t : readset)
p = s_uint64_t.write(p, t);
p = s_uint8_t.write(p, writeset.size());
for (auto &pr : writeset) {
p = s_uint8_t.write(p, pr.first.size());
memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
p = s_uint8_t.write(p, pr.second.size());
memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
}
}
void
logger_on_io_completion() OVERRIDE
{
ALWAYS_ASSERT(false); // currently broken
bool changed = true;
while (changed) {
changed = false;
for (size_t i = 0; i < NMAXCORES; i++) {
g_persist_buffers[i].peekall(pxs_);
for (auto px : pxs_) {
INVARIANT(px);
if (!px->io_scheduled_)
break;
INVARIANT(px->remaining_ > 0);
INVARIANT(px->curoff_ < g_buffer_size);
const uint8_t *p = px->pointer();
uint64_t committid;
bool allsat = true;
//cerr << "processing buffer " << px << " with curoff_=" << px->curoff_ << endl
// << " p=" << intptr_t(p) << endl;
while (px->remaining_ && allsat) {
allsat = true;
const uint8_t *nextp =
read_log_entry(p, committid, [&allsat](uint64_t readdep) {
if (!allsat)
return;
const uint64_t cid = tidhelpers::CoreId(readdep);
if (readdep > g_persistence_vc[cid])
allsat = false;
});
if (allsat) {
//cerr << "committid=" << tidhelpers::Str(committid)
// << ", g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i])
// << endl;
INVARIANT(tidhelpers::CoreId(committid) == i);
INVARIANT(g_persistence_vc[i] < committid);
g_persistence_vc[i] = committid;
changed = true;
p = nextp;
px->remaining_--;
px->curoff_ = intptr_t(p) - intptr_t(px->buf_.data());
g_ntxns_committed++;
} else {
// done, no further entries will be satisfied
}
}
if (allsat) {
INVARIANT(px->remaining_ == 0);
// finished entire buffer
struct pbuffer *pxcheck = g_persist_buffers[i].deq();
if (pxcheck != px)
INVARIANT(false);
g_all_buffers[i].enq(px);
//cerr << "buffer flused at g_persistence_vc=" << tidhelpers::Str(g_persistence_vc[i]) << endl;
} else {
INVARIANT(px->remaining_ > 0);
break; // cannot process core's list any further
}
}
}
}
}
};
uint64_t explicit_deptracking_simulation::g_persistence_vc[NMAXCORES] = {0};
class epochbased_simulation : public onecopy_logbased_simulation {
public:
epochbased_simulation(bool compress)
: compress_(compress)
{
}
protected:
bool do_compression() const OVERRIDE { return compress_; }
protected:
const uint8_t *
read_log_entry(const uint8_t *p, uint64_t &tid,
std::function<void(uint64_t)> readfunctor) OVERRIDE
{
serializer<uint8_t, false> s_uint8_t;
serializer<uint64_t, false> s_uint64_t;
uint8_t writeset_sz, key_sz, value_sz;
p = s_uint64_t.read(p, &tid);
p = s_uint8_t.read(p, &writeset_sz);
INVARIANT(size_t(writeset_sz) == g_writeset);
for (size_t i = 0; i < size_t(writeset_sz); i++) {
p = s_uint8_t.read(p, &key_sz);
INVARIANT(size_t(key_sz) == g_keysize);
p += size_t(key_sz);
p = s_uint8_t.read(p, &value_sz);
INVARIANT(size_t(value_sz) == g_valuesize);
p += size_t(value_sz);
}
return p;
}
uint64_t
compute_log_record_space() const OVERRIDE
{
// compute how much space we need for this entry
uint64_t space_needed = 0;
// 8 bytes to indicate TID
space_needed += sizeof(uint64_t);
// one byte to indicate # of records written
space_needed += 1;
// each record occupies (1 + key_length + 1 + value_length) bytes
space_needed += g_writeset * (1 + g_keysize + 1 + g_valuesize);
return space_needed;
}
void
write_log_record(uint8_t *p,
uint64_t tidcommit,
const vector<uint64_t> &readset,
const vector<pair<string, string>> &writeset) OVERRIDE
{
serializer<uint8_t, false> s_uint8_t;
serializer<uint64_t, false> s_uint64_t;
p = s_uint64_t.write(p, tidcommit);
p = s_uint8_t.write(p, writeset.size());
for (auto &pr : writeset) {
p = s_uint8_t.write(p, pr.first.size());
memcpy(p, pr.first.data(), pr.first.size()); p += pr.first.size();
p = s_uint8_t.write(p, pr.second.size());
memcpy(p, pr.second.data(), pr.second.size()); p += pr.second.size();
}
}
private:
bool compress_;
};
int
main(int argc, char **argv)
{
string strategy = "epoch";
vector<string> logfiles;
vector<vector<unsigned>> assignments;
while (1) {
static struct option long_options[] =
{
{"verbose" , no_argument , &g_verbose , 1} ,
{"fsync-back" , no_argument , &g_fsync_background, 1},
{"num-threads" , required_argument , 0 , 't'} ,
{"strategy" , required_argument , 0 , 's'} ,
{"readset" , required_argument , 0 , 'r'} ,
{"writeset" , required_argument , 0 , 'w'} ,
{"keysize" , required_argument , 0 , 'k'} ,
{"valuesize" , required_argument , 0 , 'v'} ,
{"logfile" , required_argument , 0 , 'l'} ,
{"assignment" , required_argument , 0 , 'a'} ,
{0, 0, 0, 0}
};
int option_index = 0;
int c = getopt_long(argc, argv, "t:s:r:w:k:v:l:a:", long_options, &option_index);
if (c == -1)
break;
switch (c) {
case 0:
if (long_options[option_index].flag != 0)
break;
abort();
break;
case 't':
g_nworkers = strtoul(optarg, nullptr, 10);
break;
case 's':
strategy = optarg;
break;
case 'r':
g_readset = strtoul(optarg, nullptr, 10);
break;
case 'w':
g_writeset = strtoul(optarg, nullptr, 10);
break;
case 'k':
g_keysize = strtoul(optarg, nullptr, 10);
break;
case 'v':
g_valuesize = strtoul(optarg, nullptr, 10);
break;
case 'l':
logfiles.emplace_back(optarg);
break;
case 'a':
assignments.emplace_back(
ParseCSVString<unsigned, RangeAwareParser<unsigned>>(optarg));
break;
case '?':
/* getopt_long already printed an error message. */
exit(1);
default:
abort();
}
}
ALWAYS_ASSERT(g_nworkers >= 1);
ALWAYS_ASSERT(g_readset >= 0);
ALWAYS_ASSERT(g_writeset > 0);
ALWAYS_ASSERT(g_keysize > 0);
ALWAYS_ASSERT(g_valuesize >= 0);
ALWAYS_ASSERT(!logfiles.empty());
ALWAYS_ASSERT(logfiles.size() <= g_nmax_loggers);
ALWAYS_ASSERT(
assignments.empty() ||
database_simulation::AssignmentsValid(
assignments, logfiles.size(), g_nworkers));
if (g_verbose)
cerr << "{nworkers=" << g_nworkers
<< ", readset=" << g_readset
<< ", writeset=" << g_writeset
<< ", keysize=" << g_keysize
<< ", valuesize=" << g_valuesize
<< ", logfiles=" << logfiles
<< ", strategy=" << strategy
<< ", fsync_background=" << g_fsync_background
<< ", assignments=" << assignments
<< "}" << endl;
if (strategy != "deptracking" &&
strategy != "epoch" &&
strategy != "epoch-compress")
ALWAYS_ASSERT(false);
g_database.resize(g_nrecords); // all start at TID=0
vector<int> fds;
for (auto &fname : logfiles) {
int fd = open(fname.c_str(), O_CREAT|O_WRONLY|O_TRUNC, 0664);
if (fd == -1) {
perror("open");
return 1;
}
fds.push_back(fd);
}
unique_ptr<database_simulation> sim;
if (strategy == "deptracking")
sim.reset(new explicit_deptracking_simulation);
else if (strategy == "epoch")
sim.reset(new epochbased_simulation(false));
else if (strategy == "epoch-compress")
sim.reset(new epochbased_simulation(true));
else
ALWAYS_ASSERT(false);
sim->init();
thread logger_thread(
&database_simulation::logger, sim.get(), fds, ref(assignments));
vector<thread> workers;
util::timer tt, tt1;
for (size_t i = 0; i < g_nworkers; i++)
workers.emplace_back(&database_simulation::worker, sim.get(), i);
for (auto &p: workers)
p.join();
sim->terminate();
logger_thread.join();
const double ntxns_committed = g_ntxns_committed.load();
const double xsec = tt.lap_ms() / 1000.0;
const double rate = double(ntxns_committed) / xsec;
if (g_verbose) {
cerr << "txns commited rate: " << rate << " txns/sec" << endl;
cerr << " (" << size_t(ntxns_committed) << " in " << xsec << " sec)" << endl;
const double ntxns_written = g_ntxns_written.load();
const double rate1 = double(ntxns_written) / xsec;
cerr << "txns written rate: " << rate1 << " txns/sec" << endl;
cerr << " (" << size_t(ntxns_written) << " in " << xsec << " sec)" << endl;
} else {
cout << rate << endl;
}
return 0;
}