Skip to content

Commit

Permalink
Merge pull request #268 from msgmaxim/reachable-nodes
Browse files Browse the repository at this point in the history
Bookkeeping for reachability of nodes; reporting them to Lokid
  • Loading branch information
msgmaxim committed Sep 6, 2019
2 parents 56ddc92 + bab0ad5 commit e60b24e
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 105 deletions.
29 changes: 16 additions & 13 deletions common/include/loki_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,26 @@

#include <boost/optional.hpp>

// TODO: this should be a proper struct w/o heap allocation!
using sn_pub_key_t = std::string;

struct sn_record_t {

// our 32 byte pub keys should always be 52 bytes long in base32z
static constexpr size_t BASE_LEN = 52;

private:
uint16_t port_;
std::string sn_address_; // Snode address
std::string pub_key_;
std::string sn_address_; // Snode address (pubkey plus .snode)
// TODO: create separate types for different encodings of pubkeys,
// so if we confuse them, it will be a compiler error
std::string pub_key_base_32z_;
std::string pub_key_hex_;
std::string ip_; // Snode ip
public:
sn_record_t(uint16_t port, const std::string& address,
const std::string& ip)
: port_(port), ip_(ip) {
const std::string& pk_hex, const std::string& ip)
: port_(port), pub_key_hex_(pk_hex), ip_(ip) {
set_address(address);
}

Expand All @@ -39,12 +45,13 @@ struct sn_record_t {

sn_address_ = addr;
sn_address_.append(".snode");
pub_key_ = addr;
pub_key_base_32z_ = addr;
}

uint16_t port() const { return port_; }
const std::string& sn_address() const { return sn_address_; }
const std::string& pub_key() const { return pub_key_; }
const std::string& pub_key_base32z() const { return pub_key_base_32z_; }
const std::string& pub_key_hex() const { return pub_key_hex_; }
const std::string& ip() const { return ip_; }

template <typename OStream>
Expand All @@ -65,13 +72,11 @@ class user_pubkey_t {

user_pubkey_t() {}

user_pubkey_t(std::string&& pk) : pubkey_(std::move(pk)) {
}
user_pubkey_t(std::string&& pk) : pubkey_(std::move(pk)) {}

user_pubkey_t(const std::string& pk) : pubkey_(pk) {}

public:

public:
static user_pubkey_t create(std::string&& pk, bool& success) {
success = true;
if (pk.size() != USER_PUBKEY_SIZE) {
Expand All @@ -90,9 +95,7 @@ class user_pubkey_t {
return user_pubkey_t(pk);
}

const std::string& str() const {
return pubkey_;
}
const std::string& str() const { return pubkey_; }
};

namespace loki {
Expand Down
2 changes: 2 additions & 0 deletions httpserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ set(HEADER_FILES
command_line.h
net_stats.h
dns_text_records.h
reachability_testing.h
)

set(SRC_FILES
Expand All @@ -32,6 +33,7 @@ set(SRC_FILES
security.cpp
command_line.cpp
dns_text_records.cpp
reachability_testing.cpp
)

add_library(httpserver_lib STATIC ${HEADER_FILES} ${SRC_FILES})
Expand Down
2 changes: 1 addition & 1 deletion httpserver/dns_text_records.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void check_latest_version() {
} else {
LOKI_LOG(debug,
"You are using the latest version of the storage server ({})",
latest_version_str);
STORAGE_SERVER_VERSION_STRING);
}
}

Expand Down
21 changes: 13 additions & 8 deletions httpserver/http_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,13 @@ void connection_t::process_store(const json& params) {
const auto data = params["data"].get<std::string>();

bool created;
auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), created);
auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), created);

if (!created) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(error, "Pubkey must be {} characters long", USER_PUBKEY_SIZE);
return;
}
Expand Down Expand Up @@ -805,16 +807,17 @@ void connection_t::process_snodes_by_pk(const json& params) {
}

bool success;
const auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(debug, "Pubkey must be {} characters long ", USER_PUBKEY_SIZE);
return;
}

const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pk);
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
const json res_body = snodes_to_json(nodes);

response_.result(http::status::ok);
Expand Down Expand Up @@ -968,11 +971,13 @@ void connection_t::process_retrieve(const json& params) {
}

bool success;
const auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);

if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(debug, "Pubkey must be {} characters long ", USER_PUBKEY_SIZE);
return;
}
Expand Down
38 changes: 21 additions & 17 deletions httpserver/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,30 @@ void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,

namespace fmt {

template <>
struct formatter<loki::SNodeError> {
template <>
struct formatter<loki::SNodeError> {

template <typename ParseContext>
constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

template <typename FormatContext>
auto format(const loki::SNodeError& err, FormatContext &ctx) {

switch (err) {
case loki::SNodeError::NO_ERROR: return format_to(ctx.out(), "NO_ERROR");
case loki::SNodeError::ERROR_OTHER: return format_to(ctx.out(), "ERROR_OTHER");
case loki::SNodeError::NO_REACH: return format_to(ctx.out(), "NO_REACH");
case loki::SNodeError::HTTP_ERROR: return format_to(ctx.out(), "HTTP_ERROR");
default: return format_to(ctx.out(), "[UNKNOWN]");
}

auto format(const loki::SNodeError& err, FormatContext& ctx) {

switch (err) {
case loki::SNodeError::NO_ERROR:
return format_to(ctx.out(), "NO_ERROR");
case loki::SNodeError::ERROR_OTHER:
return format_to(ctx.out(), "ERROR_OTHER");
case loki::SNodeError::NO_REACH:
return format_to(ctx.out(), "NO_REACH");
case loki::SNodeError::HTTP_ERROR:
return format_to(ctx.out(), "HTTP_ERROR");
default:
return format_to(ctx.out(), "[UNKNOWN]");
}
}
};

};


}
} // namespace fmt
4 changes: 3 additions & 1 deletion httpserver/https_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ void make_https_request(boost::asio::io_context& ioc,
#else

if (sn_address == "0.0.0.0") {
LOKI_LOG(warn, "Could not initiate request to snode (we don't know "
LOKI_LOG(debug, "Could not initiate request to snode (we don't know "
"their IP yet).");

cb(sn_response_t{SNodeError::NO_REACH, nullptr});
return;
}

Expand Down
91 changes: 91 additions & 0 deletions httpserver/reachability_testing.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@

#include "reachability_testing.h"
#include "loki_logger.h"

using std::chrono::steady_clock;
using namespace std::chrono_literals;

namespace loki {

namespace detail {

reach_record_t::reach_record_t() {
this->first_failure = steady_clock::now();
this->last_tested = this->first_failure;
}

} // namespace detail

/// How long to wait until reporting unreachable nodes to Lokid
constexpr std::chrono::minutes UNREACH_GRACE_PERIOD = 120min;

bool reachability_records_t::record_unreachable(const sn_pub_key_t& sn) {

const auto it = offline_nodes_.find(sn);

if (it == offline_nodes_.end()) {
/// TODO: change this to debug
LOKI_LOG(debug, "Adding a new node to UNREACHABLE: {}", sn);
offline_nodes_.insert({sn, {}});
} else {
LOKI_LOG(debug, "Node is ALREAY known to be UNREACHABLE: {}", sn);

it->second.last_tested = steady_clock::now();

const auto elapsed = it->second.last_tested - it->second.first_failure;
const auto elapsed_sec =
std::chrono::duration_cast<std::chrono::seconds>(elapsed).count();
LOKI_LOG(debug, "First time failed {} seconds ago", elapsed_sec);

/// TODO: Might still want to report as unreachable since this status
/// gets reset to `true` on Lokid restart
if (it->second.reported) {
LOKI_LOG(debug, "Already reported node: {}", sn);
} else if (elapsed > UNREACH_GRACE_PERIOD) {
LOKI_LOG(debug, "Will REPORT this node to Lokid!");
return true;
}

}

return false;
}

bool reachability_records_t::record_reachable(const sn_pub_key_t& sn) {
expire(sn);
}

bool reachability_records_t::expire(const sn_pub_key_t& sn) {

if (offline_nodes_.erase(sn)) {
LOKI_LOG(debug, "Removed entry for {}", sn);
}
}

void reachability_records_t::set_reported(const sn_pub_key_t& sn) {

const auto it = offline_nodes_.find(sn);
if (it != offline_nodes_.end()) {
it->second.reported = true;
}
}

boost::optional<sn_pub_key_t> reachability_records_t::next_to_test() {

const auto it = std::min_element(
offline_nodes_.begin(), offline_nodes_.end(),
[&](const auto& lhs, const auto& rhs) {
return lhs.second.last_tested < rhs.second.last_tested;
});

if (it == offline_nodes_.end()) {
return boost::none;
} else {

LOKI_LOG(debug, "Selecting to be re-tested: {}", it->first);

return it->first;
}
}

} // namespace loki
58 changes: 58 additions & 0 deletions httpserver/reachability_testing.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "loki_common.h"
#include <chrono>
#include <unordered_map>

namespace loki {

namespace detail {

/// TODO: make this class "private"?
class reach_record_t {


using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;

public:
// The time the node failed for the first time
// (and hasn't come back online)
time_point_t first_failure;
time_point_t last_tested;
// whether it's been reported to Lokid
bool reported = false;

reach_record_t();
};
} // namespace detail

class reachability_records_t {

// TODO: sn_records are heavy (3 strings), so how about we only store the
// pubkey?

// Nodes that failed the reachability test
// Note: I don't expect this list to be large, so
// `std::vector` is probably faster than `std::set` here
std::unordered_map<sn_pub_key_t, detail::reach_record_t> offline_nodes_;

public:
// Return nodes that should be tested first: decommissioned nodes
// and those that failed our earlier tests (but not reported yet)
// std::vector<> priority_nodes() const;

// Records node as unreachable, return `true` if the node should be
// reported to Lokid as being unreachable for a long time
bool record_unreachable(const sn_pub_key_t& sn);

bool record_reachable(const sn_pub_key_t& sn);

bool expire(const sn_pub_key_t& sn);

void set_reported(const sn_pub_key_t& sn);

// Retrun the least recently tested node
boost::optional<sn_pub_key_t> next_to_test();
};

} // namespace loki
Loading

0 comments on commit e60b24e

Please sign in to comment.