diff --git a/data/bs.cfg b/data/bs.cfg index 89c2acad..514ea19d 100644 --- a/data/bs.cfg +++ b/data/bs.cfg @@ -125,12 +125,12 @@ log_requests = false secure_only = false # Enable the query service, defaults to true. query_service_enabled = true -# Enable the heartbeat service, defaults to true. -heartbeat_service_enabled = true -# Enable the block publishing service, defaults to true. -block_service_enabled = true -# Enable the transaction publishing service, defaults to true. -transaction_service_enabled = true +# Enable the heartbeat service, defaults to false. +heartbeat_service_enabled = false +# Enable the block publishing service, defaults to false. +block_service_enabled = false +# Enable the transaction publishing service, defaults to false. +transaction_service_enabled = false # The public query endpoint, defaults to 'tcp://*:9091'. public_query_endpoint = tcp://*:9091 # The public heartbeat endpoint, defaults to 'tcp://*:9092'. diff --git a/include/bitcoin/server/messages/incoming.hpp b/include/bitcoin/server/messages/incoming.hpp index f8d011cf..cd71c5f3 100644 --- a/include/bitcoin/server/messages/incoming.hpp +++ b/include/bitcoin/server/messages/incoming.hpp @@ -31,7 +31,11 @@ namespace server { class BCS_API incoming { public: - bool receive(bc::protocol::zmq::socket& socket); + /// A printable address for logging only. + std::string address(); + + /// Send a message from the socket. + code receive(bc::protocol::zmq::socket& socket); /// The message route as seen at workers. data_chunk address1; diff --git a/include/bitcoin/server/messages/outgoing.hpp b/include/bitcoin/server/messages/outgoing.hpp index fdaecf3d..b195e8c9 100644 --- a/include/bitcoin/server/messages/outgoing.hpp +++ b/include/bitcoin/server/messages/outgoing.hpp @@ -43,7 +43,11 @@ class BCS_API outgoing const data_chunk& address1, const data_chunk& address2, bool delimited); - bool send(bc::protocol::zmq::socket& socket); + /// A printable address for logging only. + std::string address(); + + /// Send the message one the socket. + code send(bc::protocol::zmq::socket& socket); protected: outgoing(const std::string& command, const data_chunk& data, diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp index f327783d..2d69a2fb 100644 --- a/include/bitcoin/server/server_node.hpp +++ b/include/bitcoin/server/server_node.hpp @@ -29,7 +29,7 @@ #include #include ////#include -////#include +#include #include ////#include #include @@ -99,8 +99,9 @@ class BCS_API server_node void handle_running(const code& ec, result_handler handler); void handle_closing(const code& ec, std::promise& wait); - void start_query_services(result_handler handler); - void allocate_query_workers(bool secure, result_handler handler); + bool start_heart_services(); + bool start_query_services(); + bool start_query_workers(bool secure); const configuration& configuration_; const size_t last_checkpoint_height_; @@ -109,6 +110,8 @@ class BCS_API server_node curve_authenticator authenticator_; query_service secure_query_service_; query_service public_query_service_; + heart_service secure_heart_service_; + heart_service public_heart_service_; // This is protected by block mutex. block_notify_list block_subscriptions_; diff --git a/include/bitcoin/server/services/heart_service.hpp b/include/bitcoin/server/services/heart_service.hpp index 04d92223..cfc0165e 100644 --- a/include/bitcoin/server/services/heart_service.hpp +++ b/include/bitcoin/server/services/heart_service.hpp @@ -20,13 +20,11 @@ #ifndef LIBBITCOIN_SERVER_HEART_SERVICE_HPP #define LIBBITCOIN_SERVER_HEART_SERVICE_HPP -#include -#include #include -#include #include #include #include +#include namespace libbitcoin { namespace server { @@ -34,7 +32,7 @@ namespace server { class server_node; class BCS_API heart_service - : public enable_shared_from_base + : public bc::protocol::zmq::worker { public: typedef std::shared_ptr ptr; @@ -43,33 +41,23 @@ class BCS_API heart_service heart_service(bc::protocol::zmq::authenticator& authenticator, server_node& node, bool secure); - /// This class is not copyable. - heart_service(const heart_service&) = delete; - void operator=(const heart_service&) = delete; +protected: + virtual bool bind(bc::protocol::zmq::socket& publisher); + virtual bool unbind(bc::protocol::zmq::socket& publisher); - /// Start the endpoint. - bool start(); + // Implement the service. + virtual void work(); - /// Stop the endpoint. - /// Stopping the authenticated context does not stop the publisher. - bool stop(); + // Publish the heartbeat (integrated worker). + void publish(uint32_t count, bc::protocol::zmq::socket& socket); private: - void publisher(std::promise& started); - void send(uint32_t count, bc::protocol::zmq::socket& socket); + const server::settings& settings_; + const int32_t period_; + const bool secure_; - // These are protected by mutex. + // This is thread safe. bc::protocol::zmq::authenticator& authenticator_; - dispatcher dispatch_; - std::atomic stopped_; - std::promise stopping_; - mutable shared_mutex mutex_; - - const bc::config::endpoint endpoint_; - const std::chrono::seconds interval_; - const bool log_; - const bool enabled_; - const bool secure_; }; } // namespace server diff --git a/include/bitcoin/server/workers/query_worker.hpp b/include/bitcoin/server/workers/query_worker.hpp index 8ca4c27e..f1bdbc7a 100644 --- a/include/bitcoin/server/workers/query_worker.hpp +++ b/include/bitcoin/server/workers/query_worker.hpp @@ -50,11 +50,12 @@ class BCS_API query_worker typedef std::function command_handler; typedef std::unordered_map command_map; + virtual bool connect(bc::protocol::zmq::socket& router); + virtual bool disconnect(bc::protocol::zmq::socket& router); + virtual void attach_interface(); virtual void attach(const std::string& command, command_handler handler); - virtual bool connect(bc::protocol::zmq::socket& socket); - virtual bool disconnect(bc::protocol::zmq::socket& socket); - virtual void query(bc::protocol::zmq::socket& socket); + virtual void query(bc::protocol::zmq::socket& router); // Implement the worker. virtual void work(); diff --git a/src/messages/incoming.cpp b/src/messages/incoming.cpp index 10207d8d..6b71b363 100644 --- a/src/messages/incoming.cpp +++ b/src/messages/incoming.cpp @@ -28,24 +28,35 @@ namespace server { using namespace bc::protocol; +std::string incoming::address() +{ + return "[" + encode_base16(address1) + "]"; +} + // Protocol delimitation migration plan. //----------------------------------------------------------------------------- -// v1/v2 server: ROUTER, requires not framed -// v3 server: ROUTER, allows/echos framed +// v1/v2 server: ROUTER, requires not delimited +// v3 server: ROUTER, allows/echos delimited // v1/v2/v3 client: DEALER (not framed) //----------------------------------------------------------------------------- -// v4 server: ROUTER, requires framed -// v4 client: DEALER (framed) or REQ +// v4 server: ROUTER, requires delimited +// v4 client: DEALER (delimited) or REQ //----------------------------------------------------------------------------- -// TODO: generalize address stripping with hack parameter of expected -// payload length for parsing undelimited addressing. -bool incoming::receive(zmq::socket& socket) +// TODO: generalize address stripping, store all routes, use (hack) parameter +// of expected payload length for parsing undelimited addressing. +// BUGBUG: current implementation assumes client has not added any addresses. +// This probably prevents the use of generalized zeromq routing to the server. +code incoming::receive(zmq::socket& socket) { zmq::message message; + auto ec = message.receive(socket); - if (!message.receive(socket) || message.size() < 5 || message.size() > 6) - return false; + if (ec) + return ec; + + if (message.size() < 5 || message.size() > 6) + return error::bad_stream; // Client is undelimited DEALER -> 2 addresses with no delimiter. // Client is REQ or delimited DEALER -> 2 addresses with delimiter. @@ -66,11 +77,11 @@ bool incoming::receive(zmq::socket& socket) // Arbitrary caller data (returned to caller for correlation). if (!message.dequeue(id)) - return false; + return error::bad_stream; // Serialized query. data = message.dequeue_data(); - return true; + return error::success; } } // namespace server diff --git a/src/messages/outgoing.cpp b/src/messages/outgoing.cpp index 035410d0..55233581 100644 --- a/src/messages/outgoing.cpp +++ b/src/messages/outgoing.cpp @@ -80,7 +80,13 @@ outgoing::outgoing(const std::string& command, const data_chunk& data, message_.enqueue(data); } -bool outgoing::send(zmq::socket& socket) +std::string outgoing::address() +{ + auto message = message_; + return "[" + encode_base16(message.dequeue_data()) + "]"; +} + +code outgoing::send(zmq::socket& socket) { return message_.send(socket); } diff --git a/src/parser.cpp b/src/parser.cpp index f1d61a11..8e3a6598 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -325,17 +325,17 @@ options_metadata parser::load_settings() ( "server.heartbeat_service_enabled", value(&configured.server.heartbeat_service_enabled), - "Enable the heartbeat service, defaults to true." + "Enable the heartbeat service, defaults to false." ) ( "server.block_service_enabled", value(&configured.server.block_service_enabled), - "Enable the block publishing service, defaults to true." + "Enable the block publishing service, defaults to false." ) ( "server.transaction_service_enabled", value(&configured.server.transaction_service_enabled), - "Enable the transaction publishing service, defaults to true." + "Enable the transaction publishing service, defaults to false." ) ( "server.public_query_endpoint", diff --git a/src/server_node.cpp b/src/server_node.cpp index 8b4dae9a..de2188a9 100644 --- a/src/server_node.cpp +++ b/src/server_node.cpp @@ -45,7 +45,9 @@ server_node::server_node(const configuration& configuration) last_checkpoint_height_(configuration.last_checkpoint_height()), authenticator_(*this), secure_query_service_(authenticator_, *this, true), - public_query_service_(authenticator_, *this, false) + public_query_service_(authenticator_, *this, false), + secure_heart_service_(authenticator_, *this, true), + public_heart_service_(authenticator_, *this, false) { } @@ -145,8 +147,12 @@ void server_node::handle_running(const code& ec, result_handler handler) return; } - // Start query services and workers. - start_query_services(handler); + // Start services and workers. + if (!start_query_services() || !start_heart_services()) + { + handler(error::operation_failed); + return; + } // Subscribe to blockchain reorganizations. subscribe_blockchain( @@ -167,7 +173,7 @@ void server_node::handle_running(const code& ec, result_handler handler) // The number of threads available in the thread pool must be sufficient // to allocate the workers. This will wait forever on thread availability. -void server_node::allocate_query_workers(bool secure, result_handler handler) +bool server_node::start_query_workers(bool secure) { auto& server = *this; const auto& settings = configuration_.server; @@ -178,43 +184,51 @@ void server_node::allocate_query_workers(bool secure, result_handler handler) server, secure); if (!worker->start()) - { - handler(error::operation_failed); - return; - } + return false; subscribe_stop([=](const code&) { worker->stop(); }); } + + return true; } -void server_node::start_query_services(result_handler handler) +bool server_node::start_query_services() { const auto& settings = configuration_.server; if (!settings.query_service_enabled || settings.query_workers == 0) - return; + return true; - if (settings.server_private_key) - { - if (!secure_query_service_.start()) - { - handler(error::operation_failed); - return; - } + // Start secure service and workers if enabled. + if (settings.server_private_key && (!secure_query_service_.start() || + !start_query_workers(true))) + return false; - allocate_query_workers(true, handler); - } + // Start public service and workers if enabled. + if (!settings.secure_only && (!public_query_service_.start() || + !start_query_workers(false))) + return false; - if (!settings.secure_only) - { - if (!public_query_service_.start()) - { - handler(error::operation_failed); - return; - } + return true; +} - allocate_query_workers(false, handler); - } +bool server_node::start_heart_services() +{ + const auto& settings = configuration_.server; + + if (!settings.heartbeat_service_enabled || + settings.heartbeat_interval_seconds == 0) + return true; + + // Start public service if enabled. + if (settings.server_private_key && !secure_heart_service_.start()) + return false; + + // Start public service if enabled. + if (!settings.secure_only && !public_heart_service_.start()) + return false; + + return true; } // Subscriptions. diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp index 7dc2cc73..02f47107 100644 --- a/src/services/block_service.cpp +++ b/src/services/block_service.cpp @@ -81,10 +81,20 @@ bool block_service::start() if (!enabled_) return true; - if (!socket_ || !socket_.bind(endpoint_)) + if (!socket_) { log::error(LOG_SERVER) - << "Failed to bind block publish service to " << endpoint_; + << "Failed to initialize block publish service."; + return false; + } + + const auto ec = socket_.bind(endpoint_); + + if (ec) + { + log::error(LOG_SERVER) + << "Failed to bind block publish service to " << endpoint_ + << " : " << ec.message(); stop(); return false; } @@ -122,11 +132,12 @@ void block_service::send(uint32_t height, const block::ptr block) for (const auto& tx: block->transactions) message.enqueue(tx.hash()); - if (message.send(socket_)) - return; + auto ec = message.send(socket_); - log::warning(LOG_SERVER) - << "Failed to publish block on " << endpoint_; + if (ec) + log::warning(LOG_SERVER) + << "Failed to publish block on " << endpoint_ + << " : " << ec.message(); } } // namespace server diff --git a/src/services/heart_service.cpp b/src/services/heart_service.cpp index d519baf1..4fd14a7a 100644 --- a/src/services/heart_service.cpp +++ b/src/services/heart_service.cpp @@ -21,185 +21,127 @@ #include #include -#include -#include -#include #include #include +#include namespace libbitcoin { namespace server { - -#define NAME "heartbeat_endpoint" -#define PUBLIC_NAME "public_heartbeat" -#define SECURE_NAME "secure_heartbeat" - -using std::placeholders::_1; -using namespace std::chrono; -using namespace std::this_thread; -using namespace bc::protocol; -static inline bool is_enabled(server_node& node, bool secure) -{ - const auto& settings = node.server_settings(); - return settings.heartbeat_service_enabled && - settings.heartbeat_interval_seconds > 0 && - (!secure || settings.server_private_key); -} +static const auto domain = "heartbeat"; -static inline config::endpoint get_endpoint(server_node& node, bool secure) -{ - const auto& settings = node.server_settings(); - return secure ? settings.secure_heartbeat_endpoint : - settings.public_heartbeat_endpoint; -} +using namespace bc::config; +using namespace bc::protocol; -static inline seconds get_interval(server_node& node) +static uint32_t to_milliseconds(uint16_t seconds) { - return seconds(node.server_settings().heartbeat_interval_seconds); -} + const auto milliseconds = static_cast(seconds) * 1000; + return std::min(milliseconds, max_uint32); +}; -// ZMQ_PUB (ok to drop, never block) -// When a ZMQ_PUB socket enters an exceptional state due to having reached -// the high water mark for a subscriber, then any messages that would be sent -// to the subscriber in question shall instead be dropped until the exceptional -// state ends. The zmq_send() function shall never block for this socket type. +// Heartbeat is capped at ~ 25 days by signed/millsecond conversions. heart_service::heart_service(zmq::authenticator& authenticator, server_node& node, bool secure) - : authenticator_(authenticator), - dispatch_(node.thread_pool(), NAME), - stopped_(true), - endpoint_(get_endpoint(node, secure)), - interval_(get_interval(node)), - log_(node.server_settings().log_requests), - enabled_(is_enabled(node, secure)), + : worker(node.thread_pool()), + settings_(node.server_settings()), + period_(to_milliseconds(settings_.heartbeat_interval_seconds)), + authenticator_(authenticator), secure_(secure) { } -// The endpoint is restartable. -bool heart_service::start() +// Implement service as a publisher. +// The publisher does not block if there are no subscribers or at high water. +void heart_service::work() { - if (!enabled_) - return true; + zmq::socket publisher(authenticator_, zmq::socket::role::publisher); - std::promise started; - code ec(error::operation_failed); - - /////////////////////////////////////////////////////////////////////////// - // Critical Section - mutex_.lock(); - - if (stopped_) - { - // Enable publisher start. - stopped_ = false; - - // Create the pubisher thread and socket and start sending. - dispatch_.concurrent( - std::bind(&heart_service::publisher, - this, std::ref(started))); + // Bind socket to the worker endpoint. + if (!started(bind(publisher))) + return; - // Wait on publisher start. - ec = started.get_future().get(); - } + zmq::poller poller; + poller.add(publisher); - mutex_.unlock(); - /////////////////////////////////////////////////////////////////////////// + // Pick a random counter start, will wrap around at overflow. + auto count = static_cast(pseudo_random()); - if (ec) + // We will not receive on the poller, we use its timer and context stop. + while (!poller.terminated() && !stopped()) { - log::error(LOG_SERVER) - << "Failed to bind heartbeat service to " << endpoint_; - return false; + poller.wait(period_); + publish(count++, publisher); } - log::info(LOG_SERVER) - << "Bound " << (secure_ ? "secure " : "public ") - << "heartbeat service to " << endpoint_; - return true; + // Unbind the socket and exit this thread. + finished(unbind(publisher)); } -bool heart_service::stop() +void heart_service::publish(uint32_t count, zmq::socket& publisher) { - code ec(error::success); - - /////////////////////////////////////////////////////////////////////////// - // Critical Section - mutex_.lock(); + const auto security = secure_ ? "secure" : "public"; - if (!stopped_) - { - // Stop the publisher. - stopped_ = true; - - // Wait on publisher stop. - ec = stopping_.get_future().get(); + zmq::message message; + message.enqueue_little_endian(count); + auto ec = message.send(publisher); - // Enable restart capability. - stopping_ = std::promise(); - } - - mutex_.unlock(); - /////////////////////////////////////////////////////////////////////////// + if (ec && ec != error::service_stopped) + log::warning(LOG_SERVER) + << "Failed to publish " << security << " heartbeat: " + << ec.message(); - if (ec) - { - log::error(LOG_SERVER) - << "Failed to unbind heartbeat service from " << endpoint_; - return false; - } + // This isn't actually a request, should probably update settings. + if (!settings_.log_requests) + return; - return true; + log::debug(LOG_SERVER) + << "Published " << security << " heartbeat [" << count << "]."; } -// A context stop does not stop the publisher. -void heart_service::publisher(std::promise& started) +// Bind/Unbind. +//----------------------------------------------------------------------------- + +bool heart_service::bind(zmq::socket& publisher) { - const auto name = secure_ ? SECURE_NAME : PUBLIC_NAME; - zmq::socket socket(authenticator_, zmq::socket::role::publisher); + const auto security = secure_ ? "secure" : "public"; + const auto& endpoint = secure_ ? settings_.secure_heartbeat_endpoint : + settings_.public_heartbeat_endpoint; - if (!socket || !authenticator_.apply(socket, name, secure_) || - !socket.bind(endpoint_)) + if (secure_ && !authenticator_.apply(publisher, domain, true)) { - stopping_.set_value(error::success); - started.set_value(error::operation_failed); - return; + log::error(LOG_SERVER) + << "Failed to apply authenticator to secure heartbeat service."; + return false; } - started.set_value(error::success); - - // Pick a random counter start, overflow is okay. - auto counter = static_cast(pseudo_random()); + const auto ec = publisher.bind(endpoint); - // TODO: use poller against outbound socket, allowing context to close it. - // A simple loop is optimal and keeps us on a single thread. - while (!stopped_) + if (ec) { - send(counter++, socket); - sleep_for(interval_); + log::error(LOG_SERVER) + << "Failed to bind " << security << " heartbeat service to " + << endpoint << " : " << ec.message(); + return false; } - auto result = socket.stop() ? error::success : error::operation_failed; - stopping_.set_value(result); + log::info(LOG_SERVER) + << "Bound " << security << " heartbeat service to " << endpoint; + return true; } -void heart_service::send(uint32_t count, zmq::socket& socket) +bool heart_service::unbind(zmq::socket& publisher) { - zmq::message message; - message.enqueue_little_endian(count); - const auto result = message.send(socket); + const auto security = secure_ ? "secure" : "public"; - if (!result) + if (!publisher.stop()) { - log::warning(LOG_SERVER) - << "Failed to send heartbeat on " << endpoint_; - return; + log::error(LOG_SERVER) + << "Failed to disconnect " << security << " heartbeat worker."; + return false; } - if (log_) - log::debug(LOG_SERVER) - << "Heartbeat [" << count << "] sent on " << endpoint_; + // Don't log stop success. + return true; } } // namespace server diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp index cea40617..aa068dd9 100644 --- a/src/services/query_service.cpp +++ b/src/services/query_service.cpp @@ -44,6 +44,8 @@ query_service::query_service(zmq::authenticator& authenticator, // Implement worker as a broker. // TODO: implement as a load balancing broker. +// The dealer blocks until there are available workers. +// The router drops messages for lost peers (clients) and high water. void query_service::work() { zmq::socket router(authenticator_, zmq::socket::role::router); @@ -53,6 +55,8 @@ void query_service::work() if (!started(bind(router, dealer))) return; + // TODO: replace with native implementation that allows us to log send + // and receive failures in the relay, so we can log high water. // Relay messages between router and dealer (blocks on context). relay(router, dealer); @@ -77,19 +81,23 @@ bool query_service::bind(zmq::socket& router, zmq::socket& dealer) return false; } - if (!router.bind(service)) + auto ec = router.bind(service); + + if (ec) { log::error(LOG_SERVER) << "Failed to bind " << security << " query service to " - << service; + << service << " : " << ec.message(); return false; } - if (!dealer.bind(worker)) + ec = dealer.bind(worker); + + if (ec) { log::error(LOG_SERVER) << "Failed to bind " << security << " query workers to " - << worker; + << worker << " : " << ec.message(); return false; } diff --git a/src/services/trans_service.cpp b/src/services/trans_service.cpp index 2d296332..50b99f0a 100644 --- a/src/services/trans_service.cpp +++ b/src/services/trans_service.cpp @@ -79,10 +79,20 @@ bool trans_service::start() if (!enabled_) return true; - if (!socket_ || !socket_.bind(endpoint_)) + if (!socket_) { log::error(LOG_SERVER) - << "Failed to bind transaction publish service to " << endpoint_; + << "Failed to initialize transaction publish service."; + return false; + } + + const auto ec = socket_.bind(endpoint_); + + if (ec) + { + log::error(LOG_SERVER) + << "Failed to bind transaction publish service to " << endpoint_ + << " : " << ec.message(); stop(); return false; } @@ -115,11 +125,12 @@ void trans_service::send(const transaction& tx) zmq::message message; message.enqueue(tx.to_data()); - if (message.send(socket_)) - return; + auto ec = message.send(socket_); - log::warning(LOG_SERVER) - << "Failed to publish transaction on " << endpoint_; + if (ec) + log::warning(LOG_SERVER) + << "Failed to publish transaction on " << endpoint_ + << " : " << ec.message(); } } // namespace server diff --git a/src/settings.cpp b/src/settings.cpp index 32e7b4ee..1eb42b02 100644 --- a/src/settings.cpp +++ b/src/settings.cpp @@ -34,9 +34,9 @@ settings::settings() log_requests(false), secure_only(false), query_service_enabled(true), - heartbeat_service_enabled(true), - block_service_enabled(true), - transaction_service_enabled(true), + heartbeat_service_enabled(false), + block_service_enabled(false), + transaction_service_enabled(false), public_query_endpoint("tcp://*:9091"), public_heartbeat_endpoint("tcp://*:9092"), public_block_endpoint("tcp://*:9093"), diff --git a/src/workers/query_worker.cpp b/src/workers/query_worker.cpp index 7824a0e4..a640778f 100644 --- a/src/workers/query_worker.cpp +++ b/src/workers/query_worker.cpp @@ -52,7 +52,8 @@ query_worker::query_worker(zmq::authenticator& authenticator, } // Implement worker as a router. -// NOTE: v2 libbitcoin-client DEALER does not add delimiter frame. +// v2 libbitcoin-client DEALER does not add delimiter frame. +// The router drops messages for lost peers (query service) and high water. void query_worker::work() { zmq::socket router(authenticator_, zmq::socket::role::router); @@ -63,8 +64,7 @@ void query_worker::work() zmq::poller poller; poller.add(router); - - // We can drop messages here because this is a router. + while (!poller.terminated() && !stopped()) { if (poller.wait().contains(router.id())) @@ -78,17 +78,19 @@ void query_worker::work() // Connect/Disconnect. //----------------------------------------------------------------------------- -bool query_worker::connect(zmq::socket& socket) +bool query_worker::connect(zmq::socket& router) { const auto security = secure_ ? "secure" : "public"; const auto& endpoint = secure_ ? query_service::secure_worker : query_service::public_worker; - if (!socket.connect(endpoint)) + auto ec = router.connect(endpoint); + + if (ec) { log::error(LOG_SERVER) << "Failed to connect " << security << " query worker to " - << endpoint; + << endpoint << " : " << ec.message(); return false; } @@ -97,11 +99,11 @@ bool query_worker::connect(zmq::socket& socket) return true; } -bool query_worker::disconnect(zmq::socket& socket) +bool query_worker::disconnect(zmq::socket& router) { const auto security = secure_ ? "secure" : "public"; - if (!socket.stop()) + if (!router.stop()) { log::error(LOG_SERVER) << "Failed to disconnect " << security << " query worker."; @@ -112,25 +114,43 @@ bool query_worker::disconnect(zmq::socket& socket) return true; } -// Utilities. +// Query Execution. //----------------------------------------------------------------------------- -void query_worker::query(zmq::socket& socket) +// Because the socket is a router we may simply drop invalid queries. +// As a single thread worker this router should not reach high water. +// If we implemented as a replier we would need to always provide a response. +void query_worker::query(zmq::socket& router) { + // TODO: rewrite the serial blockchain interface to avoid callbacks. + const auto sender = [&router](outgoing&& response) + { + const auto ec = response.send(router); + + if (ec) + log::warning(LOG_SERVER) + << "Failed to send query response to " << response.address() + << ec.message(); + }; + incoming request; + const auto ec = request.receive(router); - if (!request.receive(socket)) + if (ec) { - log::warning(LOG_SERVER) - << "Malformed query from " << encode_base16(request.address1); + log::debug(LOG_SERVER) + << "Failed to receive query from " << request.address() + << ec.message(); + + // Because the query did not parse this is likely to be misaddressed. + sender(outgoing(request, ec)); return; } if (settings_.log_requests) { log::info(LOG_SERVER) - << "Query " << request.command << " from " - << encode_base16(request.address1); + << "Query " << request.command << " from " << request.address(); } // Locate the request handler for this command. @@ -138,20 +158,13 @@ void query_worker::query(zmq::socket& socket) if (handler == command_handlers_.end()) { - log::warning(LOG_SERVER) - << "Invalid query command from " - << encode_base16(request.address1); + log::debug(LOG_SERVER) + << "Invalid query command from " << request.address(); + + sender(outgoing(request, error::not_found)); return; } - // TODO: rewrite the serial blockchain interface to avoid callbacks. - const auto sender = [&socket](outgoing&& response) - { - if (!response.send(socket)) - log::warning(LOG_SERVER) - << "Failed to send query response."; - }; - // Execute the request and forward result to queue. handler->second(request, sender); }