diff --git a/httpserver/command_line.cpp b/httpserver/command_line.cpp index d1b931e8..5b20075f 100644 --- a/httpserver/command_line.cpp +++ b/httpserver/command_line.cpp @@ -70,6 +70,10 @@ void command_line_parser::parse_args(int argc, char* argv[]) { return; } + if (options_.testnet && !vm.count("lokid-rpc-port")) { + options_.lokid_rpc_port = 38157; + } + if (!vm.count("ip") || !vm.count("port")) { throw std::runtime_error( "Invalid option: address and/or port missing."); diff --git a/httpserver/command_line.h b/httpserver/command_line.h index 38e4a66d..a1438776 100644 --- a/httpserver/command_line.h +++ b/httpserver/command_line.h @@ -7,7 +7,7 @@ namespace loki { struct command_line_options { uint16_t port; - uint16_t lokid_rpc_port = 22023; + uint16_t lokid_rpc_port = 22023; // Or 38157 if `testnet` bool force_start = false; bool print_version = false; bool print_help = false; diff --git a/httpserver/http_connection.cpp b/httpserver/http_connection.cpp index c9733edb..ddb6e814 100644 --- a/httpserver/http_connection.cpp +++ b/httpserver/http_connection.cpp @@ -105,15 +105,15 @@ void LokidClient::make_lokid_request(boost::string_view method, const nlohmann::json& params, http_callback_t&& cb) const { - make_lokid_request(local_ip_, lokid_rpc_port_, method, params, - std::move(cb)); + make_custom_lokid_request(local_ip_, lokid_rpc_port_, method, params, + std::move(cb)); } -void LokidClient::make_lokid_request(const std::string& daemon_ip, - const uint16_t daemon_port, - boost::string_view method, - const nlohmann::json& params, - http_callback_t&& cb) const { +void LokidClient::make_custom_lokid_request(const std::string& daemon_ip, + const uint16_t daemon_port, + boost::string_view method, + const nlohmann::json& params, + http_callback_t&& cb) const { auto req = std::make_shared(); @@ -242,8 +242,12 @@ void connection_t::do_handshake() { } void connection_t::on_handshake(boost::system::error_code ec) { + + const auto sockfd = stream_.lowest_layer().native_handle(); + LOKI_LOG(debug, "Open https socket: {}", sockfd); + get_net_stats().record_socket_open(sockfd); if (ec) { - LOKI_LOG(warn, "ssl handshake failed: {}", ec.message()); + LOKI_LOG(warn, "ssl handshake failed: ec: {} ({})", ec.value(), ec.message()); deadline_.cancel(); return; } @@ -433,7 +437,8 @@ void connection_t::process_blockchain_test_req(uint64_t, void connection_t::process_swarm_req(boost::string_view target) { #ifndef DISABLE_SNODE_SIGNATURE - if (!validate_snode_request()) { + // allow ping request as a quick workaround (and they are cheap) + if (!validate_snode_request() && (target != "/swarms/ping_test/v1")) { return; } #endif @@ -527,6 +532,7 @@ void connection_t::process_swarm_req(boost::string_view target) { } } else if (target == "/swarms/ping_test/v1") { + LOKI_LOG(debug, "Received ping_test"); response_.result(http::status::ok); } else if (target == "/swarms/push/v1") { @@ -615,6 +621,12 @@ void connection_t::process_request() { response_.result(http::status::ok); write_response(); ioc_.stop(); + } else if (target == "/sleep") { + ioc_.post([]() { + LOKI_LOG(warn, "Sleeping for some time..."); + std::this_thread::sleep_for(std::chrono::seconds(30)); + }); + response_.result(http::status::ok); } #endif else { @@ -1164,10 +1176,14 @@ void connection_t::on_shutdown(boost::system::error_code ec) { // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error ec.assign(0, ec.category()); } - if (ec) + if (ec) { LOKI_LOG(error, "Could not close ssl stream gracefully, ec: {}", ec.message()); + } + const auto sockfd = stream_.lowest_layer().native_handle(); + LOKI_LOG(debug, "Close https socket: {}", sockfd); + get_net_stats().record_socket_close(sockfd); stream_.lowest_layer().close(); } @@ -1224,7 +1240,9 @@ HttpClientSession::HttpClientSession(boost::asio::io_context& ioc, void HttpClientSession::on_connect() { - LOKI_LOG(trace, "on connect"); + const auto sockfd = socket_.native_handle(); + LOKI_LOG(debug, "Open http socket: {}", sockfd); + get_net_stats().record_socket_open(sockfd); http::async_write(socket_, *req_, std::bind(&HttpClientSession::on_write, shared_from_this(), std::placeholders::_1, @@ -1283,11 +1301,21 @@ void HttpClientSession::start() { if (ec) { // We should make sure that we print the error a few levels above, // where we have more context - LOKI_LOG( - debug, - "[http client]: could not connect to {}:{}, message: {} ({})", - endpoint_.address().to_string(), endpoint_.port(), ec.message(), - ec.value()); + + if (ec == boost::system::errc::connection_refused) { + LOKI_LOG(debug, + "[http client]: could not connect to {}:{}, message: " + "{} ({})", + endpoint_.address().to_string(), endpoint_.port(), + ec.message(), ec.value()); + } else { + LOKI_LOG(error, + "[http client]: could not connect to {}:{}, message: " + "{} ({})", + endpoint_.address().to_string(), endpoint_.port(), + ec.message(), ec.value()); + } + trigger_callback(SNodeError::NO_REACH, nullptr); return; } @@ -1307,7 +1335,7 @@ void HttpClientSession::start() { } } else { LOKI_LOG(warn, "client socket timed out"); - self->socket_.close(); + self->clean_up(); } }); } @@ -1320,20 +1348,20 @@ void HttpClientSession::trigger_callback(SNodeError error, deadline_timer_.cancel(); } -/// We execute callback (if haven't already) here to make sure it is called -HttpClientSession::~HttpClientSession() { +void HttpClientSession::clean_up() { - if (!used_callback_) { - // If we destroy the session before posting the callback, - // it must be due to some error - ioc_.post(std::bind(callback_, - sn_response_t{SNodeError::ERROR_OTHER, nullptr})); + if (!needs_cleanup) { + // This can happen because the deadline timer + // triggered and cleaned up the connection already + LOKI_LOG(debug, "No need for cleanup"); + return; } - get_net_stats().http_connections_out--; + needs_cleanup = false; if (!socket_.is_open()) { - LOKI_LOG(debug, "Socket is already closed"); + /// This should never happen! + LOKI_LOG(critical, "Socket is already closed"); return; } @@ -1349,12 +1377,32 @@ HttpClientSession::~HttpClientSession() { ec.message()); } + const auto sockfd = socket_.native_handle(); socket_.close(ec); if (ec) { - LOKI_LOG(error, "On close socket [{}: {}]", ec.value(), ec.message()); + LOKI_LOG(error, "Closing socket {} failed [{}: {}]", sockfd, ec.value(), ec.message()); + } else { + LOKI_LOG(debug, "Close http socket: {}", sockfd); + get_net_stats().record_socket_close(sockfd); + } +} + +/// We execute callback (if haven't already) here to make sure it is called +HttpClientSession::~HttpClientSession() { + + if (!used_callback_) { + // If we destroy the session before posting the callback, + // it must be due to some error + ioc_.post(std::bind(callback_, + sn_response_t{SNodeError::ERROR_OTHER, nullptr})); } + get_net_stats().http_connections_out--; + + this->clean_up(); + + } } // namespace loki diff --git a/httpserver/http_connection.h b/httpserver/http_connection.h index 213409bd..9845daf3 100644 --- a/httpserver/http_connection.h +++ b/httpserver/http_connection.h @@ -70,11 +70,11 @@ class LokidClient { void make_lokid_request(boost::string_view method, const nlohmann::json& params, http_callback_t&& cb) const; - void make_lokid_request(const std::string& daemon_ip, - const uint16_t daemon_port, - boost::string_view method, - const nlohmann::json& params, - http_callback_t&& cb) const; + void make_custom_lokid_request(const std::string& daemon_ip, + const uint16_t daemon_port, + boost::string_view method, + const nlohmann::json& params, + http_callback_t&& cb) const; }; constexpr auto SESSION_TIME_LIMIT = std::chrono::seconds(30); @@ -104,6 +104,7 @@ class HttpClientSession response_t res_; bool used_callback_ = false; + bool needs_cleanup = true; void on_connect(); @@ -114,6 +115,8 @@ class HttpClientSession void trigger_callback(SNodeError error, std::shared_ptr&& body); + void clean_up(); + public: // Resolver and socket require an io_context HttpClientSession(boost::asio::io_context& ioc, const tcp::endpoint& ep, diff --git a/httpserver/https_client.cpp b/httpserver/https_client.cpp index ff82ff26..9a2e234a 100644 --- a/httpserver/https_client.cpp +++ b/httpserver/https_client.cpp @@ -79,6 +79,9 @@ HttpsClientSession::HttpsClientSession( server_pub_key_b32z(sn_pubkey_b32z) { get_net_stats().https_connections_out++; + + static uint64_t connection_count = 0; + this->connection_idx = connection_count++; } void HttpsClientSession::start() { @@ -128,7 +131,12 @@ void HttpsClientSession::start() { } void HttpsClientSession::on_connect() { - LOKI_LOG(trace, "on connect"); + LOKI_LOG(trace, "on connect, connection idx: {}", this->connection_idx); + + const auto sockfd = stream_.lowest_layer().native_handle(); + LOKI_LOG(debug, "Open https socket: {}", sockfd); + get_net_stats().record_socket_open(sockfd); + stream_.set_verify_mode(ssl::verify_none); stream_.set_verify_callback( [this](bool preverified, ssl::verify_context& ctx) -> bool { @@ -260,6 +268,10 @@ void HttpsClientSession::on_shutdown(boost::system::error_code ec) { ec.message()); } + const auto sockfd = stream_.lowest_layer().native_handle(); + LOKI_LOG(debug, "Close https socket: {}", sockfd); + get_net_stats().record_socket_close(sockfd); + stream_.lowest_layer().close(); // If we get here then the connection is closed gracefully diff --git a/httpserver/https_client.h b/httpserver/https_client.h index 3d0218f7..04c6ee68 100644 --- a/httpserver/https_client.h +++ b/httpserver/https_client.h @@ -14,6 +14,9 @@ void make_https_request(boost::asio::io_context& ioc, const std::string& ip, class HttpsClientSession : public std::enable_shared_from_this { + // For debugging purposes mostly + uint64_t connection_idx; + using tcp = boost::asio::ip::tcp; boost::asio::io_context& ioc_; diff --git a/httpserver/main.cpp b/httpserver/main.cpp index e03a0bb3..ef62c8a7 100644 --- a/httpserver/main.cpp +++ b/httpserver/main.cpp @@ -8,6 +8,7 @@ #include "service_node.h" #include "swarm.h" #include "version.h" +#include "utils.hpp" #include #include @@ -119,6 +120,15 @@ int main(int argc, char* argv[]) { return EXIT_FAILURE; } + { + const auto fd_limit = util::get_fd_limit(); + if (fd_limit != -1) { + LOKI_LOG(debug, "Open file descriptor limit: {}", fd_limit); + } else { + LOKI_LOG(debug, "Open descriptor limit: N/A"); + } + } + try { // ed25519 key diff --git a/httpserver/net_stats.h b/httpserver/net_stats.h index 151d7705..e166121d 100644 --- a/httpserver/net_stats.h +++ b/httpserver/net_stats.h @@ -1,10 +1,33 @@ #pragma once +#include +#include "loki_logger.h" + struct net_stats_t { uint32_t connections_in = 0; uint32_t http_connections_out = 0; uint32_t https_connections_out = 0; + + std::set open_fds; + + void record_socket_open(int sockfd) { +#ifdef INTEGRATION_TEST + if (open_fds.find(sockfd) != open_fds.end()) { + LOKI_LOG(critical, "Already recorded as open: {}!", sockfd); + } +#endif + open_fds.insert(sockfd); + } + + void record_socket_close(int sockfd) { +#ifdef INTEGRATION_TEST + if (open_fds.find(sockfd) == open_fds.end()) { + LOKI_LOG(critical, "Socket is NOT recorded as open: {}", sockfd); + } +#endif + open_fds.erase(sockfd); + } }; inline net_stats_t& get_net_stats() { diff --git a/httpserver/service_node.cpp b/httpserver/service_node.cpp index e03970f1..4dbb903c 100644 --- a/httpserver/service_node.cpp +++ b/httpserver/service_node.cpp @@ -35,6 +35,8 @@ constexpr std::array RETRY_INTERVALS = { std::chrono::seconds(40), std::chrono::seconds(80), std::chrono::seconds(160), std::chrono::seconds(320)}; +constexpr std::chrono::milliseconds RELAY_INTERVAL = 350ms; + static void make_sn_request(boost::asio::io_context& ioc, const sn_record_t& sn, const std::shared_ptr& req, http_callback_t&& cb) { @@ -167,8 +169,8 @@ ServiceNode::ServiceNode(boost::asio::io_context& ioc, swarm_update_timer_(ioc), lokid_ping_timer_(ioc), stats_cleanup_timer_(ioc), pow_update_timer_(worker_ioc), check_version_timer_(worker_ioc), peer_ping_timer_(ioc), - lokid_key_pair_(lokid_key_pair), lokid_client_(lokid_client), - force_start_(force_start) { + relay_timer_(ioc), lokid_key_pair_(lokid_key_pair), + lokid_client_(lokid_client), force_start_(force_start) { char buf[64] = {0}; if (char const* dest = @@ -306,7 +308,7 @@ void ServiceNode::bootstrap_data() { auto req_counter = std::make_shared(0); for (auto seed_node : seed_nodes) { - lokid_client_.make_lokid_request( + lokid_client_.make_custom_lokid_request( seed_node.first, seed_node.second, "get_n_service_nodes", params, [this, seed_node, req_counter, node_count = seed_nodes.size()](const sn_response_t&& res) { @@ -487,36 +489,6 @@ void ServiceNode::reset_listeners() { pk_to_listeners.clear(); } -/// initiate a /swarms/push request -void ServiceNode::push_message(const message_t& msg) { - - if (!swarm_) - return; - - const auto& others = swarm_->other_nodes(); - - LOKI_LOG(trace, "push_message to {} other nodes", others.size()); - - std::string body; - serialize_message(body, msg); - -#ifndef DISABLE_SNODE_SIGNATURE - const auto hash = hash_data(body); - const auto signature = generate_signature(hash, lokid_key_pair_); -#endif - - auto req = make_push_request(std::move(body)); - -#ifndef DISABLE_SNODE_SIGNATURE - attach_signature(req, signature); -#endif - - for (const auto& address : others) { - /// send a request asynchronously - relay_data_reliable(req, address); - } -} - /// do this asynchronously on a different thread? (on the same thread?) bool ServiceNode::process_store(const message_t& msg) { @@ -529,12 +501,12 @@ bool ServiceNode::process_store(const message_t& msg) { all_stats_.bump_store_requests(); - /// store to the database - save_if_new(msg); + /// store in the database + this->save_if_new(msg); - /// initiate a /swarms/push request; - /// (done asynchronously) - this->push_message(msg); + // Instead of sending the messages immediatly, store them in a buffer + // and periodically send all messages from there as batches + this->relay_buffer_.push_back(msg); return true; } @@ -630,6 +602,11 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) { static bool active = false; if (!active) { LOKI_LOG(info, "Storage server is now active!"); + + relay_timer_.expires_after(RELAY_INTERVAL); + relay_timer_.async_wait( + boost::bind(&ServiceNode::relay_buffered_messages, this)); + active = true; } } @@ -652,6 +629,22 @@ void ServiceNode::on_swarm_update(const block_update_t& bu) { initiate_peer_test(); } +void ServiceNode::relay_buffered_messages() { + + // Should we wait for the response first? + relay_timer_.expires_after(RELAY_INTERVAL); + relay_timer_.async_wait( + boost::bind(&ServiceNode::relay_buffered_messages, this)); + + if (relay_buffer_.empty()) + return; + + LOKI_LOG(debug, "Relaying {} messages from buffer", relay_buffer_.size()); + + this->relay_messages(relay_buffer_, swarm_->other_nodes()); + relay_buffer_.clear(); +} + void ServiceNode::check_version_timer_tick() { check_version_timer_.expires_after(VERSION_CHECK_INTERVAL); @@ -704,11 +697,13 @@ void ServiceNode::swarm_timer_tick() { } else { LOKI_LOG(critical, "Failed to contact local Lokid"); } - }); - swarm_update_timer_.expires_after(SWARM_UPDATE_INTERVAL); - swarm_update_timer_.async_wait( - boost::bind(&ServiceNode::swarm_timer_tick, this)); + // It would make more sense to wait the difference between the time + // elapsed and SWARM_UPDATE_INTERVAL, but this is good enough: + swarm_update_timer_.expires_after(SWARM_UPDATE_INTERVAL); + swarm_update_timer_.async_wait( + boost::bind(&ServiceNode::swarm_timer_tick, this)); + }); } void ServiceNode::cleanup_timer_tick() { @@ -1478,7 +1473,8 @@ void ServiceNode::bootstrap_swarms( } } -void ServiceNode::relay_messages(const std::vector& messages, +template +void ServiceNode::relay_messages(const std::vector& messages, const std::vector& snodes) const { std::vector data = serialize_messages(messages); @@ -1585,6 +1581,7 @@ std::string ServiceNode::get_stats() const { val["connections_in"] = get_net_stats().connections_in; val["http_connections_out"] = get_net_stats().http_connections_out; val["https_connections_out"] = get_net_stats().https_connections_out; + val["open_socket_count"] = get_net_stats().open_fds.size(); /// we want pretty (indented) json, but might change that in the future constexpr bool PRETTY = true; @@ -1684,7 +1681,7 @@ bool ServiceNode::is_snode_address_known(const std::string& sn_address) { // TODO: need more robust handling of uninitialized swarm_ if (!swarm_) { LOKI_LOG(error, "Swarm data missing"); - return {}; + return false; } return swarm_->is_fully_funded_node(sn_address); diff --git a/httpserver/service_node.h b/httpserver/service_node.h index 2cf21cd0..e474cc78 100644 --- a/httpserver/service_node.h +++ b/httpserver/service_node.h @@ -128,6 +128,9 @@ class ServiceNode { boost::asio::steady_timer peer_ping_timer_; + /// Used to periodially send messages from relay_buffer_ + boost::asio::steady_timer relay_timer_; + /// map pubkeys to a list of connections to be notified std::unordered_map pk_to_listeners; @@ -135,7 +138,9 @@ class ServiceNode { reachability_records_t reach_records_; - void push_message(const message_t& msg); + /// Container for recently received messages directly from + /// clients; + std::vector relay_buffer_; void save_if_new(const message_t& msg); @@ -168,7 +173,8 @@ class ServiceNode { void relay_data_reliable(const std::shared_ptr& req, const sn_record_t& address) const; - void relay_messages(const std::vector& messages, + template + void relay_messages(const std::vector& messages, const std::vector& snodes) const; /// Request swarm structure from the deamon and reset the timer @@ -178,6 +184,8 @@ class ServiceNode { void ping_peers_tick(); + void relay_buffered_messages(); + /// Check the latest version from DNS text record void check_version_timer_tick(); /// Update PoW difficulty from DNS text record diff --git a/httpserver/swarm.cpp b/httpserver/swarm.cpp index 03f2e8bc..fdedeb7a 100644 --- a/httpserver/swarm.cpp +++ b/httpserver/swarm.cpp @@ -213,10 +213,9 @@ boost::optional Swarm::choose_funded_node() const { boost::optional Swarm::get_node_by_pk(const sn_pub_key_t& pk) const { - for (const auto& si : all_valid_swarms_) { - for (const auto& sn : si.snodes) { - if (sn.pub_key_base32z() == pk) - return sn; + for (const auto& sn : all_funded_nodes_) { + if (sn.pub_key_base32z() == pk) { + return sn; } } diff --git a/httpserver/version.h b/httpserver/version.h index 37544f9c..7d66f2e3 100644 --- a/httpserver/version.h +++ b/httpserver/version.h @@ -6,7 +6,7 @@ #define VERSION_MAJOR 1 #define VERSION_MINOR 0 -#define VERSION_PATCH 6 +#define VERSION_PATCH 7 #define LOKI_STRINGIFY2(val) #val #define LOKI_STRINGIFY(val) LOKI_STRINGIFY2(val) diff --git a/utils/include/utils.hpp b/utils/include/utils.hpp index 2a8149c4..bbedccc1 100644 --- a/utils/include/utils.hpp +++ b/utils/include/utils.hpp @@ -115,4 +115,7 @@ uint64_t uniform_distribution_portable(uint64_t n); uint64_t uniform_distribution_portable(std::mt19937_64& mersenne_twister, uint64_t n); +/// Return the open file limit (-1 on failure) +int get_fd_limit(); + } // namespace util diff --git a/utils/src/utils.cpp b/utils/src/utils.cpp index eb1da58f..ac38a81f 100644 --- a/utils/src/utils.cpp +++ b/utils/src/utils.cpp @@ -2,6 +2,10 @@ #include +#ifndef _WIN32 +#include +#endif + namespace util { uint64_t get_time_ms() { @@ -113,4 +117,13 @@ uint64_t uniform_distribution_portable(std::mt19937_64& mersenne_twister, return x / (secure_max / n); } +int get_fd_limit() { + +#ifdef _WIN32 + return -1; +#endif + + return sysconf(_SC_OPEN_MAX); +} + } // namespace util