diff --git a/.gitignore b/.gitignore index 7b8c2497..8a45f518 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,7 @@ libobelisk.pc /src/bitcoin_server +build-libbitcoin-server +console/bs +libbitcoin-server.pc + diff --git a/Makefile.am b/Makefile.am index fdeb0dac..a9bdf03f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -50,6 +50,10 @@ src_libbitcoin_server_la_SOURCES = \ src/services/heartbeat_service.cpp \ src/services/query_service.cpp \ src/services/transaction_service.cpp \ + src/web/block_socket.cpp \ + src/web/heartbeat_socket.cpp \ + src/web/query_socket.cpp \ + src/web/transaction_socket.cpp \ src/workers/authenticator.cpp \ src/workers/notification_worker.cpp \ src/workers/query_worker.cpp @@ -119,6 +123,13 @@ include_bitcoin_server_services_HEADERS = \ include/bitcoin/server/services/query_service.hpp \ include/bitcoin/server/services/transaction_service.hpp +include_bitcoin_server_webdir = ${includedir}/bitcoin/server/web +include_bitcoin_server_web_HEADERS = \ + include/bitcoin/server/web/block_socket.hpp \ + include/bitcoin/server/web/heartbeat_socket.hpp \ + include/bitcoin/server/web/query_socket.hpp \ + include/bitcoin/server/web/transaction_socket.hpp + include_bitcoin_server_workersdir = ${includedir}/bitcoin/server/workers include_bitcoin_server_workers_HEADERS = \ include/bitcoin/server/workers/authenticator.hpp \ diff --git a/builds/cmake/CMakeLists.txt b/builds/cmake/CMakeLists.txt index 85d38ab5..51c63aec 100644 --- a/builds/cmake/CMakeLists.txt +++ b/builds/cmake/CMakeLists.txt @@ -186,6 +186,10 @@ add_library( ${CANONICAL_LIB_NAME} "../../src/services/heartbeat_service.cpp" "../../src/services/query_service.cpp" "../../src/services/transaction_service.cpp" + "../../src/web/block_socket.cpp" + "../../src/web/heartbeat_socket.cpp" + "../../src/web/query_socket.cpp" + "../../src/web/transaction_socket.cpp" "../../src/workers/authenticator.cpp" "../../src/workers/notification_worker.cpp" "../../src/workers/query_worker.cpp" ) diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj index 41b3dfee..e7c73e7c 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj @@ -88,6 +88,10 @@ + + + + @@ -111,6 +115,10 @@ + + + + diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters index 4e0bf3e3..7712a9de 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -8,28 +8,31 @@ - {73CE0AC2-ECB2-4E8D-0000-000000000005} + {73CE0AC2-ECB2-4E8D-0000-000000000006} - {73CE0AC2-ECB2-4E8D-0000-000000000006} + {73CE0AC2-ECB2-4E8D-0000-000000000007} - {73CE0AC2-ECB2-4E8D-0000-000000000007} + {73CE0AC2-ECB2-4E8D-0000-000000000008} - {73CE0AC2-ECB2-4E8D-0000-000000000008} + {73CE0AC2-ECB2-4E8D-0000-000000000009} - {73CE0AC2-ECB2-4E8D-0000-000000000009} + {73CE0AC2-ECB2-4E8D-0000-00000000000A} - {73CE0AC2-ECB2-4E8D-0000-00000000000A} + {73CE0AC2-ECB2-4E8D-0000-00000000000B} + + + {73CE0AC2-ECB2-4E8D-0000-00000000000C} - {73CE0AC2-ECB2-4E8D-0000-00000000000B} + {73CE0AC2-ECB2-4E8D-0000-00000000000D} - {73CE0AC2-ECB2-4E8D-0000-00000000000C} + {73CE0AC2-ECB2-4E8D-0000-00000000000E} {73CE0AC2-ECB2-4E8D-0000-000000000000} @@ -43,9 +46,12 @@ {73CE0AC2-ECB2-4E8D-0000-000000000003} - + {73CE0AC2-ECB2-4E8D-0000-000000000004} + + {73CE0AC2-ECB2-4E8D-0000-000000000005} + @@ -93,6 +99,18 @@ src + + src\web + + + src\web + + + src\web + + + src\web + src\workers @@ -158,6 +176,18 @@ include\bitcoin\server + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + include\bitcoin\server\workers diff --git a/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj index 1c6659e4..5d692258 100644 --- a/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj @@ -88,6 +88,10 @@ + + + + @@ -111,6 +115,10 @@ + + + + diff --git a/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj.filters index 9c1fe038..455c5690 100644 --- a/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2015/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -8,28 +8,31 @@ - {73CE0AC2-ECB2-4E8D-0000-000000000005} + {73CE0AC2-ECB2-4E8D-0000-000000000006} - {73CE0AC2-ECB2-4E8D-0000-000000000006} + {73CE0AC2-ECB2-4E8D-0000-000000000007} - {73CE0AC2-ECB2-4E8D-0000-000000000007} + {73CE0AC2-ECB2-4E8D-0000-000000000008} - {73CE0AC2-ECB2-4E8D-0000-000000000008} + {73CE0AC2-ECB2-4E8D-0000-000000000009} - {73CE0AC2-ECB2-4E8D-0000-000000000009} + {73CE0AC2-ECB2-4E8D-0000-00000000000A} - {73CE0AC2-ECB2-4E8D-0000-00000000000A} + {73CE0AC2-ECB2-4E8D-0000-00000000000B} + + + {73CE0AC2-ECB2-4E8D-0000-00000000000C} - {73CE0AC2-ECB2-4E8D-0000-00000000000B} + {73CE0AC2-ECB2-4E8D-0000-00000000000D} - {73CE0AC2-ECB2-4E8D-0000-00000000000C} + {73CE0AC2-ECB2-4E8D-0000-00000000000E} {73CE0AC2-ECB2-4E8D-0000-000000000000} @@ -43,9 +46,12 @@ {73CE0AC2-ECB2-4E8D-0000-000000000003} - + {73CE0AC2-ECB2-4E8D-0000-000000000004} + + {73CE0AC2-ECB2-4E8D-0000-000000000005} + @@ -93,6 +99,18 @@ src + + src\web + + + src\web + + + src\web + + + src\web + src\workers @@ -158,6 +176,18 @@ include\bitcoin\server + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + include\bitcoin\server\workers diff --git a/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj index 59c779cf..de1c632c 100644 --- a/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj @@ -88,6 +88,10 @@ + + + + @@ -111,6 +115,10 @@ + + + + diff --git a/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj.filters index 3956c781..8b28edba 100644 --- a/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2017/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -8,28 +8,31 @@ - {73CE0AC2-ECB2-4E8D-0000-000000000005} + {73CE0AC2-ECB2-4E8D-0000-000000000006} - {73CE0AC2-ECB2-4E8D-0000-000000000006} + {73CE0AC2-ECB2-4E8D-0000-000000000007} - {73CE0AC2-ECB2-4E8D-0000-000000000007} + {73CE0AC2-ECB2-4E8D-0000-000000000008} - {73CE0AC2-ECB2-4E8D-0000-000000000008} + {73CE0AC2-ECB2-4E8D-0000-000000000009} - {73CE0AC2-ECB2-4E8D-0000-000000000009} + {73CE0AC2-ECB2-4E8D-0000-00000000000A} - {73CE0AC2-ECB2-4E8D-0000-00000000000A} + {73CE0AC2-ECB2-4E8D-0000-00000000000B} + + + {73CE0AC2-ECB2-4E8D-0000-00000000000C} - {73CE0AC2-ECB2-4E8D-0000-00000000000B} + {73CE0AC2-ECB2-4E8D-0000-00000000000D} - {73CE0AC2-ECB2-4E8D-0000-00000000000C} + {73CE0AC2-ECB2-4E8D-0000-00000000000E} {73CE0AC2-ECB2-4E8D-0000-000000000000} @@ -43,9 +46,12 @@ {73CE0AC2-ECB2-4E8D-0000-000000000003} - + {73CE0AC2-ECB2-4E8D-0000-000000000004} + + {73CE0AC2-ECB2-4E8D-0000-000000000005} + @@ -93,6 +99,18 @@ src + + src\web + + + src\web + + + src\web + + + src\web + src\workers @@ -158,6 +176,18 @@ include\bitcoin\server + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + + + include\bitcoin\server\web + include\bitcoin\server\workers diff --git a/data/bs.cfg b/data/bs.cfg index bd57be0b..7c6a7203 100644 --- a/data/bs.cfg +++ b/data/bs.cfg @@ -247,30 +247,68 @@ heartbeat_service_seconds = 5 block_service_enabled = true # Enable the transaction publishing service, defaults to true. transaction_service_enabled = true +# Allowed client IP address, multiple entries allowed. +#client_address = 127.0.0.1 +# Blocked client IP address, multiple entries allowed. +#blacklist = 127.0.0.1 + +[websockets] +# The secure query websocket endpoint, defaults to 'tcp://*:9061'. +secure_query_endpoint = tcp://*:9061 +# The secure heartbeat websocket endpoint, defaults to 'tcp://*:9062'. +secure_heartbeat_endpoint = tcp://*:9062 +# The secure block publishing websocket endpoint, defaults to 'tcp://*:9063'. +secure_block_endpoint = tcp://*:9063 +# The secure transaction publishing websocket endpoint, defaults to 'tcp://*:9064'. +secure_transaction_endpoint = tcp://*:9064 +# The public query websocket endpoint, defaults to 'tcp://*:9071'. +public_query_endpoint = tcp://*:9071 +# The public heartbeat websocket endpoint, defaults to 'tcp://*:9072'. +public_heartbeat_endpoint = tcp://*:9072 +# The public block publishing websocket endpoint, defaults to 'tcp://*:9073'. +public_block_endpoint = tcp://*:9073 +# The public transaction publishing websocket endpoint, defaults to 'tcp://*:9074'. +public_transaction_endpoint = tcp://*:9074 +# Enable websocket endpoints, defaults to true. +enabled = true +# The optional directory for serving files via HTTP/S, defaults to 'web'. +root = web +# The SSL certificate authority file, defaults to 'ca.pem', enables secure endpoints. +ca_certificate = secure/ca.pem +# The SSL private key file, defaults to 'key.pem', enables secure endpoints. +server_private_key = secure/key.pem +# The SSL certificate file, defaults to 'server.pem', enables secure endpoints. +server_certificate = secure/server.pem +# The SSL client certificates directory, defaults to 'clients'. +client_certificates = clients +# An acceptable websocket origin, multiple entries allowed. +origin = http://localhost:9061 +origin = http://localhost:9062 +origin = http://localhost:9063 +origin = http://localhost:9064 +origin = http://localhost:9071 +origin = http://localhost:9072 +origin = http://localhost:9073 +origin = http://localhost:9074 -# The secure query endpoint, defaults to 'tcp://*:9081'. +[zeromq] +# The secure query zeromq endpoint, defaults to 'tcp://*:9081'. secure_query_endpoint = tcp://*:9081 -# The secure heartbeat endpoint, defaults to 'tcp://*:9082'. +# The secure heartbeat zeromq endpoint, defaults to 'tcp://*:9082'. secure_heartbeat_endpoint = tcp://*:9082 -# The secure block publishing endpoint, defaults to 'tcp://*:9083'. +# The secure block publishing zeromq endpoint, defaults to 'tcp://*:9083'. secure_block_endpoint = tcp://*:9083 -# The secure transaction publishing endpoint, defaults to 'tcp://*:9084'. +# The secure transaction publishing zeromq endpoint, defaults to 'tcp://*:9084'. secure_transaction_endpoint = tcp://*:9084 - -# The public query endpoint, defaults to 'tcp://*:9091'. +# The public query zeromq endpoint, defaults to 'tcp://*:9091'. public_query_endpoint = tcp://*:9091 -# The public heartbeat endpoint, defaults to 'tcp://*:9092'. +# The public heartbeat zeromq endpoint, defaults to 'tcp://*:9092'. public_heartbeat_endpoint = tcp://*:9092 -# The public block publishing endpoint, defaults to 'tcp://*:9093'. +# The public block publishing zeromq endpoint, defaults to 'tcp://*:9093'. public_block_endpoint = tcp://*:9093 -# The public transaction publishing endpoint, defaults to 'tcp://*:9094'. +# The public transaction publishing zeromq endpoint, defaults to 'tcp://*:9094'. public_transaction_endpoint = tcp://*:9094 - # The Z85-encoded private key of the server, enables secure endpoints. #server_private_key = # Allowed Z85-encoded public key of the client, multiple entries allowed. #client_public_key = -# Allowed client IP address, multiple entries allowed. -#client_address = 127.0.0.1 -# Blocked client IP address, multiple entries allowed. -#blacklist = 127.0.0.1 diff --git a/include/bitcoin/server.hpp b/include/bitcoin/server.hpp index 615cf502..bbb5dc7e 100644 --- a/include/bitcoin/server.hpp +++ b/include/bitcoin/server.hpp @@ -33,6 +33,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include diff --git a/include/bitcoin/server/define.hpp b/include/bitcoin/server/define.hpp index ecd28b9a..7160dd5f 100644 --- a/include/bitcoin/server/define.hpp +++ b/include/bitcoin/server/define.hpp @@ -39,11 +39,12 @@ // Log name. #define LOG_SERVER "server" +#define LOG_SERVER_HTTP "http" // Avoid namespace conflict between boost::placeholders and std::placeholders. #define BOOST_BIND_NO_PLACEHOLDERS -// Include boost only here, so placeholders exclusion works. +#include #include #include #include diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp index 7c5bce53..d900fec4 100644 --- a/include/bitcoin/server/server_node.hpp +++ b/include/bitcoin/server/server_node.hpp @@ -31,6 +31,10 @@ #include #include #include +#include +#include +#include +#include #include #include @@ -109,6 +113,8 @@ class BCS_API server_node authenticator authenticator_; query_service secure_query_service_; query_service public_query_service_; + + // Zeromq services heartbeat_service secure_heartbeat_service_; heartbeat_service public_heartbeat_service_; block_service secure_block_service_; @@ -117,6 +123,16 @@ class BCS_API server_node transaction_service public_transaction_service_; notification_worker secure_notification_worker_; notification_worker public_notification_worker_; + + // Websocket services + query_socket secure_query_websockets_; + query_socket public_query_websockets_; + heartbeat_socket secure_heartbeat_websockets_; + heartbeat_socket public_heartbeat_websockets_; + block_socket secure_block_websockets_; + block_socket public_block_websockets_; + transaction_socket secure_transaction_websockets_; + transaction_socket public_transaction_websockets_; }; } // namespace server diff --git a/include/bitcoin/server/settings.hpp b/include/bitcoin/server/settings.hpp index 9112f71c..3bb56f5a 100644 --- a/include/bitcoin/server/settings.hpp +++ b/include/bitcoin/server/settings.hpp @@ -22,7 +22,6 @@ #include #include #include -#include #include #include #include @@ -30,7 +29,7 @@ namespace libbitcoin { namespace server { -/// Common database configuration settings, properties not thread safe. +/// Common server configuration settings, properties not thread safe. class BCS_API settings { public: @@ -40,36 +39,54 @@ class BCS_API settings /// Helpers. system::asio::duration heartbeat_interval() const; system::asio::duration subscription_expiration() const; - const system::config::endpoint& query_endpoint(bool secure) const; - const system::config::endpoint& heartbeat_endpoint(bool secure) const; - const system::config::endpoint& block_endpoint(bool secure) const; - const system::config::endpoint& transaction_endpoint(bool secure) const; + const system::config::endpoint& zeromq_query_endpoint(bool secure) const; + const system::config::endpoint& zeromq_heartbeat_endpoint(bool secure) const; + const system::config::endpoint& zeromq_block_endpoint(bool secure) const; + const system::config::endpoint& zeromq_transaction_endpoint(bool secure) const; - /// Properties. + const system::config::endpoint& websockets_query_endpoint(bool secure) const; + const system::config::endpoint& websockets_heartbeat_endpoint(bool secure) const; + const system::config::endpoint& websockets_block_endpoint(bool secure) const; + const system::config::endpoint& websockets_transaction_endpoint(bool secure) const; + + /// [server] bool priority; bool secure_only; - uint16_t query_workers; uint32_t subscription_limit; uint32_t subscription_expiration_minutes; uint32_t heartbeat_service_seconds; bool block_service_enabled; bool transaction_service_enabled; + system::config::authority::list client_addresses; + system::config::authority::list blacklists; - system::config::endpoint secure_query_endpoint; - system::config::endpoint secure_heartbeat_endpoint; - system::config::endpoint secure_block_endpoint; - system::config::endpoint secure_transaction_endpoint; + /// [websockets] + system::config::endpoint websockets_secure_query_endpoint; + system::config::endpoint websockets_secure_heartbeat_endpoint; + system::config::endpoint websockets_secure_block_endpoint; + system::config::endpoint websockets_secure_transaction_endpoint; - system::config::endpoint public_query_endpoint; - system::config::endpoint public_heartbeat_endpoint; - system::config::endpoint public_block_endpoint; - system::config::endpoint public_transaction_endpoint; + system::config::endpoint websockets_public_query_endpoint; + system::config::endpoint websockets_public_heartbeat_endpoint; + system::config::endpoint websockets_public_block_endpoint; + system::config::endpoint websockets_public_transaction_endpoint; - system::config::sodium server_private_key; - system::config::sodium::list client_public_keys; - system::config::authority::list client_addresses; - system::config::authority::list blacklists; + bool websockets_enabled; + + /// [zeromq] + system::config::endpoint zeromq_secure_query_endpoint; + system::config::endpoint zeromq_secure_heartbeat_endpoint; + system::config::endpoint zeromq_secure_block_endpoint; + system::config::endpoint zeromq_secure_transaction_endpoint; + + system::config::endpoint zeromq_public_query_endpoint; + system::config::endpoint zeromq_public_heartbeat_endpoint; + system::config::endpoint zeromq_public_block_endpoint; + system::config::endpoint zeromq_public_transaction_endpoint; + + system::config::sodium zeromq_server_private_key; + system::config::sodium::list zeromq_client_public_keys; }; } // namespace server diff --git a/include/bitcoin/server/web/block_socket.hpp b/include/bitcoin/server/web/block_socket.hpp new file mode 100644 index 00000000..8c70c073 --- /dev/null +++ b/include/bitcoin/server/web/block_socket.hpp @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_WEB_BLOCK_SOCKET_HPP +#define LIBBITCOIN_SERVER_WEB_BLOCK_SOCKET_HPP + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class server_node; + +// This class is thread safe. +// Subscribe to block acceptances from a dedicated socket endpoint. +class BCS_API block_socket + : public bc::protocol::http::socket +{ +public: + typedef std::shared_ptr ptr; + + /// Construct a block socket service endpoint. + block_socket(bc::protocol::zmq::context& context, server_node& node, + bool secure); + +protected: + // Implement the service. + virtual void work() override; + + virtual const system::config::endpoint& zeromq_endpoint() const override; + virtual const system::config::endpoint& websocket_endpoint() const override; + +private: + bool handle_block(bc::protocol::zmq::socket& subscriber); + + const bc::server::settings& settings_; + const bc::protocol::settings& protocol_settings_; +}; + +} // namespace server +} // namespace libbitcoin + +#endif diff --git a/include/bitcoin/server/web/heartbeat_socket.hpp b/include/bitcoin/server/web/heartbeat_socket.hpp new file mode 100644 index 00000000..e0ec0519 --- /dev/null +++ b/include/bitcoin/server/web/heartbeat_socket.hpp @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_WEB_HEARTBEAT_SOCKET_HPP +#define LIBBITCOIN_SERVER_WEB_HEARTBEAT_SOCKET_HPP + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class server_node; + +// This class is thread safe. +// Subscribe to a pulse from a dedicated socket endpoint. +class BCS_API heartbeat_socket + : public bc::protocol::http::socket +{ +public: + typedef std::shared_ptr ptr; + + /// Construct a heartbeat socket service endpoint. + heartbeat_socket(bc::protocol::zmq::context& context, server_node& node, + bool secure); + +protected: + + // Implement the service. + virtual void work() override; + + virtual const system::config::endpoint& zeromq_endpoint() const override; + virtual const system::config::endpoint& websocket_endpoint() const override; + +private: + bool handle_heartbeat(bc::protocol::zmq::socket& subscriber); + + const bc::server::settings& settings_; + const bc::protocol::settings& protocol_settings_; +}; + +} // namespace server +} // namespace libbitcoin + +#endif diff --git a/include/bitcoin/server/web/query_socket.hpp b/include/bitcoin/server/web/query_socket.hpp new file mode 100644 index 00000000..114577e3 --- /dev/null +++ b/include/bitcoin/server/web/query_socket.hpp @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_WEB_QUERY_SOCKET_HPP +#define LIBBITCOIN_SERVER_WEB_QUERY_SOCKET_HPP + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class server_node; + +// This class is thread safe. +// Submit queries and address subscriptions and receive address +// notifications on a dedicated socket endpoint. +class BCS_API query_socket + : public bc::protocol::http::socket +{ +public: + typedef std::shared_ptr ptr; + + /// Construct a query socket service endpoint. + query_socket(bc::protocol::zmq::context& context, server_node& node, + bool secure); + +protected: + // Implement the socket. + virtual void work() override; + + virtual bool start_websocket_handler() override; + + // Initialize the query specific zmq socket. + virtual void handle_websockets() override; + + virtual const system::config::endpoint& zeromq_endpoint() const override; + virtual const system::config::endpoint& websocket_endpoint() const override; + virtual const std::shared_ptr service() + const override; + + const system::config::endpoint& query_endpoint() const; + +private: + bool handle_query(bc::protocol::zmq::socket& dealer); + + const bc::server::settings& settings_; + const bc::protocol::settings& protocol_settings_; + std::shared_ptr service_; +}; + +} // namespace server +} // namespace libbitcoin + +#endif diff --git a/include/bitcoin/server/web/transaction_socket.hpp b/include/bitcoin/server/web/transaction_socket.hpp new file mode 100644 index 00000000..40117f61 --- /dev/null +++ b/include/bitcoin/server/web/transaction_socket.hpp @@ -0,0 +1,61 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_WEB_TRANSACTION_SOCKET_HPP +#define LIBBITCOIN_SERVER_WEB_TRANSACTION_SOCKET_HPP + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class server_node; + +// This class is thread safe. +// Subscribe to tx acceptances into the pool from a dedicated socket endpoint. +class BCS_API transaction_socket + : public bc::protocol::http::socket +{ +public: + typedef std::shared_ptr ptr; + + /// Construct a transaction socket service endpoint. + transaction_socket(bc::protocol::zmq::context& context, server_node& node, + bool secure); + +protected: + + // Implement the service. + virtual void work() override; + + virtual const system::config::endpoint& zeromq_endpoint() const override; + virtual const system::config::endpoint& websocket_endpoint() const override; + +private: + bool handle_transaction(bc::protocol::zmq::socket& subscriber); + + const bc::server::settings& settings_; + const bc::protocol::settings& protocol_settings_; +}; + +} // namespace server +} // namespace libbitcoin + +#endif diff --git a/include/bitcoin/server/workers/notification_worker.hpp b/include/bitcoin/server/workers/notification_worker.hpp index abeefb20..dfbcdf32 100644 --- a/include/bitcoin/server/workers/notification_worker.hpp +++ b/include/bitcoin/server/workers/notification_worker.hpp @@ -32,7 +32,7 @@ #include #include -// Include after bitcoin.hpp (placeholders). +// Include after define.hpp (placeholders). #include #include diff --git a/src/interface/blockchain.cpp b/src/interface/blockchain.cpp index 5d74b129..50b05aad 100644 --- a/src/interface/blockchain.cpp +++ b/src/interface/blockchain.cpp @@ -588,7 +588,7 @@ void blockchain::stealth_transaction_hashes_fetched(const code& ec, } // Save to blockchain and announce to all connected peers. -void blockchain::broadcast(server_node& node, const message& request, +void blockchain::broadcast(server_node& /* node */, const message& request, send_handler handler) { const auto block = std::make_shared(); @@ -618,7 +618,7 @@ void blockchain::handle_broadcast(const code& ec, const message& request, handler(message(request, ec)); } -void blockchain::validate(server_node& node, const message& request, +void blockchain::validate(server_node& /* node */, const message& request, send_handler handler) { const auto block = std::make_shared(); diff --git a/src/interface/transaction_pool.cpp b/src/interface/transaction_pool.cpp index 89fdcbda..0f9f9a42 100644 --- a/src/interface/transaction_pool.cpp +++ b/src/interface/transaction_pool.cpp @@ -99,7 +99,7 @@ void transaction_pool::transaction_fetched(const code& ec, // Save to tx pool and announce to all connected peers. // FUTURE: conditionally subscribe to penetration notifications. -void transaction_pool::broadcast(server_node& node, const message& request, +void transaction_pool::broadcast(server_node& /* node */, const message& request, send_handler handler) { // TODO: re-implement. @@ -129,7 +129,7 @@ void transaction_pool::handle_broadcast(const code& ec, const message& request, handler(message(request, ec)); } -void transaction_pool::validate2(server_node& node, const message& request, +void transaction_pool::validate2(server_node& /* node */, const message& request, send_handler handler) { // TODO: re-implement. diff --git a/src/parser.cpp b/src/parser.cpp index 7e1460ba..04c6d5db 100644 --- a/src/parser.cpp +++ b/src/parser.cpp @@ -604,20 +604,20 @@ options_metadata parser::load_settings() value(&configured.database.index_addresses), "Enable payment and stealth address indexing, defaults to true." ) - /* Internally this is protocol, but application to server is more intuitive. */ ( + /* Internally this is protocol, but application to server is more intuitive. */ "server.send_high_water", value(&configured.protocol.send_high_water), "Drop messages at this outgoing backlog level, defaults to 100." ) - /* Internally this is protocol, but application to server is more intuitive. */ ( + /* Internally this is protocol, but application to server is more intuitive. */ "server.receive_high_water", value(&configured.protocol.receive_high_water), "Drop messages at this incoming backlog level, defaults to 100." ) - /* Internally this is protocol, but application to server is more intuitive. */ ( + /* Internally this is protocol, but application to server is more intuitive. */ "server.handshake_seconds", value(&configured.protocol.handshake_seconds), "The time limit to complete the connection handshake, defaults to 30." @@ -658,64 +658,143 @@ options_metadata parser::load_settings() "Enable the transaction publishing service, defaults to false." ) ( - "server.secure_query_endpoint", - value(&configured.server.secure_query_endpoint), - "The secure query endpoint, defaults to 'tcp://*:9081'." + "server.client_address", + value(&configured.server.client_addresses), + "Allowed client IP address, multiple entries allowed." ) ( - "server.secure_heartbeat_endpoint", - value(&configured.server.secure_heartbeat_endpoint), - "The secure heartbeat endpoint, defaults to 'tcp://*:9082'." + "server.blacklist", + value(&configured.server.blacklists), + "Blocked client IP address, multiple entries allowed." ) + + /* [websockets] */ ( - "server.secure_block_endpoint", - value(&configured.server.secure_block_endpoint), - "The secure block publishing endpoint, defaults to 'tcp://*:9083'." + "websockets.secure_query_endpoint", + value(&configured.server.websockets_secure_query_endpoint), + "The secure query websocket endpoint, defaults to 'tcp://*:9061'." ) ( - "server.secure_transaction_endpoint", - value(&configured.server.secure_transaction_endpoint), - "The secure transaction publishing endpoint, defaults to 'tcp://*:9084'." + "websockets.secure_heartbeat_endpoint", + value(&configured.server.websockets_secure_heartbeat_endpoint), + "The secure heartbeat websocket endpoint, defaults to 'tcp://*:9062'." ) ( - "server.public_query_endpoint", - value(&configured.server.public_query_endpoint), - "The public query endpoint, defaults to 'tcp://*:9091'." + "websockets.secure_block_endpoint", + value(&configured.server.websockets_secure_block_endpoint), + "The secure block publishing websocket endpoint, defaults to 'tcp://*:9063'." ) ( - "server.public_heartbeat_endpoint", - value(&configured.server.public_heartbeat_endpoint), - "The public heartbeat endpoint, defaults to 'tcp://*:9092'." + "websockets.secure_transaction_endpoint", + value(&configured.server.websockets_secure_transaction_endpoint), + "The secure transaction publishing websocket endpoint, defaults to 'tcp://*:9064'." ) ( - "server.public_block_endpoint", - value(&configured.server.public_block_endpoint), - "The public block publishing endpoint, defaults to 'tcp://*:9093'." + "websockets.public_query_endpoint", + value(&configured.server.websockets_public_query_endpoint), + "The public query websocket endpoint, defaults to 'tcp://*:9071'." ) ( - "server.public_transaction_endpoint", - value(&configured.server.public_transaction_endpoint), - "The public transaction publishing endpoint, defaults to 'tcp://*:9094'." + "websockets.public_heartbeat_endpoint", + value(&configured.server.websockets_public_heartbeat_endpoint), + "The public heartbeat websocket endpoint, defaults to 'tcp://*:9072'." ) ( - "server.server_private_key", - value(&configured.server.server_private_key), - "The Z85-encoded private key of the server, enables secure endpoints." + "websockets.public_block_endpoint", + value(&configured.server.websockets_public_block_endpoint), + "The public block publishing websocket endpoint, defaults to 'tcp://*:9073'." ) ( - "server.client_public_key", - value(&configured.server.client_public_keys), - "Allowed Z85-encoded public key of the client, multiple entries allowed." + "websockets.public_transaction_endpoint", + value(&configured.server.websockets_public_transaction_endpoint), + "The public transaction websocket publishing endpoint, defaults to 'tcp://*:9074'." ) ( - "server.client_address", - value(&configured.server.client_addresses), - "Allowed client IP address, multiple entries allowed." + "websockets.enabled", + value(&configured.server.websockets_enabled), + "Enable websocket endpoints, defaults to true." ) ( - "server.blacklist", - value(&configured.server.blacklists), - "Blocked client IP address, multiple entries allowed." + "websockets.root", + value(&configured.protocol.web_root), + "The optional directory for serving files via HTTP/S, defaults to 'web'." + ) + ( + "websockets.ca_certificate", + value(&configured.protocol.web_ca_certificate), + "The SSL certificate authority file, defaults to '', enables secure endpoints." + ) + ( + "websockets.server_private_key", + value(&configured.protocol.web_server_private_key), + "The SSL private key file, defaults to 'key.pem', enables secure endpoints." + ) + ( + "websockets.server_certificate", + value(&configured.protocol.web_server_certificate), + "The SSL certificate file, defaults to 'server.pem', enables secure endpoints." + ) + ( + "websockets.client_certificates", + value(&configured.protocol.web_client_certificates), + "The SSL client certificates directory, defaults to 'clients'." + ) + ( + "websockets.origin", + value(&configured.protocol.web_origins), + "An acceptable websocket origin, multiple entries allowed." + ) + + /* [zeromq] */ + ( + "zeromq.secure_query_endpoint", + value(&configured.server.zeromq_secure_query_endpoint), + "The secure query zeromq endpoint, defaults to 'tcp://*:9081'." + ) + ( + "zeromq.secure_heartbeat_endpoint", + value(&configured.server.zeromq_secure_heartbeat_endpoint), + "The secure heartbeat zeromq endpoint, defaults to 'tcp://*:9082'." + ) + ( + "zeromq.secure_block_endpoint", + value(&configured.server.zeromq_secure_block_endpoint), + "The secure block publishing zeromq endpoint, defaults to 'tcp://*:9083'." + ) + ( + "zeromq.secure_transaction_endpoint", + value(&configured.server.zeromq_secure_transaction_endpoint), + "The secure transaction publishing zeromq endpoint, defaults to 'tcp://*:9084'." + ) + ( + "zeromq.public_query_endpoint", + value(&configured.server.zeromq_public_query_endpoint), + "The public query zeromq endpoint, defaults to 'tcp://*:9091'." + ) + ( + "zeromq.public_heartbeat_endpoint", + value(&configured.server.zeromq_public_heartbeat_endpoint), + "The public heartbeat zeromq endpoint, defaults to 'tcp://*:9092'." + ) + ( + "zeromq.public_block_endpoint", + value(&configured.server.zeromq_public_block_endpoint), + "The public block publishing zeromq endpoint, defaults to 'tcp://*:9093'." + ) + ( + "zeromq.public_transaction_endpoint", + value(&configured.server.zeromq_public_transaction_endpoint), + "The public transaction publishing zeromq endpoint, defaults to 'tcp://*:9094'." + ) + ( + "zeromq.server_private_key", + value(&configured.server.zeromq_server_private_key), + "The Z85-encoded private key of the server, enables secure endpoints." + ) + ( + "zeromq.client_public_key", + value(&configured.server.zeromq_client_public_keys), + "Allowed Z85-encoded public key of the client, multiple entries allowed." ); return description; diff --git a/src/server_node.cpp b/src/server_node.cpp index a77c92c9..404195b2 100644 --- a/src/server_node.cpp +++ b/src/server_node.cpp @@ -49,7 +49,15 @@ server_node::server_node(const configuration& configuration) secure_transaction_service_(authenticator_, *this, true), public_transaction_service_(authenticator_, *this, false), secure_notification_worker_(authenticator_, *this, true), - public_notification_worker_(authenticator_, *this, false) + public_notification_worker_(authenticator_, *this, false), + secure_query_websockets_(authenticator_, *this, true), + public_query_websockets_(authenticator_, *this, false), + secure_heartbeat_websockets_(authenticator_, *this, true), + public_heartbeat_websockets_(authenticator_, *this, false), + secure_block_websockets_(authenticator_, *this, true), + public_block_websockets_(authenticator_, *this, false), + secure_transaction_websockets_(authenticator_, *this, true), + public_transaction_websockets_(authenticator_, *this, false) { } @@ -107,10 +115,10 @@ void server_node::handle_running(const code& , result_handler handler) // The stop handler is already stopped but the authenticator context gets // started, allowing services to stop. The registration of services with // the stop handler invokes the registered handlers immediately, invoking - // stop o nthe services. The services are running and don't stop... + // stop on the services. The services are running and don't stop... // notification_worker, query_service and authenticator service. // The authenticator is already stopped (before it started) so there will - // be no context stop to stop the services, specifically the relays. + // be no context to stop the services, specifically the relays. if (!start_services()) { handler(error::operation_failed); @@ -176,7 +184,7 @@ bool server_node::start_authenticator() const auto& settings = configuration_.server; // Subscriptions require the query service. - if ((!settings.server_private_key && settings.secure_only) || + if ((!settings.zeromq_server_private_key && settings.secure_only) || ((settings.query_workers == 0) && (settings.heartbeat_service_seconds == 0) && (!settings.block_service_enabled) && @@ -195,7 +203,7 @@ bool server_node::start_query_services() return true; // Start secure service, query workers and notification workers if enabled. - if (settings.server_private_key && + if (settings.zeromq_server_private_key && (!secure_query_service_.start() || !start_query_workers(true) || (settings.subscription_limit > 0 && !start_notification_workers(true)))) return false; @@ -206,6 +214,18 @@ bool server_node::start_query_services() (settings.subscription_limit > 0 && !start_notification_workers(false)))) return false; + if (settings.websockets_enabled) + { + // Start secure service if enabled. + if (settings.zeromq_server_private_key && + !secure_query_websockets_.start()) + return false; + + // Start public service if enabled. + if (!settings.secure_only && !public_query_websockets_.start()) + return false; + } + return true; } @@ -217,13 +237,26 @@ bool server_node::start_heartbeat_services() return true; // Start secure service if enabled. - if (settings.server_private_key && !secure_heartbeat_service_.start()) + if (settings.zeromq_server_private_key && + !secure_heartbeat_service_.start()) return false; // Start public service if enabled. if (!settings.secure_only && !public_heartbeat_service_.start()) return false; + if (settings.websockets_enabled) + { + // Start secure service if enabled. + if (settings.zeromq_server_private_key && + !secure_heartbeat_websockets_.start()) + return false; + + // Start public service if enabled. + if (!settings.secure_only && !public_heartbeat_websockets_.start()) + return false; + } + return true; } @@ -235,13 +268,25 @@ bool server_node::start_block_services() return true; // Start secure service if enabled. - if (settings.server_private_key && !secure_block_service_.start()) + if (settings.zeromq_server_private_key && !secure_block_service_.start()) return false; // Start public service if enabled. if (!settings.secure_only && !public_block_service_.start()) return false; + if (settings.websockets_enabled) + { + // Start secure service if enabled. + if (settings.zeromq_server_private_key && + !secure_block_websockets_.start()) + return false; + + // Start public service if enabled. + if (!settings.secure_only && !public_block_websockets_.start()) + return false; + } + return true; } @@ -253,13 +298,26 @@ bool server_node::start_transaction_services() return true; // Start secure service if enabled. - if (settings.server_private_key && !secure_transaction_service_.start()) + if (settings.zeromq_server_private_key && + !secure_transaction_service_.start()) return false; // Start public service if enabled. if (!settings.secure_only && !public_transaction_service_.start()) return false; + if (settings.websockets_enabled) + { + // Start secure service if enabled. + if (settings.zeromq_server_private_key && + !secure_transaction_websockets_.start()) + return false; + + // Start public service if enabled. + if (!settings.secure_only && !public_transaction_websockets_.start()) + return false; + } + return true; } diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp index 86309da7..9b7484f8 100644 --- a/src/services/block_service.cpp +++ b/src/services/block_service.cpp @@ -48,7 +48,7 @@ block_service::block_service(zmq::authenticator& authenticator, settings_(node.server_settings()), external_(node.protocol_settings()), internal_(external_.send_high_water, external_.receive_high_water), - service_(settings_.block_endpoint(secure)), + service_(settings_.zeromq_block_endpoint(secure)), worker_(secure ? secure_worker : public_worker), authenticator_(authenticator), node_(node), @@ -154,7 +154,7 @@ bool block_service::handle_reorganization(const code& ec, size_t fork_height, LOG_WARNING(LOG_SERVER) << "Failure handling new block: " << ec.message(); - // Don't let a failure here prevent prevent future notifications. + // Don't let a failure here prevent future notifications. return true; } diff --git a/src/services/heartbeat_service.cpp b/src/services/heartbeat_service.cpp index 17055ff3..0d2e0d1c 100644 --- a/src/services/heartbeat_service.cpp +++ b/src/services/heartbeat_service.cpp @@ -42,7 +42,7 @@ heartbeat_service::heartbeat_service(zmq::authenticator& authenticator, security_(secure ? "secure" : "public"), settings_(node.server_settings()), external_(node.protocol_settings()), - service_(settings_.heartbeat_endpoint(secure)), + service_(settings_.zeromq_heartbeat_endpoint(secure)), authenticator_(authenticator), node_(node), diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp index d25c4702..b4eda1f2 100644 --- a/src/services/query_service.cpp +++ b/src/services/query_service.cpp @@ -48,7 +48,7 @@ query_service::query_service(zmq::authenticator& authenticator, settings_(node.server_settings()), external_(node.protocol_settings()), internal_(external_.send_high_water, external_.receive_high_water), - service_(settings_.query_endpoint(secure)), + service_(settings_.zeromq_query_endpoint(secure)), worker_(secure ? secure_worker : public_worker), authenticator_(authenticator) { diff --git a/src/services/transaction_service.cpp b/src/services/transaction_service.cpp index f1270054..5d54c1f0 100644 --- a/src/services/transaction_service.cpp +++ b/src/services/transaction_service.cpp @@ -47,7 +47,7 @@ transaction_service::transaction_service(zmq::authenticator& authenticator, settings_(node.server_settings()), external_(node.protocol_settings()), internal_(external_.send_high_water, external_.receive_high_water), - service_(settings_.transaction_endpoint(secure)), + service_(settings_.zeromq_transaction_endpoint(secure)), worker_(secure ? secure_worker : public_worker), authenticator_(authenticator), node_(node), @@ -153,7 +153,7 @@ bool transaction_service::handle_transaction(const code& ec, LOG_WARNING(LOG_SERVER) << "Failure handling new transaction: " << ec.message(); - // Don't let a failure here prevent prevent future notifications. + // Don't let a failure here prevent future notifications. return true; } diff --git a/src/settings.cpp b/src/settings.cpp index 5a57c1a1..bcc65d04 100644 --- a/src/settings.cpp +++ b/src/settings.cpp @@ -36,15 +36,29 @@ settings::settings() block_service_enabled(true), transaction_service_enabled(true), - secure_query_endpoint("tcp://*:9081"), - secure_heartbeat_endpoint("tcp://*:9082"), - secure_block_endpoint("tcp://*:9083"), - secure_transaction_endpoint("tcp://*:9084"), - - public_query_endpoint("tcp://*:9091"), - public_heartbeat_endpoint("tcp://*:9092"), - public_block_endpoint("tcp://*:9093"), - public_transaction_endpoint("tcp://*:9094") + // [websockets] + websockets_secure_query_endpoint("tcp://*:9061"), + websockets_secure_heartbeat_endpoint("tcp://*:9062"), + websockets_secure_block_endpoint("tcp://*:9063"), + websockets_secure_transaction_endpoint("tcp://*:9064"), + + websockets_public_query_endpoint("tcp://*:9071"), + websockets_public_heartbeat_endpoint("tcp://*:9072"), + websockets_public_block_endpoint("tcp://*:9073"), + websockets_public_transaction_endpoint("tcp://*:9074"), + + websockets_enabled(true), + + // [zeromq] + zeromq_secure_query_endpoint("tcp://*:9081"), + zeromq_secure_heartbeat_endpoint("tcp://*:9082"), + zeromq_secure_block_endpoint("tcp://*:9083"), + zeromq_secure_transaction_endpoint("tcp://*:9084"), + + zeromq_public_query_endpoint("tcp://*:9091"), + zeromq_public_heartbeat_endpoint("tcp://*:9092"), + zeromq_public_block_endpoint("tcp://*:9093"), + zeromq_public_transaction_endpoint("tcp://*:9094") { } @@ -54,34 +68,63 @@ settings::settings(config::settings) { } -const config::endpoint& settings::query_endpoint(bool secure) const +duration settings::heartbeat_interval() const { - return secure ? secure_query_endpoint : public_query_endpoint; + return seconds(heartbeat_service_seconds); } -const config::endpoint& settings::heartbeat_endpoint(bool secure) const +duration settings::subscription_expiration() const { - return secure ? secure_heartbeat_endpoint : public_heartbeat_endpoint; + return minutes(subscription_expiration_minutes); } -const config::endpoint& settings::block_endpoint(bool secure) const +const config::endpoint& settings::websockets_query_endpoint(bool secure) const { - return secure ? secure_block_endpoint : public_block_endpoint; + return secure ? websockets_secure_query_endpoint : + websockets_public_query_endpoint; } -const config::endpoint& settings::transaction_endpoint(bool secure) const +const config::endpoint& settings::websockets_heartbeat_endpoint(bool secure) const { - return secure ? secure_transaction_endpoint : public_transaction_endpoint; + return secure ? websockets_secure_heartbeat_endpoint : + websockets_public_heartbeat_endpoint; } -duration settings::heartbeat_interval() const +const config::endpoint& settings::websockets_block_endpoint(bool secure) const { - return seconds(heartbeat_service_seconds); + return secure ? websockets_secure_block_endpoint : + websockets_public_block_endpoint; } -duration settings::subscription_expiration() const +const config::endpoint& settings::websockets_transaction_endpoint( + bool secure) const { - return minutes(subscription_expiration_minutes); + return secure ? websockets_secure_transaction_endpoint : + websockets_public_transaction_endpoint; +} + +const config::endpoint& settings::zeromq_query_endpoint(bool secure) const +{ + return secure ? zeromq_secure_query_endpoint : + zeromq_public_query_endpoint; +} + +const config::endpoint& settings::zeromq_heartbeat_endpoint(bool secure) const +{ + return secure ? zeromq_secure_heartbeat_endpoint : + zeromq_public_heartbeat_endpoint; +} + +const config::endpoint& settings::zeromq_block_endpoint(bool secure) const +{ + return secure ? zeromq_secure_block_endpoint : + zeromq_public_block_endpoint; +} + +const config::endpoint& settings::zeromq_transaction_endpoint(bool secure) const +{ + return secure ? zeromq_secure_transaction_endpoint : + zeromq_public_transaction_endpoint; } } // namespace server diff --git a/src/web/block_socket.cpp b/src/web/block_socket.cpp new file mode 100644 index 00000000..ffc1ee4e --- /dev/null +++ b/src/web/block_socket.cpp @@ -0,0 +1,157 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +using namespace bc::protocol; +using namespace bc::system; +using namespace bc::system::chain; +using namespace bc::system::config; +using role = zmq::socket::role; + +static constexpr auto poll_interval_milliseconds = 100u; + +block_socket::block_socket(zmq::context& context, server_node& node, + bool secure) + : http::socket(context, node.protocol_settings(), secure), + settings_(node.server_settings()), + protocol_settings_(node.protocol_settings()) +{ +} + +void block_socket::work() +{ + zmq::socket sub(context_, role::subscriber, protocol_settings_); + + const auto endpoint = zeromq_endpoint().to_local(); + const auto ec = sub.connect(endpoint); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to connect to block service " << endpoint << ": " + << ec.message(); + return; + } + + if (!started(start_websocket_handler())) + { + LOG_ERROR(LOG_SERVER) + << "Failed to start " << security_ << " block websocket handler."; + return; + } + + LOG_INFO(LOG_SERVER) + << "Bound " << security_ << " websocket block service to " + << websocket_endpoint(); + + // TODO: this should be hidden in socket base. + // Hold a shared reference to the websocket thread_ so that we can + // properly call stop_websocket_handler on cleanup. + const auto thread_ref = thread_; + + zmq::poller poller; + poller.add(sub); + + while (!poller.terminated() && !stopped()) + { + if (poller.wait(poll_interval_milliseconds).contains(sub.id()) && + !handle_block(sub)) + break; + } + + const auto sub_stop = sub.stop(); + const auto websocket_stop = stop_websocket_handler(); + + if (!sub_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to disconnect " << security_ + << " block websocket service."; + + if (!websocket_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to stop " << security_ << " block websocket handler."; + + finished(sub_stop && websocket_stop); +} + +// Called by this thread's work() method. +// Returns true to continue future notifications. +bool block_socket::handle_block(zmq::socket& subscriber) +{ + if (stopped()) + return false; + + zmq::message response; + subscriber.receive(response); + + static constexpr size_t block_message_size = 3; + if (response.empty() || response.size() != block_message_size) + { + LOG_WARNING(LOG_SERVER) + << "Failure handling block notification: invalid data"; + + // Don't let a failure here prevent future notifications. + return true; + } + + uint16_t sequence; + uint32_t height; + data_chunk block_data; + response.dequeue(sequence); + response.dequeue(height); + response.dequeue(block_data); + + // Format and send transaction to websocket subscribers. + const auto block = system::chain::block::factory(block_data, true); + broadcast(http::to_json(block, height, sequence)); + + LOG_VERBOSE(LOG_SERVER) + << "Broadcasted " << security_ << " socket block [" + << height << "]"; + return true; +} + +const endpoint& block_socket::zeromq_endpoint() const +{ + // The Websocket to zeromq backend internally always uses the + // local public zeromq endpoint since it does not affect the + // external security of the websocket endpoint and impacts + // configuration and performance for no additional gain. + return settings_.zeromq_block_endpoint(false /* secure_ */); +} + +const endpoint& block_socket::websocket_endpoint() const +{ + return settings_.websockets_block_endpoint(secure_); +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/web/heartbeat_socket.cpp b/src/web/heartbeat_socket.cpp new file mode 100644 index 00000000..a1a5a1b5 --- /dev/null +++ b/src/web/heartbeat_socket.cpp @@ -0,0 +1,150 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +static constexpr auto poll_interval_milliseconds = 100u; + +using namespace bc::protocol; +using namespace bc::system::config; +using role = zmq::socket::role; + +heartbeat_socket::heartbeat_socket(zmq::context& context, server_node& node, + bool secure) + : http::socket(context, node.protocol_settings(), secure), + settings_(node.server_settings()), + protocol_settings_(node.protocol_settings()) +{ +} + +void heartbeat_socket::work() +{ + zmq::socket sub(context_, role::subscriber, protocol_settings_); + + const auto endpoint = zeromq_endpoint().to_local(); + const auto ec = sub.connect(endpoint); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to connect to heartbeat service " << endpoint << ": " + << ec.message(); + return; + } + + if (!started(start_websocket_handler())) + { + LOG_ERROR(LOG_SERVER) + << "Failed to start " << security_ + << " heartbeat websocket handler."; + return; + } + + LOG_INFO(LOG_SERVER) + << "Bound " << security_ << " websocket heartbeat service to " + << websocket_endpoint(); + + // TODO: this should be hidden in socket base. + // Hold a shared reference to the websocket thread_ so that we can + // properly call stop_websocket_handler on cleanup. + const auto thread_ref = thread_; + + zmq::poller poller; + poller.add(sub); + + while (!poller.terminated() && !stopped()) + { + if (poller.wait(poll_interval_milliseconds).contains(sub.id()) && + !handle_heartbeat(sub)) + break; + } + + const auto sub_stop = sub.stop(); + const auto websocket_stop = stop_websocket_handler(); + + if (!sub_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to disconnect " << security_ + << " hearbeat websocket service."; + + if (!websocket_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to stop " << security_ + << " heartbeat websocket handler."; + + finished(sub_stop && websocket_stop); +} + +// Called by this thread's work() method. +// Returns true to continue future notifications. +bool heartbeat_socket::handle_heartbeat(zmq::socket& subscriber) +{ + if (stopped()) + return false; + + zmq::message response; + subscriber.receive(response); + + static constexpr size_t heartbeat_message_size = 2; + if (response.empty() || response.size() != heartbeat_message_size) + { + LOG_WARNING(LOG_SERVER) + << "Failure handling heartbeat notification: invalid data."; + + // Don't let a failure here prevent future notifications. + return true; + } + + uint16_t sequence; + uint64_t height; + response.dequeue(sequence); + response.dequeue(height); + + broadcast(http::to_json(height, sequence)); + + LOG_VERBOSE(LOG_SERVER) + << "Broadcasted " << security_ << " socket heartbeat [" << height + << ", " << sequence << "]"; + return true; +} + +const endpoint& heartbeat_socket::zeromq_endpoint() const +{ + // The Websocket to zeromq backend internally always uses the + // local public zeromq endpoint since it does not affect the + // external security of the websocket endpoint and impacts + // configuration and performance for no additional gain. + return settings_.zeromq_heartbeat_endpoint(false /* secure_ */); +} + +const endpoint& heartbeat_socket::websocket_endpoint() const +{ + return settings_.websockets_heartbeat_endpoint(secure_); +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/web/query_socket.cpp b/src/web/query_socket.cpp new file mode 100644 index 00000000..fc327221 --- /dev/null +++ b/src/web/query_socket.cpp @@ -0,0 +1,361 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +using namespace bc::protocol; +using namespace bc::system; +using namespace bc::system::config; +using namespace bc::system::machine; +using role = zmq::socket::role; +using connection_ptr = http::connection_ptr; + +static constexpr auto poll_interval_milliseconds = 100u; + +query_socket::query_socket(zmq::context& context, server_node& node, + bool secure) + : http::socket(context, node.protocol_settings(), secure), + settings_(node.server_settings()), + protocol_settings_(node.protocol_settings()) +{ + // JSON to ZMQ request encoders. + //------------------------------------------------------------------------- + + const auto encode_empty = [](zmq::message& request, + const std::string& command, const std::string& /* arguments */, + uint32_t id) + { + request.enqueue(command); + request.enqueue_little_endian(id); + request.enqueue(data_chunk{}); + }; + + const auto encode_hash = [](zmq::message& request, + const std::string& command, const std::string& arguments, uint32_t id) + { + hash_digest hash; + DEBUG_ONLY(const auto result =) decode_hash(hash, arguments); + BITCOIN_ASSERT(result); + request.enqueue(command); + request.enqueue_little_endian(id); + request.enqueue(to_chunk(hash)); + }; + + // A local clone of the task_sender send logic, for the decoders + // below that don't need to use that mechanism since they're + // already guaranteed to be run on the web thread. + auto decode_send = [](connection_ptr connection, const std::string& json) + { + if (!connection || connection->closed()) + return false; + + if (json.size() > std::numeric_limits::max()) + { + LOG_ERROR(LOG_SERVER_HTTP) + << "Skipping JSON-RPC response of size " << json.size(); + return false; + } + + const int32_t json_size = static_cast(json.size()); + + if (!connection->json_rpc()) + return connection->write(json) == json_size; + + http::http_reply reply; + const auto response = reply.generate(http::protocol_status::ok, {}, + json_size, false) + json; + + LOG_VERBOSE(LOG_SERVER_HTTP) + << "Writing JSON-RPC response: " << response; + + return connection->write(response) == json_size; + }; + + // JSON to ZMQ response decoders. + // ------------------------------------------------------------------------- + // These all run on the websocket thread, so can write on the + // connection directly. + const auto decode_height = [this, decode_send](const data_chunk& data, + const uint32_t id, connection_ptr connection) + { + data_source istream(data); + istream_reader source(istream); + const auto height = source.read_4_bytes_little_endian(); + decode_send(connection, http::to_json(height, id)); + }; + + const auto decode_transaction = [this, &node, decode_send]( + const data_chunk& data, const uint32_t id, connection_ptr connection) + { + const auto witness = chain::script::is_enabled( + node.blockchain_settings().enabled_forks(), rule_fork::bip141_rule); + const auto transaction = chain::transaction::factory(data, true, + witness); + decode_send(connection, http::to_json(transaction, id)); + }; + + const auto decode_block = [this, &node, decode_send]( + const data_chunk& data, const uint32_t id, connection_ptr connection) + { + const auto witness = chain::script::is_enabled( + node.blockchain_settings().enabled_forks(), rule_fork::bip141_rule); + const auto block = chain::block::factory(data, witness); + decode_send(connection, http::to_json(block, id)); + }; + + const auto decode_block_header = [this, decode_send](const data_chunk& data, + const uint32_t id, connection_ptr connection) + { + const auto header = chain::header::factory(data, true); + decode_send(connection, http::to_json(header, id)); + }; + + handlers_["getblockcount"] = handlers + { + "blockchain.fetch_last_height", + encode_empty, + decode_height + }; + + handlers_["getrawtransaction"] = handlers + { + "transaction_pool.fetch_transaction", + encode_hash, + decode_transaction + }; + + handlers_["getblock"] = handlers + { + "blockchain.fetch_block", + encode_hash, + decode_block + }; + + handlers_["getblockheader"] = handlers + { + "blockchain.fetch_block_header", + encode_hash, + decode_block_header + }; + + handlers_["getblockheight"] = handlers + { + "blockchain.fetch_block_height", + encode_hash, + decode_height + }; +} + +void query_socket::work() +{ + zmq::socket dealer(context_, role::dealer, protocol_settings_); + zmq::socket query_receiver(context_, role::pair, protocol_settings_); + + auto ec = query_receiver.bind(query_endpoint()); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to bind " << security_ << " query internal socket: " + << ec.message(); + return; + } + + const auto endpoint = zeromq_endpoint().to_local(); + ec = dealer.connect(endpoint); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to connect to " << security_ << " query socket " + << endpoint << ": " << ec.message(); + return; + } + + if (!started(start_websocket_handler())) + { + LOG_ERROR(LOG_SERVER) + << "Failed to start " << security_ << " websocket handler: " + << ec.message(); + return; + } + + LOG_INFO(LOG_SERVER) + << "Bound " << security_ << " websocket query service to " + << websocket_endpoint(); + + // TODO: this should be hidden in socket base. + // Hold a shared reference to the websocket thread_ so that we can + // properly call stop_websocket_handler on cleanup. + const auto thread_ref = thread_; + + zmq::poller poller; + poller.add(dealer); + poller.add(query_receiver); + + while (!poller.terminated() && !stopped()) + { + const auto identifiers = poller.wait(poll_interval_milliseconds); + + if (identifiers.contains(query_receiver.id()) && + !forward(query_receiver, dealer)) + break; + + if (identifiers.contains(dealer.id()) && !handle_query(dealer)) + break; + } + + const auto query_stop = query_receiver.stop(); + const auto dealer_stop = dealer.stop(); + const auto websocket_stop = stop_websocket_handler(); + + if (!query_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to unbind " << security_ << " websocket query service."; + + if (!dealer_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to disconnect " << security_ << " query connection."; + + if (!websocket_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to stop " << security_ << " query websocket handler."; + + finished(query_stop && dealer_stop && websocket_stop); +} + +// Called by this thread's zmq work() method. Returns true to +// continue future notifications. +bool query_socket::handle_query(zmq::socket& dealer) +{ + if (stopped()) + return false; + + zmq::message response; + auto ec = dealer.receive(response); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to receive response from dealer: " << ec.message(); + return true; + } + + static constexpr size_t query_message_size = 3; + if (response.empty() || response.size() != query_message_size) + { + LOG_WARNING(LOG_SERVER) + << "Failure handling query response: invalid data size."; + return true; + } + + uint32_t sequence; + data_chunk data; + std::string command; + + if (!response.dequeue(command) || !response.dequeue(sequence) || + !response.dequeue(data)) + { + LOG_WARNING(LOG_SERVER) + << "Failure handling query response: invalid data parts."; + return true; + } + + socket::queue_response(sequence, data, command); + return true; +} + +const endpoint& query_socket::zeromq_endpoint() const +{ + // The Websocket to zeromq backend internally always uses the + // local public zeromq endpoint since it does not affect the + // external security of the websocket endpoint and impacts + // configuration and performance for no additional gain. + return settings_.zeromq_query_endpoint(false /* secure_ */); +} + +const endpoint& query_socket::websocket_endpoint() const +{ + return settings_.websockets_query_endpoint(secure_); +} + +const endpoint& query_socket::query_endpoint() const +{ + static const endpoint secure_query("inproc://secure_query_websockets"); + static const endpoint public_query("inproc://public_query_websockets"); + return secure_ ? secure_query : public_query; +} + +const std::shared_ptr query_socket::service() const +{ + return service_; +} + +// This method is run by the web socket thread. +void query_socket::handle_websockets() +{ + // A zmq socket must remain on its single thread. + service_ = std::make_shared(context_, role::pair, + protocol_settings_); + + // Hold a reference to this service_ socket member by this thread + // method so that we can properly shutdown even if the query_socket + // object is destroyed. + const auto service_ref = service_; + const auto ec = service_ref->connect(query_endpoint()); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to connect " << security_ << " query sender socket: " + << ec.message(); + socket_started_.set_value(false); + return; + } + + // socket::handle_websockets does web socket initialization and runs the + // web event loop. Inside that loop, socket::poll eventually calls into + // the static handle_event callback, which calls socket::notify_query_work, + // which uses this 'service_' zmq socket for sending incoming requests and + // reading the json web responses. + socket::handle_websockets(); + + if (!service_ref->stop()) + { + LOG_ERROR(LOG_SERVER) + << "Failed to disconnect " << security_ << " query sender."; + } +} + +bool query_socket::start_websocket_handler() +{ + auto started = socket_started_.get_future(); + thread_ = std::make_shared(&query_socket::handle_websockets, + this); + return started.get(); +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/web/transaction_socket.cpp b/src/web/transaction_socket.cpp new file mode 100644 index 00000000..4254ed78 --- /dev/null +++ b/src/web/transaction_socket.cpp @@ -0,0 +1,156 @@ +/** + * Copyright (c) 2011-2018 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +using namespace std::placeholders; +using namespace bc::protocol; +using namespace bc::system; +using namespace bc::system::chain; +using namespace bc::system::config; +using namespace bc::system::message; +using role = zmq::socket::role; + +static constexpr auto poll_interval_milliseconds = 100u; + +transaction_socket::transaction_socket(zmq::context& context, + server_node& node, bool secure) + : http::socket(context, node.protocol_settings(), secure), + settings_(node.server_settings()), + protocol_settings_(node.protocol_settings()) +{ +} + +void transaction_socket::work() +{ + zmq::socket sub(context_, role::subscriber, protocol_settings_); + + const auto endpoint = zeromq_endpoint().to_local(); + const auto ec = sub.connect(endpoint); + + if (ec) + { + LOG_ERROR(LOG_SERVER) + << "Failed to connect to transaction service " << endpoint << ": " + << ec.message(); + return; + } + + if (!started(start_websocket_handler())) + { + LOG_ERROR(LOG_SERVER) + << "Failed to start " << security_ + << " transaction websocket handler."; + return; + } + + LOG_INFO(LOG_SERVER) + << "Bound " << security_ << " websocket transaction service to " + << websocket_endpoint(); + + // TODO: this should be hidden in socket base. + // Hold a shared reference to the websocket thread_ so that we can + // properly call stop_websocket_handler on cleanup. + const auto thread_ref = thread_; + + zmq::poller poller; + poller.add(sub); + + while (!poller.terminated() && !stopped()) + { + if (poller.wait(poll_interval_milliseconds).contains(sub.id()) && + !handle_transaction(sub)) + break; + } + + const auto sub_stop = sub.stop(); + const auto websocket_stop = stop_websocket_handler(); + + if (!sub_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to disconnect " << security_ + << " transaction websocket service."; + + if (!websocket_stop) + LOG_ERROR(LOG_SERVER) + << "Failed to stop " << security_ + << " transaction websocket handler."; + + finished(sub_stop && websocket_stop); +} + +// Called by this thread's work() method. +// Returns true to continue future notifications. +bool transaction_socket::handle_transaction(zmq::socket& subscriber) +{ + if (stopped()) + return false; + + zmq::message response; + subscriber.receive(response); + + static constexpr size_t transaction_message_size = 2; + if (response.empty() || response.size() != transaction_message_size) + { + LOG_WARNING(LOG_SERVER) + << "Failure handling transaction notification: invalid data"; + + // Don't let a failure here prevent future notifications. + return true; + } + + uint16_t sequence; + data_chunk transaction_data; + response.dequeue(sequence); + response.dequeue(transaction_data); + + chain::transaction tx; + tx.from_data(transaction_data, true, true); + + broadcast(http::to_json(tx, sequence)); + + LOG_VERBOSE(LOG_SERVER) + << "Broadcasted " << security_ << " socket tx [" + << encode_hash(tx.hash()) << "]"; + return true; +} + +const endpoint& transaction_socket::zeromq_endpoint() const +{ + // The Websocket to zeromq backend internally always uses the + // local public zeromq endpoint since it does not affect the + // external security of the websocket endpoint and impacts + // configuration and performance for no additional gain. + return settings_.zeromq_transaction_endpoint(false /* secure_ */); +} + +const endpoint& transaction_socket::websocket_endpoint() const +{ + return settings_.websockets_transaction_endpoint(secure_); +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/workers/authenticator.cpp b/src/workers/authenticator.cpp index 95df80e0..c55c518b 100644 --- a/src/workers/authenticator.cpp +++ b/src/workers/authenticator.cpp @@ -34,10 +34,10 @@ authenticator::authenticator(server_node& node) { const auto& settings = node.server_settings(); - set_private_key(settings.server_private_key); + set_private_key(settings.zeromq_server_private_key); // Secure clients are also affected by address restrictions. - for (const auto& public_key: settings.client_public_keys) + for (const auto& public_key: settings.zeromq_client_public_keys) { LOG_DEBUG(LOG_SERVER) << "Allow client public key [" << public_key << "]"; diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp index 6e1bda38..2a7d62ab 100644 --- a/src/workers/notification_worker.cpp +++ b/src/workers/notification_worker.cpp @@ -67,7 +67,7 @@ notification_worker::notification_worker(zmq::authenticator& authenticator, // There is no unsubscribe so this class shouldn't be restarted. // Notifications are ordered by validation in node but thread safety is still -// required so that purge can run on a seperate time thread. +// required so that purge can run on a separate time thread. bool notification_worker::start() { // Subscribe to blockchain reorganizations. @@ -173,7 +173,7 @@ bool notification_worker::handle_reorganization(const code& ec, LOG_WARNING(LOG_SERVER) << "Failure handling new block: " << ec.message(); - // Don't let a failure here prevent prevent future notifications. + // Don't let a failure here prevent future notifications. return true; } @@ -431,23 +431,22 @@ void notification_worker::purge() // Accumulate removals, send expiration notifications outside locks. std::vector expires; - if (true) - { - /////////////////////////////////////////////////////////////////////// - // Critical Section - unique_lock lock(address_mutex_); + /////////////////////////////////////////////////////////////////////////// + // Critical Section + address_mutex_.lock(); - auto& right = address_subscriptions_.right; + auto& address = address_subscriptions_.right; - for (auto it = right.begin(); it != right.end() && - it->first.updated() < cutoff; it = right.erase(it)) - { - it->first.increment(); - expires.push_back(it->first); - } - /////////////////////////////////////////////////////////////////////// + for (auto it = address.begin(); it != address.end() && + it->first.updated() < cutoff; it = address.erase(it)) + { + it->first.increment(); + expires.push_back(it->first); } + address_mutex_.unlock(); + /////////////////////////////////////////////////////////////////////////// + // Send failure is logged in send. if (dealer) for (auto& expire: expires) @@ -456,23 +455,22 @@ void notification_worker::purge() expires.clear(); - if (true) - { - /////////////////////////////////////////////////////////////////////// - // Critical Section - unique_lock lock(stealth_mutex_); + /////////////////////////////////////////////////////////////////////////// + // Critical Section + stealth_mutex_.lock(); - auto& right = stealth_subscriptions_.right; + auto& stealth = stealth_subscriptions_.right; - for (auto it = right.begin(); it != right.end() && - it->first.updated() < cutoff; it = right.erase(it)) - { - it->first.increment(); - expires.push_back(it->first); - } - /////////////////////////////////////////////////////////////////////// + for (auto it = stealth.begin(); it != stealth.end() && + it->first.updated() < cutoff; it = stealth.erase(it)) + { + it->first.increment(); + expires.push_back(it->first); } + stealth_mutex_.unlock(); + /////////////////////////////////////////////////////////////////////////// + // Send failure is logged in send. if (dealer) for (auto& expire: expires) @@ -543,9 +541,10 @@ code notification_worker::subscribe_address(const message& request, address_mutex_.unlock_upgrade_and_lock(); //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - address_subscriptions_.insert({ + address_subscriptions_.insert( + { std::move(address_hash), - subscription{ request.route(), request.id(), current_time() } + { request.route(), request.id(), current_time() } }); address_mutex_.unlock(); @@ -595,9 +594,10 @@ code notification_worker::subscribe_stealth(const message& request, stealth_mutex_.unlock_upgrade_and_lock(); //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ - stealth_subscriptions_.insert({ + stealth_subscriptions_.insert( + { std::move(prefix_filter), - subscription{ request.route(), request.id(), current_time() } + { request.route(), request.id(), current_time() } }); stealth_mutex_.unlock();