Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PHT] Key consistency #78

Merged
merged 8 commits into from Jul 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
213 changes: 89 additions & 124 deletions include/opendht/indexation/pht.h
Expand Up @@ -52,8 +52,12 @@ struct Prefix {
* @param pos : Pos of the needed bit
* @return : true if the bit is at 1
* false otherwise
* @throw out_of_range Throw out of range if the bit at 'pos' does not exist
*/
bool isActivBit(size_t pos) const {
bool isActiveBit(size_t pos) const {
if ( pos >= size_ )
throw std::out_of_range("Can't detect active bit at pos, pos larger than prefix size or empty prefix");

return ((this->content_[pos / 8] >> (7 - (pos % 8)) ) & 1) == 1;
}

Expand All @@ -65,12 +69,7 @@ struct Prefix {
* @return The prefix of this sibling.
*/
Prefix getSibling() const {
Prefix copy = *this;
if (size_) {
size_t last_bit = (8 - size_) % 8;
copy.content_.back() ^= (1 << last_bit);
}
return copy;
return swapBit(size_ - 1);
}

InfoHash hash() const {
Expand Down Expand Up @@ -115,33 +114,37 @@ struct Prefix {
return 8 * i + j;
}

size_t size_ {0};
Blob content_ {};
};

using Value = std::pair<InfoHash, dht::Value::Id>;
/**
* This method swap the bit a the position 'bit' and return the new prefix
*
* @param bit Position of the bit to swap
*
* @return The prefix with the bit at position 'bit' swapped
*
* @throw out_of_range Throw out of range if bit does not exist
*/
Prefix swapBit(size_t bit) const {
if ( bit >= size_ )
throw std::out_of_range("bit larger than prefix size.");

struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
static const ValueType TYPE;
Prefix copy = *this;

virtual void unpackValue(const dht::Value& v) {
Serializable<IndexEntry>::unpackValue(v);
name = v.user_type;
}
size_t offset_bit = (8 - bit) % 8;
copy.content_[bit / 8] ^= (1 << offset_bit);

virtual dht::Value packValue() const {
auto pack = Serializable<IndexEntry>::packValue();
pack.user_type = name;
return pack;
return copy;
}

Blob prefix;
Value value;
std::string name;
MSGPACK_DEFINE_MAP(prefix, value);
size_t size_ {0};
Blob content_ {};
};

using Value = std::pair<InfoHash, dht::Value::Id>;

class Pht {
static constexpr const char* INVALID_KEY = "Key does not match the PHT key spec.";

/* Prefixes the user_type for all dht values put on the DHT */
static constexpr const char* INDEX_PREFIX = "index.pht.";

public:
Expand All @@ -150,7 +153,11 @@ class Pht {
*/
static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};

using Key = std::map<std::string, Prefix>;
/* A key for a an index entry */
using Key = std::map<std::string, Blob>;
/* Specifications of the keys. It defines the number, the length and the
* serialization order of fields. */
using KeySpec = std::map<std::string, size_t>;

using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
typedef void (*LookupCallbackRaw)(std::vector<std::shared_ptr<Value>>* values, Prefix* p, void *user_data);
Expand All @@ -161,15 +168,33 @@ class Pht {
raw_cb((std::vector<std::shared_ptr<Value>>*) &values, (Prefix*) &p, user_data);
};
}
using LookupCallbackSimple = std::function<void(std::vector<std::shared_ptr<Value>>& values)>;
typedef void (*LookupCallbackSimpleRaw)(std::vector<std::shared_ptr<Value>>* values, void *user_data);
static LookupCallbackSimple
bindLookupCbSimple(LookupCallbackSimpleRaw raw_cb, void* user_data) {
if (not raw_cb) return {};
return [=](std::vector<std::shared_ptr<Value>>& values) {
raw_cb((std::vector<std::shared_ptr<Value>>*) &values, user_data);
};
}

Pht(std::string name, std::shared_ptr<DhtRunner> dht)
: name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), dht_(dht) { }
Pht(std::string name, KeySpec k_spec, std::shared_ptr<DhtRunner> dht)
: name_(INDEX_PREFIX + name), canary_(name_ + ".canary"), keySpec_(k_spec), dht_(dht)
{
if (k_spec.size() != 1)
throw std::invalid_argument("PHT only supports unidimensional data.");
}
virtual ~Pht () { }

/**
* Lookup a key for a value.
*/
void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple doneCb = {}, bool exact_match = true);
void lookup(Key k, LookupCallback cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true);
void lookup(Key k, LookupCallbackSimple cb = {}, DoneCallbackSimple done_cb = {}, bool exact_match = true)
{
lookup(k, [=](std::vector<std::shared_ptr<Value>>& values, Prefix) { cb(values); }, done_cb, exact_match);
}

/**
* Adds an entry into the index.
*/
Expand All @@ -182,97 +207,15 @@ class Pht {
* Insert all needed node into the tree according to a prefix
* @param p : Prefix that we need to insert
*/
void insert(const Prefix& p) {
size_t i = 0;
auto now = clock::now();

std::shared_ptr<Node> curr_node;

while ( ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now )
|| leaves_.size() > MAX_ELEMENT ) {

leaves_.erase(leaves_.begin());
}

if ( !(curr_node = root_.lock()) ) {

/* Root does not exist, need to create one*/
curr_node = std::make_shared<Node>();
root_ = curr_node;
}

curr_node->last_reply = now;

/* Iterate through all bit of the Blob */
for ( i = 0; i < p.size_; i++ ) {

/* According to the bit define which node is the next one */
auto& next = ( p.isActivBit(i) ) ? curr_node->right_child : curr_node->left_child;

/**
* If lock, node exists
* else create it
*/
if (auto n = next.lock()) {
curr_node = std::move(n);
} else {
/* Create the next node if doesn't exist*/
auto tmp_curr_node = std::make_shared<Node>();
tmp_curr_node->parent = curr_node;
next = tmp_curr_node;
curr_node = std::move(tmp_curr_node);
}

curr_node->last_reply = now;
}

/* Insert the leaf (curr_node) into the multimap */
leaves_.emplace(std::move(now), std::move(curr_node) );
}
void insert(const Prefix& p);

/**
* Lookup into the tree to return the maximum prefix length in the cache tree
*
* @param p : Prefix that we are looking for
* @return : The size of the longest prefix known in the cache between 0 and p.size_
*/
int lookup(const Prefix& p) {
int pos = 0;
auto now = clock::now(), last_node_time = now;

/* Before lookup remove the useless one [i.e. too old] */
while ( leaves_.size() > 0 && leaves_.begin()->first + NODE_EXPIRE_TIME < now ) {
leaves_.erase(leaves_.begin());
}

auto next = root_;
std::shared_ptr<Node> curr_node;

while ( auto n = next.lock() ) {
/* Safe since pos is equal to 0 until here */
if ( (unsigned) pos >= p.size_ ) break;

curr_node = n;
last_node_time = curr_node->last_reply;
curr_node->last_reply = now;

/* Get the Prefix bit by bit, starting from left */
next = ( p.isActivBit(pos) ) ? curr_node->right_child : curr_node->left_child;

++pos;
}

if ( pos > 0 ) {
auto to_erase = leaves_.find(last_node_time);
if ( to_erase != leaves_.end() )
leaves_.erase( to_erase );

leaves_.emplace( std::move(now), std::move(curr_node) );
}

return --pos;
}

int lookup(const Prefix& p);

private:
static constexpr const size_t MAX_ELEMENT {1024};
Expand All @@ -295,27 +238,48 @@ class Pht {
std::multimap<time_point, std::shared_ptr<Node>> leaves_;
};

/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
*/
void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb,
DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len,
int start = -1, bool all_values = false);

/**
* Linearizes the key into a unidimensional key. A pht only takes
* unidimensional key.
*
* @param Key The initial key.
*
* @return return The linearized key.
* @return the prefix of the linearized key.
*/
static Prefix linearize(Key k) {
if (k.size() != 1) { throw std::invalid_argument("PHT only supports unidimensional data."); }
return k.begin()->second;
virtual Prefix linearize(Key k) const {
if (not validKey(k)) { throw std::invalid_argument(INVALID_KEY); }

Prefix p = Blob {k.begin()->second.begin(), k.begin()->second.end()};

auto bit_loc = p.size_ + 1;
for ( auto i = p.content_.size(); i <= keySpec_.begin()->second; i++ ) {
p.content_.push_back(0);
p.size_ += 8;
}

return p.swapBit(bit_loc);
};

/**
* Performs a step in the lookup operation. Each steps are performed
* asynchronously.
* Tells if the key is valid according to the key spec.
*/
void lookupStep(Prefix k, std::shared_ptr<int> lo, std::shared_ptr<int> hi,
std::shared_ptr<std::vector<std::shared_ptr<Value>>> vals, LookupCallback cb,
DoneCallbackSimple done_cb, std::shared_ptr<unsigned> max_common_prefix_len,
int start = -1, bool all_values = false);
bool validKey(const Key& k) const {
return k.size() == keySpec_.size() and
std::equal(k.begin(), k.end(), keySpec_.begin(),
[&](const Key::value_type& key, const KeySpec::value_type& key_spec) {
return key.first == key_spec.first and key.second.size() <= key_spec.second;
}
);
}

/**
* Updates the canary token on the node responsible for the specified
Expand All @@ -325,6 +289,7 @@ class Pht {

const std::string name_;
const std::string canary_;
const KeySpec keySpec_;
Cache cache_;
std::shared_ptr<DhtRunner> dht_;
};
Expand Down
11 changes: 7 additions & 4 deletions python/opendht.pyx
Expand Up @@ -406,8 +406,11 @@ cdef class IndexValue(object):

cdef class Pht(object):
cdef cpp.Pht* thisptr
def __cinit__(self, bytes name, DhtRunner dht):
self.thisptr = new cpp.Pht(name, dht.thisptr)
def __cinit__(self, bytes name, key_spec, DhtRunner dht):
cdef cpp.IndexKeySpec cpp_key_spec
for kk, size in key_spec.items():
cpp_key_spec[bytes(kk, 'utf-8')] = size
self.thisptr = new cpp.Pht(name, cpp_key_spec, dht.thisptr)
property MAX_NODE_ENTRY_COUNT:
def __get__(self):
return cpp.PHT_MAX_NODE_ENTRY_COUNT
Expand All @@ -424,7 +427,7 @@ cdef class Pht(object):
ref.Py_INCREF(cb_obj)
cdef cpp.IndexKey cppk
for kk, v in key.items():
cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v))
cppk[bytes(kk, 'utf-8')] = bytes(v)
self.thisptr.lookup(
cppk,
cpp.Pht.bindLookupCb(lookup_callback, <void*>cb_obj),
Expand All @@ -441,7 +444,7 @@ cdef class Pht(object):
ref.Py_INCREF(cb_obj)
cdef cpp.IndexKey cppk
for kk, v in key.items():
cppk[bytes(kk, 'utf-8')] = cpp.Prefix(bytes(v))
cppk[bytes(kk, 'utf-8')] = bytes(v)
cdef cpp.IndexValue val
val.first = (<InfoHash>value.getKey())._infohash
val.second = value.getValueId()
Expand Down
5 changes: 3 additions & 2 deletions python/opendht_cpp.pxd
Expand Up @@ -167,12 +167,13 @@ cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation":
Prefix(vector[uint8_t]) except +
string toString() const
ctypedef pair[InfoHash, uint64_t] IndexValue "dht::indexation::Value"
ctypedef map[string, Prefix] IndexKey "dht::indexation::Pht::Key"
ctypedef map[string, vector[uint8_t]] IndexKey "dht::indexation::Pht::Key"
ctypedef map[string, uint32_t] IndexKeySpec "dht::indexation::Pht::KeySpec"
ctypedef void (*LookupCallbackRaw)(vector[shared_ptr[IndexValue]]* values, Prefix* p, void* user_data);
cdef cppclass Pht:
cppclass LookupCallback:
LookupCallback() except +
Pht(string, shared_ptr[DhtRunner]) except +
Pht(string, IndexKeySpec, shared_ptr[DhtRunner]) except +
void lookup(IndexKey k, LookupCallback cb, DoneCallbackSimple doneCb);
void insert(IndexKey k, IndexValue v, DoneCallbackSimple cb)
@staticmethod
Expand Down
10 changes: 6 additions & 4 deletions python/tools/dht/tests.py
Expand Up @@ -10,6 +10,7 @@
import time
import subprocess
import re
import collections

from matplotlib.ticker import FuncFormatter
import math
Expand Down Expand Up @@ -276,17 +277,18 @@ def _insertTest(self):
"""
bootstrap = self._bootstrap
bootstrap.resize(2)

dht = bootstrap.get(1)
pht = Pht(b'foo_index', dht)

NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys.
keyspec = collections.OrderedDict([('foo', NUM_DIG)])
pht = Pht(b'foo_index', keyspec, dht)

DhtNetwork.log('PHT has',
pht.MAX_NODE_ENTRY_COUNT,
'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''),
'per leaf bucket.')
NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys.
keys = [{
'foo' :
[_ for _ in keyspec.keys()][0] :
''.join(random.SystemRandom().choice(string.hexdigits)
for _ in range(NUM_DIG)).encode()
} for n in range(self._num_keys)]
Expand Down