Skip to content

Commit

Permalink
Bookkeeping for reachability of nodes; reporting them to Lokid
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Sep 2, 2019
1 parent d05e65e commit effcd7e
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 17 deletions.
13 changes: 9 additions & 4 deletions common/include/loki_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@

#include <boost/optional.hpp>

// TODO: this should be a proper struct w/o heap allocation!
using sn_pub_key_t = std::string;

struct sn_record_t {

// our 32 byte pub keys should always be 52 bytes long in base32z
static constexpr size_t BASE_LEN = 52;

private:
uint16_t port_;
std::string sn_address_; // Snode address
std::string pub_key_;
std::string sn_address_; // Snode address (pubkey plus .snode)
std::string pub_key_; // base32z
std::string pub_key_hex_;
std::string ip_; // Snode ip
public:
sn_record_t(uint16_t port, const std::string& address,
const std::string& ip)
: port_(port), ip_(ip) {
const std::string& pk_hex, const std::string& ip)
: port_(port), pub_key_hex_(pk_hex), ip_(ip) {
set_address(address);
}

Expand All @@ -45,6 +49,7 @@ struct sn_record_t {
uint16_t port() const { return port_; }
const std::string& sn_address() const { return sn_address_; }
const std::string& pub_key() const { return pub_key_; }
const std::string& pub_key_hex() const { return pub_key_hex_; }
const std::string& ip() const { return ip_; }

template <typename OStream>
Expand Down
2 changes: 2 additions & 0 deletions httpserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(HEADER_FILES
command_line.h
net_stats.h
dns_text_records.h
reachability_testing.h
)

set(SRC_FILES
Expand All @@ -32,6 +33,7 @@ set(SRC_FILES
security.cpp
command_line.cpp
dns_text_records.cpp
reachability_testing.cpp
)

add_library(httpserver_lib STATIC ${HEADER_FILES} ${SRC_FILES})
Expand Down
2 changes: 1 addition & 1 deletion httpserver/dns_text_records.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void check_latest_version() {
} else {
LOKI_LOG(debug,
"You are using the latest version of the storage server ({})",
latest_version_str);
STORAGE_SERVER_VERSION_STRING);
}
}

Expand Down
2 changes: 2 additions & 0 deletions httpserver/https_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void make_https_request(boost::asio::io_context& ioc,
if (sn_address == "0.0.0.0") {
LOKI_LOG(warn, "Could not initiate request to snode (we don't know "
"their IP yet).");

cb(sn_response_t{SNodeError::NO_REACH, nullptr});
return;
}

Expand Down
92 changes: 92 additions & 0 deletions httpserver/reachability_testing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@

#include "loki_logger.h"
#include "reachability_testing.h"

using std::chrono::steady_clock;
using namespace std::chrono_literals;

namespace loki {

namespace detail {

reach_record_t::reach_record_t() {
this->first_failure = steady_clock::now();
this->last_tested = this->first_failure;
}

} // namespace detail

/// How long to wait until reporting unreachable nodes to Lokid
constexpr std::chrono::minutes UNREACH_GRACE_PERIOD = 120min;

bool reachability_records_t::record_unreachable(const sn_pub_key_t& sn) {

auto it = offline_nodes_.find(sn);

if (it == offline_nodes_.end()) {
LOKI_LOG(info, "adding a new node to UNREACHABLE: {}", sn);
offline_nodes_.insert({sn, {}});
} else {
LOKI_LOG(info, "node is ALREAY known to be UNREACHABLE: {}", sn);

it->second.last_tested = steady_clock::now();

const auto elapsed = it->second.last_tested - it->second.first_failure;
const auto elapsed_sec =
std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
LOKI_LOG(info, " - first time failed {} seconds ago", elapsed_sec);

/// TODO: Might still want to report as unreachable since this status
/// gets reset to `true` on Lokid restart
if (elapsed > UNREACH_GRACE_PERIOD && !it->second.reported) {
LOKI_LOG(warn, " - will REPORT this node to Lokid!");
return true;
} else {
if (it->second.reported) {
LOKI_LOG(warn, " - Already reported node: {}", sn);
}
}
}

return false;
}

bool reachability_records_t::record_reachable(const sn_pub_key_t& sn) {
expire(sn);
}

bool reachability_records_t::expire(const sn_pub_key_t& sn) {

if (offline_nodes_.erase(sn)) {
LOKI_LOG(warn, " - removed entry for {}", sn);
}

}

void reachability_records_t::set_reported(const sn_pub_key_t& sn) {

auto it = offline_nodes_.find(sn);
if (it != offline_nodes_.end()) {
it->second.reported = true;
}
}

boost::optional<sn_pub_key_t> reachability_records_t::next_to_test() {

const auto it = std::min_element(
offline_nodes_.begin(), offline_nodes_.end(),
[&](const auto& lhs, const auto& rhs) {
return lhs.second.last_tested < rhs.second.last_tested;
});

if (it == offline_nodes_.end()) {
return boost::none;
} else {

LOKI_LOG(warn, "~~~ Selecting to be re-tested: {}", it->first);

return it->first;
}
}

} // namespace loki
59 changes: 59 additions & 0 deletions httpserver/reachability_testing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include "loki_common.h"
#include <unordered_map>
#include <chrono>

namespace loki {

namespace detail {

/// TODO: make this class "private"?
class reach_record_t {

// The time the node failed for the first time
// (and hasn't come back online)

using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;

public:
time_point_t first_failure;
time_point_t last_tested;
// whether it's been reported to Lokid
bool reported = false;

reach_record_t();

};
} // namespace detail

class reachability_records_t {

// TODO: sn_records are heavy (3 strings), so how about we only store the
// pubkey?

// Nodes that failed the reachability test
// Note: I don't expect this list to be large, so
// `std::vector` is probably faster than `std::set` here
std::unordered_map<sn_pub_key_t, detail::reach_record_t> offline_nodes_;

public:
// Return nodes that should be tested first: decommissioned nodes
// and those that failed our earlier tests (but not reported yet)
// std::vector<> priority_nodes() const;

// Records node as unreachable, return `true` if the node should be
// reported to Lokid as being unreachable for a long time
bool record_unreachable(const sn_pub_key_t& sn);

bool record_reachable(const sn_pub_key_t& sn);

bool expire(const sn_pub_key_t& sn);

void set_reported(const sn_pub_key_t& sn);

// Retrun the least recently tested node
boost::optional<sn_pub_key_t> next_to_test();
};

} // namespace loki
122 changes: 112 additions & 10 deletions httpserver/service_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,12 @@ parse_swarm_update(const std::shared_ptr<std::string>& response_body) {

const swarm_id_t swarm_id =
sn_json.at("swarm_id").get<swarm_id_t>();
std::string snode_address = util::hex64_to_base32z(pubkey);
std::string snode_address = util::hex_to_base32z(pubkey);

const uint16_t port = sn_json.at("storage_port").get<uint16_t>();
const std::string snode_ip =
sn_json.at("public_ip").get<std::string>();
const sn_record_t sn{port, std::move(snode_address),
const sn_record_t sn{port, std::move(snode_address), pubkey,
std::move(snode_ip)};

swarm_map[swarm_id].push_back(sn);
Expand Down Expand Up @@ -697,23 +697,44 @@ void ServiceNode::ping_peers_tick() {
this->peer_ping_timer_.async_wait(
std::bind(&ServiceNode::ping_peers_tick, this));

auto node = swarm_->choose_other_node();
/// We always test one node already known to be offline
/// plus one random other node (could even be the same node)

if (!node) {
LOKI_LOG(debug, "No nodes to ping test yet");
return;
const auto other_node = swarm_->choose_other_node();

if (other_node) {
LOKI_LOG(debug, "Selected random node for testing: {}", (*other_node).pub_key());
test_reachability(*other_node);
} else {
LOKI_LOG(debug, "No nodes to test for reachability");
}

test_reachability(*node);
// TODO: there is an edge case where SS reported some offending
// nodes, but then restarted, so SS won't give priority to those
// nodes. SS will still test them eventually (through random selection) and
// update Lokid, but this scenario could be made more robust.
const auto offline_node = reach_records_.next_to_test();

if (offline_node) {
const boost::optional<sn_record_t> sn = swarm_->get_node_by_pk(*offline_node);
LOKI_LOG(debug, "No nodes offline nodes to ping test yet");
if (sn) {
test_reachability(*sn);
} else {
LOKI_LOG(debug, "Node does not seem to exist anymore: {}", *offline_node);
// delete its entry from test records as irrelevant
reach_records_.expire(*offline_node);
}
}

}

void ServiceNode::test_reachability(const sn_record_t& sn) {

LOKI_LOG(info, "TODO: testing node {}", sn);
LOKI_LOG(debug, "testing node for reachability {}", sn);

auto callback = [](sn_response_t&& res) {
LOKI_LOG(info, "PING RESPONSE STATUS: {}", res.error_code);
auto callback = [this, sn](sn_response_t&& res) {
this->process_reach_test_response(std::move(res), sn.pub_key());
};

nlohmann::json json_body;
Expand All @@ -734,6 +755,7 @@ void ServiceNode::test_reachability(const sn_record_t& sn) {

void ServiceNode::lokid_ping_timer_tick() {

/// TODO: Note that this is not actually an SN response! (but Lokid)
auto cb = [](const sn_response_t&& res) {
if (res.error_code == SNodeError::NO_ERROR) {

Expand Down Expand Up @@ -933,6 +955,86 @@ void ServiceNode::send_blockchain_test_req(const sn_record_t& testee,
this->block_height_));
}

void ServiceNode::report_node_reachability(const sn_pub_key_t& sn_pk,
bool reachable) {

const auto sn = swarm_->get_node_by_pk(sn_pk);

if (!sn) {
LOKI_LOG(debug, "No Service node with pubkey: {}", sn_pk);
return;
}

json params;
params["type"] = "reachability";
params["pubkey"] = (*sn).pub_key_hex();
params["value"] = reachable;

/// Note that if Lokid restarts, all its reachability records will be
/// updated to "true".

auto cb = [this, sn_pk, reachable](const sn_response_t&& res) {
bool success = false;

if (res.error_code == SNodeError::NO_ERROR) {
if (!res.body) {
LOKI_LOG(error, "Empty body on Lokid report node status");
return;
}

try {
const json res_json = json::parse(*res.body);

const auto status =
res_json.at("result").at("status").get<std::string>();

if (status == "OK") {
success = true;
} else {
LOKI_LOG(error, "Could not report node. Status: {}",
status);
}
} catch (...) {
LOKI_LOG(error,
"Could not report node status: bad json in response");
}
} else {
LOKI_LOG(error, "Could not report node status");
}

if (success) {
if (reachable) {
LOKI_LOG(debug, "Successfully reported node as reachable: {}", sn_pk);
this->reach_records_.expire(sn_pk);
} else {
LOKI_LOG(debug, "Successfully reported node as unreachable {}", sn_pk);
this->reach_records_.set_reported(sn_pk);
}
}
};

lokid_client_.make_lokid_request("report_peer_storage_server_status", params,
std::move(cb));
}

void ServiceNode::process_reach_test_response(sn_response_t&& res,
const sn_pub_key_t& pk) {

if (res.error_code == SNodeError::NO_ERROR) {
// NOTE: We don't need to report healthy nodes that previously has been
// not been reported to Lokid as unreachable but I'm worried there might
// be some race conditions, so do it anyway for now.
report_node_reachability(pk, true);
return;
}

const bool should_report = reach_records_.record_unreachable(pk);

if (should_report) {
report_node_reachability(pk, false);
}
}

void ServiceNode::process_blockchain_test_response(
sn_response_t&& res, blockchain_test_answer_t our_answer,
sn_record_t testee, uint64_t bc_height) {
Expand Down
Loading

0 comments on commit effcd7e

Please sign in to comment.