Permalink
Browse files

proxy: only get old values at first subscribe

  • Loading branch information...
AmarOk1412 authored and aberaud committed Jan 17, 2019
1 parent d523e9f commit 75a14d360892b0267bc980deac8b62b8f868feaa
Showing with 68 additions and 35 deletions.
  1. +10 −2 include/opendht/dht_proxy_client.h
  2. +27 −17 src/dht_proxy_client.cpp
  3. +31 −16 src/dht_proxy_server.cpp
@@ -280,7 +280,15 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
bool doCancelListen(const InfoHash& key, size_t token);

struct ListenState;
void sendListen(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state, bool usePush = false);
enum class ListenMethod {
LISTEN,
SUBSCRIBE,
RESUBSCRIBE,
};
void sendListen(const std::shared_ptr<restbed::Request> &request,
const ValueCallback &, const Value::Filter &filter,
const Sp<ListenState> &state,
ListenMethod method = ListenMethod::LISTEN);

void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent);

@@ -372,7 +380,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
const std::function<void()> loopSignal_;

#if OPENDHT_PUSH_NOTIFICATIONS
void fillBody(std::shared_ptr<restbed::Request> request);
void fillBody(std::shared_ptr<restbed::Request> request, bool resubscribe);
void getPushRequest(Json::Value&) const;
#endif // OPENDHT_PUSH_NOTIFICATIONS

@@ -692,20 +692,24 @@ DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) {
return canceled;
}


void
DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request>& req, const ValueCallback& cb, const Value::Filter& filter, const Sp<ListenState>& state, bool usePush) {
auto settings = std::make_shared<restbed::Settings>();
if (usePush) {
req->set_method("SUBSCRIBE");
} else {
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
req->set_method("LISTEN");
}
try {
void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
const ValueCallback &cb,
const Value::Filter &filter,
const Sp<ListenState> &state,
ListenMethod method) {
auto settings = std::make_shared<restbed::Settings>();
if (method != ListenMethod::LISTEN) {
req->set_method("SUBSCRIBE");
} else {
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(
timeout); // Avoid the client to close the socket after 5 seconds.
req->set_method("LISTEN");
}
try {
#if OPENDHT_PUSH_NOTIFICATIONS
if (usePush) fillBody(req);
if (method != ListenMethod::LISTEN)
fillBody(req, method == ListenMethod::RESUBSCRIBE);
#endif
restbed::Http::async(req,
[this, filter, cb, state](const std::shared_ptr<restbed::Request>& req,
@@ -843,8 +847,10 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}
});
}
l->second.thread = std::thread([this, req, vcb, filter, state](){
sendListen(req, vcb, filter, state, !deviceKey_.empty());
l->second.thread = std::thread([this, req, vcb, filter, state]() {
sendListen(req, vcb, filter, state,
deviceKey_.empty() ? ListenMethod::LISTEN
: ListenMethod::SUBSCRIBE);
});
return token;
}
@@ -1057,7 +1063,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
auto vcb = listener.cb;
auto filter = listener.filter;
listener.thread = std::thread([this, req, vcb, filter, state]() {
sendListen(req, vcb, filter, state, true);
sendListen(req, vcb, filter, state, ListenMethod::RESUBSCRIBE);
});
#endif
}
@@ -1077,14 +1083,18 @@ DhtProxyClient::getPushRequest(Json::Value& body) const
}

void
DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req)
DhtProxyClient::fillBody(std::shared_ptr<restbed::Request> req, bool resubscribe)
{
// Fill body with
// {
// "key":"device_key",
// }
Json::Value body;
getPushRequest(body);
if (!resubscribe) {
// This is the first listen, we want to retrieve previous values.
body["previous_values"] = true;
}
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
@@ -429,22 +429,37 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
scheduler_.edit(listener.expireJob, timeout);
scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN);
s->yield(restbed::OK);
dht_->get(infoHash,
[this, s](const Sp<Value>& value) {
if (s->is_closed()) return false;
// Send values as soon as we get them
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
s->yield(output, [](const Sp<restbed::Session>& /*session*/){ });
return true;
}, [s](bool /*ok* */) {
// Communication is finished
if (not s->is_closed()) {
s->close("{}\n");
}
});

if (root.isMember("previous_values") &&
root["previous_values"].asBool()) {
dht_->get(
infoHash,
[this, s](const Sp<Value> &value) {
if (s->is_closed())
return false;
// Send values as soon as we get them
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
auto output = Json::writeString(
wbuilder, value->toJson()) +
"\n";
s->yield(output, [](const Sp<restbed::Session>
& /*session*/) {});
return true;
},
[s](bool /*ok* */) {
// Communication is finished
if (not s->is_closed()) {
s->close("{}\n");
}
});
} else {
// Communication is finished
if (not s->is_closed()) {
s->close("{}\n");
}
}
schedulerCv_.notify_one();
return;
}

0 comments on commit 75a14d3

Please sign in to comment.