Skip to content

Commit

Permalink
dhtproxy: remove sendSubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
AmarOk1412 authored and aberaud committed Jan 14, 2019
1 parent 70f09de commit 3a3933e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 96 deletions.
3 changes: 1 addition & 2 deletions include/opendht/dht_proxy_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
bool doCancelListen(const InfoHash& key, size_t token);

struct ListenState;
void sendListen(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state);
void sendSubscribe(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state);
void sendListen(const std::shared_ptr<restbed::Request>& request, const ValueCallback&, const Value::Filter& filter, const Sp<ListenState>& state, bool usePush = false);

void doPut(const InfoHash&, Sp<Value>, DoneCallback, time_point created, bool permanent);

Expand Down
109 changes: 19 additions & 90 deletions src/dht_proxy_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -694,12 +694,19 @@ DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) {


void
DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request>& req, const ValueCallback& cb, const Value::Filter& filter, const Sp<ListenState>& state) {
DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request>& req, const ValueCallback& cb, const Value::Filter& filter, const Sp<ListenState>& state, bool usePush) {
auto settings = std::make_shared<restbed::Settings>();
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
req->set_method("LISTEN");
if (usePush) {
req->set_method("SUBSCRIBE");
} else {
std::chrono::milliseconds timeout(std::numeric_limits<int>::max());
settings->set_connection_timeout(timeout); // Avoid the client to close the socket after 5 seconds.
req->set_method("LISTEN");
}
try {
#if OPENDHT_PUSH_NOTIFICATIONS
if (usePush) fillBody(req);
#endif
restbed::Http::async(req,
[this, filter, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply) {
Expand All @@ -714,84 +721,13 @@ DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request>& req, const V
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch

Json::Value json;
std::string err;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
auto expired = json.get("expired", Json::Value(false)).asBool();
auto value = std::make_shared<Value>(json);
if ((not filter or filter(*value)) and cb) {
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, state, expired]() {
if (not state->cancel and not cb({value}, expired))
state->cancel = true;
});
loopSignal_();
}
}
}
} catch (const std::exception& e) {
if (not state->cancel) {
DHT_LOG.w("Listen closed by the proxy server: %s", e.what());
state->ok = false;
}
}
} else {
state->ok = false;
}
}, settings).get();
} catch (const std::exception& e) {
state->ok = false;
}
auto& s = *state;
if (not s.ok and not s.cancel)
opFailed();
}

void
DhtProxyClient::sendSubscribe(const std::shared_ptr<restbed::Request>& req, const ValueCallback& cb, const Value::Filter& filter, const Sp<ListenState>& state) {
#if OPENDHT_PUSH_NOTIFICATIONS
req->set_method("SUBSCRIBE");
try {
fillBody(req);
restbed::Http::async(req, [this, state, cb, filter, req](const std::shared_ptr<restbed::Request>&,
const std::shared_ptr<restbed::Response>& reply) {

auto code = reply->get_status_code();
if (code == 200) {
try {
DHT_LOG.e("################1.");
while (restbed::Http::is_open(req) and not state->cancel) {
DHT_LOG.e("################2.");
restbed::Http::fetch("\n", reply);
DHT_LOG.e("################3.");
auto code = reply->get_status_code();
DHT_LOG.e("################4.");
state->ok = code == 200;
if (state->cancel) {
DHT_LOG.e("################5.");

break;
}
std::string body;
reply->get_body(body);
reply->set_body(""); // Reset the body for the next fetch
DHT_LOG.e("###BODY: %s\n", body.c_str());;
if (body.empty() || body == "{}") {
DHT_LOG.e("################6.");
break;

}
DHT_LOG.e("################7.");

Json::Value json;
std::string err;
Json::CharReaderBuilder rbuilder;
auto reader = std::unique_ptr<Json::CharReader>(rbuilder.newCharReader());
if (reader->parse(body.data(), body.data() + body.size(), &json, &err)) {
if (json.size() == 0) {
DHT_LOG.e("################*.");
// Empty value, it's the end
break;
}
auto expired = json.get("expired", Json::Value(false)).asBool();
Expand All @@ -808,21 +744,20 @@ DhtProxyClient::sendSubscribe(const std::shared_ptr<restbed::Request>& req, cons
}
} catch (const std::exception& e) {
if (not state->cancel) {
DHT_LOG.e("sendSubscribe: error: %s", e.what());
DHT_LOG.w("Listen closed by the proxy server: %s", e.what());
state->ok = false;
}
}
} else {
state->ok = false;
}
}).get();
} catch(const std::exception& e) {
}, settings).get();
} catch (const std::exception& e) {
state->ok = false;
}
auto& s = *state;
if (not s.ok and not s.cancel)
opFailed();
#endif
}

size_t
Expand Down Expand Up @@ -907,16 +842,10 @@ DhtProxyClient::doListen(const InfoHash& key, ValueCallback cb, Value::Filter fi
}
}
});
auto vcb = l->second.cb;
auto filter = l->second.filter;
l->second.thread = std::thread([this, req, vcb, filter, state](){
sendSubscribe(req, vcb, filter, state);
});
} else {
l->second.thread = std::thread([this, req, vcb, filter, state]{
sendListen(req, vcb, filter, state);
});
}
l->second.thread = std::thread([this, req, vcb, filter, state](){
sendListen(req, vcb, filter, state, !deviceKey_.empty());
});
return token;
}

Expand Down Expand Up @@ -1128,7 +1057,7 @@ DhtProxyClient::resubscribe(const InfoHash& key, Listener& listener)
auto vcb = listener.cb;
auto filter = listener.filter;
listener.thread = std::thread([this, req, vcb, filter, state]() {
sendSubscribe(req, vcb, filter, state);
sendListen(req, vcb, filter, state, true);
});
#endif
}
Expand Down
7 changes: 3 additions & 4 deletions src/dht_proxy_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ DhtProxyServer::get(const Sp<restbed::Session>& session) const
Json::StreamWriterBuilder wbuilder;
wbuilder["commentStyle"] = "None";
wbuilder["indentation"] = "";
auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
auto output = Json::writeString(wbuilder, value->toJson()) + "\n";
s->yield(output, [](const Sp<restbed::Session>& /*session*/){ });
return true;
}, [s](bool /*ok* */) {
Expand Down Expand Up @@ -428,6 +428,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
if (listener.clientId == clientId) {
scheduler_.edit(listener.expireJob, timeout);
scheduler_.edit(listener.expireNotifyJob, timeout - proxy::OP_MARGIN);
s->yield(restbed::OK);
dht_->get(infoHash,
[this, s](const Sp<Value>& value) {
if (s->is_closed()) return false;
Expand All @@ -441,8 +442,7 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
}, [s](bool /*ok* */) {
// Communication is finished
if (not s->is_closed()) {
std::cout << "X: " << std::endl;
s->close();
s->close("{}\n");
}
});
schedulerCv_.notify_one();
Expand Down Expand Up @@ -483,7 +483,6 @@ DhtProxyServer::subscribe(const std::shared_ptr<restbed::Session>& session)
);
}
schedulerCv_.notify_one();
std::cout << "CLOSE: " << std::endl;
s->close(restbed::OK, "{}\n");
} catch (...) {
s->close(restbed::INTERNAL_SERVER_ERROR, "{\"err\":\"Internal server error\"}");
Expand Down

0 comments on commit 3a3933e

Please sign in to comment.