diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp index 2d69a2fb..d90e31cb 100644 --- a/include/bitcoin/server/server_node.hpp +++ b/include/bitcoin/server/server_node.hpp @@ -20,19 +20,17 @@ #ifndef LIBBITCOIN_SERVER_SERVER_NODE_HPP #define LIBBITCOIN_SERVER_SERVER_NODE_HPP -#include -#include #include #include #include #include #include #include -////#include +#include #include #include -////#include -#include +#include +////#include #include namespace libbitcoin { @@ -43,10 +41,6 @@ class BCS_API server_node { public: typedef std::shared_ptr ptr; - typedef std::function - block_notify_callback; - typedef std::function - transaction_notify_callback; /// Construct a server node. server_node(const configuration& configuration); @@ -76,35 +70,17 @@ class BCS_API server_node /// Server configuration settings. virtual const settings& server_settings() const; - // Subscriptions. - // ---------------------------------------------------------------------------- - - /// Subscribe to block announcements and reorgs. - virtual void subscribe_blocks(block_notify_callback notify_block); - - /// Subscribe to new memory pool transactions. - virtual void subscribe_transactions(transaction_notify_callback notify_tx); - private: - typedef std::vector block_notify_list; - typedef std::vector transaction_notify_list; - - bool handle_new_transaction(const code& ec, - const chain::point::indexes& unconfirmed, - const chain::transaction& tx); - bool handle_new_blocks(const code& ec, uint64_t fork_point, - const chain::block::ptr_list& new_blocks, - const chain::block::ptr_list& replaced_blocks); - void handle_running(const code& ec, result_handler handler); void handle_closing(const code& ec, std::promise& wait); - bool start_heart_services(); bool start_query_services(); + bool start_heart_services(); + bool start_block_services(); + bool start_trans_services(); bool start_query_workers(bool secure); const configuration& configuration_; - const size_t last_checkpoint_height_; // These are thread safe. curve_authenticator authenticator_; @@ -112,14 +88,10 @@ class BCS_API server_node query_service public_query_service_; heart_service secure_heart_service_; heart_service public_heart_service_; - - // This is protected by block mutex. - block_notify_list block_subscriptions_; - mutable upgrade_mutex block_mutex_; - - // This is protected by transaction mutex. - transaction_notify_list transaction_subscriptions_; - mutable upgrade_mutex transaction_mutex_; + block_service secure_block_service_; + block_service public_block_service_; + trans_service secure_trans_service_; + trans_service public_trans_service_; }; } // namespace server diff --git a/include/bitcoin/server/services/block_service.hpp b/include/bitcoin/server/services/block_service.hpp index 219245d5..b3b98aac 100644 --- a/include/bitcoin/server/services/block_service.hpp +++ b/include/bitcoin/server/services/block_service.hpp @@ -1,5 +1,5 @@ /** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * Copyright (c) 2011-2016 libbitcoin developers (see AUTHORS) * * This file is part of libbitcoin-server. * @@ -25,44 +25,56 @@ #include #include #include -#include namespace libbitcoin { namespace server { class server_node; +// Subscribe to block acceptances into the long chain. class BCS_API block_service - : public enable_shared_from_base + : public bc::protocol::zmq::worker { public: typedef std::shared_ptr ptr; - /// Construct a block endpoint. + /// The fixed inprocess worker endpoints. + static const config::endpoint public_worker; + static const config::endpoint secure_worker; + + /// Construct a block service. block_service(bc::protocol::zmq::authenticator& authenticator, server_node& node, bool secure); - /// This class is not copyable. - block_service(const block_service&) = delete; - void operator=(const block_service&) = delete; - - /// Start the endpoint. - bool start(); + /// Start the service. + bool start() override; - /// Stop the endpoint. + /// Stop the service. bool stop(); +protected: + typedef chain::block::ptr_list block_list; + typedef bc::protocol::zmq::socket socket; + + virtual bool bind(socket& xpub, socket& xsub); + virtual bool unbind(socket& xpub, socket& xsub); + + // Implement the service. + virtual void work(); + private: - void send(uint32_t height, const chain::block::ptr block); + bool handle_reorganization(const code& ec, uint64_t fork_point, + const block_list& new_blocks, const block_list&); + void publish_blocks(uint32_t fork_point, const block_list& blocks); + void publish_block(socket& publisher, uint32_t height, + const chain::block::ptr block); - // BUGBUG: This is NOT thread safe. - bc::protocol::zmq::socket socket_; + const bool secure_; + const server::settings& settings_; - // This is thread safe. + // These are thread safe. + bc::protocol::zmq::authenticator& authenticator_; server_node& node_; - const bc::config::endpoint endpoint_; - const bool enabled_; - const bool secure_; }; } // namespace server diff --git a/include/bitcoin/server/services/heart_service.hpp b/include/bitcoin/server/services/heart_service.hpp index cfc0165e..60bc7902 100644 --- a/include/bitcoin/server/services/heart_service.hpp +++ b/include/bitcoin/server/services/heart_service.hpp @@ -42,14 +42,16 @@ class BCS_API heart_service server_node& node, bool secure); protected: - virtual bool bind(bc::protocol::zmq::socket& publisher); - virtual bool unbind(bc::protocol::zmq::socket& publisher); + typedef bc::protocol::zmq::socket socket; + + virtual bool bind(socket& publisher); + virtual bool unbind(socket& publisher); // Implement the service. virtual void work(); // Publish the heartbeat (integrated worker). - void publish(uint32_t count, bc::protocol::zmq::socket& socket); + void publish(uint32_t count, socket& socket); private: const server::settings& settings_; diff --git a/include/bitcoin/server/services/query_service.hpp b/include/bitcoin/server/services/query_service.hpp index feb4134f..3a6a0afe 100644 --- a/include/bitcoin/server/services/query_service.hpp +++ b/include/bitcoin/server/services/query_service.hpp @@ -45,10 +45,10 @@ class BCS_API query_service server_node& node, bool secure); protected: - virtual bool bind(bc::protocol::zmq::socket& router, - bc::protocol::zmq::socket& dealer); - virtual bool unbind(bc::protocol::zmq::socket& router, - bc::protocol::zmq::socket& dealer); + typedef bc::protocol::zmq::socket socket; + + virtual bool bind(socket& router, socket& dealer); + virtual bool unbind(socket& router, socket& dealer); // Implement the service. virtual void work(); diff --git a/include/bitcoin/server/services/trans_service.hpp b/include/bitcoin/server/services/trans_service.hpp index c20c2200..f9f48e3e 100644 --- a/include/bitcoin/server/services/trans_service.hpp +++ b/include/bitcoin/server/services/trans_service.hpp @@ -1,5 +1,5 @@ /** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * Copyright (c) 2011-2016 libbitcoin developers (see AUTHORS) * * This file is part of libbitcoin-server. * @@ -20,49 +20,58 @@ #ifndef LIBBITCOIN_SERVER_TRANS_SERVICE_HPP #define LIBBITCOIN_SERVER_TRANS_SERVICE_HPP -#include #include #include #include #include -#include namespace libbitcoin { namespace server { class server_node; +// Subscribe to transaction acceptances into the transaction memory pool. class BCS_API trans_service - : public enable_shared_from_base + : public bc::protocol::zmq::worker { public: typedef std::shared_ptr ptr; - /// Construct a transaction endpoint. + /// The fixed inprocess worker endpoints. + static const config::endpoint public_worker; + static const config::endpoint secure_worker; + + /// Construct a transaction service. trans_service(bc::protocol::zmq::authenticator& authenticator, server_node& node, bool secure); - /// This class is not copyable. - trans_service(const trans_service&) = delete; - void operator=(const trans_service&) = delete; - - /// Start the endpoint. - bool start(); + /// Start the service. + bool start() override; - /// Stop the endpoint. + /// Stop the service. bool stop(); +protected: + typedef bc::protocol::zmq::socket socket; + typedef bc::chain::point::indexes index_list; + + virtual bool bind(socket& xpub, socket& xsub); + virtual bool unbind(socket& xpub, socket& xsub); + + // Implement the service. + virtual void work(); + private: - void send(const chain::transaction& tx); + bool handle_transaction(const code& ec, const index_list&, + const chain::transaction& tx); + void publish_transaction(const chain::transaction& tx); - // BUGBUG: This is NOT thread safe. - bc::protocol::zmq::socket socket_; + const bool secure_; + const server::settings& settings_; - // This is thread safe. + // These are thread safe. + bc::protocol::zmq::authenticator& authenticator_; server_node& node_; - const bc::config::endpoint endpoint_; - const bool enabled_; - const bool secure_; }; } // namespace server diff --git a/include/bitcoin/server/utility/address_notifier.hpp b/include/bitcoin/server/utility/address_notifier.hpp index 8e3f95b8..641aa5d7 100644 --- a/include/bitcoin/server/utility/address_notifier.hpp +++ b/include/bitcoin/server/utility/address_notifier.hpp @@ -75,6 +75,10 @@ class BCS_API address_notifier subscription_locator locator; }; + + ////typedef resubscriber block_subscriber; + ////typedef resubscriber tx_subscriber; + typedef std::vector subscription_records; typedef std::vector subscription_locators; diff --git a/include/bitcoin/server/workers/query_worker.hpp b/include/bitcoin/server/workers/query_worker.hpp index f1bdbc7a..fe9f23f6 100644 --- a/include/bitcoin/server/workers/query_worker.hpp +++ b/include/bitcoin/server/workers/query_worker.hpp @@ -47,15 +47,17 @@ class BCS_API query_worker server_node& node, bool secure); protected: + typedef bc::protocol::zmq::socket socket; + typedef std::function command_handler; typedef std::unordered_map command_map; - virtual bool connect(bc::protocol::zmq::socket& router); - virtual bool disconnect(bc::protocol::zmq::socket& router); - virtual void attach_interface(); virtual void attach(const std::string& command, command_handler handler); - virtual void query(bc::protocol::zmq::socket& router); + + virtual bool connect(socket& router); + virtual bool disconnect(socket& router); + virtual void query(socket& router); // Implement the worker. virtual void work(); diff --git a/src/messages/incoming.cpp b/src/messages/incoming.cpp index 6b71b363..5eaeadaf 100644 --- a/src/messages/incoming.cpp +++ b/src/messages/incoming.cpp @@ -50,7 +50,7 @@ std::string incoming::address() code incoming::receive(zmq::socket& socket) { zmq::message message; - auto ec = message.receive(socket); + auto ec = socket.receive(message); if (ec) return ec; diff --git a/src/messages/outgoing.cpp b/src/messages/outgoing.cpp index 55233581..cea77dcb 100644 --- a/src/messages/outgoing.cpp +++ b/src/messages/outgoing.cpp @@ -88,7 +88,7 @@ std::string outgoing::address() code outgoing::send(zmq::socket& socket) { - return message_.send(socket); + return socket.send(message_); } } // namespace server diff --git a/src/server_node.cpp b/src/server_node.cpp index de2188a9..33265e3d 100644 --- a/src/server_node.cpp +++ b/src/server_node.cpp @@ -30,24 +30,22 @@ namespace libbitcoin { namespace server { using std::placeholders::_1; -using std::placeholders::_2; -using std::placeholders::_3; -using std::placeholders::_4; -using namespace bc::blockchain; using namespace bc::chain; using namespace bc::node; using namespace bc::protocol; -using namespace bc::wallet; server_node::server_node(const configuration& configuration) : p2p_node(configuration), configuration_(configuration), - last_checkpoint_height_(configuration.last_checkpoint_height()), authenticator_(*this), secure_query_service_(authenticator_, *this, true), public_query_service_(authenticator_, *this, false), secure_heart_service_(authenticator_, *this, true), - public_heart_service_(authenticator_, *this, false) + public_heart_service_(authenticator_, *this, false), + secure_block_service_(authenticator_, *this, true), + public_block_service_(authenticator_, *this, false), + secure_trans_service_(authenticator_, *this, true), + public_trans_service_(authenticator_, *this, false) { } @@ -56,20 +54,6 @@ server_node::server_node(const configuration& configuration) void server_node::stop(result_handler handler) { - // Critical Section - /////////////////////////////////////////////////////////////////////////// - transaction_mutex_.lock(); - transaction_subscriptions_.clear(); - transaction_mutex_.unlock(); - /////////////////////////////////////////////////////////////////////////// - - // Critical Section - /////////////////////////////////////////////////////////////////////////// - block_mutex_.lock(); - block_subscriptions_.clear(); - block_mutex_.unlock(); - /////////////////////////////////////////////////////////////////////////// - // Signals close and blocks until all sockets are closed. authenticator_.stop(); @@ -148,22 +132,13 @@ void server_node::handle_running(const code& ec, result_handler handler) } // Start services and workers. - if (!start_query_services() || !start_heart_services()) + if (!start_query_services() || !start_heart_services() || + !start_block_services() || !start_trans_services()) { handler(error::operation_failed); return; } - // Subscribe to blockchain reorganizations. - subscribe_blockchain( - std::bind(&server_node::handle_new_blocks, - this, _1, _2, _3, _4)); - - // Subscribe to transaction pool acceptances. - subscribe_transaction_pool( - std::bind(&server_node::handle_new_transaction, - this, _1, _2, _3)); - // This is the end of the derived run sequence. handler(error::success); } @@ -231,96 +206,14 @@ bool server_node::start_heart_services() return true; } -// Subscriptions. -// ---------------------------------------------------------------------------- - -// This serves both address subscription and the block publisher. -void server_node::subscribe_blocks(block_notify_callback notify_block) -{ - // Critical Section - /////////////////////////////////////////////////////////////////////////// - unique_lock lock(block_mutex_); - - if (!stopped()) - block_subscriptions_.push_back(notify_block); - /////////////////////////////////////////////////////////////////////////// -} - -// This serves both address subscription and the tx publisher. -void server_node::subscribe_transactions(transaction_notify_callback notify_tx) +bool server_node::start_block_services() { - // Critical Section - /////////////////////////////////////////////////////////////////////////// - unique_lock lock(transaction_mutex_); - - if (!stopped()) - transaction_subscriptions_.push_back(notify_tx); - /////////////////////////////////////////////////////////////////////////// -} - -// Notifications. -// ---------------------------------------------------------------------------- - -bool server_node::handle_new_transaction(const code& ec, - const point::indexes& unconfirmed, const transaction& tx) -{ - if (stopped() || ec == bc::error::service_stopped) - return false; - - if (ec) - { - log::error(LOG_SERVER) - << "Failure handling new tx: " << ec.message(); - return false; - } - - // Critical Section - /////////////////////////////////////////////////////////////////////////// - transaction_mutex_.lock_shared(); - const auto transaction_subscriptions = transaction_subscriptions_; - transaction_mutex_.unlock_shared(); - /////////////////////////////////////////////////////////////////////////// - - // Fire server protocol tx subscription notifications. - for (const auto notify: transaction_subscriptions) - notify(tx); - return true; } -bool server_node::handle_new_blocks(const code& ec, uint64_t fork_point, - const block::ptr_list& new_blocks, const block::ptr_list& replaced_blocks) +bool server_node::start_trans_services() { - if (stopped() || ec == bc::error::service_stopped) - return false; - - if (fork_point < last_checkpoint_height_) - return false; - - if (ec) - { - log::error(LOG_SERVER) - << "Failure handling new blocks: " << ec.message(); - return false; - } - - BITCOIN_ASSERT(fork_point < max_uint32 - new_blocks.size()); - auto height = static_cast(fork_point); - - // Critical Section - /////////////////////////////////////////////////////////////////////////// - block_mutex_.lock_shared(); - const auto block_subscriptions = block_subscriptions_; - block_mutex_.unlock_shared(); - /////////////////////////////////////////////////////////////////////////// - - // Fire server protocol block subscription notifications. - for (auto new_block: new_blocks) - for (const auto notify: block_subscriptions) - notify(++height, new_block); - return true; } - } // namespace server } // namespace libbitcoin diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp index 02f47107..a5b2a6b1 100644 --- a/src/services/block_service.cpp +++ b/src/services/block_service.cpp @@ -1,5 +1,5 @@ /** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * Copyright (c) 2011-2016 libbitcoin developers (see AUTHORS) * * This file is part of libbitcoin-server. * @@ -20,124 +20,215 @@ #include #include -#include -#include +#include +#include #include #include #include #include -#include namespace libbitcoin { namespace server { -#define PUBLIC_NAME "public_block" -#define SECURE_NAME "secure_block" - using std::placeholders::_1; using std::placeholders::_2; +using std::placeholders::_3; +using std::placeholders::_4; using namespace bc::chain; using namespace bc::protocol; -static inline bool is_enabled(server_node& node, bool secure) +static const auto domain = "block"; +const config::endpoint block_service::public_worker("inproc://public_block"); +const config::endpoint block_service::secure_worker("inproc://secure_block"); + +block_service::block_service(zmq::authenticator& authenticator, + server_node& node, bool secure) + : worker(node.thread_pool()), + secure_(secure), + settings_(node.server_settings()), + authenticator_(authenticator), + node_(node) { - const auto& settings = node.server_settings(); - return settings.block_service_enabled && - (!secure || settings.server_private_key); } -static inline config::endpoint get_endpoint(server_node& node, bool secure) +// There is no unsubscribe so this class shouldn't be restarted. +bool block_service::start() { - const auto& settings = node.server_settings(); - return secure ? settings.secure_block_endpoint : - settings.public_block_endpoint; + // Subscribe to blockchain reorganizations. + node_.subscribe_blockchain( + std::bind(&block_service::handle_reorganization, + this, _1, _2, _3, _4)); + + return zmq::worker::start(); } -// ZMQ_PUSH (we might want ZMQ_SUB here) -// When a ZMQ_PUSH socket enters an exceptional state due to having reached the -// high water mark for all downstream nodes, or if there are no downstream -// nodes at all, then any zmq_send(3) operations on the socket shall block -// until the exceptional state ends or at least one downstream node becomes -//available for sending; messages are not discarded. -block_service::block_service(zmq::authenticator& authenticator, - server_node& node, bool secure) - : node_(node), - socket_(authenticator, zmq::socket::role::pusher), - endpoint_(get_endpoint(node, secure)), - enabled_(is_enabled(node, secure)), - secure_(secure) + +// No unsubscribe so must be kept in scope until subscriber stop complete. +bool block_service::stop() +{ + return zmq::worker::stop(); +} + +// Implement worker as extended pub-sub. +// The publisher drops messages for lost peers (clients) and high water. +void block_service::work() { - const auto name = secure ? SECURE_NAME : PUBLIC_NAME; + zmq::socket xpub(authenticator_, zmq::socket::role::extended_publisher); + zmq::socket xsub(authenticator_, zmq::socket::role::extended_subscriber); + + // Bind sockets to the service and worker endpoints. + if (!started(bind(xpub, xsub))) + return; - // The authenticator logs apply failures and stopped socket halts start. - if (!enabled_ || !authenticator.apply(socket_, name, secure)) - socket_.stop(); + // TODO: tap in to failure conditions, such as high water. + // Relay messages between subscriber and publisher (blocks on context). + relay(xpub, xsub); + + // Unbind the sockets and exit this thread. + finished(unbind(xpub, xsub)); } -// The endpoint is not restartable. -// The instance is retained in scope by subscribe_blocks until stopped. -bool block_service::start() +// Bind/Unbind. +//----------------------------------------------------------------------------- + +bool block_service::bind(zmq::socket& xpub, zmq::socket& xsub) { - if (!enabled_) - return true; + const auto security = secure_ ? "secure" : "public"; + const auto& worker = secure_ ? secure_worker : public_worker; + const auto& service = secure_ ? settings_.secure_block_endpoint : + settings_.public_block_endpoint; - if (!socket_) + if (secure_ && !authenticator_.apply(xpub, domain, true)) { log::error(LOG_SERVER) - << "Failed to initialize block publish service."; + << "Failed to apply authenticator to secure block service."; return false; } - const auto ec = socket_.bind(endpoint_); + auto ec = xpub.bind(service); if (ec) { log::error(LOG_SERVER) - << "Failed to bind block publish service to " << endpoint_ - << " : " << ec.message(); - stop(); + << "Failed to bind " << security << " block service to " + << service << " : " << ec.message(); return false; } - log::info(LOG_SERVER) - << "Bound " << (secure_ ? "secure " : "public ") - << "block publish service to " << endpoint_; + ec = xsub.bind(worker); - // This is not a libbitcoin re/subscriber. - node_.subscribe_blocks( - std::bind(&block_service::send, - this, _1, _2)); + if (ec) + { + log::error(LOG_SERVER) + << "Failed to bind " << security << " block workers to " + << worker << " : " << ec.message(); + return false; + } + log::info(LOG_SERVER) + << "Bound " << security << " block service to " << service; return true; } -bool block_service::stop() +bool block_service::unbind(zmq::socket& xpub, zmq::socket& xsub) { - if (!socket_ || socket_.stop()) + // Stop both even if one fails. + const auto service_stop = xpub.stop(); + const auto worker_stop = xsub.stop(); + const auto security = secure_ ? "secure" : "public"; + + if (!service_stop) + { + log::error(LOG_SERVER) + << "Failed to unbind " << security << " block service."; + } + + if (!worker_stop) + { + log::error(LOG_SERVER) + << "Failed to unbind " << security << " block workers."; + } + + // Don't log stop success. + return service_stop && worker_stop; +} + +// Publish Execution (integral worker). +// ---------------------------------------------------------------------------- + +// A failure here does not prevent future notifications. +bool block_service::handle_reorganization(const code& ec, uint64_t fork_point, + const block::ptr_list& new_blocks, const block::ptr_list&) +{ + if (stopped() || ec == bc::error::service_stopped) + return false; + + if (ec) + { + log::warning(LOG_SERVER) + << "Failure handling new block: " << ec.message(); return true; - - log::debug(LOG_SERVER) - << "Failed to unbind block publish service from " << endpoint_; + } - return false; + // Blockchain height is 64 bit but obelisk protocol is 32 bit. + BITCOIN_ASSERT(fork_point <= max_uint32); + const auto fork_point32 = static_cast(fork_point); + + publish_blocks(fork_point32, new_blocks); + return true; } -// BUGBUG: this must be translated to the socket thread. -void block_service::send(uint32_t height, const block::ptr block) +void block_service::publish_blocks(uint32_t fork_point, + const block::ptr_list& blocks) { - zmq::message message; - message.enqueue_little_endian(height); - message.enqueue(block->header.to_data(false)); + const auto security = secure_ ? "secure" : "public"; + const auto& endpoint = secure_ ? block_service::secure_worker : + block_service::public_worker; - for (const auto& tx: block->transactions) - message.enqueue(tx.hash()); + // Notifications are off the pub-sub thread so this must connect back. + // This could be optimized by caching the socket as thread static. + zmq::socket publisher(authenticator_, zmq::socket::role::publisher); + const auto ec = publisher.connect(endpoint); - auto ec = message.send(socket_); + if (ec) + { + log::warning(LOG_SERVER) + << "Failed to connect " << security << " block worker: " + << ec.message(); + } + + BITCOIN_ASSERT(blocks.size() <= max_uint32); + BITCOIN_ASSERT(fork_point < max_uint32 - blocks.size()); + auto height = fork_point; + + for (const auto block: blocks) + publish_block(publisher, height++, block); +} + +void block_service::publish_block(zmq::socket& publisher, uint32_t height, + const block::ptr block) +{ + const auto security = secure_ ? "secure" : "public"; + + zmq::message respose; + respose.enqueue_little_endian(height); + respose.enqueue(block->to_data(false)); + const auto ec = publisher.send(respose); if (ec) + { log::warning(LOG_SERVER) - << "Failed to publish block on " << endpoint_ - << " : " << ec.message(); + << "Failed to publish " << security << " bloc [" + << encode_hash(block->header.hash()) << "] " << ec.message(); + } + + // This isn't actually a request, should probably update settings. + if (!settings_.log_requests) + return; + + log::debug(LOG_SERVER) + << "Published " << security << " block [" + << encode_hash(block->header.hash()) << "]"; } } // namespace server diff --git a/src/services/heart_service.cpp b/src/services/heart_service.cpp index 4fd14a7a..cb6136e8 100644 --- a/src/services/heart_service.cpp +++ b/src/services/heart_service.cpp @@ -77,27 +77,6 @@ void heart_service::work() finished(unbind(publisher)); } -void heart_service::publish(uint32_t count, zmq::socket& publisher) -{ - const auto security = secure_ ? "secure" : "public"; - - zmq::message message; - message.enqueue_little_endian(count); - auto ec = message.send(publisher); - - if (ec && ec != error::service_stopped) - log::warning(LOG_SERVER) - << "Failed to publish " << security << " heartbeat: " - << ec.message(); - - // This isn't actually a request, should probably update settings. - if (!settings_.log_requests) - return; - - log::debug(LOG_SERVER) - << "Published " << security << " heartbeat [" << count << "]."; -} - // Bind/Unbind. //----------------------------------------------------------------------------- @@ -144,5 +123,29 @@ bool heart_service::unbind(zmq::socket& publisher) return true; } +// Publish Execution (integral worker). +//----------------------------------------------------------------------------- + +void heart_service::publish(uint32_t count, zmq::socket& publisher) +{ + const auto security = secure_ ? "secure" : "public"; + + zmq::message message; + message.enqueue_little_endian(count); + auto ec = publisher.send(message); + + if (ec && ec != error::service_stopped) + log::warning(LOG_SERVER) + << "Failed to publish " << security << " heartbeat: " + << ec.message(); + + // This isn't actually a request, should probably update settings. + if (!settings_.log_requests) + return; + + log::debug(LOG_SERVER) + << "Published " << security << " heartbeat [" << count << "]."; +} + } // namespace server } // namespace libbitcoin diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp index aa068dd9..fe47e229 100644 --- a/src/services/query_service.cpp +++ b/src/services/query_service.cpp @@ -43,7 +43,6 @@ query_service::query_service(zmq::authenticator& authenticator, } // Implement worker as a broker. -// TODO: implement as a load balancing broker. // The dealer blocks until there are available workers. // The router drops messages for lost peers (clients) and high water. void query_service::work() @@ -55,8 +54,7 @@ void query_service::work() if (!started(bind(router, dealer))) return; - // TODO: replace with native implementation that allows us to log send - // and receive failures in the relay, so we can log high water. + // TODO: tap in to failure conditions, such as high water. // Relay messages between router and dealer (blocks on context). relay(router, dealer); @@ -109,24 +107,24 @@ bool query_service::bind(zmq::socket& router, zmq::socket& dealer) bool query_service::unbind(zmq::socket& router, zmq::socket& dealer) { // Stop both even if one fails. - const auto router_stop = router.stop(); - const auto dealer_stop = dealer.stop(); + const auto service_stop = router.stop(); + const auto worker_stop = dealer.stop(); const auto security = secure_ ? "secure" : "public"; - if (!router_stop) + if (!service_stop) { log::error(LOG_SERVER) << "Failed to unbind " << security << " query service."; } - if (!dealer_stop) + if (!worker_stop) { log::error(LOG_SERVER) << "Failed to unbind " << security << " query workers."; } // Don't log stop success. - return router_stop && dealer_stop; + return service_stop && worker_stop; } } // namespace server diff --git a/src/services/trans_service.cpp b/src/services/trans_service.cpp index 50b99f0a..40da0942 100644 --- a/src/services/trans_service.cpp +++ b/src/services/trans_service.cpp @@ -19,118 +19,195 @@ */ #include -#include -#include +#include +#include #include #include #include #include -#include namespace libbitcoin { namespace server { -#define PUBLIC_NAME "public_transaction" -#define SECURE_NAME "secure_transaction" - using std::placeholders::_1; +using std::placeholders::_2; +using std::placeholders::_3; using namespace bc::chain; using namespace bc::protocol; -static inline bool is_enabled(server_node& node, bool secure) +static const auto domain = "transaction"; +const config::endpoint trans_service::public_worker("inproc://public_tx"); +const config::endpoint trans_service::secure_worker("inproc://secure_tx"); + +trans_service::trans_service(zmq::authenticator& authenticator, + server_node& node, bool secure) + : worker(node.thread_pool()), + secure_(secure), + settings_(node.server_settings()), + authenticator_(authenticator), + node_(node) { - const auto& settings = node.server_settings(); - return settings.transaction_service_enabled && - (!secure || settings.server_private_key); } -static inline config::endpoint get_endpoint(server_node& node, bool secure) +// There is no unsubscribe so this class shouldn't be restarted. +bool trans_service::start() { - const auto& settings = node.server_settings(); - return secure ? settings.secure_transaction_endpoint : - settings.public_transaction_endpoint; + // Subscribe to transaction pool acceptances. + node_.subscribe_transaction_pool( + std::bind(&trans_service::handle_transaction, + this, _1, _2, _3)); + + return zmq::worker::start(); } -// ZMQ_PUSH (we might want ZMQ_SUB here) -// When a ZMQ_PUSH socket enters an exceptional state due to having reached the -// high water mark for all downstream nodes, or if there are no downstream -// nodes at all, then any zmq_send(3) operations on the socket shall block -// until the exceptional state ends or at least one downstream node becomes -//available for sending; messages are not discarded. -trans_service::trans_service(zmq::authenticator& authenticator, - server_node& node, bool secure) - : node_(node), - socket_(authenticator, zmq::socket::role::pusher), - endpoint_(get_endpoint(node, secure)), - enabled_(is_enabled(node, secure)), - secure_(secure) + +// No unsubscribe so must be kept in scope until subscriber stop complete. +bool trans_service::stop() { - const auto name = secure ? SECURE_NAME : PUBLIC_NAME; + return zmq::worker::stop(); +} + +// Implement worker as extended pub-sub. +// The publisher drops messages for lost peers (clients) and high water. +void trans_service::work() +{ + zmq::socket xpub(authenticator_, zmq::socket::role::extended_publisher); + zmq::socket xsub(authenticator_, zmq::socket::role::extended_subscriber); + + // Bind sockets to the service and worker endpoints. + if (!started(bind(xpub, xsub))) + return; + + // TODO: tap in to failure conditions, such as high water. + // Relay messages between subscriber and publisher (blocks on context). + relay(xpub, xsub); - // The authenticator logs apply failures and stopped socket halts start. - if (!enabled_ || !authenticator.apply(socket_, name, secure)) - socket_.stop(); + // Unbind the sockets and exit this thread. + finished(unbind(xpub, xsub)); } -// The endpoint is not restartable. -// The instance is retained in scope by subscribe_transactions until stopped. -bool trans_service::start() +// Bind/Unbind. +//----------------------------------------------------------------------------- + +bool trans_service::bind(zmq::socket& xpub, zmq::socket& xsub) { - if (!enabled_) - return true; + const auto security = secure_ ? "secure" : "public"; + const auto& worker = secure_ ? secure_worker : public_worker; + const auto& service = secure_ ? settings_.secure_transaction_endpoint : + settings_.public_transaction_endpoint; - if (!socket_) + if (secure_ && !authenticator_.apply(xpub, domain, true)) { log::error(LOG_SERVER) - << "Failed to initialize transaction publish service."; + << "Failed to apply authenticator to secure transaction service."; return false; } - const auto ec = socket_.bind(endpoint_); + auto ec = xpub.bind(service); if (ec) { log::error(LOG_SERVER) - << "Failed to bind transaction publish service to " << endpoint_ - << " : " << ec.message(); - stop(); + << "Failed to bind " << security << " transaction service to " + << service << " : " << ec.message(); return false; } - log::info(LOG_SERVER) - << "Bound " << (secure_ ? "secure " : "public ") - << "transaction publish service to " << endpoint_; + ec = xsub.bind(worker); - // This is not a libbitcoin re/subscriber. - node_.subscribe_transactions( - std::bind(&trans_service::send, - this, _1)); + if (ec) + { + log::error(LOG_SERVER) + << "Failed to bind " << security << " transaction workers to " + << worker << " : " << ec.message(); + return false; + } + log::info(LOG_SERVER) + << "Bound " << security << " transaction service to " << service; return true; } -bool trans_service::stop() +bool trans_service::unbind(zmq::socket& xpub, zmq::socket& xsub) +{ + // Stop both even if one fails. + const auto service_stop = xpub.stop(); + const auto worker_stop = xsub.stop(); + const auto security = secure_ ? "secure" : "public"; + + if (!service_stop) + { + log::error(LOG_SERVER) + << "Failed to unbind " << security << " transaction service."; + } + + if (!worker_stop) + { + log::error(LOG_SERVER) + << "Failed to unbind " << security << " transaction workers."; + } + + // Don't log stop success. + return service_stop && worker_stop; +} + +// Publish Execution (integral worker). +// ---------------------------------------------------------------------------- + +// A failure here does not prevent future notifications. +bool trans_service::handle_transaction(const code& ec, + const point::indexes&, const transaction& tx) { - if (!socket_ || socket_.stop()) + if (stopped() || ec == bc::error::service_stopped) + return false; + + if (ec) + { + log::warning(LOG_SERVER) + << "Failure handling new transaction: " << ec.message(); return true; + } - log::debug(LOG_SERVER) - << "Failed to unbind transaction publish service from " << endpoint_; - return false; + publish_transaction(tx); + return true; } -// BUGBUG: this must be translated to the socket thread. -void trans_service::send(const transaction& tx) +void trans_service::publish_transaction(const transaction& tx) { - zmq::message message; - message.enqueue(tx.to_data()); + const auto security = secure_ ? "secure" : "public"; + const auto& endpoint = secure_ ? trans_service::secure_worker : + trans_service::public_worker; + + // Notifications are off the pub-sub thread so this must connect back. + // This could be optimized by caching the socket as thread static. + zmq::socket publisher(authenticator_, zmq::socket::role::publisher); + auto ec = publisher.connect(endpoint); + + if (ec) + { + log::warning(LOG_SERVER) + << "Failed to connect " << security << " transaction worker: " + << ec.message(); + } - auto ec = message.send(socket_); + zmq::message respose; + respose.enqueue(tx.to_data()); + ec = publisher.send(respose); if (ec) + { log::warning(LOG_SERVER) - << "Failed to publish transaction on " << endpoint_ - << " : " << ec.message(); + << "Failed to publish " << security << " transaction [" + << encode_hash(tx.hash()) << "] " << ec.message(); + } + + // This isn't actually a request, should probably update settings. + if (!settings_.log_requests) + return; + + log::debug(LOG_SERVER) + << "Published " << security << " transaction [" + << encode_hash(tx.hash()) << "]"; } } // namespace server diff --git a/src/utility/address_notifier.cpp b/src/utility/address_notifier.cpp index 7cfbd538..4b57f92d 100644 --- a/src/utility/address_notifier.cpp +++ b/src/utility/address_notifier.cpp @@ -51,13 +51,15 @@ address_notifier::address_notifier(server_node& node) // Subscribe against the node's tx and block publishers. bool address_notifier::start() { - node_.subscribe_blocks( - std::bind(&address_notifier::receive_block, - this, _1, _2)); - - node_.subscribe_transactions( - std::bind(&address_notifier::receive_transaction, - this, _1)); + ////////// This is not a libbitcoin re/subscriber. + ////////node_.subscribe_blocks( + //////// std::bind(&address_notifier::receive_block, + //////// this, _1, _2)); + + ////////// This is not a libbitcoin re/subscriber. + ////////node_.subscribe_transactions( + //////// std::bind(&address_notifier::receive_transaction, + //////// this, _1)); return true; } diff --git a/src/workers/query_worker.cpp b/src/workers/query_worker.cpp index a640778f..2e0c8b8e 100644 --- a/src/workers/query_worker.cpp +++ b/src/workers/query_worker.cpp @@ -84,7 +84,7 @@ bool query_worker::connect(zmq::socket& router) const auto& endpoint = secure_ ? query_service::secure_worker : query_service::public_worker; - auto ec = router.connect(endpoint); + const auto ec = router.connect(endpoint); if (ec) { @@ -147,12 +147,6 @@ void query_worker::query(zmq::socket& router) return; } - if (settings_.log_requests) - { - log::info(LOG_SERVER) - << "Query " << request.command << " from " << request.address(); - } - // Locate the request handler for this command. const auto handler = command_handlers_.find(request.command); @@ -165,6 +159,12 @@ void query_worker::query(zmq::socket& router) return; } + if (settings_.log_requests) + { + log::info(LOG_SERVER) + << "Query " << request.command << " from " << request.address(); + } + // Execute the request and forward result to queue. handler->second(request, sender); }