Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
644 lines (593 sloc) 23.1 KB
#ifndef _NDB_TXN_IMPL_H_
#define _NDB_TXN_IMPL_H_
#include "txn.h"
#include "lockguard.h"
// base definitions
template <template <typename> class Protocol, typename Traits>
transaction<Protocol, Traits>::transaction(uint64_t flags, string_allocator_type &sa)
: transaction_base(flags), sa(&sa)
{
INVARIANT(rcu::s_instance.in_rcu_region());
#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
concurrent_btree::NodeLockRegionBegin();
#endif
}
template <template <typename> class Protocol, typename Traits>
transaction<Protocol, Traits>::~transaction()
{
// transaction shouldn't fall out of scope w/o resolution
// resolution means TXN_EMBRYO, TXN_COMMITED, and TXN_ABRT
INVARIANT(state != TXN_ACTIVE);
INVARIANT(rcu::s_instance.in_rcu_region());
const unsigned cur_depth = rcu_guard_->sync()->depth();
rcu_guard_.destroy();
if (cur_depth == 1) {
INVARIANT(!rcu::s_instance.in_rcu_region());
cast()->on_post_rcu_region_completion();
}
#ifdef BTREE_LOCK_OWNERSHIP_CHECKING
concurrent_btree::AssertAllNodeLocksReleased();
#endif
}
template <template <typename> class Protocol, typename Traits>
void
transaction<Protocol, Traits>::clear()
{
// it's actually *more* efficient to not call clear explicitly on the
// read/write/absent sets, and let the destructors do the clearing- this is
// because the destructors can take shortcuts since it knows the obj doesn't
// have to end in a valid state
}
template <template <typename> class Protocol, typename Traits>
void
transaction<Protocol, Traits>::abort_impl(abort_reason reason)
{
abort_trap(reason);
switch (state) {
case TXN_EMBRYO:
case TXN_ACTIVE:
break;
case TXN_ABRT:
return;
case TXN_COMMITED:
throw transaction_unusable_exception();
}
state = TXN_ABRT;
this->reason = reason;
// on abort, we need to go over all insert nodes and
// release the locks
typename write_set_map::iterator it = write_set.begin();
typename write_set_map::iterator it_end = write_set.end();
for (; it != it_end; ++it) {
dbtuple * const tuple = it->get_tuple();
if (it->is_insert()) {
INVARIANT(tuple->is_locked());
this->cleanup_inserted_tuple_marker(tuple, it->get_key(), it->get_btree());
tuple->unlock();
}
}
clear();
}
template <template <typename> class Protocol, typename Traits>
void
transaction<Protocol, Traits>::cleanup_inserted_tuple_marker(
dbtuple *marker, const std::string &key, concurrent_btree *btr)
{
// XXX: this code should really live in txn_proto2_impl.h
INVARIANT(marker->version == dbtuple::MAX_TID);
INVARIANT(marker->is_locked());
INVARIANT(marker->is_lock_owner());
typename concurrent_btree::value_type removed = 0;
const bool did_remove = btr->remove(varkey(key), &removed);
if (unlikely(!did_remove)) {
#ifdef CHECK_INVARIANTS
std::cerr << " *** could not remove key: " << util::hexify(key) << std::endl;
#ifdef TUPLE_CHECK_KEY
std::cerr << " *** original key : " << util::hexify(marker->key) << std::endl;
#endif
#endif
ALWAYS_ASSERT(false);
}
INVARIANT(removed == (typename concurrent_btree::value_type) marker);
INVARIANT(marker->is_latest());
marker->clear_latest();
dbtuple::release(marker); // rcu free
}
namespace {
inline const char *
transaction_state_to_cstr(transaction_base::txn_state state)
{
switch (state) {
case transaction_base::TXN_EMBRYO: return "TXN_EMBRYO";
case transaction_base::TXN_ACTIVE: return "TXN_ACTIVE";
case transaction_base::TXN_ABRT: return "TXN_ABRT";
case transaction_base::TXN_COMMITED: return "TXN_COMMITED";
}
ALWAYS_ASSERT(false);
return 0;
}
inline std::string
transaction_flags_to_str(uint64_t flags)
{
bool first = true;
std::ostringstream oss;
if (flags & transaction_base::TXN_FLAG_LOW_LEVEL_SCAN) {
oss << "TXN_FLAG_LOW_LEVEL_SCAN";
first = false;
}
if (flags & transaction_base::TXN_FLAG_READ_ONLY) {
if (first)
oss << "TXN_FLAG_READ_ONLY";
else
oss << " | TXN_FLAG_READ_ONLY";
first = false;
}
return oss.str();
}
}
template <template <typename> class Protocol, typename Traits>
void
transaction<Protocol, Traits>::dump_debug_info() const
{
std::cerr << "Transaction (obj=" << util::hexify(this) << ") -- state "
<< transaction_state_to_cstr(state) << std::endl;
std::cerr << " Abort Reason: " << AbortReasonStr(reason) << std::endl;
std::cerr << " Flags: " << transaction_flags_to_str(flags) << std::endl;
std::cerr << " Read/Write sets:" << std::endl;
std::cerr << " === Read Set ===" << std::endl;
// read-set
for (typename read_set_map::const_iterator rs_it = read_set.begin();
rs_it != read_set.end(); ++rs_it)
std::cerr << *rs_it << std::endl;
std::cerr << " === Write Set ===" << std::endl;
// write-set
for (typename write_set_map::const_iterator ws_it = write_set.begin();
ws_it != write_set.end(); ++ws_it)
std::cerr << *ws_it << std::endl;
std::cerr << " === Absent Set ===" << std::endl;
// absent-set
for (typename absent_set_map::const_iterator as_it = absent_set.begin();
as_it != absent_set.end(); ++as_it)
std::cerr << " B-tree Node " << util::hexify(as_it->first)
<< " : " << as_it->second << std::endl;
}
template <template <typename> class Protocol, typename Traits>
std::map<std::string, uint64_t>
transaction<Protocol, Traits>::get_txn_counters() const
{
std::map<std::string, uint64_t> ret;
// max_read_set_size
ret["read_set_size"] = read_set.size();;
ret["read_set_is_large?"] = !read_set.is_small_type();
// max_absent_set_size
ret["absent_set_size"] = absent_set.size();
ret["absent_set_is_large?"] = !absent_set.is_small_type();
// max_write_set_size
ret["write_set_size"] = write_set.size();
ret["write_set_is_large?"] = !write_set.is_small_type();
return ret;
}
template <template <typename> class Protocol, typename Traits>
bool
transaction<Protocol, Traits>::handle_last_tuple_in_group(
dbtuple_write_info &last,
bool did_group_insert)
{
if (did_group_insert) {
// don't need to lock
if (!last.is_insert())
// we inserted the last run, and then we did 1+ more overwrites
// to it, so we do NOT need to lock the node (again), but we DO
// need to apply the latest write
last.entry->set_do_write();
} else {
dbtuple *tuple = last.get_tuple();
if (unlikely(tuple->version == dbtuple::MAX_TID)) {
// if we race to put/insert w/ another txn which has inserted a new
// record, we *must* abort b/c the other txn could try to put/insert
// into a new record which we hold the lock on, so we must abort
//
// other ideas:
// we could *not* abort if this txn did not insert any new records.
// we could also release our insert locks and try to acquire them
// again in sorted order
return false; // signal abort
}
const dbtuple::version_t v = tuple->lock(true); // lock for write
INVARIANT(dbtuple::IsLatest(v) == tuple->is_latest());
last.mark_locked();
if (unlikely(!dbtuple::IsLatest(v) ||
!cast()->can_read_tid(tuple->version))) {
// XXX(stephentu): overly conservative (with the can_read_tid() check)
return false; // signal abort
}
last.entry->set_do_write();
}
return true;
}
template <template <typename> class Protocol, typename Traits>
bool
transaction<Protocol, Traits>::commit(bool doThrow)
{
#ifdef TUPLE_MAGIC
try {
#endif
PERF_DECL(
static std::string probe0_name(
std::string(__PRETTY_FUNCTION__) + std::string(":total:")));
ANON_REGION(probe0_name.c_str(), &transaction_base::g_txn_commit_probe0_cg);
switch (state) {
case TXN_EMBRYO:
case TXN_ACTIVE:
break;
case TXN_COMMITED:
return true;
case TXN_ABRT:
if (doThrow)
throw transaction_abort_exception(reason);
return false;
}
dbtuple_write_info_vec write_dbtuples;
std::pair<bool, tid_t> commit_tid(false, 0);
// copy write tuples to vector for sorting
if (!write_set.empty()) {
PERF_DECL(
static std::string probe1_name(
std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
ANON_REGION(probe1_name.c_str(), &transaction_base::g_txn_commit_probe1_cg);
INVARIANT(!is_snapshot());
typename write_set_map::iterator it = write_set.begin();
typename write_set_map::iterator it_end = write_set.end();
for (size_t pos = 0; it != it_end; ++it, ++pos) {
INVARIANT(!it->is_insert() || it->get_tuple()->is_locked());
write_dbtuples.emplace_back(it->get_tuple(), &(*it), it->is_insert(), pos);
}
}
// read_only txns require consistent snapshots
INVARIANT(!is_snapshot() || read_set.empty());
INVARIANT(!is_snapshot() || write_set.empty());
INVARIANT(!is_snapshot() || absent_set.empty());
if (!is_snapshot()) {
// we don't have consistent tids, or not a read-only txn
// lock write nodes
if (!write_dbtuples.empty()) {
PERF_DECL(
static std::string probe2_name(
std::string(__PRETTY_FUNCTION__) + std::string(":lock_write_nodes:")));
ANON_REGION(probe2_name.c_str(), &transaction_base::g_txn_commit_probe2_cg);
// lock the logical nodes in sort order
{
PERF_DECL(
static std::string probe6_name(
std::string(__PRETTY_FUNCTION__) + std::string(":sort_write_nodes:")));
ANON_REGION(probe6_name.c_str(), &transaction_base::g_txn_commit_probe6_cg);
write_dbtuples.sort(); // in-place
}
typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
typename dbtuple_write_info_vec::iterator it_end = write_dbtuples.end();
dbtuple_write_info *last_px = nullptr;
bool inserted_last_run = false;
for (; it != it_end; last_px = &(*it), ++it) {
if (likely(last_px && last_px->tuple != it->tuple)) {
// on boundary
if (unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
goto do_abort;
}
inserted_last_run = false;
}
if (it->is_insert()) {
INVARIANT(!last_px || last_px->tuple != it->tuple);
INVARIANT(it->is_locked());
INVARIANT(it->get_tuple()->is_locked());
INVARIANT(it->get_tuple()->is_lock_owner());
it->entry->set_do_write(); // all inserts are marked do-write
inserted_last_run = true;
} else {
INVARIANT(!it->is_locked());
}
}
if (likely(last_px) &&
unlikely(!handle_last_tuple_in_group(*last_px, inserted_last_run))) {
abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
goto do_abort;
}
commit_tid.first = true;
PERF_DECL(
static std::string probe5_name(
std::string(__PRETTY_FUNCTION__) + std::string(":gen_commit_tid:")));
ANON_REGION(probe5_name.c_str(), &transaction_base::g_txn_commit_probe5_cg);
commit_tid.second = cast()->gen_commit_tid(write_dbtuples);
VERBOSE(std::cerr << "commit tid: " << g_proto_version_str(commit_tid.second) << std::endl);
} else {
VERBOSE(std::cerr << "commit tid: <read-only>" << std::endl);
}
// do read validation
{
PERF_DECL(
static std::string probe3_name(
std::string(__PRETTY_FUNCTION__) + std::string(":read_validation:")));
ANON_REGION(probe3_name.c_str(), &transaction_base::g_txn_commit_probe3_cg);
// check the nodes we actually read are still the latest version
if (!read_set.empty()) {
typename read_set_map::iterator it = read_set.begin();
typename read_set_map::iterator it_end = read_set.end();
for (; it != it_end; ++it) {
VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple())
<< " at snapshot_tid "
<< g_proto_version_str(cast()->snapshot_tid())
<< std::endl);
const bool found = sorted_dbtuples_contains(
write_dbtuples, it->get_tuple());
if (likely(found ?
it->get_tuple()->is_latest_version(it->get_tid()) :
it->get_tuple()->stable_is_latest_version(it->get_tid())))
continue;
VERBOSE(std::cerr << "validating dbtuple " << util::hexify(it->get_tuple()) << " at snapshot_tid "
<< g_proto_version_str(cast()->snapshot_tid()) << " FAILED" << std::endl
<< " txn read version: " << g_proto_version_str(it->get_tid()) << std::endl
<< " tuple=" << *it->get_tuple() << std::endl);
//std::cerr << "failed tuple: " << *it->get_tuple() << std::endl;
abort_trap((reason = ABORT_REASON_READ_NODE_INTEREFERENCE));
goto do_abort;
}
}
// check btree versions have not changed
if (!absent_set.empty()) {
typename absent_set_map::iterator it = absent_set.begin();
typename absent_set_map::iterator it_end = absent_set.end();
for (; it != it_end; ++it) {
const uint64_t v = concurrent_btree::ExtractVersionNumber(it->first);
if (unlikely(v != it->second.version)) {
VERBOSE(std::cerr << "expected node " << util::hexify(it->first) << " at v="
<< it->second.version << ", got v=" << v << std::endl);
abort_trap((reason = ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED));
goto do_abort;
}
}
}
}
// commit actual records
if (!write_dbtuples.empty()) {
PERF_DECL(
static std::string probe4_name(
std::string(__PRETTY_FUNCTION__) + std::string(":write_records:")));
ANON_REGION(probe4_name.c_str(), &transaction_base::g_txn_commit_probe4_cg);
typename write_set_map::iterator it = write_set.begin();
typename write_set_map::iterator it_end = write_set.end();
for (; it != it_end; ++it) {
if (unlikely(!it->do_write()))
continue;
dbtuple * const tuple = it->get_tuple();
INVARIANT(tuple->is_locked());
VERBOSE(std::cerr << "writing dbtuple " << util::hexify(tuple)
<< " at commit_tid " << g_proto_version_str(commit_tid.second)
<< std::endl);
if (it->is_insert()) {
INVARIANT(tuple->version == dbtuple::MAX_TID);
tuple->version = commit_tid.second; // allows write_record_ret() to succeed
// w/o creating a new chain
} else {
tuple->prefetch();
const dbtuple::write_record_ret ret =
tuple->write_record_at(
cast(), commit_tid.second,
it->get_value(), it->get_writer());
bool unlock_head = false;
if (unlikely(ret.head_ != tuple)) {
// tuple was replaced by ret.head_
INVARIANT(ret.rest_ == tuple);
// XXX: write_record_at() should acquire this lock
ret.head_->lock(true);
unlock_head = true;
// need to unlink tuple from underlying btree, replacing
// with ret.rest_ (atomically)
typename concurrent_btree::value_type old_v = 0;
if (it->get_btree()->insert(
varkey(it->get_key()), (typename concurrent_btree::value_type) ret.head_, &old_v, NULL))
// should already exist in tree
INVARIANT(false);
INVARIANT(old_v == (typename concurrent_btree::value_type) tuple);
// we don't RCU free this, because it is now part of the chain
// (the cleaners will take care of this)
++evt_dbtuple_latest_replacement;
}
if (unlikely(ret.rest_))
// spill happened: schedule GC task
cast()->on_dbtuple_spill(ret.head_, ret.rest_);
if (!it->get_value())
// logical delete happened: schedule GC task
cast()->on_logical_delete(ret.head_, it->get_key(), it->get_btree());
if (unlikely(unlock_head))
ret.head_->unlock();
}
VERBOSE(std::cerr << "dbtuple " << util::hexify(tuple) << " is_locked? " << tuple->is_locked() << std::endl);
}
// unlock
// NB: we can no longer un-lock after doing the writes above
for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
it != write_dbtuples.end(); ++it) {
if (it->is_locked())
it->tuple->unlock();
else
INVARIANT(!it->is_insert());
}
}
}
state = TXN_COMMITED;
if (commit_tid.first)
cast()->on_tid_finish(commit_tid.second);
clear();
return true;
do_abort:
// XXX: these values are possibly un-initialized
if (this->is_snapshot())
VERBOSE(std::cerr << "aborting txn @ snapshot_tid " << cast()->snapshot_tid() << std::endl);
else
VERBOSE(std::cerr << "aborting txn" << std::endl);
for (typename dbtuple_write_info_vec::iterator it = write_dbtuples.begin();
it != write_dbtuples.end(); ++it) {
if (it->is_locked()) {
if (it->is_insert()) {
INVARIANT(it->entry);
this->cleanup_inserted_tuple_marker(
it->tuple.get(), it->entry->get_key(), it->entry->get_btree());
}
// XXX: potential optimization: on unlock() for abort, we don't
// technically need to change the version number
it->tuple->unlock();
} else {
INVARIANT(!it->is_insert());
}
}
state = TXN_ABRT;
if (commit_tid.first)
cast()->on_tid_finish(commit_tid.second);
clear();
if (doThrow)
throw transaction_abort_exception(reason);
return false;
#ifdef TUPLE_MAGIC
} catch (dbtuple::magic_failed_exception &) {
dump_debug_info();
ALWAYS_ASSERT(false);
}
#endif
}
template <template <typename> class Protocol, typename Traits>
std::pair< dbtuple *, bool >
transaction<Protocol, Traits>::try_insert_new_tuple(
concurrent_btree &btr,
const std::string *key,
const void *value,
dbtuple::tuple_writer_t writer)
{
INVARIANT(key);
const size_t sz =
value ? writer(dbtuple::TUPLE_WRITER_COMPUTE_NEEDED,
value, nullptr, 0) : 0;
// perf: ~900 tsc/alloc on istc11.csail.mit.edu
dbtuple * const tuple = dbtuple::alloc_first(sz, true);
if (value)
writer(dbtuple::TUPLE_WRITER_DO_WRITE,
value, tuple->get_value_start(), 0);
INVARIANT(find_read_set(tuple) == read_set.end());
INVARIANT(tuple->is_latest());
INVARIANT(tuple->version == dbtuple::MAX_TID);
INVARIANT(tuple->is_locked());
INVARIANT(tuple->is_write_intent());
#ifdef TUPLE_CHECK_KEY
tuple->key.assign(key->data(), key->size());
tuple->tree = (void *) &btr;
#endif
// XXX: underlying btree api should return the existing value if insert
// fails- this would allow us to avoid having to do another search
typename concurrent_btree::insert_info_t insert_info;
if (unlikely(!btr.insert_if_absent(
varkey(*key), (typename concurrent_btree::value_type) tuple, &insert_info))) {
VERBOSE(std::cerr << "insert_if_absent failed for key: " << util::hexify(key) << std::endl);
tuple->clear_latest();
tuple->unlock();
dbtuple::release_no_rcu(tuple);
++transaction_base::g_evt_dbtuple_write_insert_failed;
return std::pair< dbtuple *, bool >(nullptr, false);
}
VERBOSE(std::cerr << "insert_if_absent suceeded for key: " << util::hexify(key) << std::endl
<< " new dbtuple is " << util::hexify(tuple) << std::endl);
// update write_set
// too expensive to be practical
// INVARIANT(find_write_set(tuple) == write_set.end());
write_set.emplace_back(tuple, key, value, writer, &btr, true);
// update node #s
INVARIANT(insert_info.node);
if (!absent_set.empty()) {
auto it = absent_set.find(insert_info.node);
if (it != absent_set.end()) {
if (unlikely(it->second.version != insert_info.old_version)) {
abort_trap((reason = ABORT_REASON_WRITE_NODE_INTERFERENCE));
return std::make_pair(tuple, true);
}
VERBOSE(std::cerr << "bump node=" << util::hexify(it->first) << " from v=" << insert_info.old_version
<< " -> v=" << insert_info.new_version << std::endl);
// otherwise, bump the version
it->second.version = insert_info.new_version;
SINGLE_THREADED_INVARIANT(concurrent_btree::ExtractVersionNumber(it->first) == it->second);
}
}
return std::make_pair(tuple, false);
}
template <template <typename> class Protocol, typename Traits>
template <typename ValueReader>
bool
transaction<Protocol, Traits>::do_tuple_read(
const dbtuple *tuple, ValueReader &value_reader)
{
INVARIANT(tuple);
++evt_local_search_lookups;
const bool is_snapshot_txn = is_snapshot();
const transaction_base::tid_t snapshot_tid = is_snapshot_txn ?
cast()->snapshot_tid() : static_cast<transaction_base::tid_t>(dbtuple::MAX_TID);
transaction_base::tid_t start_t = 0;
if (Traits::read_own_writes) {
// this is why read_own_writes is not performant, because we have
// to do linear scan
auto write_set_it = find_write_set(const_cast<dbtuple *>(tuple));
if (unlikely(write_set_it != write_set.end())) {
++evt_local_search_write_set_hits;
if (!write_set_it->get_value())
return false;
const typename ValueReader::value_type * const px =
reinterpret_cast<const typename ValueReader::value_type *>(
write_set_it->get_value());
value_reader.dup(*px, this->string_allocator());
return true;
}
}
// do the actual tuple read
dbtuple::ReadStatus stat;
{
PERF_DECL(static std::string probe0_name(std::string(__PRETTY_FUNCTION__) + std::string(":do_read:")));
ANON_REGION(probe0_name.c_str(), &private_::txn_btree_search_probe0_cg);
tuple->prefetch();
stat = tuple->stable_read(snapshot_tid, start_t, value_reader, this->string_allocator(), is_snapshot_txn);
if (unlikely(stat == dbtuple::READ_FAILED)) {
const transaction_base::abort_reason r = transaction_base::ABORT_REASON_UNSTABLE_READ;
abort_impl(r);
throw transaction_abort_exception(r);
}
}
if (unlikely(!cast()->can_read_tid(start_t))) {
const transaction_base::abort_reason r = transaction_base::ABORT_REASON_FUTURE_TID_READ;
abort_impl(r);
throw transaction_abort_exception(r);
}
INVARIANT(stat == dbtuple::READ_EMPTY ||
stat == dbtuple::READ_RECORD);
const bool v_empty = (stat == dbtuple::READ_EMPTY);
if (v_empty)
++transaction_base::g_evt_read_logical_deleted_node_search;
if (!is_snapshot_txn)
// read-only txns do not need read-set tracking
// (b/c we know the values are consistent)
read_set.emplace_back(tuple, start_t);
return !v_empty;
}
template <template <typename> class Protocol, typename Traits>
void
transaction<Protocol, Traits>::do_node_read(
const typename concurrent_btree::node_opaque_t *n, uint64_t v)
{
INVARIANT(n);
if (is_snapshot())
return;
auto it = absent_set.find(n);
if (it == absent_set.end()) {
absent_set[n].version = v;
} else if (it->second.version != v) {
const transaction_base::abort_reason r =
transaction_base::ABORT_REASON_NODE_SCAN_READ_VERSION_CHANGED;
abort_impl(r);
throw transaction_abort_exception(r);
}
}
#endif /* _NDB_TXN_IMPL_H_ */