Permalink
Browse files

WIP: still does not work, making progress though

  • Loading branch information...
stephentu committed Jul 22, 2013
1 parent dafd0f7 commit 52dcd9fc1fe38242173eddd484eb74946fcc89ab
Showing with 163 additions and 56 deletions.
  1. +2 −0 benchmarks/ndb_wrapper.h
  2. +23 −6 circbuf.h
  3. +1 −1 macros.h
  4. +0 −33 persist_test.cc
  5. +42 −0 test.cc
  6. +37 −12 txn_proto2_impl.cc
  7. +58 −4 txn_proto2_impl.h
View
@@ -43,6 +43,8 @@ class ndb_wrapper : public abstract_db {
const std::vector<std::string> &logfiles,
const std::vector<std::vector<unsigned>> &assignments_given);
virtual ssize_t txn_max_batch_size() const OVERRIDE { return 1000; }
virtual void
do_txn_epoch_sync() const
{
View
@@ -3,12 +3,14 @@
#include <cstring>
#include <atomic>
#include <vector>
#include <limits>
#include "macros.h"
#include "amd64.h"
// only one concurrent reader + writer allowed
// Thread safety is ensured for many concurrent enqueuers but only one
// concurrent dequeuer. That is, the head end is thread safe, but the tail end
// can only be manipulated by a single thread.
template <typename Tp, unsigned int Capacity>
class circbuf {
public:
@@ -26,14 +28,29 @@ class circbuf {
!buf_[head_.load(std::memory_order_acquire)].load(std::memory_order_acquire);
}
// assumes there will be capacity
// blocks until something enqs()
inline void
enq(Tp *p)
{
INVARIANT(p);
INVARIANT(!buf_[head_.load(std::memory_order_acquire)].load(std::memory_order_acquire));
buf_[postincr(head_)].store(p, std::memory_order_release);
retry:
unsigned icur = head_.load(std::memory_order_acquire);
INVARIANT(icur < Capacity);
if (buf_[icur].load(std::memory_order_acquire)) {
nop_pause();
goto retry;
}
// found an empty spot, so we now race for it
unsigned inext = (icur + 1) % Capacity;
if (!head_.compare_exchange_strong(icur, inext, std::memory_order_acq_rel)) {
nop_pause();
goto retry;
}
INVARIANT(!buf_[icur].load(std::memory_order_acquire));
buf_[icur].store(p, std::memory_order_release);
}
// blocks until something deqs()
View
@@ -10,7 +10,7 @@
//#define DIE_ON_ABORT
//#define TRAP_LARGE_ALLOOCATIONS
#define USE_BUILTIN_MEMFUNCS
//#define CHECK_INVARIANTS
#define CHECK_INVARIANTS
//#define TUPLE_CHECK_KEY
#define USE_SMALL_CONTAINER_OPT
#define BTREE_NODE_ALLOC_CACHE_ALIGNED
View
@@ -1168,39 +1168,6 @@ main(int argc, char **argv)
strategy != "epoch-compress")
ALWAYS_ASSERT(false);
{
// test circbuf
int values[] = {0, 1, 2, 3, 4};
circbuf<int, ARRAY_NELEMS(values)> b;
ALWAYS_ASSERT(b.empty());
for (size_t i = 0; i < ARRAY_NELEMS(values); i++)
b.enq(&values[i]);
vector<int *> pxs;
b.peekall(pxs);
ALWAYS_ASSERT(pxs.size() == ARRAY_NELEMS(values));
ALWAYS_ASSERT(set<int *>(pxs.begin(), pxs.end()).size() == pxs.size());
for (size_t i = 0; i < ARRAY_NELEMS(values); i++)
ALWAYS_ASSERT(pxs[i] == &values[i]);
for (size_t i = 0; i < ARRAY_NELEMS(values); i++) {
ALWAYS_ASSERT(!b.empty());
ALWAYS_ASSERT(b.peek() == &values[i]);
ALWAYS_ASSERT(*b.peek() == values[i]);
ALWAYS_ASSERT(b.deq() == &values[i]);
}
ALWAYS_ASSERT(b.empty());
b.enq(&values[0]);
b.enq(&values[1]);
b.enq(&values[2]);
b.peekall(pxs);
auto testlist = vector<int *>({&values[0], &values[1], &values[2]});
ALWAYS_ASSERT(pxs == testlist);
ALWAYS_ASSERT(b.deq() == &values[0]);
ALWAYS_ASSERT(b.deq() == &values[1]);
ALWAYS_ASSERT(b.deq() == &values[2]);
}
g_database.resize(g_nrecords); // all start at TID=0
vector<int> fds;
View
42 test.cc
@@ -2,8 +2,10 @@
#include <functional>
#include <unordered_map>
#include <tuple>
#include <set>
#include <unistd.h>
#include "circbuf.h"
#include "core.h"
#include "thread.h"
#include "txn.h"
@@ -208,6 +210,44 @@ namespace pxqueuetest {
}
}
void
CircbufTest()
{
// test circbuf
int values[] = {0, 1, 2, 3, 4};
circbuf<int, ARRAY_NELEMS(values)> b;
ALWAYS_ASSERT(b.empty());
for (size_t i = 0; i < ARRAY_NELEMS(values); i++) {
b.enq(&values[i]);
}
vector<int *> pxs;
b.peekall(pxs);
ALWAYS_ASSERT(pxs.size() == ARRAY_NELEMS(values));
ALWAYS_ASSERT(set<int *>(pxs.begin(), pxs.end()).size() == pxs.size());
for (size_t i = 0; i < ARRAY_NELEMS(values); i++)
ALWAYS_ASSERT(pxs[i] == &values[i]);
for (size_t i = 0; i < ARRAY_NELEMS(values); i++) {
ALWAYS_ASSERT(!b.empty());
ALWAYS_ASSERT(b.peek() == &values[i]);
ALWAYS_ASSERT(*b.peek() == values[i]);
ALWAYS_ASSERT(b.deq() == &values[i]);
}
ALWAYS_ASSERT(b.empty());
b.enq(&values[0]);
b.enq(&values[1]);
b.enq(&values[2]);
b.peekall(pxs);
auto testlist = vector<int *>({&values[0], &values[1], &values[2]});
ALWAYS_ASSERT(pxs == testlist);
ALWAYS_ASSERT(b.deq() == &values[0]);
ALWAYS_ASSERT(b.deq() == &values[1]);
ALWAYS_ASSERT(b.deq() == &values[2]);
cout << "circbuf test passed" << endl;
}
void
CounterTest()
{
@@ -984,6 +1024,8 @@ class main_thread : public ndb_thread {
#endif
cerr << "PID: " << getpid() << endl;
CircbufTest();
// initialize the numa allocator subsystem with the number of CPUs running
// + reasonable size per core
::allocator::Initialize(coreid::num_cpus_online(), size_t(128 * (1<<20)));
View
@@ -22,6 +22,8 @@ aligned_padded_elem<atomic<uint64_t>>
txn_logger::system_sync_epoch_(0);
aligned_padded_elem<circbuf<txn_logger::pbuffer, txn_logger::g_perthread_buffers>>
txn_logger::g_all_buffers[NMAXCORES];
bool
txn_logger::g_all_buffers_init[NMAXCORES] = {false};
aligned_padded_elem<circbuf<txn_logger::pbuffer, txn_logger::g_perthread_buffers>>
txn_logger::g_persist_buffers[NMAXCORES];
@@ -55,9 +57,10 @@ txn_logger::Init(
for (size_t i = 0; i < g_nworkers; i++) {
for (size_t j = 0; j < g_perthread_buffers; j++) {
struct pbuffer *p = new pbuffer;
struct pbuffer *p = new pbuffer(i);
g_all_buffers[i]->enq(p);
}
g_all_buffers_init[i] = true;
}
std::vector<std::thread> writers;
@@ -148,6 +151,8 @@ txn_logger::writer(
clock_gettime(CLOCK_MONOTONIC, &last_io_completed);
// NOTE: a core id in the persistence system really represets
// all cores in the regular system modulo g_nworkers
for (;;) {
// don't allow this loop to proceed less than an epoch's worth of time,
@@ -172,19 +177,36 @@ txn_logger::writer(
for (auto px : pxs) {
INVARIANT(px);
INVARIANT(!px->io_scheduled_);
INVARIANT(nwritten <= iovs.size());
INVARIANT(px->header()->nentries_);
INVARIANT((px->core_id_ % g_nworkers) == id);
if (nwritten == iovs.size())
break;
iovs[nwritten].iov_base = (void *) px->buf_.data();
iovs[nwritten].iov_len = px->curoff_;
nbytes_written[sense] += px->curoff_;
px->io_scheduled_ = true;
px->curoff_ = sizeof(logbuf_header);
px->remaining_ = px->header()->nentries_;
txns_written[sense] += px->header()->nentries_;
nwritten++;
INVARIANT(transaction_proto2_static::CoreId(px->header()->last_tid_) == idx);
INVARIANT(epoch_prefixes[sense][idx] <=
transaction_proto2_static::EpochId(px->header()->last_tid_));
INVARIANT(transaction_proto2_static::EpochId(px->header()->last_tid_) > 0);
epoch_prefixes[sense][idx] =
auto last_tid_cid = transaction_proto2_static::CoreId(px->header()->last_tid_);
auto px_cid = px->core_id_;
if (last_tid_cid != px_cid) {
cerr << "header: " << *px->header() << endl;
cerr << g_proto_version_str(last_tid_cid) << endl;
cerr << "last_tid_cid: " << last_tid_cid << endl;
cerr << "px_cid: " << px_cid << endl;
}
INVARIANT(
transaction_proto2_static::CoreId(px->header()->last_tid_) ==
px->core_id_);
INVARIANT(
epoch_prefixes[sense][id] <=
transaction_proto2_static::EpochId(px->header()->last_tid_));
INVARIANT(
transaction_proto2_static::EpochId(px->header()->last_tid_) > 0);
epoch_prefixes[sense][id] =
transaction_proto2_static::EpochId(px->header()->last_tid_) - 1;
}
}
@@ -225,13 +247,16 @@ txn_logger::writer(
sense = !sense;
// return all buffers that have been io_scheduled_ - we can do this as
// soon as write returns
// soon as write returns. we take care to return to the proper buffer
// (not modulo)
for (auto idx : assignment) {
pbuffer *px;
pbuffer *px, *px0;
while ((px = g_persist_buffers[idx]->peek()) &&
px->io_scheduled_) {
g_persist_buffers[idx]->deq();
g_all_buffers[idx]->enq(px);
px0 = g_persist_buffers[idx]->deq();
INVARIANT(px == px0);
px0->reset();
g_all_buffers[px0->core_id_]->enq(px0);
}
}
}
View
@@ -67,9 +67,20 @@ class txn_logger {
bool io_scheduled_; // has the logger scheduled IO yet?
size_t curoff_; // current offset into buf_, either for writing
// or during the dep computation phase
size_t remaining_; // number of deps remaining to compute
std::string buf_; // the actual buffer, of size g_buffer_size
const unsigned core_id_; // which core does this pbuffer belong to?
pbuffer(unsigned core_id) : core_id_(core_id) { reset(); }
inline void
reset()
{
io_scheduled_ = false;
curoff_ = sizeof(logbuf_header);
buf_.assign(g_buffer_size, 0);
}
inline uint8_t *
pointer()
{
@@ -118,6 +129,29 @@ class txn_logger {
return seen.size() == nworkers;
}
typedef circbuf<pbuffer, g_perthread_buffers> pbuffer_circbuf;
// the buffer which a logger thread pushes clean buffers back to the worker
// thread
static inline circbuf<pbuffer, g_perthread_buffers> &
logger_to_core_buffer(size_t core_id)
{
// make sure its init-ed
if (unlikely(!g_all_buffers_init[core_id])) {
for (size_t i = 0; i < g_perthread_buffers; i++)
g_all_buffers[core_id]->enq(new pbuffer(core_id));
g_all_buffers_init[core_id] = true;
}
return g_all_buffers[core_id].elem;
}
// the buffer which a worker thread uses to push buffers to the logger
static inline circbuf<pbuffer, g_perthread_buffers> &
core_to_logger_buffer(size_t core_id)
{
return g_persist_buffers[core_id % g_nworkers].elem;
}
private:
static void
@@ -153,10 +187,22 @@ class txn_logger {
static util::aligned_padded_elem<circbuf<pbuffer, g_perthread_buffers>>
g_all_buffers[NMAXCORES];
static bool g_all_buffers_init[NMAXCORES]; // not cache aligned because
// in steady state is only read-only
static util::aligned_padded_elem<circbuf<pbuffer, g_perthread_buffers>>
g_persist_buffers[NMAXCORES];
};
static inline std::ostream &
operator<<(std::ostream &o, txn_logger::logbuf_header &hdr)
{
o << "{nentries_=" << hdr.nentries_ << ", last_tid_="
<< g_proto_version_str(hdr.last_tid_) << "}";
return o;
}
class transaction_proto2_static {
public:
@@ -438,18 +484,26 @@ class transaction_proto2 : public transaction<transaction_proto2, Traits>,
// XXX(stephentu): spinning for now
const unsigned long my_core_id = coreid::core_id();
txn_logger::pbuffer_circbuf &pull_buf =
txn_logger::logger_to_core_buffer(my_core_id);
retry:
txn_logger::pbuffer *px;
while (unlikely(!(px = txn_logger::g_all_buffers[my_core_id]->peek())))
while (unlikely(!(px = pull_buf.peek())))
nop_pause();
INVARIANT(!px->io_scheduled_);
INVARIANT(px->core_id_ == my_core_id);
// check if enough size
if (px->space_remaining() < space_needed) {
txn_logger::pbuffer *px0 = txn_logger::g_all_buffers[my_core_id]->deq();
if (!px->header()->nentries_)
std::cerr << "space_needed: " << space_needed << std::endl;
INVARIANT(px->header()->nentries_);
txn_logger::pbuffer *px0 = pull_buf.deq();
INVARIANT(px == px0);
txn_logger::g_persist_buffers[my_core_id]->enq(px0);
txn_logger::pbuffer_circbuf &push_buf =
txn_logger::core_to_logger_buffer(my_core_id);
push_buf.enq(px0);
goto retry;
}

0 comments on commit 52dcd9f

Please sign in to comment.