Permalink
Browse files

add pht insert test

  • Loading branch information...
sim590 authored and aberaud committed Jan 19, 2016
1 parent 5a239cd commit 5a5f7012d823d15f9d5ee9fce3422ea3a9fde51c
Showing with 180 additions and 29 deletions.
  1. +10 −0 .gitignore
  2. +1 −1 include/opendht/indexation/pht.h
  3. +6 −1 python/opendht.pyx
  4. +1 −0 python/opendht_cpp.pxd
  5. +9 −1 python/tools/benchmark.py
  6. +153 −26 python/tools/dht/tests.py
View
@@ -45,3 +45,13 @@ Makefile.in
# KDevelop
.kdev4/
*.kdev4
# Python
*.pyc
# git backup files
*.orig
# vim swap files
*.swp
*.swo
@@ -132,12 +132,12 @@ struct IndexEntry : public dht::Value::Serializable<IndexEntry> {
class Pht {
static constexpr const char* INDEX_PREFIX = "index.pht.";
public:
/* This is the maximum number of entries per node. This parameter is
* critical and influences the traffic alot during a lookup operation.
*/
static constexpr const size_t MAX_NODE_ENTRY_COUNT {16};
public:
using Key = std::map<std::string, Prefix>;
using LookupCallback = std::function<void(std::vector<std::shared_ptr<Value>>& values, Prefix p)>;
View
@@ -393,9 +393,11 @@ cdef class DhtRunner(_WithID):
cdef class IndexValue(object):
cdef cpp.shared_ptr[cpp.IndexValue] _value
def __init__(self, InfoHash h, cpp.uint64_t vid=0):
def __init__(self, InfoHash h=None, cpp.uint64_t vid=0):
cdef cpp.InfoHash hh = h._infohash
self._value.reset(new cpp.IndexValue(hh, vid))
def __str__(self):
return "(" + self.getKey().toString().decode() +", "+ str(self.getValueId()) +")"
def getKey(self):
h = InfoHash()
h._infohash = self._value.get().first
@@ -407,6 +409,9 @@ cdef class Pht(object):
cdef cpp.Pht* thisptr
def __cinit__(self, bytes name, DhtRunner dht):
self.thisptr = new cpp.Pht(name, dht.thisptr)
property MAX_NODE_ENTRY_COUNT:
def __get__(self):
return cpp.PHT_MAX_NODE_ENTRY_COUNT
def lookup(self, key, lookup_cb=None, done_cb=None):
"""Query the Index with a specified key.
View
@@ -159,6 +159,7 @@ cdef extern from "opendht/log.h" namespace "dht::log":
void enableFileLogging(DhtRunner& dht, const string& path)
cdef extern from "opendht/indexation/pht.h" namespace "dht::indexation":
size_t PHT_MAX_NODE_ENTRY_COUNT "dht::indexation::Pht::MAX_NODE_ENTRY_COUNT"
cdef cppclass Prefix:
Prefix() except +
Prefix(vector[uint8_t]) except +
View
@@ -27,7 +27,7 @@
from dht.network import DhtNetwork
from dht.network import DhtNetworkSubProcess
from dht.tests import PerformanceTest, PersistenceTest
from dht.tests import PerformanceTest, PersistenceTest, PhtTest
from dht import virtual_network_builder
from dht import network as dhtnetwork
@@ -168,6 +168,10 @@ def resize_clusters(self, n):
featureArgs = parser.add_mutually_exclusive_group(required=True)
featureArgs.add_argument('--performance', action='store_true', default=False,
help='Launches performance benchmark test. Available args for "-t" are: gets.')
featureArgs.add_argument('--pht', action='store_true', default=False,
help='Launches PHT benchmark test. '\
'Available args for "-t" are: insert. '\
'Use "-m" option for fixing number of keys to create during the test.')
featureArgs.add_argument('--data-persistence', action='store_true', default=0,
help='Launches data persistence benchmark test. '\
'Available args for "-t" are: delete, replace, mult_time. '\
@@ -217,6 +221,10 @@ def toggle_bs_dht_log(signum, frame):
PerformanceTest(args.test, wb, test_opt).run()
elif args.data_persistence:
PersistenceTest(args.test, wb, test_opt).run()
elif args.pht:
if args.m:
test_opt.update({ 'num_keys' : args.m })
PhtTest(args.test, wb, test_opt).run()
except Exception as e:
print(e)
View
@@ -13,9 +13,12 @@
import re
import traceback
from matplotlib.ticker import FuncFormatter
import math
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import networkx as nx
from opendht import *
from dht.network import DhtNetwork, DhtNetworkSubProcess
@@ -156,13 +159,13 @@ def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'):
###########
class FeatureTest(object):
done = 0
lock = None
"""
This is a base test.
"""
done = 0
lock = None
def __init__(self, test, workbench):
"""
@param test: The test string indicating the test to run. This string is
@@ -174,6 +177,7 @@ def __init__(self, test, workbench):
"""
self._test = test
self._workbench = workbench
self._bootstrap = self._workbench.get_bootstrap()
def _reset(self):
"""
@@ -188,14 +192,146 @@ def run(self):
raise NotImplementedError('This method must be implemented.')
##################################
# PHT #
##################################
class PhtTest(FeatureTest):
"""TODO
"""
indexEntries = None
prefix = None
key = None
def __init__(self, test, workbench, opts):
"""
@param test: is one of the following:
- 'insert': indexes a considerable amount of data in
the PHT structure.
TODO
@type test: string
@param opts: Dictionnary containing options for the test. Allowed
options are:
- 'num_keys': this specifies the number of keys to insert
in the PHT during the test.
@type opts: dict
"""
super(PhtTest, self).__init__(test, workbench)
self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32
def _reset(self):
super(PhtTest, self)._reset()
PhtTest.indexEntries = []
@staticmethod
def lookupCb(vals, prefix):
PhtTest.indexEntries = list(vals)
PhtTest.prefix = prefix.decode()
DhtNetwork.log('Index name: <todo>')
DhtNetwork.log('Leaf prefix:', prefix)
for v in vals:
DhtNetwork.log('[ENTRY]:', v)
@staticmethod
def lookupDoneCb(ok):
DhtNetwork.log('[LOOKUP]:', PhtTest.key, "--", "success!" if ok else "Fail...")
with FeatureTest.lock:
FeatureTest.lock.notify()
@staticmethod
def insertDoneCb(ok):
DhtNetwork.log('[INSERT]:', PhtTest.key, "--", "success!" if ok else "Fail...")
with FeatureTest.lock:
FeatureTest.lock.notify()
@staticmethod
def drawTrie(trie_dict):
"""
Draws the trie structure of the PHT from dictionnary.
@param trie_dict: Dictionnary of index entries (prefix -> entry).
@type trie_dict: dict
"""
prefixes = list(trie_dict.keys())
if len(prefixes) == 0:
return
edges = set([])
for prefix in prefixes:
for i in range(-1, len(prefix)-1):
u = prefix[:i+1]
edges.add( ("" if i == -1 else u, u+"0") )
edges.add( ("" if i == -1 else u, u+"1") )
# TODO: use a binary tree position layout...
G = nx.Graph(list(edges))
nx.draw(G, with_labels=True, node_color='white')
plt.show()
def run(self):
try:
if self._test == 'insert':
self._massIndexTest()
except Exception as e:
print(e)
finally:
self._bootstrap.resize(1)
###########
# Tests #
###########
@reset_before_test
def _insertTest(self):
"""TODO: Docstring for _massIndexTest.
"""
bootstrap = self._bootstrap
bootstrap.resize(2)
dht = bootstrap.get(1)
pht = Pht(b'foo_index', dht)
DhtNetwork.log('PHT has',
pht.MAX_NODE_ENTRY_COUNT,
'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''),
'per leaf bucket.')
NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys.
keys = [{
'foo' :
''.join(random.SystemRandom().choice(string.hexdigits)
for _ in range(NUM_DIG)).encode()
} for n in range(self._num_keys)]
all_entries = {}
# Index all entries.
for key in keys:
PhtTest.key = key
pht.insert(key, IndexValue(random_hash()), PhtTest.insertDoneCb)
with FeatureTest.lock:
FeatureTest.lock.wait()
# Recover entries now that the trie is complete.
for key in keys:
PhtTest.key = key
pht.lookup(key, PhtTest.lookupCb, PhtTest.lookupDoneCb)
with FeatureTest.lock:
FeatureTest.lock.wait()
all_entries[PhtTest.prefix] = [e.__str__()
for e in PhtTest.indexEntries]
PhtTest.drawTrie(all_entries)
##################################
# DHT #
##################################
class DhtFeatureTest(FeatureTest):
"""
This is base test. A method run() implementation is required.
This is a base dht test.
"""
#static variables used by class callbacks
successfullTransfer = lambda lv,fv: len(lv) == len(fv)
@@ -204,7 +340,6 @@ class DhtFeatureTest(FeatureTest):
def __init__(self, test, workbench):
super(DhtFeatureTest, self).__init__(test, workbench)
self.bootstrap = self._workbench.get_bootstrap()
def _reset(self):
super(DhtFeatureTest, self)._reset()
@@ -323,17 +458,17 @@ def _trigger_dp(self, trigger_nodes, _hash, count=1):
config.setNodeId(InfoHash(_hash_str.encode()))
n = DhtRunner()
n.run(config=config)
n.bootstrap(self.bootstrap.ip4,
str(self.bootstrap.port))
DhtNetwork.Log.log('Node','['+_hash_str+']',
n.bootstrap(self._bootstrap.ip4,
str(self._bootstrap.port))
DhtNetwork.log('Node','['+_hash_str+']',
'started around', _hash.toString().decode()
if n.isRunning() else
'failed to start...'
)
trigger_nodes.append(n)
def _result(self, local_values, new_nodes):
bootstrap = self.bootstrap
bootstrap = self._bootstrap
if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' %
(len(DhtFeatureTest.foreignValues), len(local_values)))
@@ -372,7 +507,7 @@ def run(self):
plot_fname = "traffic-plot"
print('plot saved to', plot_fname)
plt.savefig(plot_fname)
self.bootstrap.resize(1)
self._bootstrap.resize(1)
###########
# Tests #
@@ -385,8 +520,7 @@ def _totallyNormalTest(self):
"""
trigger_nodes = []
wb = self._workbench
bootstrap = self.bootstrap
bootstrap = self._bootstrap
# Value representing an ICE packet. Each ICE packet is around 1KB.
VALUE_SIZE = 1024
num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5
@@ -481,13 +615,7 @@ def _deleteTest(self):
It uses Dht shutdown call from the API to gracefuly finish the nodes one
after the other.
"""
wb = self._workbench
if self._traffic_plot:
traffic_plot_thread = threading.Thread(target=display_traffic_plot, args=tuple(['br'+wb.ifname]))
traffic_plot_thread.daemon = True
traffic_plot_thread.start()
bootstrap = self.bootstrap
bootstrap = self._bootstrap
ops_count = []
@@ -573,7 +701,7 @@ def _replaceClusterTest(self):
"""
clusters = 8
bootstrap = self.bootstrap
bootstrap = self._bootstrap
bootstrap.resize(3)
consumer = bootstrap.get(1)
@@ -610,7 +738,7 @@ def _multTimeTest(self):
minutes for the nodes to trigger storage maintenance.
"""
trigger_nodes = []
bootstrap = self.bootstrap
bootstrap = self._bootstrap
N_PRODUCERS = self._num_producers if self._num_values else 16
DP_TIMEOUT = 1
@@ -693,7 +821,7 @@ def run(self):
traceback.print_tb(e.__traceback__)
print(type(e).__name__+':', e, file=sys.stderr)
finally:
self.bootstrap.resize(1)
self._bootstrap.resize(1)
###########
@@ -705,7 +833,7 @@ def _getsTimesTest(self):
"""
Tests for performance of the DHT doing multiple get() operation.
"""
bootstrap = self.bootstrap
bootstrap = self._bootstrap
plt.ion()
@@ -794,8 +922,7 @@ def _delete(self):
deleting around the target hash.
"""
bootstrap = self.bootstrap
bootstrap = self._bootstrap
bootstrap.resize(3)
consumer = bootstrap.get(1)

0 comments on commit 5a5f701

Please sign in to comment.