Skip to content

Commit

Permalink
network: add signaling for expired, refreshed values
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed Mar 15, 2018
1 parent 80b944d commit 1617c79
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 21 deletions.
7 changes: 6 additions & 1 deletion include/opendht/network_engine.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2014-2017 Savoir-faire Linux Inc.
* Copyright (C) 2014-2018 Savoir-faire Linux Inc.
* Author(s) : Adrien Béraud <adrien.beraud@savoirfairelinux.com>
* Simon Désaulniers <simon.desaulniers@savoirfairelinux.com>
*
Expand Down Expand Up @@ -87,6 +87,8 @@ struct RequestAnswer {
Blob ntoken {};
Value::Id vid {};
std::vector<Sp<Value>> values {};
std::vector<Value::Id> refreshed_values {};
std::vector<Value::Id> expired_values {};
std::vector<Sp<FieldValueIndex>> fields {};
std::vector<Sp<Node>> nodes4 {};
std::vector<Sp<Node>> nodes6 {};
Expand Down Expand Up @@ -235,6 +237,9 @@ class NetworkEngine final
std::vector<Sp<Node>>&& nodes, std::vector<Sp<Node>>&& nodes6,
std::vector<Sp<Value>>&& values, const Query& q);

void tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);
void tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& ntoken, const std::vector<Value::Id>& values);

bool isRunning(sa_family_t af) const;
inline want_t want () const { return dht_socket >= 0 && dht_socket6 >= 0 ? (WANT4 | WANT6) : -1; }

Expand Down
37 changes: 31 additions & 6 deletions src/dht.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1242,11 +1242,33 @@ Dht::storageAddListener(const InfoHash& id, const Sp<Node>& node, size_t socket_
void
Dht::expireStore(decltype(store)::iterator i)
{
auto stats = i->second.expire(i->first, scheduler.time());
total_store_size += stats.size_diff;
total_values += stats.values_diff;
if (stats.values_diff) {
DHT_LOG.d(i->first, "[store %s] discarded %ld expired values (%ld bytes)", i->first.toString().c_str(), -stats.values_diff, -stats.size_diff);
const auto& id = i->first;
auto& st = i->second;
auto stats = st.expire(id, scheduler.time());
total_store_size += stats.first;
total_values -= stats.second.size();
if (not stats.second.empty()) {
DHT_LOG.d(id, "[store %s] discarded %ld expired values (%ld bytes)",
id.toString().c_str(), stats.second.size(), -stats.first);

if (not st.listeners.empty()) {
DHT_LOG.d(id, "[store %s] %lu remote listeners", id.toString().c_str(), st.listeners.size());

std::vector<Value::Id> ids;
ids.reserve(stats.second.size());
for (const auto& v : stats.second)
ids.emplace_back(v->id);

for (const auto& node_listeners : st.listeners) {
for (const auto& l : node_listeners.second) {
DHT_LOG.w(id, node_listeners.first->id, "[store %s] [node %s] sending expired",
id.toString().c_str(),
node_listeners.first->toString().c_str());
Blob ntoken = makeToken(node_listeners.first->getAddr(), false);
network_engine.tellListenerExpired(node_listeners.first, l.first, id, ntoken, ids);
}
}
}
}
}

Expand Down Expand Up @@ -1872,7 +1894,7 @@ Dht::periodic(const uint8_t *buf, size_t buflen, const SockAddr& from)
try {
network_engine.processMessage(buf, buflen, from);
} catch (const std::exception& e) {
DHT_LOG.e("Can't parse message from %s: %s", from.toString().c_str(), e.what());
DHT_LOG.e("Can't process message from %s: %s", from.toString().c_str(), e.what());
}
}
return scheduler.run();
Expand Down Expand Up @@ -2182,6 +2204,9 @@ void Dht::onGetValuesDone(const Sp<Node>& node,
}
for (auto& l : tmp_lists)
l.first(l.second);
} else if (not a.expired_values.empty()) {
DHT_LOG.w(sr->id, node->id, "[search %s] [node %s] %u expired values",
sr->id.toString().c_str(), node->toString().c_str(), a.expired_values.size());
}
} else {
DHT_LOG.w(sr->id, "[node %s] no token provided. Ignoring response content.", node->toString().c_str());
Expand Down
87 changes: 77 additions & 10 deletions src/network_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,22 @@ serializeValues(const std::vector<Sp<Value>>& st)
return svals;
}

void
packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token)
{
pk.pack_bin(token.size());
pk.pack_bin_body((char*)token.data(), token.size());
}

RequestAnswer::RequestAnswer(ParsedMessage&& msg)
: ntoken(std::move(msg.token)), values(std::move(msg.values)), fields(std::move(msg.fields)),
nodes4(std::move(msg.nodes4)), nodes6(std::move(msg.nodes6)) {}
: ntoken(std::move(msg.token)),
values(std::move(msg.values)),
refreshed_values(std::move(msg.refreshed_values)),
expired_values(std::move(msg.expired_values)),
fields(std::move(msg.fields)),
nodes4(std::move(msg.nodes4)),
nodes6(std::move(msg.nodes6))
{}

NetworkEngine::NetworkEngine(Logger& log, Scheduler& scheduler, const int& s, const int& s6)
: myid(zeroes), DHT_LOG(log), scheduler(scheduler), dht_socket(s), dht_socket6(s6)
Expand Down Expand Up @@ -171,6 +184,67 @@ NetworkEngine::tellListener(Sp<Node> node, Tid socket_id, const InfoHash& hash,
}
}

void
NetworkEngine::tellListenerRefreshed(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& token, const std::vector<Value::Id>& values)
{
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_map(4+(network?1:0));

pk.pack(std::string("u"));
pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0));
pk.pack(std::string("id")); pk.pack(myid);
if (not token.empty()) {
pk.pack(std::string("token")); packToken(pk, token);
}
if (not values.empty()) {
pk.pack(std::string("re"));
pk.pack(values);
DHT_LOG.d(n->id, "[node %s] sending %zu refreshed values", n->toString().c_str(), values.size());
}

pk.pack(std::string("t")); pk.pack(socket_id);
pk.pack(std::string("y")); pk.pack(std::string("r"));
pk.pack(std::string("v")); pk.pack(my_v);
if (network) {
pk.pack(std::string("n")); pk.pack(network);
}

// send response
send(buffer.data(), buffer.size(), 0, n->getAddr());
}

void
NetworkEngine::tellListenerExpired(Sp<Node> n, Tid socket_id, const InfoHash& hash, const Blob& token, const std::vector<Value::Id>& values)
{
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> pk(&buffer);
pk.pack_map(4+(network?1:0));

pk.pack(std::string("u"));
pk.pack_map(1 + (not values.empty()?1:0) + (not token.empty()?1:0));
pk.pack(std::string("id")); pk.pack(myid);
if (not token.empty()) {
pk.pack(std::string("token")); packToken(pk, token);
}
if (not values.empty()) {
pk.pack(std::string("exp"));
pk.pack(values);
DHT_LOG.d(n->id, "[node %s] sending %zu expired values", n->toString().c_str(), values.size());
}

pk.pack(std::string("t")); pk.pack(socket_id);
pk.pack(std::string("y")); pk.pack(std::string("r"));
pk.pack(std::string("v")); pk.pack(my_v);
if (network) {
pk.pack(std::string("n")); pk.pack(network);
}

// send response
send(buffer.data(), buffer.size(), 0, n->getAddr());
}


bool
NetworkEngine::isRunning(sa_family_t af) const
{
Expand Down Expand Up @@ -337,7 +411,7 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr&
msgpack::unpacked msg_res = msgpack::unpack((const char*)buf, buflen);
msg->msgpack_unpack(msg_res.get());
} catch (const std::exception& e) {
DHT_LOG.w("Can't process message of size %lu: %s", buflen, e.what());
DHT_LOG.w("Can't parse message of size %lu: %s", buflen, e.what());
DHT_LOG.DEBUG.logPrintable(buf, buflen);
return;
}
Expand Down Expand Up @@ -550,13 +624,6 @@ NetworkEngine::process(std::unique_ptr<ParsedMessage>&& msg, const SockAddr& fro
}
}

void
packToken(msgpack::packer<msgpack::sbuffer>& pk, const Blob& token)
{
pk.pack_bin(token.size());
pk.pack_bin_body((char*)token.data(), token.size());
}

void
insertAddr(msgpack::packer<msgpack::sbuffer>& pk, const SockAddr& addr)
{
Expand Down
6 changes: 6 additions & 0 deletions src/parsed_message.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ struct ParsedMessage {
std::vector<Sp<Node>> nodes4, nodes6;
/* values to store or retreive request */
std::vector<Sp<Value>> values;
std::vector<Value::Id> refreshed_values {};
std::vector<Value::Id> expired_values {};
/* index for fields values */
std::vector<Sp<FieldValueIndex>> fields;
/** When part of the message header: {index -> (total size, {})}
Expand Down Expand Up @@ -282,6 +284,10 @@ ParsedMessage::msgpack_unpack(msgpack::object msg)
} else {
throw msgpack::type_error();
}
} else if (auto raw_fields = findMapValue(req, "exp")) {
expired_values = raw_fields->as<decltype(expired_values)>();
} else if (auto raw_fields = findMapValue(req, "re")) {
refreshed_values = raw_fields->as<decltype(refreshed_values)>();
}

if (auto w = findMapValue(req, "w")) {
Expand Down
10 changes: 6 additions & 4 deletions src/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ struct Storage {

StoreDiff remove(const InfoHash& id, Value::Id);

StoreDiff expire(const InfoHash& id, time_point now);
std::pair<ssize_t, std::vector<Sp<Value>>> expire(const InfoHash& id, time_point now);

private:
Storage(const Storage&) = delete;
Expand Down Expand Up @@ -245,7 +245,7 @@ Storage::clear()
return {-tot_size, -num_values, 0};
}

Storage::StoreDiff
std::pair<ssize_t, std::vector<Sp<Value>>>
Storage::expire(const InfoHash& id, time_point now)
{
// expire listeners
Expand All @@ -271,16 +271,18 @@ Storage::expire(const InfoHash& id, time_point now)
auto r = std::partition(values.begin(), values.end(), [&](const ValueStorage& v) {
return v.expiration > now;
});
ssize_t del_num = -std::distance(r, values.end());
std::vector<Sp<Value>> ret;
ret.reserve(std::distance(r, values.end()));
ssize_t size_diff {};
std::for_each(r, values.end(), [&](const ValueStorage& v) {
size_diff -= v.data->size();
if (v.store_bucket)
v.store_bucket->erase(id, *v.data, v.expiration);
ret.emplace_back(std::move(v.data));
});
total_size += size_diff;
values.erase(r, values.end());
return {size_diff, del_num, del_listen};
return {size_diff, std::move(ret)};
}

}

0 comments on commit 1617c79

Please sign in to comment.