Skip to content
Permalink
Browse files

dht: add configuration to persist state

  • Loading branch information...
aberaud committed Feb 24, 2019
1 parent a30f1b8 commit 04850e1701370c843a4dbf970d7661e3ccaab7c9
Showing with 85 additions and 15 deletions.
  1. +3 −0 include/opendht/callbacks.h
  2. +12 −7 include/opendht/dht.h
  3. +2 −1 include/opendht/dhtrunner.h
  4. +61 −6 src/dht.cpp
  5. +1 −0 tools/dhtnode.cpp
  6. +6 −1 tools/tools_common.h
@@ -103,6 +103,9 @@ struct OPENDHT_PUBLIC Config {

/** Makes the DHT responsible to maintain its stored values. Consumes more ressources. */
bool maintain_storage;

/** If set, the dht will load its state from this file on start and save its state in this file on shutdown */
std::string persist_path;
};

/**
@@ -266,6 +266,9 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
std::vector<ValuesExport> exportValues() const;
void importValues(const std::vector<ValuesExport>&);

void saveState(const std::string& path) const;
void loadState(const std::string& path);

NodeStats getNodesStats(sa_family_t af) const;

std::string getStorageLog() const;
@@ -359,13 +362,6 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {
// registred types
TypeStore types;

// are we a bootstrap node ?
// note: Any running node can be used as a bootstrap node.
// Only nodes running only as bootstrap nodes should
// be put in bootstrap mode.
const bool is_bootstrap {false};
const bool maintain_storage {false};

// the stuff
RoutingTable buckets4 {};
RoutingTable buckets6 {};
@@ -400,6 +396,15 @@ class OPENDHT_PUBLIC Dht final : public DhtInterface {

std::mt19937_64 rd {crypto::getSeededRandomEngine<std::mt19937_64>()};

std::string persistPath;

// are we a bootstrap node ?
// note: Any running node can be used as a bootstrap node.
// Only nodes running only as bootstrap nodes should
// be put in bootstrap mode.
const bool is_bootstrap {false};
const bool maintain_storage {false};

void rotateSecrets();

Blob makeToken(const SockAddr&, bool old) const;
@@ -354,7 +354,8 @@ class OPENDHT_PUBLIC DhtRunner {
/*.node_id = */{},
/*.network = */network,
/*.is_bootstrap = */false,
/*.maintain_storage*/false
/*.maintain_storage = */false,
/*.persist_path = */{}
},
/*.id = */identity
},
@@ -29,6 +29,7 @@
#include <algorithm>
#include <random>
#include <sstream>
#include <fstream>

namespace dht {

@@ -54,6 +55,9 @@ Dht::getStatus(sa_family_t af) const
void
Dht::shutdown(ShutdownCallback cb)
{
if (not persistPath.empty())
saveState(persistPath);

if (not maintain_storage) {
if (cb) cb();
return;
@@ -1722,9 +1726,7 @@ Dht::~Dht()
Dht::Dht() : store(), network_engine(DHT_LOG, scheduler) {}

Dht::Dht(const int& s, const int& s6, Config config)
: myid(config.node_id ? config.node_id : InfoHash::getRandom()),
is_bootstrap(config.is_bootstrap),
maintain_storage(config.maintain_storage), store(), store_quota(),
: myid(config.node_id ? config.node_id : InfoHash::getRandom()), store(), store_quota(),
network_engine(myid, config.network, s, s6, DHT_LOG, scheduler,
std::bind(&Dht::onError, this, _1, _2),
std::bind(&Dht::onNewNode, this, _1, _2),
@@ -1734,7 +1736,10 @@ Dht::Dht(const int& s, const int& s6, Config config)
std::bind(&Dht::onGetValues, this, _1, _2, _3, _4),
std::bind(&Dht::onListen, this, _1, _2, _3, _4, _5),
std::bind(&Dht::onAnnounce, this, _1, _2, _3, _4, _5),
std::bind(&Dht::onRefresh, this, _1, _2, _3, _4))
std::bind(&Dht::onRefresh, this, _1, _2, _3, _4)),
persistPath(config.persist_path),
is_bootstrap(config.is_bootstrap),
maintain_storage(config.maintain_storage)
{
scheduler.syncTime();
if (s < 0 && s6 < 0)
@@ -1765,8 +1770,10 @@ Dht::Dht(const int& s, const int& s6, Config config)
expire();

DHT_LOG.d("DHT initialised with node ID %s", myid.toString().c_str());
}

if (not persistPath.empty())
loadState(persistPath);
}

bool
Dht::neighbourhoodMaintenance(RoutingTable& list)
@@ -2283,7 +2290,7 @@ Dht::onListen(Sp<Node> node, const InfoHash& hash, const Blob& token, size_t soc
}

void
Dht::onListenDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>& sr)
Dht::onListenDone(const Sp<Node>& /* node */, net::RequestAnswer& /* answer */, Sp<Search>& sr)
{
// DHT_LOG.d(sr->id, node->id, "[search %s] [node %s] got listen confirmation",
// sr->id.toString().c_str(), node->toString().c_str(), answer.values.size());
@@ -2424,4 +2431,52 @@ Dht::onAnnounceDone(const Sp<Node>& node, net::RequestAnswer& answer, Sp<Search>
sr->checkAnnounced(answer.vid);
}


void
Dht::saveState(const std::string& path) const
{
std::ofstream file(path);
msgpack::pack(file, exportNodes());
msgpack::pack(file, exportValues());
}

void
Dht::loadState(const std::string& path)
{
DHT_LOG.d("Importing state from %s", path.c_str());
try {
// Import nodes from binary file
msgpack::unpacker pac;
{
// Read whole file
std::ifstream file(path, std::ios::binary|std::ios::ate);
if (!file.is_open()) {
return;
}
auto size = file.tellg();
file.seekg (0, std::ios::beg);
pac.reserve_buffer(size);
file.read (pac.buffer(), size);
pac.buffer_consumed(size);
}
// Import nodes
msgpack::object_handle oh;
if (pac.next(oh)) {
{
auto imported_nodes = oh.get().as<std::vector<NodeExport>>();
DHT_LOG.d("Importing %zu nodes", imported_nodes.size());
for (const auto& node : imported_nodes)
insertNode(node);
}
if (pac.next(oh)) {
auto imported_values = oh.get().as<std::vector<ValuesExport>>();
DHT_LOG.d("Importing %zu values", imported_values.size());
importValues(imported_values);
}
}
} catch (const std::exception& e) {
DHT_LOG.w("Error importing state from %s: %s", path.c_str(), e.what());
}
}

}
@@ -500,6 +500,7 @@ main(int argc, char **argv)
dht::DhtRunner::Config config {};
config.dht_config.node_config.network = params.network;
config.dht_config.node_config.maintain_storage = false;
config.dht_config.node_config.persist_path = params.persist_path;
config.dht_config.id = crt;
config.threaded = true;
config.proxy_server = params.proxyclient;
@@ -103,6 +103,7 @@ struct dht_params {
std::string proxyclient {};
std::string pushserver {};
std::string devicekey {};
std::string persist_path {};
};

static const constexpr struct option long_options[] = {
@@ -114,6 +115,7 @@ static const constexpr struct option long_options[] = {
{"verbose", no_argument , nullptr, 'v'},
{"daemonize", no_argument , nullptr, 'd'},
{"service", no_argument , nullptr, 's'},
{"persist", required_argument, nullptr, 'f'},
{"logfile", required_argument, nullptr, 'l'},
{"syslog", no_argument , nullptr, 'L'},
{"proxyserver",required_argument, nullptr, 'S'},
@@ -127,7 +129,7 @@ dht_params
parseArgs(int argc, char **argv) {
dht_params params;
int opt;
while ((opt = getopt_long(argc, argv, "hidsvp:n:b:l:", long_options, nullptr)) != -1) {
while ((opt = getopt_long(argc, argv, "hidsvp:n:b:f:l:", long_options, nullptr)) != -1) {
switch (opt) {
case 'p': {
int port_arg = atoi(optarg);
@@ -154,6 +156,9 @@ parseArgs(int argc, char **argv) {
case 'D':
params.devicekey = optarg;
break;
case 'f':
params.persist_path = optarg;
break;
case 'n':
params.network = strtoul(optarg, nullptr, 0);
break;

0 comments on commit 04850e1

Please sign in to comment.
You can’t perform that action at this time.