Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
594 lines (518 sloc) 16.8 KB
#ifndef _NDB_TYPED_TXN_BTREE_H_
#define _NDB_TYPED_TXN_BTREE_H_
#include "base_txn_btree.h"
#include "txn_btree.h"
#include "record/cursor.h"
template <typename Schema>
struct typed_txn_btree_ {
typedef typename Schema::base_type base_type;
typedef typename Schema::key_type key_type;
typedef typename Schema::value_type value_type;
typedef typename Schema::value_descriptor_type value_descriptor_type;
typedef typename Schema::key_encoder_type key_encoder_type;
typedef typename Schema::value_encoder_type value_encoder_type;
static_assert(value_descriptor_type::nfields() <= 64, "xx");
static const uint64_t AllFieldsMask = (1UL << value_descriptor_type::nfields()) - 1;
static inline constexpr bool
IsAllFields(uint64_t m)
{
return (m & AllFieldsMask) == AllFieldsMask;
}
class key_reader {
public:
constexpr key_reader(bool no_key_results) : no_key_results(no_key_results) {}
inline const key_type &
operator()(const std::string &s)
{
const typename Schema::key_encoder_type key_encoder;
if (!no_key_results)
key_encoder.read(s, &k);
return k;
}
#if NDB_MASSTREE
inline const key_type &
operator()(lcdf::Str s)
{
const typename Schema::key_encoder_type key_encoder;
if (!no_key_results)
key_encoder.read(s, &k);
return k;
}
#endif
private:
key_type k;
bool no_key_results;
};
static inline bool
do_record_read(const uint8_t *data, size_t sz, uint64_t fields_mask, value_type *v)
{
if (IsAllFields(fields_mask)) {
// read the entire record
const value_encoder_type value_encoder;
return value_encoder.failsafe_read(data, sz, v);
} else {
// pick individual fields
read_record_cursor<base_type> r(data, sz);
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields_mask) {
r.skip_to(i);
if (unlikely(!r.read_current_and_advance(v)))
return false;
}
}
return true;
}
}
class single_value_reader {
public:
typedef typename Schema::value_type value_type;
constexpr single_value_reader(value_type &v, uint64_t fields_mask)
: v(&v), fields_mask(fields_mask) {}
template <typename StringAllocator>
inline bool
operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
{
return do_record_read(data, sz, fields_mask, v);
}
inline value_type &
results()
{
return *v;
}
inline const value_type &
results() const
{
return *v;
}
template <typename StringAllocator>
inline void
dup(const value_type &vdup, StringAllocator &sa)
{
*v = vdup;
}
private:
value_type *v;
uint64_t fields_mask;
};
class value_reader {
public:
typedef typename Schema::value_type value_type;
constexpr value_reader(uint64_t fields_mask) : fields_mask(fields_mask) {}
template <typename StringAllocator>
inline bool
operator()(const uint8_t *data, size_t sz, StringAllocator &sa)
{
return do_record_read(data, sz, fields_mask, &v);
}
inline value_type &
results()
{
return v;
}
inline const value_type &
results() const
{
return v;
}
template <typename StringAllocator>
inline void
dup(const value_type &vdup, StringAllocator &sa)
{
v = vdup;
}
private:
value_type v;
uint64_t fields_mask;
};
class key_writer {
public:
constexpr key_writer(const key_type *k) : k(k) {}
template <typename StringAllocator>
inline const std::string *
fully_materialize(bool stable_input, StringAllocator &sa)
{
if (!k)
return nullptr;
std::string * const ret = sa();
const key_encoder_type key_encoder;
key_encoder.write(*ret, k);
return ret;
}
private:
const key_type *k;
};
static inline size_t
compute_needed_standalone(
const value_type *v, uint64_t fields,
const uint8_t *buf, size_t sz)
{
if (fields == 0) {
// delete
INVARIANT(!v);
return 0;
}
INVARIANT(v);
if (sz == 0) {
// new record (insert)
INVARIANT(IsAllFields(fields));
const value_encoder_type value_encoder;
return value_encoder.nbytes(v);
}
ssize_t new_updates_sum = 0;
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields) {
const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
value_descriptor_type::cstruct_offsetof(i);
new_updates_sum += value_descriptor_type::nbytes_fn(i)(px);
}
}
// XXX: should try to cache pointers discovered by read_record_cursor
ssize_t old_updates_sum = 0;
read_record_cursor<base_type> rc(buf, sz);
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields) {
rc.skip_to(i);
const size_t sz = rc.read_current_raw_size_and_advance();
INVARIANT(sz);
old_updates_sum += sz;
}
}
// XXX: see if approximate version works almost as well (approx version is
// to assume that each field has the minimum possible size, which is
// overly conservative but correct)
const ssize_t ret = static_cast<ssize_t>(sz) - old_updates_sum + new_updates_sum;
INVARIANT(ret > 0);
return ret;
}
// how many bytes do we need to encode a delta record
static inline size_t
compute_needed_delta_standalone(
const value_type *v, uint64_t fields)
{
size_t size_needed = 0;
size_needed += sizeof(uint64_t);
if (fields == 0) {
// delete
INVARIANT(!v);
return size_needed;
}
INVARIANT(v);
if (IsAllFields(fields)) {
// new record (insert)
const value_encoder_type value_encoder;
size_needed += value_encoder.nbytes(v);
return size_needed;
}
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields) {
const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
value_descriptor_type::cstruct_offsetof(i);
size_needed += value_descriptor_type::nbytes_fn(i)(px);
}
}
return size_needed;
}
static inline void
do_write_standalone(
const value_type *v, uint64_t fields,
uint8_t *buf, size_t sz)
{
if (fields == 0) {
// no-op for delete
INVARIANT(!v);
return;
}
if (IsAllFields(fields)) {
// special case, just use the standard encoder (faster)
// because it's straight-line w/ no branching
const value_encoder_type value_encoder;
value_encoder.write(buf, v);
return;
}
write_record_cursor<base_type> wc(buf);
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields) {
wc.skip_to(i);
wc.write_current_and_advance(v, nullptr);
}
}
}
static inline void
do_delta_write_standalone(
const value_type *v, uint64_t fields,
uint8_t *buf, size_t sz)
{
serializer<uint64_t, false> s_uint64_t;
#ifdef CHECK_INVARIANTS
const uint8_t * const orig_buf = buf;
#endif
buf = s_uint64_t.write(buf, fields);
if (fields == 0) {
// no-op for delete
INVARIANT(!v);
return;
}
if (IsAllFields(fields)) {
// special case, just use the standard encoder (faster)
// because it's straight-line w/ no branching
const value_encoder_type value_encoder;
value_encoder.write(buf, v);
return;
}
for (uint64_t i = 0; i < value_descriptor_type::nfields(); i++) {
if ((1UL << i) & fields) {
const uint8_t * px = reinterpret_cast<const uint8_t *>(v) +
value_descriptor_type::cstruct_offsetof(i);
buf = value_descriptor_type::write_fn(i)(buf, px);
}
}
INVARIANT(buf - orig_buf == ptrdiff_t(sz));
}
template <uint64_t Fields>
static inline size_t
tuple_writer(dbtuple::TupleWriterMode mode, const void *v, uint8_t *p, size_t sz)
{
const value_type *vx = reinterpret_cast<const value_type *>(v);
switch (mode) {
case dbtuple::TUPLE_WRITER_NEEDS_OLD_VALUE:
return 1;
case dbtuple::TUPLE_WRITER_COMPUTE_NEEDED:
return compute_needed_standalone(vx, Fields, p, sz);
case dbtuple::TUPLE_WRITER_COMPUTE_DELTA_NEEDED:
return compute_needed_delta_standalone(vx, Fields);
case dbtuple::TUPLE_WRITER_DO_WRITE:
do_write_standalone(vx, Fields, p, sz);
return 0;
case dbtuple::TUPLE_WRITER_DO_DELTA_WRITE:
do_delta_write_standalone(vx, Fields, p, sz);
return 0;
}
ALWAYS_ASSERT(false);
return 0;
}
class value_writer {
public:
constexpr value_writer(const value_type *v, uint64_t fields)
: v(v), fields(fields) {}
// old version of record is stored at
// [buf, buf+sz).
//
// compute the new required size for the update
inline size_t
compute_needed(const uint8_t *buf, size_t sz)
{
return compute_needed_standalone(v, fields, buf, sz);
}
template <typename StringAllocator>
inline const std::string *
fully_materialize(bool stable_input, StringAllocator &sa)
{
INVARIANT(IsAllFields(fields) || fields == 0);
if (fields == 0) {
// delete
INVARIANT(!v);
return nullptr;
}
std::string * const ret = sa();
const value_encoder_type value_encoder;
value_encoder.write(*ret, v);
return ret;
}
// the old value lives in [buf, buf+sz), but [buf, buf+compute_needed())
// is valid memory to write to
inline void
operator()(uint8_t *buf, size_t sz)
{
do_write_standalone(v, fields, buf, sz);
}
private:
const value_type *v;
uint64_t fields;
};
typedef key_type Key;
typedef key_writer KeyWriter;
typedef value_type Value;
typedef value_writer ValueWriter;
typedef uint64_t ValueInfo;
//typedef key_reader KeyReader;
//typedef single_value_reader SingleValueReader;
//typedef value_reader ValueReader;
};
template <template <typename> class Transaction, typename Schema>
class typed_txn_btree : public base_txn_btree<Transaction, typed_txn_btree_<Schema>> {
typedef base_txn_btree<Transaction, typed_txn_btree_<Schema>> super_type;
public:
typedef typename super_type::string_type string_type;
typedef typename super_type::size_type size_type;
typedef typename Schema::base_type base_type;
typedef typename Schema::key_type key_type;
typedef typename Schema::value_type value_type;
typedef typename Schema::value_descriptor_type value_descriptor_type;
typedef typename Schema::key_encoder_type key_encoder_type;
typedef typename Schema::value_encoder_type value_encoder_type;
private:
typedef txn_btree_::key_reader bytes_key_reader;
typedef txn_btree_::single_value_reader bytes_single_value_reader;
typedef txn_btree_::value_reader bytes_value_reader;
typedef
typename typed_txn_btree_<Schema>::key_writer
key_writer;
typedef
typename typed_txn_btree_<Schema>::key_reader
key_reader;
typedef
typename typed_txn_btree_<Schema>::single_value_reader
single_value_reader;
typedef
typename typed_txn_btree_<Schema>::value_reader
value_reader;
template <typename Traits>
static constexpr inline bool
IsSupportable()
{
return Traits::stable_input_memory ||
(private_::is_trivially_copyable<key_type>::value &&
private_::is_trivially_destructible<key_type>::value &&
private_::is_trivially_copyable<value_type>::value &&
private_::is_trivially_destructible<value_type>::value);
}
public:
static const uint64_t AllFieldsMask = typed_txn_btree_<Schema>::AllFieldsMask;
typedef util::Fields<AllFieldsMask> AllFields;
struct search_range_callback {
public:
virtual ~search_range_callback() {}
virtual bool invoke(const key_type &k, const value_type &v) = 0;
};
struct bytes_search_range_callback {
public:
virtual ~bytes_search_range_callback() {}
virtual bool invoke(const string_type &k, const string_type &v) = 0;
};
typed_txn_btree(size_type value_size_hint = 128,
bool mostly_append = false,
const std::string &name = "<unknown>")
: super_type(value_size_hint, mostly_append, name)
{}
template <typename Traits, typename FieldsMask = AllFields>
inline bool search(
Transaction<Traits> &t, const key_type &k, value_type &v,
FieldsMask fm = FieldsMask());
template <typename Traits, typename FieldsMask = AllFields>
inline void search_range_call(
Transaction<Traits> &t, const key_type &lower, const key_type *upper,
search_range_callback &callback,
bool no_key_results = false /* skip decoding of keys? */,
FieldsMask fm = FieldsMask());
// a lower-level variant which does not bother to decode the key/values
template <typename Traits>
inline void bytes_search_range_call(
Transaction<Traits> &t, const key_type &lower, const key_type *upper,
bytes_search_range_callback &callback,
size_type value_fields_prefix = std::numeric_limits<size_type>::max());
template <typename Traits, typename FieldsMask = AllFields>
inline void put(
Transaction<Traits> &t, const key_type &k, const value_type &v,
FieldsMask fm = FieldsMask());
template <typename Traits>
inline void insert(
Transaction<Traits> &t, const key_type &k, const value_type &v);
template <typename Traits>
inline void remove(
Transaction<Traits> &t, const key_type &k);
private:
template <typename Traits>
static inline const std::string *
stablize(Transaction<Traits> &t, const key_type &k)
{
key_writer writer(&k);
return writer.fully_materialize(
Traits::stable_input_memory, t.string_allocator());
}
template <typename Traits>
static inline const value_type *
stablize(Transaction<Traits> &t, const value_type &v)
{
if (Traits::stable_input_memory)
return &v;
std::string * const px = t.string_allocator()();
px->assign(reinterpret_cast<const char *>(&v), sizeof(v));
return reinterpret_cast<const value_type *>(px->data());
}
key_encoder_type key_encoder;
value_encoder_type value_encoder;
};
template <template <typename> class Transaction, typename Schema>
template <typename Traits, typename FieldsMask>
bool
typed_txn_btree<Transaction, Schema>::search(
Transaction<Traits> &t, const key_type &k, value_type &v,
FieldsMask fm)
{
// XXX: template single_value_reader with mask
single_value_reader vr(v, FieldsMask::value);
return this->do_search(t, k, vr);
}
template <template <typename> class Transaction, typename Schema>
template <typename Traits, typename FieldsMask>
void
typed_txn_btree<Transaction, Schema>::search_range_call(
Transaction<Traits> &t,
const key_type &lower, const key_type *upper,
search_range_callback &callback,
bool no_key_results,
FieldsMask fm)
{
key_reader kr(no_key_results);
value_reader vr(FieldsMask::value);
this->do_search_range_call(t, lower, upper, callback, kr, vr);
}
template <template <typename> class Transaction, typename Schema>
template <typename Traits>
void
typed_txn_btree<Transaction, Schema>::bytes_search_range_call(
Transaction<Traits> &t, const key_type &lower, const key_type *upper,
bytes_search_range_callback &callback,
size_type value_fields_prefix)
{
const value_encoder_type value_encoder;
const size_t max_bytes_read =
value_encoder.encode_max_nbytes_prefix(value_fields_prefix);
bytes_key_reader kr;
bytes_value_reader vr(max_bytes_read);
this->do_search_range_call(t, lower, upper, callback, kr, vr);
}
template <template <typename> class Transaction, typename Schema>
template <typename Traits, typename FieldsMask>
void
typed_txn_btree<Transaction, Schema>::put(
Transaction<Traits> &t, const key_type &k, const value_type &v, FieldsMask fm)
{
static_assert(IsSupportable<Traits>(), "xx");
const dbtuple::tuple_writer_t tw =
&typed_txn_btree_<Schema>::template tuple_writer<FieldsMask::value>;
this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, false);
}
template <template <typename> class Transaction, typename Schema>
template <typename Traits>
void
typed_txn_btree<Transaction, Schema>::insert(
Transaction<Traits> &t, const key_type &k, const value_type &v)
{
static_assert(IsSupportable<Traits>(), "xx");
const dbtuple::tuple_writer_t tw =
&typed_txn_btree_<Schema>::template tuple_writer<AllFieldsMask>;
this->do_tree_put(t, stablize(t, k), stablize(t, v), tw, true);
}
template <template <typename> class Transaction, typename Schema>
template <typename Traits>
void
typed_txn_btree<Transaction, Schema>::remove(
Transaction<Traits> &t, const key_type &k)
{
static_assert(IsSupportable<Traits>(), "xx");
const dbtuple::tuple_writer_t tw =
&typed_txn_btree_<Schema>::template tuple_writer<0>;
this->do_tree_put(t, stablize(t, k), nullptr, tw, false);
}
#endif /* _NDB_TYPED_TXN_BTREE_H_ */