Skip to content
Permalink
Browse files

proxy client: remove unneeded filter

Filters are applied at the SearchCache level
  • Loading branch information...
aberaud committed Jul 1, 2019
1 parent be0baef commit b64820af593f73e96b0afe19c9341de0125597dc
Showing with 24 additions and 24 deletions.
  1. +1 −1 include/opendht/dht_proxy_client.h
  2. +23 −23 src/dht_proxy_client.cpp
@@ -283,7 +283,7 @@ class OPENDHT_PUBLIC DhtProxyClient final : public DhtInterface {
RESUBSCRIBE,
};
void sendListen(const std::shared_ptr<restbed::Request> &request,
const ValueCallback &, const Value::Filter &filter,
const ValueCallback &,
const Sp<ListenState> &state,
ListenMethod method = ListenMethod::LISTEN);

@@ -45,14 +45,13 @@ struct DhtProxyClient::Listener
{
OpValueCache cache;
ValueCallback cb;
Value::Filter filter;
Sp<restbed::Request> req;
std::thread thread;
unsigned callbackId;
Sp<ListenState> state;
Sp<Scheduler::Job> refreshJob;
Listener(OpValueCache&& c, const Sp<restbed::Request>& r, Value::Filter&& f)
: cache(std::move(c)), filter(std::move(f)),req(r) {}
Listener(OpValueCache&& c, const Sp<restbed::Request>& r)
: cache(std::move(c)), req(r) {}
};

struct PermanentPut {
@@ -690,50 +689,51 @@ size_t
DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filter, Where where) {
DHT_LOG.d(key, "[search %s]: listen", key.to_c_str());
auto& search = searches_[key];
auto query = std::make_shared<Query>(Select{}, where);
auto token = search.ops.listen(cb, query, filter, [this, key, filter](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t {
auto query = std::make_shared<Query>(Select{}, std::move(where));
auto token = search.ops.listen(cb, query, filter, [this, key](Sp<Query> /*q*/, ValueCallback cb, SyncCallback /*scb*/) -> size_t {
scheduler.syncTime();
restbed::Uri uri(serverHost_ + "/" + key.toString());
std::lock_guard<std::mutex> lock(searchLock_);
// Find search
auto search = searches_.find(key);
if (search == searches_.end()) {
DHT_LOG.e(key, "[search %s] listen: search not found", key.to_c_str());
return 0;
}
DHT_LOG.d(key, "[search %s] sending %s", key.to_c_str(), deviceKey_.empty() ? "listen" : "subscribe");

// Add listener
auto req = std::make_shared<restbed::Request>(uri);
auto token = ++listenerToken_;
auto l = search->second.listeners.find(token);
if (l == search->second.listeners.end()) {
auto f = filter;
l = search->second.listeners.emplace(std::piecewise_construct,
std::forward_as_tuple(token),
std::forward_as_tuple(std::move(cb), req, std::move(f))).first;
std::forward_as_tuple(std::move(cb), req)).first;
} else {
if (l->second.state)
l->second.state->cancel = true;
l->second.req = req;
}

// Add callback
auto state = std::make_shared<ListenState>();
l->second.state = state;
l->second.cb = [this,key,token,state](const std::vector<Sp<Value>>& values, bool expired) {
if (state->cancel)
return false;
std::lock_guard<std::mutex> lock(searchLock_);
auto s = searches_.find(key);
if (s == searches_.end()) {
return false;
}
auto l = s->second.listeners.find(token);
if (l == s->second.listeners.end()) {
return false;
if (s != searches_.end()) {
auto l = s->second.listeners.find(token);
if (l != s->second.listeners.end()) {
return l->second.cache.onValue(values, expired);
}
}
return l->second.cache.onValue(values, expired);
return false;
};

auto vcb = l->second.cb;
l->second.req = req;

if (not deviceKey_.empty()) {
// Relaunch push listeners even if a timeout is not received (if the proxy crash for any reason)
@@ -750,8 +750,10 @@ DhtProxyClient::listen(const InfoHash& key, ValueCallback cb, Value::Filter filt
}
});
}
l->second.thread = std::thread([this, req, vcb, filter, state]() {
sendListen(req, vcb, filter, state,

// Send listen to servers
l->second.thread = std::thread([this, req, vcb, state]() {
sendListen(req, vcb, state,
deviceKey_.empty() ? ListenMethod::LISTEN : ListenMethod::SUBSCRIBE);
});
return token;
@@ -788,7 +790,6 @@ DhtProxyClient::cancelListen(const InfoHash& key, size_t gtoken) {

void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
const ValueCallback &cb,
const Value::Filter &filter,
const Sp<ListenState> &state,
ListenMethod method) {
auto settings = std::make_shared<restbed::Settings>();
@@ -805,7 +806,7 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
fillBody(req, method == ListenMethod::RESUBSCRIBE);
#endif
restbed::Http::async(req,
[this, filter, cb, state](const std::shared_ptr<restbed::Request>& req,
[this, cb, state](const std::shared_ptr<restbed::Request>& req,
const std::shared_ptr<restbed::Response>& reply)
{
auto code = reply->get_status_code();
@@ -830,7 +831,7 @@ void DhtProxyClient::sendListen(const std::shared_ptr<restbed::Request> &req,
}
auto expired = json.get("expired", Json::Value(false)).asBool();
auto value = std::make_shared<Value>(json);
if ((not filter or filter(*value)) and cb) {
if (cb) {
{
std::lock_guard<std::mutex> lock(lockCallbacks);
callbacks_.emplace_back([cb, value, state, expired]() {
@@ -998,14 +999,13 @@ DhtProxyClient::restartListeners()
// Redo listen
state->cancel = false;
state->ok = true;
auto filter = listener.filter;
auto cb = listener.cb;
restbed::Uri uri(serverHost_ + "/" + search.first.toString());
auto req = std::make_shared<restbed::Request>(uri);
req->set_method("LISTEN");
listener.req = req;
listener.thread = std::thread([this, req, cb, filter, state]() {
sendListen(req, cb, filter, state);
listener.thread = std::thread([this, req, cb, state]() {
sendListen(req, cb, state);
});
}
}

0 comments on commit b64820a

Please sign in to comment.
You can’t perform that action at this time.