Skip to content

Commit

Permalink
Make storage server aware of decommissioned nodes; test them
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Sep 2, 2019
1 parent a61195c commit 69aa778
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 101 deletions.
12 changes: 4 additions & 8 deletions common/include/loki_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct sn_record_t {
private:
uint16_t port_;
std::string sn_address_; // Snode address (pubkey plus .snode)
std::string pub_key_; // base32z
std::string pub_key_; // base32z
std::string pub_key_hex_;
std::string ip_; // Snode ip
public:
Expand Down Expand Up @@ -70,13 +70,11 @@ class user_pubkey_t {

user_pubkey_t() {}

user_pubkey_t(std::string&& pk) : pubkey_(std::move(pk)) {
}
user_pubkey_t(std::string&& pk) : pubkey_(std::move(pk)) {}

user_pubkey_t(const std::string& pk) : pubkey_(pk) {}

public:

public:
static user_pubkey_t create(std::string&& pk, bool& success) {
success = true;
if (pk.size() != USER_PUBKEY_SIZE) {
Expand All @@ -95,9 +93,7 @@ class user_pubkey_t {
return user_pubkey_t(pk);
}

const std::string& str() const {
return pubkey_;
}
const std::string& str() const { return pubkey_; }
};

namespace loki {
Expand Down
21 changes: 13 additions & 8 deletions httpserver/http_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,13 @@ void connection_t::process_store(const json& params) {
const auto data = params["data"].get<std::string>();

bool created;
auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), created);
auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), created);

if (!created) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(error, "Pubkey must be {} characters long", USER_PUBKEY_SIZE);
return;
}
Expand Down Expand Up @@ -805,16 +807,17 @@ void connection_t::process_snodes_by_pk(const json& params) {
}

bool success;
const auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(debug, "Pubkey must be {} characters long ", USER_PUBKEY_SIZE);
return;
}

const std::vector<sn_record_t> nodes =
service_node_.get_snodes_by_pk(pk);
const std::vector<sn_record_t> nodes = service_node_.get_snodes_by_pk(pk);
const json res_body = snodes_to_json(nodes);

response_.result(http::status::ok);
Expand Down Expand Up @@ -968,11 +971,13 @@ void connection_t::process_retrieve(const json& params) {
}

bool success;
const auto pk = user_pubkey_t::create(params["pubKey"].get<std::string>(), success);
const auto pk =
user_pubkey_t::create(params["pubKey"].get<std::string>(), success);

if (!success) {
response_.result(http::status::bad_request);
body_stream_ << fmt::format("Pubkey must be {} characters long\n", USER_PUBKEY_SIZE);
body_stream_ << fmt::format("Pubkey must be {} characters long\n",
USER_PUBKEY_SIZE);
LOKI_LOG(debug, "Pubkey must be {} characters long ", USER_PUBKEY_SIZE);
return;
}
Expand Down
38 changes: 21 additions & 17 deletions httpserver/http_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,26 +278,30 @@ void run(boost::asio::io_context& ioc, const std::string& ip, uint16_t port,

namespace fmt {

template <>
struct formatter<loki::SNodeError> {
template <>
struct formatter<loki::SNodeError> {

template <typename ParseContext>
constexpr auto parse(ParseContext &ctx) { return ctx.begin(); }
constexpr auto parse(ParseContext& ctx) {
return ctx.begin();
}

template <typename FormatContext>
auto format(const loki::SNodeError& err, FormatContext &ctx) {

switch (err) {
case loki::SNodeError::NO_ERROR: return format_to(ctx.out(), "NO_ERROR");
case loki::SNodeError::ERROR_OTHER: return format_to(ctx.out(), "ERROR_OTHER");
case loki::SNodeError::NO_REACH: return format_to(ctx.out(), "NO_REACH");
case loki::SNodeError::HTTP_ERROR: return format_to(ctx.out(), "HTTP_ERROR");
default: return format_to(ctx.out(), "[UNKNOWN]");
}

auto format(const loki::SNodeError& err, FormatContext& ctx) {

switch (err) {
case loki::SNodeError::NO_ERROR:
return format_to(ctx.out(), "NO_ERROR");
case loki::SNodeError::ERROR_OTHER:
return format_to(ctx.out(), "ERROR_OTHER");
case loki::SNodeError::NO_REACH:
return format_to(ctx.out(), "NO_REACH");
case loki::SNodeError::HTTP_ERROR:
return format_to(ctx.out(), "HTTP_ERROR");
default:
return format_to(ctx.out(), "[UNKNOWN]");
}
}
};

};


}
} // namespace fmt
3 changes: 1 addition & 2 deletions httpserver/reachability_testing.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

#include "loki_logger.h"
#include "reachability_testing.h"
#include "loki_logger.h"

using std::chrono::steady_clock;
using namespace std::chrono_literals;
Expand Down Expand Up @@ -60,7 +60,6 @@ bool reachability_records_t::expire(const sn_pub_key_t& sn) {
if (offline_nodes_.erase(sn)) {
LOKI_LOG(warn, " - removed entry for {}", sn);
}

}

void reachability_records_t::set_reported(const sn_pub_key_t& sn) {
Expand Down
5 changes: 2 additions & 3 deletions httpserver/reachability_testing.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#pragma once

#include "loki_common.h"
#include <unordered_map>
#include <chrono>
#include <unordered_map>

namespace loki {

Expand All @@ -16,14 +16,13 @@ class reach_record_t {

using time_point_t = std::chrono::time_point<std::chrono::steady_clock>;

public:
public:
time_point_t first_failure;
time_point_t last_tested;
// whether it's been reported to Lokid
bool reported = false;

reach_record_t();

};
} // namespace detail

Expand Down
83 changes: 52 additions & 31 deletions httpserver/service_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,21 @@ parse_swarm_update(const std::shared_ptr<std::string>& response_body) {
const sn_record_t sn{port, std::move(snode_address), pubkey,
std::move(snode_ip)};

swarm_map[swarm_id].push_back(sn);
const bool fully_funded = sn_json.at("funded").get<bool>();

/// We want to include (test) decommissioned nodes, but not
/// partially funded ones.
if (!fully_funded) {
continue;
}

/// Storing decommissioned nodes (with dummy swarm id) in
/// a separate data structure as it seems less error prone
if (swarm_id == INVALID_SWARM_ID) {
bu.decommissioned_nodes.push_back(sn);
} else {
swarm_map[swarm_id].push_back(sn);
}
}

bu.height = body.at("result").at("height").get<uint64_t>();
Expand Down Expand Up @@ -268,6 +282,7 @@ void ServiceNode::bootstrap_data() {
fields["height"] = true;
fields["block_hash"] = true;
fields["hardfork"] = true;
fields["funded"] = true;

params["fields"] = fields;

Expand All @@ -286,6 +301,8 @@ void ServiceNode::bootstrap_data() {
if (res.error_code == SNodeError::NO_ERROR) {
try {
const block_update_t bu = parse_swarm_update(res.body);
// TODO: this should be disabled in the "testnet" mode
// (or changed to point to testnet seeds)
on_bootstrap_update(bu);
} catch (const std::exception& e) {
LOKI_LOG(
Expand Down Expand Up @@ -344,7 +361,7 @@ ServiceNode::~ServiceNode() {
};

void ServiceNode::relay_data_reliable(const std::shared_ptr<request_t>& req,
const sn_record_t& sn) const {
const sn_record_t& sn) const {

LOKI_LOG(debug, "Relaying data to: {}", sn);

Expand Down Expand Up @@ -605,7 +622,7 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) {
}
}

swarm_->update_state(bu.swarms, events);
swarm_->update_state(bu.swarms, bu.decommissioned_nodes, events);

if (!events.new_snodes.empty()) {
bootstrap_peers(events.new_snodes);
Expand All @@ -615,7 +632,7 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) {
bootstrap_swarms(events.new_swarms);
}

if (events.decommissioned) {
if (events.dissolved) {
/// Go through all our PK and push them accordingly
salvage_data();
}
Expand Down Expand Up @@ -656,11 +673,12 @@ void ServiceNode::swarm_timer_tick() {
fields["height"] = true;
fields["block_hash"] = true;
fields["hardfork"] = true;
fields["funded"] = true;

params["fields"] = fields;

/// TODO: include decommissioned
params["active_only"] = true;
params["active_only"] = false;

lokid_client_.make_lokid_request(
"get_n_service_nodes", params, [this](const sn_response_t&& res) {
Expand Down Expand Up @@ -697,14 +715,24 @@ void ServiceNode::ping_peers_tick() {
this->peer_ping_timer_.async_wait(
std::bind(&ServiceNode::ping_peers_tick, this));


/// TODO: To be safe, let's not even test peers until we
/// have reached the right hardfork height

/// We always test one node already known to be offline
/// plus one random other node (could even be the same node)

const auto other_node = swarm_->choose_other_node();
const auto random_node = swarm_->choose_funded_node();

if (other_node) {
LOKI_LOG(debug, "Selected random node for testing: {}", (*other_node).pub_key());
test_reachability(*other_node);
if (random_node) {

if (random_node == our_address_) {
LOKI_LOG(debug, "Would test our own node, skipping");
} else {
LOKI_LOG(debug, "Selected random node for testing: {}",
(*random_node).pub_key());
test_reachability(*random_node);
}
} else {
LOKI_LOG(debug, "No nodes to test for reachability");
}
Expand All @@ -716,17 +744,18 @@ void ServiceNode::ping_peers_tick() {
const auto offline_node = reach_records_.next_to_test();

if (offline_node) {
const boost::optional<sn_record_t> sn = swarm_->get_node_by_pk(*offline_node);
const boost::optional<sn_record_t> sn =
swarm_->get_node_by_pk(*offline_node);
LOKI_LOG(debug, "No nodes offline nodes to ping test yet");
if (sn) {
test_reachability(*sn);
} else {
LOKI_LOG(debug, "Node does not seem to exist anymore: {}", *offline_node);
LOKI_LOG(debug, "Node does not seem to exist anymore: {}",
*offline_node);
// delete its entry from test records as irrelevant
reach_records_.expire(*offline_node);
}
}

}

void ServiceNode::test_reachability(const sn_record_t& sn) {
Expand All @@ -750,7 +779,6 @@ void ServiceNode::test_reachability(const sn_record_t& sn) {
#endif

make_sn_request(ioc_, sn, req, std::move(callback));

}

void ServiceNode::lokid_ping_timer_tick() {
Expand Down Expand Up @@ -1004,17 +1032,19 @@ void ServiceNode::report_node_reachability(const sn_pub_key_t& sn_pk,

if (success) {
if (reachable) {
LOKI_LOG(debug, "Successfully reported node as reachable: {}", sn_pk);
LOKI_LOG(debug, "Successfully reported node as reachable: {}",
sn_pk);
this->reach_records_.expire(sn_pk);
} else {
LOKI_LOG(debug, "Successfully reported node as unreachable {}", sn_pk);
LOKI_LOG(debug, "Successfully reported node as unreachable {}",
sn_pk);
this->reach_records_.set_reported(sn_pk);
}
}
};

lokid_client_.make_lokid_request("report_peer_storage_server_status", params,
std::move(cb));
lokid_client_.make_lokid_request("report_peer_storage_server_status",
params, std::move(cb));
}

void ServiceNode::process_reach_test_response(sn_response_t&& res,
Expand Down Expand Up @@ -1312,7 +1342,7 @@ void ServiceNode::bootstrap_swarms(
LOKI_LOG(info, "Bootstrapping swarms: {}", vec_to_string(swarms));
}

const auto& all_swarms = swarm_->all_swarms();
const auto& all_swarms = swarm_->all_valid_swarms();

std::vector<Item> all_entries;
if (!get_all_messages(all_entries)) {
Expand Down Expand Up @@ -1554,14 +1584,15 @@ bool ServiceNode::is_pubkey_for_us(const user_pubkey_t& pk) const {
return swarm_->is_pubkey_for_us(pk);
}

std::vector<sn_record_t> ServiceNode::get_snodes_by_pk(const user_pubkey_t& pk) {
std::vector<sn_record_t>
ServiceNode::get_snodes_by_pk(const user_pubkey_t& pk) {

if (!swarm_) {
LOKI_LOG(error, "Swarm data missing");
return {};
}

const auto& all_swarms = swarm_->all_swarms();
const auto& all_swarms = swarm_->all_valid_swarms();

swarm_id_t swarm_id = get_swarm_by_pk(all_swarms, pk);

Expand All @@ -1586,17 +1617,7 @@ bool ServiceNode::is_snode_address_known(const std::string& sn_address) {
return {};
}

const auto& all_swarms = swarm_->all_swarms();

return std::any_of(all_swarms.begin(), all_swarms.end(),
[&sn_address](const SwarmInfo& swarm_info) {
return std::any_of(
swarm_info.snodes.begin(),
swarm_info.snodes.end(),
[&sn_address](const sn_record_t& sn_record) {
return sn_record.sn_address() == sn_address;
});
});
return swarm_->is_fully_funded_node(sn_address);
}

} // namespace loki
3 changes: 2 additions & 1 deletion httpserver/service_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ class ServiceNode {

/// Check if status is OK and handle failed test otherwise; note
/// that we want a copy of `sn` here because of the way it is called
void process_reach_test_response(sn_response_t&& res, const sn_pub_key_t& sn);
void process_reach_test_response(sn_response_t&& res,
const sn_pub_key_t& sn);

/// From a peer
void process_blockchain_test_response(sn_response_t&& res,
Expand Down
Loading

0 comments on commit 69aa778

Please sign in to comment.