Skip to content

Commit

Permalink
cancel request when receiving token error
Browse files Browse the repository at this point in the history
  • Loading branch information
aberaud committed May 2, 2016
1 parent 995196c commit 20662a9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 21 deletions.
25 changes: 17 additions & 8 deletions include/opendht/network_engine.h
Expand Up @@ -217,6 +217,13 @@ class NetworkEngine final {
and now - last_try <= Node::MAX_RESPONSE_TIME;
}

void cancel() {
if (not completed) {
cancelled = true;
clear();
}
}

Request() {}

private:
Expand All @@ -227,11 +234,17 @@ class NetworkEngine final {
std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired, bool persistent = false) :
node(node), on_done(on_done), on_expired(on_expired), tid(tid), msg(std::move(msg)), persistent(persistent) { }

void clear() {
on_done = {};
on_expired = {};
msg.clear();
}

std::function<void(std::shared_ptr<Request> req_status, ParsedMessage&&)> on_done {};
std::function<void(std::shared_ptr<Request> req_status, bool)> on_expired {};

const uint16_t tid {0}; /* the request id. */
const Blob msg {}; /* the serialized message. */
Blob msg {}; /* the serialized message. */
const bool persistent {false}; /* the request is not erased upon completion. */
};

Expand All @@ -241,9 +254,7 @@ class NetworkEngine final {
*/
void cancelRequest(std::shared_ptr<Request>& req) {
if (req) {
req->cancelled = true;
req->on_done = {};
req->on_expired = {};
req->cancel();
requests.erase(req->tid);
}
}
Expand Down Expand Up @@ -356,10 +367,8 @@ class NetworkEngine final {
};

void clear() {
for (auto& req : requests) {
req.second->on_expired = {};
req.second->on_done = {};
}
for (auto& req : requests)
req.second->cancel();
requests.clear();
}

Expand Down
14 changes: 6 additions & 8 deletions src/dht.cpp
Expand Up @@ -2524,17 +2524,15 @@ Dht::pingNode(const sockaddr *sa, socklen_t salen)
}

void
Dht::onError(std::shared_ptr<NetworkEngine::Request> status, DhtProtocolException e) {
Dht::onError(std::shared_ptr<NetworkEngine::Request> req, DhtProtocolException e) {
if (e.getCode() == DhtProtocolException::UNAUTHORIZED) {
//TODO
//auto esr = searches.find(status);
//if (esr == searches.end()) return;
network_engine.cancelRequest(req);
unsigned cleared = 0;
for (auto& srp : status->node->ss.ss_family == AF_INET ? searches4 : searches6) {
for (auto& srp : req->node->ss.ss_family == AF_INET ? searches4 : searches6) {
auto& sr = srp.second;
for (auto& n : sr->nodes) {
if (n.node != status->node) continue;
n.getStatus = {};
if (n.node != req->node) continue;
network_engine.cancelRequest(n.getStatus);

This comment has been minimized.

Copy link
@sim590

sim590 May 5, 2016

Contributor

@aberaud: is line 2535 meant to cancel a possible diffrent request than req, in case a new request would have been issued before receiving an error for req? Since I'm working on queries and now keeping track of multiple requests for a searchnode, would you say the desired behavior here would be to cancel all get requests associated to this SearchNode ?

This comment has been minimized.

Copy link
@aberaud

aberaud May 5, 2016

Author Member

Yes, a token is valid for all searches for a given node. If it expires, it should be renewed for all requests.

n.last_get_reply = time_point::min();
cleared++;
if (searchSendGetValues(sr))
Expand All @@ -2543,7 +2541,7 @@ Dht::onError(std::shared_ptr<NetworkEngine::Request> status, DhtProtocolExceptio
}
}
DHT_LOG.WARN("[node %s %s] token flush (%d searches affected)",
status->node->id.toString().c_str(), print_addr((sockaddr*)&status->node->ss, status->node->sslen).c_str(), cleared);
req->node->id.toString().c_str(), print_addr((sockaddr*)&req->node->ss, req->node->sslen).c_str(), cleared);
}
}

Expand Down
11 changes: 6 additions & 5 deletions src/network_engine.cpp
Expand Up @@ -137,6 +137,8 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr
if (reqp == requests.end())
throw DhtProtocolException {DhtProtocolException::UNKNOWN_TID, "Can't find transaction", msg.id};
auto req = reqp->second;
if (req->cancelled)
return;

auto node = onNewNode(msg.id, from, fromlen, 2);
onReportedAddr(msg.id, (sockaddr*)&msg.addr.first, msg.addr.second);
Expand All @@ -158,15 +160,14 @@ NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const sockaddr
break;
}
case MessageType::Reply:
if (not reqp->second->persistent or reqp->second->cancelled)
// erase before calling callback to make sure iterator is still valid
if (not req->persistent)
requests.erase(reqp);
req->reply_time = scheduler.time();
req->completed = true;
req->on_done(req, std::move(msg));
if (not req->persistent) {
req->on_done = {};
req->on_expired = {};
}
if (not req->persistent)
req->clear();
break;
default:
break;
Expand Down

0 comments on commit 20662a9

Please sign in to comment.