From ebe84332d37170145405477a4d21966f35cb7f14 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Wed, 20 Jul 2016 12:36:49 -0700 Subject: [PATCH 1/5] Server subscriptions work in progress. --- Makefile.am | 8 +- .../libbitcoin-server.vcxproj | 8 +- .../libbitcoin-server.vcxproj.filters | 24 +- include/bitcoin/server.hpp | 4 +- include/bitcoin/server/interface/address.hpp | 37 +- .../bitcoin/server/interface/blockchain.hpp | 43 +- include/bitcoin/server/interface/protocol.hpp | 9 +- .../server/interface/transaction_pool.hpp | 13 +- include/bitcoin/server/messages/message.hpp | 84 ++++ include/bitcoin/server/messages/outgoing.hpp | 66 --- .../messages/{incoming.hpp => route.hpp} | 38 +- include/bitcoin/server/server_node.hpp | 18 +- .../bitcoin/server/utility/fetch_helpers.hpp | 11 +- .../server/workers/notification_worker.hpp | 99 ++-- .../bitcoin/server/workers/query_worker.hpp | 5 +- src/interface/address.cpp | 120 +++-- src/interface/blockchain.cpp | 110 ++--- src/interface/protocol.cpp | 21 +- src/interface/transaction_pool.cpp | 23 +- src/messages/incoming.cpp | 91 ---- src/messages/message.cpp | 184 +++++++ src/messages/outgoing.cpp | 95 ---- src/messages/route.cpp | 39 ++ src/server_node.cpp | 56 +-- src/services/block_service.cpp | 13 +- src/services/query_service.cpp | 4 +- src/services/transaction_service.cpp | 7 +- src/utility/fetch_helpers.cpp | 18 +- src/workers/notification_worker.cpp | 462 ++++++++++++++---- src/workers/query_worker.cpp | 63 ++- 30 files changed, 1071 insertions(+), 702 deletions(-) create mode 100644 include/bitcoin/server/messages/message.hpp delete mode 100644 include/bitcoin/server/messages/outgoing.hpp rename include/bitcoin/server/messages/{incoming.hpp => route.hpp} (62%) delete mode 100644 src/messages/incoming.cpp create mode 100644 src/messages/message.cpp delete mode 100644 src/messages/outgoing.cpp create mode 100644 src/messages/route.cpp diff --git a/Makefile.am b/Makefile.am index 251f365a..d5da86c5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -43,8 +43,8 @@ src_libbitcoin_server_la_SOURCES = \ src/interface/blockchain.cpp \ src/interface/protocol.cpp \ src/interface/transaction_pool.cpp \ - src/messages/incoming.cpp \ - src/messages/outgoing.cpp \ + src/messages/message.cpp \ + src/messages/route.cpp \ src/services/block_service.cpp \ src/services/heartbeat_service.cpp \ src/services/query_service.cpp \ @@ -108,8 +108,8 @@ include_bitcoin_server_interface_HEADERS = \ include_bitcoin_server_messagesdir = ${includedir}/bitcoin/server/messages include_bitcoin_server_messages_HEADERS = \ - include/bitcoin/server/messages/incoming.hpp \ - include/bitcoin/server/messages/outgoing.hpp + include/bitcoin/server/messages/message.hpp \ + include/bitcoin/server/messages/route.hpp include_bitcoin_server_servicesdir = ${includedir}/bitcoin/server/services include_bitcoin_server_services_HEADERS = \ diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj index 59653c88..a5ea578d 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj @@ -78,8 +78,8 @@ - - + + @@ -100,8 +100,8 @@ - - + + diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters index 5b722c19..77e05d79 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -82,12 +82,6 @@ include\bitcoin\server\interface - - include\bitcoin\server\messages - - - include\bitcoin\server\messages - include\bitcoin\server\workers @@ -112,6 +106,12 @@ include\bitcoin\server\workers + + include\bitcoin\server\messages + + + include\bitcoin\server\messages + @@ -138,12 +138,6 @@ src - - src\messages - - - src\messages - src\utility @@ -168,6 +162,12 @@ src\workers + + src\messages + + + src\messages + diff --git a/include/bitcoin/server.hpp b/include/bitcoin/server.hpp index e905dc6e..4dbcd7b2 100644 --- a/include/bitcoin/server.hpp +++ b/include/bitcoin/server.hpp @@ -26,8 +26,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include diff --git a/include/bitcoin/server/interface/address.hpp b/include/bitcoin/server/interface/address.hpp index 6e70a43d..a4a79614 100644 --- a/include/bitcoin/server/interface/address.hpp +++ b/include/bitcoin/server/interface/address.hpp @@ -21,21 +21,12 @@ #define LIBBITCOIN_SERVER_ADDRESS_HPP #include -#include -#include +#include #include namespace libbitcoin { namespace server { -// TODO: move to bc::protocol and integrate with zmq::message. -class BCS_API route -{ - bool secure; - bool delimited; - data_queue identities; -}; - /// Address interface. /// Class and method names are published and mapped to the zeromq interface. class BCS_API address @@ -43,22 +34,26 @@ class BCS_API address public: /// Fetch the blockchain and transaction pool history of a payment address. static void fetch_history2(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); + + /// Alias for subscribe, preserved for backward compatability. + static void renew(server_node& node, const message& request, + send_handler handler); + + /// Subscribe to payment or stealth address notifications by prefix. + static void subscribe(server_node& node, const message& request, + send_handler handler); /// Subscribe to payment and stealth address notifications by prefix. - static void subscribe(server_node& node, const incoming& request, + static void subscribe2(server_node& node, const message& request, send_handler handler); - static bool unwrap_subscribe_args(route& reply_to, binary& prefix_filter, - chain::subscribe_type& type, const incoming& request); +private: + static bool unwrap_subscribe_args(binary& prefix_filter, + chain::subscribe_type& type, const message& request); - // TODO: can't we just call subscribe again? This would prevent duplicates. - /////// Subscribe to payment and stealth address notifications by prefix. - ////static void renew(server_node& node, - //// const incoming& request, send_handler handler); - //// - ////static bool unwrap_renew_args(route& reply_to, binary& prefix_filter, - //// chain::subscribe_type& type, const incoming& request); + static bool unwrap_subscribe2_args(binary& prefix_filter, + const message& request); }; } // namespace server diff --git a/include/bitcoin/server/interface/blockchain.hpp b/include/bitcoin/server/interface/blockchain.hpp index 840332e2..15818141 100644 --- a/include/bitcoin/server/interface/blockchain.hpp +++ b/include/bitcoin/server/interface/blockchain.hpp @@ -23,8 +23,7 @@ #include #include #include -#include -#include +#include #include namespace libbitcoin { @@ -38,77 +37,77 @@ class BCS_API blockchain public: /// Fetch the blockchain history of a payment address. static void fetch_history(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch a transaction from the blockchain by its hash. static void fetch_transaction(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch the current height of the blockchain. static void fetch_last_height(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch a block header by hash or height (conditional serialization). static void fetch_block_header(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch tx hashes of block by hash or height (conditional serialization). static void fetch_block_transaction_hashes(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch the block index of a transaction and the height of its block. static void fetch_transaction_index(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch the inpoint which is spent by the specified output. static void fetch_spend(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch the height of a block by its hash. static void fetch_block_height(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Fetch the blockchain history of a stealth address by its prefix filter. static void fetch_stealth(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); private: static void last_height_fetched(const code& ec, size_t last_height, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void fetch_block_header_by_hash(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void fetch_block_header_by_height(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void block_header_fetched(const code& ec, - const chain::header& block, const incoming& request, + const chain::header& block, const message& request, send_handler handler); static void fetch_block_transaction_hashes_by_hash(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void fetch_block_transaction_hashes_by_height(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void block_transaction_hashes_fetched(const code& ec, - const hash_list& hashes, const incoming& request, + const hash_list& hashes, const message& request, send_handler handler); static void transaction_index_fetched(const code& ec, size_t block_height, - size_t index, const incoming& request, send_handler handler); + size_t index, const message& request, send_handler handler); static void spend_fetched(const code& ec, - const chain::input_point& inpoint, const incoming& request, + const chain::input_point& inpoint, const message& request, send_handler handler); static void block_height_fetched(const code& ec, size_t block_height, - const incoming& request, send_handler handler); + const message& request, send_handler handler); static void stealth_fetched(const code& ec, const chain::stealth_compact::list& stealth_results, - const incoming& request, send_handler handler); + const message& request, send_handler handler); }; } // namespace server diff --git a/include/bitcoin/server/interface/protocol.hpp b/include/bitcoin/server/interface/protocol.hpp index 7f5cab0c..43f036ba 100644 --- a/include/bitcoin/server/interface/protocol.hpp +++ b/include/bitcoin/server/interface/protocol.hpp @@ -22,8 +22,7 @@ #include #include -#include -#include +#include #include namespace libbitcoin { @@ -36,15 +35,15 @@ class BCS_API protocol public: /// Broadcast a transaction to all connected peers. static void broadcast_transaction(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); /// Determine the count of all connected peers. static void total_connections(server_node& node, - const incoming& request, send_handler handler); + const message& request, send_handler handler); private: static void handle_total_connections(size_t count, - const incoming& request, send_handler handler); + const message& request, send_handler handler); }; } // namespace server diff --git a/include/bitcoin/server/interface/transaction_pool.hpp b/include/bitcoin/server/interface/transaction_pool.hpp index cbc1dcba..3764446b 100644 --- a/include/bitcoin/server/interface/transaction_pool.hpp +++ b/include/bitcoin/server/interface/transaction_pool.hpp @@ -22,8 +22,7 @@ #include #include -#include -#include +#include #include namespace libbitcoin { @@ -35,17 +34,21 @@ class BCS_API transaction_pool { public: /// Fetch a transaction from the transaction pool (only), by its hash. - static void fetch_transaction(server_node& node, const incoming& request, + static void fetch_transaction(server_node& node, const message& request, + send_handler handler); + + /// Broadcast a transaction with penetration subscription. + static void broadcast(server_node& node, const message& request, send_handler handler); /// Validate a transaction against the transaction pool and blockchain. - static void validate(server_node& node, const incoming& request, + static void validate(server_node& node, const message& request, send_handler handler); private: static void handle_validated(const code& ec, const chain::transaction& tx, const hash_digest& tx_hash, const chain::point::indexes& unconfirmed, - const incoming& request, send_handler handler); + const message& request, send_handler handler); }; } // namespace server diff --git a/include/bitcoin/server/messages/message.hpp b/include/bitcoin/server/messages/message.hpp new file mode 100644 index 00000000..a25b30a1 --- /dev/null +++ b/include/bitcoin/server/messages/message.hpp @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin-server. + * + * libbitcoin-server is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License with + * additional permissions to the one published by the Free Software + * Foundation, either version 3 of the License, or (at your option) + * any later version. For more information see LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_MESSAGE +#define LIBBITCOIN_SERVER_MESSAGE + +#include +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class BCS_API message +{ +public: + static data_chunk to_bytes(const code& ec); + + //// Construct an empty message with security routing context. + message(bool secure); + + //// Construct a response for the request (code only). + message(const message& request, const code& ec); + + //// Construct a response for the request (data with code). + message(const message& request, const data_chunk& data); + + //// Construct a response for the route (subscription code only). + message(const server::route& route, const std::string& command, + uint32_t id, const code& ec); + + //// Construct a response for the route (subscription data with code). + message(const server::route& route, const std::string& command, + uint32_t id, const data_chunk& data); + + /// Arbitrary caller data (returned to caller for correlation). + uint32_t id() const; + + /// Serialized query or response (defined in relation to command). + const data_chunk& data() const; + + /// Query command (used for subscription, always returned to caller). + const std::string& command() const; + + /// The message route. + const server::route& route() const; + + /// Receive a message via the socket. + code receive(bc::protocol::zmq::socket& socket); + + /// Send the message via the socket. + code send(bc::protocol::zmq::socket& socket); + +private: + uint32_t id_; + data_chunk data_; + server::route route_; + std::string command_; +}; + +typedef std::function send_handler; + +} // namespace server +} // namespace libbitcoin + +#endif diff --git a/include/bitcoin/server/messages/outgoing.hpp b/include/bitcoin/server/messages/outgoing.hpp deleted file mode 100644 index b195e8c9..00000000 --- a/include/bitcoin/server/messages/outgoing.hpp +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) - * - * This file is part of libbitcoin-server. - * - * libbitcoin-server is free software: you can redistribute it and/or - * modify it under the terms of the GNU Affero General Public License with - * additional permissions to the one published by the Free Software - * Foundation, either version 3 of the License, or (at your option) - * any later version. For more information see LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef LIBBITCOIN_SERVER_OUTGOING -#define LIBBITCOIN_SERVER_OUTGOING - -#include -#include -#include -#include -#include - -namespace libbitcoin { -namespace server { - -class BCS_API outgoing -{ -public: - /// Return an error code in response to the incoming query. - outgoing(const incoming& request, const code& ec); - - /// Return data in response to a successfully-executed incoming query. - outgoing(const incoming& request, const data_chunk& data); - - /// Return data as a subscription by the given address. - outgoing(const std::string& command, const data_chunk& data, - const data_chunk& address1, const data_chunk& address2, - bool delimited); - - /// A printable address for logging only. - std::string address(); - - /// Send the message one the socket. - code send(bc::protocol::zmq::socket& socket); - -protected: - outgoing(const std::string& command, const data_chunk& data, - const data_chunk& address1, const data_chunk& address2, bool delimited, - uint32_t id); - -private: - bc::protocol::zmq::message message_; -}; - -typedef std::function send_handler; - -} // namespace server -} // namespace libbitcoin - -#endif diff --git a/include/bitcoin/server/messages/incoming.hpp b/include/bitcoin/server/messages/route.hpp similarity index 62% rename from include/bitcoin/server/messages/incoming.hpp rename to include/bitcoin/server/messages/route.hpp index 0475856a..3bc69a04 100644 --- a/include/bitcoin/server/messages/incoming.hpp +++ b/include/bitcoin/server/messages/route.hpp @@ -17,43 +17,37 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#ifndef LIBBITCOIN_SERVER_INCOMING -#define LIBBITCOIN_SERVER_INCOMING +#ifndef LIBBITCOIN_SERVER_ROUTE +#define LIBBITCOIN_SERVER_ROUTE -#include -#include #include #include namespace libbitcoin { namespace server { -class BCS_API incoming +/// This class is not thread safe. +/// The route is fixed in compliance with v2/v3 limitations. +class BCS_API route { public: - /// A printable address for logging only. - std::string address(); - - /// Send a message from the socket. - code receive(bc::protocol::zmq::socket& socket, bool secure=false); + /// Construct a route. + route(); - /// The message route as seen at workers. - data_chunk address1; - data_chunk address2; - bool delimited; + /// A printable address for logging only. + std::string display() const; - /// For deferred work, directs worker to respond on secure endpoint. + /// The message requires a secure port. bool secure; - /// Query command (used for subscription, always returned to caller). - std::string command; + /// The message route is delimited using an empty frame. + bool delimited; - /// Structure is little-endian. - /// Arbitrary caller data (returned to caller for correlation). - uint32_t id; + /// The first address. + data_chunk address1; - /// Serialized query (structure defined in relation to command). - data_chunk data; + /// The second address. + data_chunk address2; }; } // namespace server diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp index ad01e30a..739a569b 100644 --- a/include/bitcoin/server/server_node.hpp +++ b/include/bitcoin/server/server_node.hpp @@ -26,8 +26,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include @@ -83,15 +83,16 @@ class BCS_API server_node // Notification. // ------------------------------------------------------------------------ - /////// Subscribe to address and stealth prefix notifications. - ////virtual void subscribe_address(route& reply_to, binary& prefix_filter, - //// chain::subscribe_type& type); + /// Subscribe to address (including stealth) prefix notifications. + /// Stealth prefix is limited to 32 bits, address prefix to 256 bits. + virtual void subscribe_address(const route& reply_to, + const binary& prefix_filter, chain::subscribe_type type); - /////// Subscribe to transaction radar notifications. - ////virtual void subscribe_radar(route& reply_to, hash_digest& tx_hash); + /// Subscribe to transaction penetration notifications. + virtual void subscribe_penetration(const route& reply_to, + const hash_digest& tx_hash); private: - void handle_running(const code& ec, result_handler handler); bool start_services(); @@ -106,7 +107,6 @@ class BCS_API server_node // These are thread safe. authenticator authenticator_; - ////notifications notifications_; query_service secure_query_service_; query_service public_query_service_; heartbeat_service secure_heartbeat_service_; diff --git a/include/bitcoin/server/utility/fetch_helpers.hpp b/include/bitcoin/server/utility/fetch_helpers.hpp index 74b791fe..f524f8e4 100644 --- a/include/bitcoin/server/utility/fetch_helpers.hpp +++ b/include/bitcoin/server/utility/fetch_helpers.hpp @@ -24,8 +24,7 @@ #include #include #include -#include -#include +#include #include namespace libbitcoin { @@ -38,19 +37,19 @@ static BC_CONSTEXPR size_t point_size = hash_size + sizeof(uint32_t); // fetch_history stuff bool BCS_API unwrap_fetch_history_args(wallet::payment_address& address, - uint32_t& from_height, const incoming& request); + uint32_t& from_height, const message& request); void BCS_API send_history_result(const code& ec, - const chain::history_compact::list& history, const incoming& request, + const chain::history_compact::list& history, const message& request, send_handler handler); // fetch_transaction stuff bool BCS_API unwrap_fetch_transaction_args(hash_digest& hash, - const incoming& request); + const message& request); void BCS_API transaction_fetched(const code& ec, const chain::transaction& tx, - const incoming& request, send_handler handler); + const message& request, send_handler handler); } // namespace server } // namespace libbitcoin diff --git a/include/bitcoin/server/workers/notification_worker.hpp b/include/bitcoin/server/workers/notification_worker.hpp index b707e250..14996417 100644 --- a/include/bitcoin/server/workers/notification_worker.hpp +++ b/include/bitcoin/server/workers/notification_worker.hpp @@ -24,8 +24,8 @@ #include #include #include -#include -#include +#include +#include #include namespace libbitcoin { @@ -33,22 +33,6 @@ namespace server { class server_node; -////struct subscription_locator -////{ -//// send_handler handler; -//// data_chunk address1; -//// data_chunk address2; -//// bool delimited; -////}; -//// -////struct subscription_record -////{ -//// binary prefix; -//// chain::subscribe_type type; -//// boost::posix_time::ptime expiry_time; -//// subscription_locator locator; -////}; - // This class is thread safe. // Provide address and stealth notifications to the query service. class BCS_API notification_worker @@ -67,12 +51,13 @@ class BCS_API notification_worker /// Stop the worker. bool stop() override; - /////// Subscribe to address and stealth prefix notifications. - ////virtual void subscribe_address(route& reply_to, binary& prefix_filter, - //// chain::subscribe_type& type); + /// Subscribe to address and stealth prefix notifications. + virtual void subscribe_address(const route& reply_to, + const binary& prefix_filter, chain::subscribe_type type); - /////// Subscribe to address and stealth prefix notifications. - ////virtual void subscribe_radar(route& reply_to, hash_digest& tx_hash); + /// Subscribe to transaction penetration notifications. + virtual void subscribe_penetration(const route& reply_to, + const hash_digest& tx_hash); protected: typedef bc::protocol::zmq::socket socket; @@ -87,49 +72,68 @@ class BCS_API notification_worker typedef chain::block::ptr_list block_list; typedef chain::point::indexes index_list; - typedef resubscriber address_subscriber; - typedef resubscriber - inventory_subscriber; - typedef resubscriber payment_subscriber; + typedef resubscriber stealth_subscriber; + typedef resubscriber address_subscriber; + typedef resubscriber penetration_subscriber; + + // Remove expired subscriptions. + void prune(); bool handle_blockchain_reorganization(const code& ec, uint64_t fork_point, const block_list& new_blocks, const block_list&); bool handle_transaction_pool(const code& ec, const index_list&, const chain::transaction& tx); bool handle_inventory(const code& ec, - const message::inventory::ptr packet); + const bc::message::inventory::ptr packet); void notify_blocks(uint32_t fork_point, const block_list& blocks); void notify_block(socket& peer, uint32_t height, const chain::block::ptr block); void notify_transaction(uint32_t height, const hash_digest& block_hash, const chain::transaction& tx); - void notify_address(const wallet::payment_address& address, + + // v2/v3 (deprecated) + void notify_payment(const wallet::payment_address& address, uint32_t height, const hash_digest& block_hash, const chain::transaction& tx); void notify_stealth(uint32_t prefix, uint32_t height, const hash_digest& block_hash, const chain::transaction& tx); - void notify_inventory(uint32_t height, const hash_digest& block_hash, - const hash_digest& tx_hash); - - ////static boost::posix_time::ptime now(); - ////void scan(uint32_t height, const hash_digest& block_hash, - //// const chain::transaction& tx); + // v3 + void notify_address(const binary& field, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx); + void notify_penetration(uint32_t height, const hash_digest& block_hash, + const hash_digest& tx_hash); - ////void post_updates(const wallet::payment_address& address, - //// uint32_t height, const hash_digest& block_hash, - //// const chain::transaction& tx); - ////void post_stealth_updates(uint32_t prefix, uint32_t height, - //// const hash_digest& block_hash, const chain::transaction& tx); + // Send a notification to the subscriber. + void send(const route& reply_to, const std::string& command, + uint32_t id, const data_chunk& payload); + void send_payment(const route& reply_to, uint32_t id, + const wallet::payment_address& address, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx); + void send_stealth(const route& reply_to, uint32_t id, uint32_t prefix, + uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx); + void send_address(const route& reply_to, uint32_t id, uint8_t sequence, + uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx); - size_t prune() { return 0; } - ////code create(const incoming& request, send_handler handler); - ////code update(const incoming& request, send_handler handler); - ////bool deserialize(binary& address, chain::subscribe_type& type, - //// const data_chunk& data); + bool handle_payment(const code& ec, const wallet::payment_address& address, + uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx, const route& reply_to, uint32_t id, + const binary& prefix_filter); + bool handle_stealth(const code& ec, uint32_t prefix, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx, + const route& reply_to, uint32_t id, const binary& prefix_filter); + bool handle_address(const code& ec, const binary& field, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx, + const route& reply_to, uint32_t id, const binary& prefix_filter, + std::shared_ptr sequence); const bool secure_; const server::settings& settings_; @@ -138,8 +142,9 @@ class BCS_API notification_worker server_node& node_; bc::protocol::zmq::authenticator& authenticator_; address_subscriber::ptr address_subscriber_; - inventory_subscriber::ptr inventory_subscriber_; + payment_subscriber::ptr payment_subscriber_; stealth_subscriber::ptr stealth_subscriber_; + penetration_subscriber::ptr penetration_subscriber_; }; } // namespace server diff --git a/include/bitcoin/server/workers/query_worker.hpp b/include/bitcoin/server/workers/query_worker.hpp index b23cb34e..cf09dd54 100644 --- a/include/bitcoin/server/workers/query_worker.hpp +++ b/include/bitcoin/server/workers/query_worker.hpp @@ -26,8 +26,7 @@ #include #include #include -#include -#include +#include #include namespace libbitcoin { @@ -50,7 +49,7 @@ class BCS_API query_worker protected: typedef bc::protocol::zmq::socket socket; - typedef std::function command_handler; + typedef std::function command_handler; typedef std::unordered_map command_map; virtual void attach_interface(); diff --git a/src/interface/address.cpp b/src/interface/address.cpp index 69a9f474..8969a1f8 100644 --- a/src/interface/address.cpp +++ b/src/interface/address.cpp @@ -22,8 +22,7 @@ #include #include #include -#include -#include +#include #include #include @@ -34,7 +33,7 @@ using namespace std::placeholders; using namespace bc::chain; using namespace bc::wallet; -void address::fetch_history2(server_node& node, const incoming& request, +void address::fetch_history2(server_node& node, const message& request, send_handler handler) { static constexpr uint64_t limit = 0; @@ -43,7 +42,7 @@ void address::fetch_history2(server_node& node, const incoming& request, if (!unwrap_fetch_history_args(address, from_height, request)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -53,50 +52,61 @@ void address::fetch_history2(server_node& node, const incoming& request, _1, _2, request, handler)); } -void address::subscribe(server_node& node, const incoming& request, +// v2/v3 (deprecated), used for resubscription, alias for subscribe in v3. +void address::renew(server_node& node, const message& request, + send_handler handler) +{ + subscribe(node, request, handler); +} + +// v2/v3 (deprecated), requires an explicit subscription type. +void address::subscribe(server_node& node, const message& request, send_handler handler) { - route reply_to; binary prefix_filter; subscribe_type type; - if (!unwrap_subscribe_args(reply_to, prefix_filter, type, request)) + if (!unwrap_subscribe_args(prefix_filter, type, request)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } - // TODO: reenable. - ////node.subscribe_address(reply_to, prefix_filter, type); - - handler(outgoing(request, error::success)); + node.subscribe_address(request.route(), prefix_filter, type); + handler(message(request, error::success)); } -bool address::unwrap_subscribe_args(route& reply_to, binary& prefix_filter, - subscribe_type& type, const incoming& request) +bool address::unwrap_subscribe_args(binary& prefix_filter, + subscribe_type& type, const message& request) { + static constexpr auto address_bits = hash_size * byte_bits; + static constexpr auto stealth_bits = sizeof(uint32_t) * byte_bits; + // [ type:1 ] (0 = address prefix, 1 = stealth prefix) // [ prefix_bitsize:1 ] - // [ prefix_blocks:... ] - const auto& data = request.data; + // [ prefix_blocks:...] + const auto& data = request.data(); if (data.size() < 2) return false; // First byte is the subscribe_type enumeration. - if (data[0] != static_cast(subscribe_type::address) && - data[0] != static_cast(subscribe_type::stealth)) - return false; - type = static_cast(data[0]); + if (type != subscribe_type::payment && type != subscribe_type::stealth) + return false; + // Second byte is the number of bits. const auto bit_length = data[1]; + if ((type == subscribe_type::payment && bit_length > address_bits) || + (type == subscribe_type::stealth && bit_length > stealth_bits)) + return false; + // Convert the bit length to byte length. const auto byte_length = binary::blocks_size(bit_length); - if (data.size() != byte_length + 2) + if (data.size() - 2 != byte_length) return false; const data_chunk bytes({ data.begin() + 2, data.end() }); @@ -104,28 +114,52 @@ bool address::unwrap_subscribe_args(route& reply_to, binary& prefix_filter, return true; } -////void address::renew(server_node& node, const incoming& request, -//// send_handler handler) -////{ -//// route reply_to; -//// binary prefix_filter; -//// subscribe_type type; -//// -//// if (!unwrap_subscribe_args(reply_to, prefix_filter, type, request)) -//// { -//// handler(outgoing(request, error::bad_stream)); -//// return; -//// } -//// -//// node.renew(reply_to, prefix_filter, type); -////} -//// -////bool address::unwrap_renew_args(route& reply_to, binary& prefix_filter, -//// subscribe_type& type, const incoming& request) -////{ -//// // These are currently isomorphic. -//// return unwrap_subscribe_args(reply_to, prefix_filter, type, request); -////} +// v3 eliminates the subscription type, which we map to 'unspecified'. +void address::subscribe2(server_node& node, const message& request, + send_handler handler) +{ + static constexpr auto type = subscribe_type::unspecified; + + binary prefix_filter; + + if (!unwrap_subscribe2_args(prefix_filter, request)) + { + handler(message(request, error::bad_stream)); + return; + } + + node.subscribe_address(request.route(), prefix_filter, type); + handler(message(request, error::success)); +} + +bool address::unwrap_subscribe2_args(binary& prefix_filter, + const message& request) +{ + static constexpr auto address_bits = hash_size * byte_bits; + + // [ prefix_bitsize:1 ] + // [ prefix_blocks:...] + const auto& data = request.data(); + + if (data.empty()) + return false; + + // First byte is the number of bits. + const auto bit_length = data[0]; + + if (bit_length > address_bits) + return false; + + // Convert the bit length to byte length. + const auto byte_length = binary::blocks_size(bit_length); + + if (data.size() - 1 != byte_length) + return false; + + const data_chunk bytes({ data.begin() + 1, data.end() }); + prefix_filter = binary(bit_length, bytes); + return true; +} } // namespace server } // namespace libbitcoin diff --git a/src/interface/blockchain.cpp b/src/interface/blockchain.cpp index 6f69f507..5b903bc7 100644 --- a/src/interface/blockchain.cpp +++ b/src/interface/blockchain.cpp @@ -24,8 +24,7 @@ #include #include #include -#include -#include +#include #include #include @@ -37,8 +36,8 @@ using namespace bc::blockchain; using namespace bc::chain; using namespace bc::wallet; -void blockchain::fetch_history(server_node& node, - const incoming& request, send_handler handler) +void blockchain::fetch_history(server_node& node, const message& request, + send_handler handler) { static constexpr uint64_t limit = 0; uint32_t from_height; @@ -46,7 +45,7 @@ void blockchain::fetch_history(server_node& node, if (!unwrap_fetch_history_args(address, from_height, request)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -59,14 +58,14 @@ void blockchain::fetch_history(server_node& node, _1, _2, request, handler)); } -void blockchain::fetch_transaction(server_node& node, - const incoming& request, send_handler handler) +void blockchain::fetch_transaction(server_node& node, const message& request, + send_handler handler) { hash_digest tx_hash; if (!unwrap_fetch_transaction_args(tx_hash, request)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -78,14 +77,14 @@ void blockchain::fetch_transaction(server_node& node, _1, _2, request, handler)); } -void blockchain::fetch_last_height(server_node& node, - const incoming& request, send_handler handler) +void blockchain::fetch_last_height(server_node& node, const message& request, + send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); - if (!request.data.empty()) + if (!data.empty()) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -95,7 +94,7 @@ void blockchain::fetch_last_height(server_node& node, } void blockchain::last_height_fetched(const code& ec, size_t last_height, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { BITCOIN_ASSERT(last_height <= bc::max_uint32); auto last_height32 = static_cast(last_height); @@ -108,26 +107,26 @@ void blockchain::last_height_fetched(const code& ec, size_t last_height, serial.write_4_bytes_little_endian(last_height32); BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } -void blockchain::fetch_block_header(server_node& node, - const incoming& request, send_handler handler) +void blockchain::fetch_block_header(server_node& node, const message& request, + send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() == hash_size) blockchain::fetch_block_header_by_hash(node, request, handler); else if (data.size() == sizeof(uint32_t)) blockchain::fetch_block_header_by_height(node, request, handler); else - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); } void blockchain::fetch_block_header_by_hash(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); BITCOIN_ASSERT(data.size() == hash_size); auto deserial = make_deserializer(data.begin(), data.end()); @@ -139,9 +138,9 @@ void blockchain::fetch_block_header_by_hash(server_node& node, } void blockchain::fetch_block_header_by_height(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); BITCOIN_ASSERT(data.size() == sizeof(uint32_t)); auto deserial = make_deserializer(data.begin(), data.end()); @@ -153,7 +152,7 @@ void blockchain::fetch_block_header_by_height(server_node& node, } void blockchain::block_header_fetched(const code& ec, - const chain::header& block, const incoming& request, send_handler handler) + const chain::header& block, const message& request, send_handler handler) { const auto block_size64 = block.serialized_size(false); BITCOIN_ASSERT_MSG(block_size64 <= max_size_t, "Clearly Bitcoin is dead."); @@ -168,26 +167,26 @@ void blockchain::block_header_fetched(const code& ec, serial.write_data(block_data); BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } void blockchain::fetch_block_transaction_hashes(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() == hash_size) fetch_block_transaction_hashes_by_hash(node, request, handler); else if (data.size() == sizeof(uint32_t)) fetch_block_transaction_hashes_by_height(node, request, handler); else - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); } void blockchain::fetch_block_transaction_hashes_by_hash(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); BITCOIN_ASSERT(data.size() == hash_size); auto deserial = make_deserializer(data.begin(), data.end()); @@ -198,9 +197,9 @@ void blockchain::fetch_block_transaction_hashes_by_hash(server_node& node, } void blockchain::fetch_block_transaction_hashes_by_height(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); BITCOIN_ASSERT(data.size() == sizeof(uint32_t)); auto deserial = make_deserializer(data.begin(), data.end()); @@ -211,8 +210,7 @@ void blockchain::fetch_block_transaction_hashes_by_height(server_node& node, } void blockchain::block_transaction_hashes_fetched(const code& ec, - const hash_list& hashes, const incoming& request, - send_handler handler) + const hash_list& hashes, const message& request, send_handler handler) { data_chunk result(code_size + hash_size * hashes.size()); auto serial = make_serializer(result.begin()); @@ -224,17 +222,17 @@ void blockchain::block_transaction_hashes_fetched(const code& ec, BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } void blockchain::fetch_transaction_index(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() != hash_size) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -247,7 +245,7 @@ void blockchain::fetch_transaction_index(server_node& node, } void blockchain::transaction_index_fetched(const code& ec, size_t block_height, - size_t index, const incoming& request, send_handler handler) + size_t index, const message& request, send_handler handler) { BITCOIN_ASSERT(index <= max_uint32); auto index32 = static_cast(index); @@ -264,17 +262,17 @@ void blockchain::transaction_index_fetched(const code& ec, size_t block_height, serial.write_4_bytes_little_endian(block_height32); serial.write_4_bytes_little_endian(index32); - handler(outgoing(request, result)); + handler(message(request, result)); } -void blockchain::fetch_spend(server_node& node, const incoming& request, +void blockchain::fetch_spend(server_node& node, const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() != point_size) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -291,7 +289,7 @@ void blockchain::fetch_spend(server_node& node, const incoming& request, } void blockchain::spend_fetched(const code& ec, const chain::input_point& inpoint, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { // error_code (4), hash (32), index (4) const auto inpoint_size64 = inpoint.serialized_size(); @@ -306,17 +304,17 @@ void blockchain::spend_fetched(const code& ec, const chain::input_point& inpoint auto raw_inpoint = inpoint.to_data(); serial.write_data(raw_inpoint); - handler(outgoing(request, result)); + handler(message(request, result)); } void blockchain::fetch_block_height(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() != hash_size) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -328,7 +326,7 @@ void blockchain::fetch_block_height(server_node& node, } void blockchain::block_height_fetched(const code& ec, size_t block_height, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { BITCOIN_ASSERT(block_height <= max_uint32); auto block_height32 = static_cast(block_height); @@ -341,17 +339,17 @@ void blockchain::block_height_fetched(const code& ec, size_t block_height, BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); serial.write_4_bytes_little_endian(block_height32); - handler(outgoing(request, result)); + handler(message(request, result)); } -void blockchain::fetch_stealth(server_node& node, const incoming& request, +void blockchain::fetch_stealth(server_node& node, const message& request, send_handler handler) { - const auto& data = request.data; + const auto& data = request.data(); if (data.empty()) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -363,7 +361,7 @@ void blockchain::fetch_stealth(server_node& node, const incoming& request, if (data.size() != sizeof(uint8_t) + binary::blocks_size(bitsize) + sizeof(uint32_t)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -380,7 +378,7 @@ void blockchain::fetch_stealth(server_node& node, const incoming& request, } void blockchain::stealth_fetched(const code& ec, - const stealth_compact::list& stealth_results, const incoming& request, + const stealth_compact::list& stealth_results, const message& request, send_handler handler) { // [ ephemeral_key_hash:32 ] @@ -399,7 +397,7 @@ void blockchain::stealth_fetched(const code& ec, serial.write_hash(row.transaction_hash); } - handler(outgoing(request, result)); + handler(message(request, result)); } } // namespace server diff --git a/src/interface/protocol.cpp b/src/interface/protocol.cpp index 5d7b343f..80d3f2bb 100644 --- a/src/interface/protocol.cpp +++ b/src/interface/protocol.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -34,14 +35,14 @@ using namespace std::placeholders; // This does NOT save to our memory pool. // The transaction will hit our memory pool when it is picked up from a peer. -void protocol::broadcast_transaction(server_node& node, - const incoming& request, send_handler handler) +void protocol::broadcast_transaction(server_node& node, const message& request, + send_handler handler) { chain::transaction tx; - if (!tx.from_data(request.data)) + if (!tx.from_data(request.data())) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -52,15 +53,15 @@ void protocol::broadcast_transaction(server_node& node, node.broadcast(tx, ignore_send, ignore_complete); // Tell the user everything is fine. - handler(outgoing(request, error::success)); + handler(message(request, error::success)); } -void protocol::total_connections(server_node& node, const incoming& request, +void protocol::total_connections(server_node& node, const message& request, send_handler handler) { - if (!request.data.empty()) + if (!request.data().empty()) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -69,7 +70,7 @@ void protocol::total_connections(server_node& node, const incoming& request, _1, request, handler)); } -void protocol::handle_total_connections(size_t count, const incoming& request, +void protocol::handle_total_connections(size_t count, const message& request, send_handler handler) { BITCOIN_ASSERT(count <= max_uint32); @@ -83,7 +84,7 @@ void protocol::handle_total_connections(size_t count, const incoming& request, serial.write_4_bytes_little_endian(total_connections); BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } } // namespace server diff --git a/src/interface/transaction_pool.cpp b/src/interface/transaction_pool.cpp index 7648d62b..ef7612a5 100644 --- a/src/interface/transaction_pool.cpp +++ b/src/interface/transaction_pool.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -33,13 +34,13 @@ using namespace std::placeholders; using namespace bc::chain; void transaction_pool::fetch_transaction(server_node& node, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { hash_digest hash; if (!unwrap_fetch_transaction_args(hash, request)) { - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -51,16 +52,24 @@ void transaction_pool::fetch_transaction(server_node& node, _1, _2, request, handler)); } -void transaction_pool::validate(server_node& node, const incoming& request, +// Broadcast a transaction with penetration subscription. +void transaction_pool::broadcast(server_node& node, const message& request, + send_handler handler) +{ + // TODO: conditionally subscribe to penetration notifications. + // TODO: broadcast transaction to receiving peers. +} + +void transaction_pool::validate(server_node& node, const message& request, send_handler handler) { transaction tx; - if (!tx.from_data(request.data)) + if (!tx.from_data(request.data())) { // NOTE: the format of this response changed in v3 (send only code). // This is our standard behavior and should not break clients. - handler(outgoing(request, error::bad_stream)); + handler(message(request, error::bad_stream)); return; } @@ -71,7 +80,7 @@ void transaction_pool::validate(server_node& node, const incoming& request, void transaction_pool::handle_validated(const code& ec, const transaction& tx, const hash_digest& tx_hash, const point::indexes& unconfirmed, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { data_chunk result(code_size + unconfirmed.size() * index_size); auto serial = make_serializer(result.begin()); @@ -87,7 +96,7 @@ void transaction_pool::handle_validated(const code& ec, const transaction& tx, BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } } // namespace server diff --git a/src/messages/incoming.cpp b/src/messages/incoming.cpp deleted file mode 100644 index 707f923c..00000000 --- a/src/messages/incoming.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) - * - * This file is part of libbitcoin-server. - * - * libbitcoin-server is free software: you can redistribute it and/or - * modify it under the terms of the GNU Affero General Public License with - * additional permissions to the one published by the Free Software - * Foundation, either version 3 of the License, or (at your option) - * any later version. For more information see LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include -#include -#include - -namespace libbitcoin { -namespace server { - -using namespace bc::protocol; - -std::string incoming::address() -{ - return "[" + encode_base16(address1) + "]"; -} - -// Protocol delimitation migration plan. -//----------------------------------------------------------------------------- -// v1/v2 server: ROUTER, requires not delimited -// v3 server: ROUTER, allows/echos delimited -// v1/v2/v3 client: DEALER (not framed) -//----------------------------------------------------------------------------- -// v4 server: ROUTER, requires delimited -// v4 client: DEALER (delimited) or REQ -//----------------------------------------------------------------------------- - -// TODO: generalize address stripping, store all routes, use (hack) parameter -// of expected payload length for parsing undelimited addressing. -// BUGBUG: current implementation assumes client has not added any addresses. -// This probably prevents the use of generalized zeromq routing to the server. -code incoming::receive(zmq::socket& socket, bool secure) -{ - zmq::message message; - auto ec = socket.receive(message); - - if (ec) - return ec; - - if (message.size() < 5 || message.size() > 6) - return error::bad_stream; - - // Client is undelimited DEALER -> 2 addresses with no delimiter. - // Client is REQ or delimited DEALER -> 2 addresses with delimiter. - address1 = message.dequeue_data(); - address2 = message.dequeue_data(); - - // In the reply we echo the delimited-ness of the original request. - delimited = message.size() == 4; - - if (delimited) - message.dequeue(); - - // All libbitcoin queries have these three frames. - //------------------------------------------------------------------------- - - // Query command (returned to caller). - command = message.dequeue_text(); - - // Arbitrary caller data (returned to caller for correlation). - if (!message.dequeue(id)) - return error::bad_stream; - - // Serialized query. - data = message.dequeue_data(); - - // For deferred work, directs worker to respond on secure endpoint. - this->secure = secure; - return error::success; -} - -} // namespace server -} // namespace libbitcoin diff --git a/src/messages/message.cpp b/src/messages/message.cpp new file mode 100644 index 00000000..6f56f922 --- /dev/null +++ b/src/messages/message.cpp @@ -0,0 +1,184 @@ +/** + * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin-server. + * + * libbitcoin-server is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License with + * additional permissions to the one published by the Free Software + * Foundation, either version 3 of the License, or (at your option) + * any later version. For more information see LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +using namespace bc::protocol; + +// Protocol delimitation migration plan. +//----------------------------------------------------------------------------- +// v1/v2 server: ROUTER, requires not delimited +// v3 server: ROUTER, allows/echos delimited +// v1/v2/v3 client: DEALER (not delimited) +//----------------------------------------------------------------------------- +// v4 server: ROUTER, requires delimited +// v4 client: DEALER (delimited) or REQ +//----------------------------------------------------------------------------- + +// Convert an error code to data for payload. +data_chunk message::to_bytes(const code& ec) +{ + return build_chunk( + { + to_little_endian(static_cast(ec.value())) + }); +} + +// Constructors. +//------------------------------------------------------------------------- + +// Construct an empty message with security routing context. +message::message(bool secure) +{ + // For subscriptions, directs notifier to respond on secure endpoint. + route_.secure = secure; +} + +// Construct a response for the request (response code only). +message::message(const message& request, const code& ec) + : message(request, to_bytes(ec)) +{ +} + +// Construct a response for the request (response data with code). +message::message(const message& request, const data_chunk& data) + : message(request.route(), request.command(), request.id(), data) +{ +} + +// Construct a response for the route (subscription code only). +message::message(const server::route& route, const std::string& command, + uint32_t id, const code& ec) + : message(route, command, id, to_bytes(ec)) +{ +} + +// Construct a response for the route (subscription data with code). +message::message(const server::route& route, const std::string& command, + uint32_t id, const data_chunk& data) + : route_(route), command_(command), id_(id), data_(data) +{ +} + +// Properties. +//------------------------------------------------------------------------- + +/// Arbitrary caller data (returned to caller for correlation). +uint32_t message::id() const +{ + return id_; +} + +/// Serialized query or response (defined in relation to command). +const data_chunk& message::data() const +{ + return data_; +} + +/// Query command (used for subscription, always returned to caller). +const std::string& message::command() const +{ + return command_; +} + +/// The message route. +const server::route& message::route() const +{ + return route_; +} + +// Transport. +//------------------------------------------------------------------------- + +code message::receive(zmq::socket& socket) +{ + zmq::message message; + auto ec = socket.receive(message); + + if (ec) + return ec; + + if (message.size() < 5 || message.size() > 6) + return error::bad_stream; + + // Decode the routing information (TODO: generalize in route). + //------------------------------------------------------------------------- + + // Client is undelimited DEALER -> 2 addresses with no delimiter. + // Client is REQ or delimited DEALER -> 2 addresses with delimiter. + route_.address1 = message.dequeue_data(); + route_.address2 = message.dequeue_data(); + + // In the reply we echo the delimited-ness of the original request. + route_.delimited = message.size() == 4; + + if (route_.delimited) + message.dequeue(); + + // All libbitcoin queries and responses have these three frames. + //------------------------------------------------------------------------- + + // Query command (returned to caller). + command_ = message.dequeue_text(); + + // Arbitrary caller data (returned to caller for correlation). + if (!message.dequeue(id_)) + return error::bad_stream; + + // Serialized query. + data_ = message.dequeue_data(); + + return error::success; +} + +code message::send(zmq::socket& socket) +{ + zmq::message message; + + // Encode the routing information (TODO: generalize in route). + //------------------------------------------------------------------------- + + // Client is undelimited DEALER -> 2 addresses with no delimiter. + // Client is REQ or delimited DEALER -> 2 addresses with delimiter. + message.enqueue(route_.address1); + message.enqueue(route_.address2); + + // In the reply we echo the delimited-ness of the original request. + if (route_.delimited) + message.enqueue(); + + // All libbitcoin queries and responses have these three frames. + //------------------------------------------------------------------------- + message.enqueue(command_); + message.enqueue_little_endian(id_); + message.enqueue(data_); + + return socket.send(message); +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/messages/outgoing.cpp b/src/messages/outgoing.cpp deleted file mode 100644 index cea77dcb..00000000 --- a/src/messages/outgoing.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) - * - * This file is part of libbitcoin-server. - * - * libbitcoin-server is free software: you can redistribute it and/or - * modify it under the terms of the GNU Affero General Public License with - * additional permissions to the one published by the Free Software - * Foundation, either version 3 of the License, or (at your option) - * any later version. For more information see LICENSE. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include - -#include -#include -#include -#include -#include - -namespace libbitcoin { -namespace server { - -using namespace bc::protocol; - -// Convert an error code to data for payload. -inline data_chunk to_chunk(const code& ec) -{ - return build_chunk( - { - to_little_endian(static_cast(ec.value())) - }); -} - -// Return an error code in response to the incoming query. -outgoing::outgoing(const incoming& request, const code& ec) - : outgoing(request, to_chunk(ec)) -{ -} - -// Return data in response to a successfully-executed incoming query. -outgoing::outgoing(const incoming& request, const data_chunk& data) - : outgoing(request.command, data, request.address1, request.address2, - request.delimited, request.id) -{ -} - -// Return data as a subscription by the given address (zero id). -outgoing::outgoing(const std::string& command, const data_chunk& data, - const data_chunk& address1, const data_chunk& address2, bool delimited) - : outgoing(command, data, address1, address2, delimited, 0) -{ -} - -// protected -outgoing::outgoing(const std::string& command, const data_chunk& data, - const data_chunk& address1, const data_chunk& address2, bool delimited, - uint32_t id) -{ - // Client is undelimited DEALER -> 2 addresses with no delimiter. - // Client is REQ or delimited DEALER -> 2 addresses with delimiter. - message_.enqueue(address1); - message_.enqueue(address2); - - // In the reply we echo the delimited-ness of the original request. - if (delimited) - message_.enqueue(); - - // All libbitcoin queries have these three frames. - //------------------------------------------------------------------------- - message_.enqueue(command); - message_.enqueue_little_endian(id); - message_.enqueue(data); -} - -std::string outgoing::address() -{ - auto message = message_; - return "[" + encode_base16(message.dequeue_data()) + "]"; -} - -code outgoing::send(zmq::socket& socket) -{ - return socket.send(message_); -} - -} // namespace server -} // namespace libbitcoin diff --git a/src/messages/route.cpp b/src/messages/route.cpp new file mode 100644 index 00000000..86fc843f --- /dev/null +++ b/src/messages/route.cpp @@ -0,0 +1,39 @@ +/** + * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin-server. + * + * libbitcoin-server is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License with + * additional permissions to the one published by the Free Software + * Foundation, either version 3 of the License, or (at your option) + * any later version. For more information see LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include + +namespace libbitcoin { +namespace server { + +route::route() + : secure(false), delimited(false) +{ +} + +std::string route::display() const +{ + return "[" + encode_base16(address1) + ":" + encode_base16(address2) + "]"; +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/server_node.cpp b/src/server_node.cpp index 2b38c695..6d21986f 100644 --- a/src/server_node.cpp +++ b/src/server_node.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include namespace libbitcoin { @@ -38,7 +39,6 @@ server_node::server_node(const configuration& configuration) : p2p_node(configuration), configuration_(configuration), authenticator_(*this), - ////notifications_(); secure_query_service_(authenticator_, *this, true), public_query_service_(authenticator_, *this, false), secure_heartbeat_service_(authenticator_, *this, true), @@ -61,11 +61,6 @@ server_node::~server_node() // Properties. // ---------------------------------------------------------------------------- -////notifications& server_node::notifier() -////{ -//// return notifications_; -////} - const settings& server_node::server_settings() const { return configuration_.server; @@ -125,26 +120,27 @@ bool server_node::close() // Notification. // ---------------------------------------------------------------------------- -////// Subscribe to address and stealth prefix notifications. -////void server_node::subscribe_address(route& reply_to, binary& prefix_filter, -//// subscribe_type& type) -////{ -//// if (true) -//// secure_notification_worker_ -//// .subscribe_address(reply_to, prefix_filter, type); -//// else -//// public_notification_worker_ -//// .subscribe_address(reply_to, prefix_filter, type); -////} -//// -////// Subscribe to transaction radar notifications. -////void server_node::subscribe_radar(route& reply_to, hash_digest& tx_hash) -////{ -//// if (true) -//// secure_notification_worker_.subscribe_radar(reply_to, tx_hash); -//// else -//// public_notification_worker_.subscribe_radar(reply_to, tx_hash); -////} +// Subscribe to address (including stealth) prefix notifications. +void server_node::subscribe_address(const route& reply_to, + const binary& prefix_filter, subscribe_type type) +{ + if (reply_to.secure) + secure_notification_worker_ + .subscribe_address(reply_to, prefix_filter, type); + else + public_notification_worker_ + .subscribe_address(reply_to, prefix_filter, type); +} + +// Subscribe to transaction penetration notifications. +void server_node::subscribe_penetration(const route& reply_to, + const hash_digest& tx_hash) +{ + if (reply_to.secure) + secure_notification_worker_.subscribe_penetration(reply_to, tx_hash); + else + public_notification_worker_.subscribe_penetration(reply_to, tx_hash); +} // Services. // ---------------------------------------------------------------------------- @@ -179,13 +175,13 @@ bool server_node::start_query_services() if (!settings.query_service_enabled || settings.query_workers == 0) return true; - // Start secure service, query workers and address workers if enabled. + // Start secure service, query workers and notification workers if enabled. if (settings.server_private_key && (!secure_query_service_.start() || (settings.subscription_limit > 0 && !secure_notification_worker_.start()) || !start_query_workers(true))) return false; - // Start public service, query workers and address workers if enabled. + // Start public service, query workers and notification workers if enabled. if (!settings.secure_only && (!public_query_service_.start() || (settings.subscription_limit > 0 && !public_notification_worker_.start()) || !start_query_workers(false))) @@ -285,14 +281,14 @@ uint32_t server_node::threads_required(const configuration& configuration) { ++required; required += settings.query_workers; - required += (settings.subscription_limit > 0 ? 1 : 0); + required += (settings.subscription_limit > 0 ? 4 : 0); } if (!settings.secure_only) { ++required; required += settings.query_workers; - required += (settings.subscription_limit > 0 ? 1 : 0); + required += (settings.subscription_limit > 0 ? 4 : 0); } } diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp index c52c616d..d7ab914b 100644 --- a/src/services/block_service.cpp +++ b/src/services/block_service.cpp @@ -201,6 +201,11 @@ void block_service::publish_blocks(uint32_t fork_point, publish_block(publisher, height++, block); } +// [ height:4 ] +// [ header:80 ] +// [ txs... ] +// The payload for block publication is delimited within the zeromq message. +// This is required for compatability and inconsistent with query payloads. void block_service::publish_block(zmq::socket& publisher, uint32_t height, const block::ptr block) { @@ -209,10 +214,10 @@ void block_service::publish_block(zmq::socket& publisher, uint32_t height, const auto security = secure_ ? "secure" : "public"; - zmq::message respose; - respose.enqueue_little_endian(height); - respose.enqueue(block->to_data(false)); - const auto ec = publisher.send(respose); + zmq::message broadcast; + broadcast.enqueue_little_endian(height); + broadcast.enqueue(block->to_data(false)); + const auto ec = publisher.send(broadcast); if (ec == bc::error::service_stopped) return; diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp index f2524500..845aad71 100644 --- a/src/services/query_service.cpp +++ b/src/services/query_service.cpp @@ -57,10 +57,12 @@ void query_service::work() if (!started(bind(router, query_dealer, notify_dealer))) return; - // TODO: integrate notify_dealer into relay. // TODO: tap in to failure conditions, such as high water. // Relay messages between router and dealer (blocks on context). + //************************************************************************* + // TODO: integrate notify_dealer into relay. relay(router, query_dealer); + //************************************************************************* // Unbind the sockets and exit this thread. finished(unbind(router, query_dealer, notify_dealer)); diff --git a/src/services/transaction_service.cpp b/src/services/transaction_service.cpp index 7d16374e..8385d608 100644 --- a/src/services/transaction_service.cpp +++ b/src/services/transaction_service.cpp @@ -163,6 +163,7 @@ bool transaction_service::handle_transaction(const code& ec, return true; } +// [ tx... ] void transaction_service::publish_transaction(const transaction& tx) { if (stopped()) @@ -191,9 +192,9 @@ void transaction_service::publish_transaction(const transaction& tx) if (stopped()) return; - zmq::message respose; - respose.enqueue(tx.to_data()); - ec = publisher.send(respose); + zmq::message broadcast; + broadcast.enqueue(tx.to_data()); + ec = publisher.send(broadcast); if (ec == bc::error::service_stopped) return; diff --git a/src/utility/fetch_helpers.cpp b/src/utility/fetch_helpers.cpp index 7b2440ae..314a382a 100644 --- a/src/utility/fetch_helpers.cpp +++ b/src/utility/fetch_helpers.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include namespace libbitcoin { namespace server { @@ -36,12 +36,12 @@ using namespace bc::wallet; // ---------------------------------------------------------------------------- bool unwrap_fetch_history_args(payment_address& address, - uint32_t& from_height, const incoming& request) + uint32_t& from_height, const message& request) { static constexpr size_t history_args_size = sizeof(uint8_t) + short_hash_size + sizeof(uint32_t); - const auto& data = request.data; + const auto& data = request.data(); if (data.size() != history_args_size) { @@ -61,7 +61,7 @@ bool unwrap_fetch_history_args(payment_address& address, } void send_history_result(const code& ec, const history_compact::list& history, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { static constexpr size_t row_size = sizeof(uint8_t) + point_size + sizeof(uint32_t) + sizeof(uint64_t); @@ -82,16 +82,16 @@ void send_history_result(const code& ec, const history_compact::list& history, BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } // fetch_transaction stuff // ---------------------------------------------------------------------------- bool unwrap_fetch_transaction_args(hash_digest& hash, - const incoming& request) + const message& request) { - const auto& data = request.data; + const auto& data = request.data(); if (data.size() != hash_size) { @@ -106,7 +106,7 @@ bool unwrap_fetch_transaction_args(hash_digest& hash, } void transaction_fetched(const code& ec, const chain::transaction& tx, - const incoming& request, send_handler handler) + const message& request, send_handler handler) { const auto tx_size64 = tx.serialized_size(); BITCOIN_ASSERT(tx_size64 <= max_size_t); @@ -121,7 +121,7 @@ void transaction_fetched(const code& ec, const chain::transaction& tx, serial.write_data(tx_data); BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(outgoing(request, result)); + handler(message(request, result)); } } // namespace server diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp index 484d0ae0..785e1b64 100644 --- a/src/workers/notification_worker.cpp +++ b/src/workers/notification_worker.cpp @@ -22,10 +22,14 @@ #include #include #include +#include #include +#include +#include #include #include #include +#include namespace libbitcoin { namespace server { @@ -37,15 +41,9 @@ using namespace bc::chain; using namespace bc::protocol; using namespace bc::wallet; -// TX RADAR -// By monitoring for tx inventory we do not poll and therefore will not -// suffer from peers that don't provide redundant responses, will not -// suffer double-counting, or have complexity and poor perf from caching. -// However this requires that the user subscribe *before* submitting a -// transaction. This feature is useful for the submitter of the tx to the -// network - generally the spender. This is intended to allow the spender -// to increase fees (ie using replace by fee). It is *not* recommended for -// use by a receiver (ie for the acceptance of zero confirmation txs). +static const std::string address_update = "address.update"; +static const std::string address_update2 = "address.update2"; +static const std::string address_stealth = "address.stealth_update"; notification_worker::notification_worker(zmq::authenticator& authenticator, server_node& node, bool secure) @@ -54,22 +52,28 @@ notification_worker::notification_worker(zmq::authenticator& authenticator, settings_(node.server_settings()), node_(node), authenticator_(authenticator), + payment_subscriber_(std::make_shared( + node.thread_pool(), NAME "_payment")), + stealth_subscriber_(std::make_shared( + node.thread_pool(), NAME "_stealth")), address_subscriber_(std::make_shared( node.thread_pool(), NAME "_address")), - inventory_subscriber_(std::make_shared( - node.thread_pool(), NAME "_inventory")), - stealth_subscriber_(std::make_shared( - node.thread_pool(), NAME "_stealth")) + penetration_subscriber_(std::make_shared( + node.thread_pool(), NAME "_penetration")) { } // There is no unsubscribe so this class shouldn't be restarted. bool notification_worker::start() { - address_subscriber_->start(); - inventory_subscriber_->start(); + // v2/v3 (deprecated) + payment_subscriber_->start(); stealth_subscriber_->start(); + // v3 + address_subscriber_->start(); + penetration_subscriber_->start(); + // Subscribe to blockchain reorganizations. node_.subscribe_blockchain( std::bind(¬ification_worker::handle_blockchain_reorganization, @@ -81,7 +85,7 @@ bool notification_worker::start() this, _1, _2, _3)); // Subscribe to all inventory messages from all peers. - node_.subscribe( + node_.subscribe( std::bind(¬ification_worker::handle_inventory, this, _1, _2)); @@ -91,15 +95,27 @@ bool notification_worker::start() // No unsubscribe so must be kept in scope until subscriber stop complete. bool notification_worker::stop() { - address_subscriber_->stop(); - inventory_subscriber_->stop(); + static const auto code = error::channel_stopped; + + // v2/v3 (deprecated) + payment_subscriber_->stop(); + payment_subscriber_->do_relay(code, {}, 0, {}, {}); + stealth_subscriber_->stop(); + stealth_subscriber_->do_relay(code, 0, 0, {}, {}); + + // v3 + address_subscriber_->stop(); + address_subscriber_->do_relay(code, {}, 0, {}, {}); + + penetration_subscriber_->stop(); + penetration_subscriber_->do_relay(code, 0, {}, {}); return zmq::worker::stop(); } // Implement worker as a router to the query service. -// The address worker receives no messages from the query service. +// The notification worker receives no messages from the query service. void notification_worker::work() { zmq::socket router(authenticator_, zmq::socket::role::router); @@ -137,13 +153,13 @@ bool notification_worker::connect(socket& router) if (ec) { log::error(LOG_SERVER) - << "Failed to connect " << security << " address worker to " + << "Failed to connect " << security << " notification worker to " << endpoint << " : " << ec.message(); return false; } log::debug(LOG_SERVER) - << "Connected " << security << " address worker to " << endpoint; + << "Connected " << security << " notification worker to " << endpoint; return true; } @@ -156,10 +172,266 @@ bool notification_worker::disconnect(socket& router) return true; log::error(LOG_SERVER) - << "Failed to disconnect " << security << " address worker."; + << "Failed to disconnect " << security << " notification worker."; return false; } +// Pruning. +// ---------------------------------------------------------------------------- + +// Signal expired subscriptions to self-remove. +void notification_worker::prune() +{ + static const auto code = error::channel_timeout; + + // v2/v3 (deprecated) + payment_subscriber_->relay(code, {}, 0, {}, {}); + stealth_subscriber_->relay(code, 0, 0, {}, {}); + + // v3 + address_subscriber_->relay(code, {}, 0, {}, {}); + penetration_subscriber_->relay(code, 0, {}, {}); +} + +// Sending. +// ---------------------------------------------------------------------------- + +//***************************************************************************** +// TODO: the notify_dealer must be connected to the query_service router. +//***************************************************************************** +void notification_worker::send(const route& reply_to, + const std::string& command, uint32_t id, const data_chunk& payload) +{ + const auto security = secure_ ? "secure" : "public"; + const auto& endpoint = secure_ ? query_service::secure_notify : + query_service::public_notify; + + zmq::socket notifier(authenticator_, zmq::socket::role::router); + auto ec = notifier.connect(endpoint); + + if (ec == bc::error::service_stopped) + return; + + if (ec) + { + log::warning(LOG_SERVER) + << "Failed to connect " << security << " notification worker: " + << ec.message(); + return; + } + + // Notifications are formatted as query response messages. + message notification(reply_to, command, id, payload); + ec = notification.send(notifier); + + if (ec && ec != error::service_stopped) + log::warning(LOG_SERVER) + << "Failed to send notification to " + << notification.route().display() << " " << ec.message(); +} + +void notification_worker::send_payment(const route& reply_to, uint32_t id, + const wallet::payment_address& address, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx) +{ + // [ address.version:1 ] + // [ address.hash:20 ] + // [ height:4 ] + // [ block_hash:32 ] + // [ tx:... ] + const auto payload = build_chunk( + { + to_array(address.version()), + address.hash(), + to_little_endian(height), + block_hash, + tx.to_data() + }); + + send(reply_to, address_update, id, payload); +} + +void notification_worker::send_stealth(const route& reply_to, uint32_t id, + uint32_t prefix, uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx) +{ + // [ prefix:4 ] + // [ height:4 ] + // [ block_hash:32 ] + // [ tx:... ] + const auto payload = build_chunk( + { + to_little_endian(prefix), + to_little_endian(height), + block_hash, + tx.to_data() + }); + + send(reply_to, address_stealth, id, payload); +} + +void notification_worker::send_address(const route& reply_to, uint32_t id, + uint8_t sequence, uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx) +{ + // [ code:4 ] + // [ sequence:1 ] + // [ height:4 ] + // [ block_hash:32 ] + // [ tx:... ] + const auto payload = build_chunk( + { + message::to_bytes(error::success), + to_array(sequence), + to_little_endian(height), + block_hash, + tx.to_data() + }); + + send(reply_to, address_update2, id, payload); +} + +// Handlers. +// ---------------------------------------------------------------------------- + +bool notification_worker::handle_payment(const code& ec, + const payment_address& address, uint32_t height, + const hash_digest& block_hash, const chain::transaction& tx, + const route& reply_to, uint32_t id, const binary& prefix_filter) +{ + if (ec == error::channel_timeout && true /* not expired */) + { + return true; + } + + if (ec) + { + send(reply_to, address_update, id, message::to_bytes(ec)); + return false; + } + + if (prefix_filter.is_prefix_of(address.hash())) + send_payment(reply_to, id, address, height, block_hash, tx); + + return true; +} + +bool notification_worker::handle_stealth(const code& ec, + uint32_t prefix, uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx, const route& reply_to, uint32_t id, + const binary& prefix_filter) +{ + if (ec == error::channel_timeout && true /* not expired */) + { + return true; + } + + if (ec) + { + send(reply_to, address_stealth, id, message::to_bytes(ec)); + return false; + } + + if (prefix_filter.is_prefix_of(prefix)) + send_stealth(reply_to, id, prefix, height, block_hash, tx); + + return true; +} + +bool notification_worker::handle_address(const code& ec, + const binary& field, uint32_t height, const hash_digest& block_hash, + const chain::transaction& tx, const route& reply_to, uint32_t id, + const binary& prefix_filter, std::shared_ptr sequence) +{ + if (ec == error::channel_timeout && true /* not expired */) + { + return true; + } + + if (ec) + { + send(reply_to, address_update2, id, message::to_bytes(ec)); + return false; + } + + if (prefix_filter.is_prefix_of(field)) + send_address(reply_to, id, *sequence, height, block_hash, tx); + + return true; +} + +// Subscribers. +// ---------------------------------------------------------------------------- + +// Subscribe to address and stealth prefix notifications. +// Each delegate must connect to the appropriate query notification endpoint. +void notification_worker::subscribe_address(const route& reply_to, + const binary& prefix_filter, subscribe_type type) +{ + static const auto code = error::channel_stopped; + + switch (type) + { + // v2/v3 (deprecated) + case subscribe_type::payment: + { + const auto handler = + std::bind(¬ification_worker::handle_payment, + this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter); + + payment_subscriber_->subscribe(handler, code, {}, 0, {}, {}); + break; + } + + // v2/v3 (deprecated) + case subscribe_type::stealth: + { + const auto handler = + std::bind(¬ification_worker::handle_stealth, + this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter); + + stealth_subscriber_->subscribe(handler, code, 0, 0, {}, {}); + break; + } + + // v3 + default: + case subscribe_type::unspecified: + { + // The sequence enables the client to detect dropped messages. + const auto sequence = std::make_shared(0); + + const auto handler = + std::bind(¬ification_worker::handle_address, + this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter, + sequence); + + // v3 + address_subscriber_->subscribe(handler, code, {}, 0, {}, {}); + break; + } + } +} + +// Subscribe to transaction penetration notifications. +// Each delegate must connect to the appropriate query notification endpoint. +void notification_worker::subscribe_penetration(const route& reply_to, + const hash_digest& tx_hash) +{ + // TODO: + // Height and hash are zeroized if tx is not chained (inv/mempool). + // If chained or penetration is 100 (percent) drop subscription. + // Only send messages at configured thresholds (e.g. 20/40/60/80/100%). + // Thresholding allows the server to mask its peer count. + // Penetration is computed by the relay handler. + // No sequence is required because gaps are okay. + // [ tx_hash:32 ] + // [ penetration:1 ] + // [ height:4 ] + // [ block_hash:32 ] + ////penetration_subscriber_->subscribe(); +} + // Notification (via blockchain). // ---------------------------------------------------------------------------- @@ -228,20 +500,21 @@ void notification_worker::notify_block(zmq::socket& publisher, uint32_t height, const auto block_hash = block->header.hash(); - for (const auto tx: block->transactions) + for (const auto& tx: block->transactions) { const auto tx_hash = tx.hash(); notify_transaction(height, block_hash, tx); - notify_inventory(height, block_hash, tx_hash); + notify_penetration(height, block_hash, tx_hash); } } // Notification (via transaction inventory). // ---------------------------------------------------------------------------- +// This relies on peers always notifying us of new txs via inv messages. bool notification_worker::handle_inventory(const code& ec, - const message::inventory::ptr packet) + const bc::message::inventory::ptr packet) { if (stopped() || ec == bc::error::service_stopped) return false; @@ -255,10 +528,10 @@ bool notification_worker::handle_inventory(const code& ec, return true; } - //************************************************************************* - // TODO: loop inventories and extract transaction hashes. - notify_inventory(0, null_hash, packet->inventories.front().hash); - //************************************************************************* + // Loop inventories and extract transaction hashes. + for (const auto& inventory: packet->inventories) + if (inventory.type == bc::message::inventory_type_id::transaction) + notify_penetration(0, null_hash, inventory.hash); return true; } @@ -277,7 +550,7 @@ bool notification_worker::handle_transaction_pool(const code& ec, log::warning(LOG_SERVER) << "Failure handling new transaction: " << ec.message(); - // Don't let a failure here prevent prevent future notifications. + // Don't let a failure here prevent future notifications. return true; } @@ -285,87 +558,97 @@ bool notification_worker::handle_transaction_pool(const code& ec, return true; } +// This parsing is duplicated by bc::database::data_base. void notification_worker::notify_transaction(uint32_t height, const hash_digest& block_hash, const transaction& tx) { - if (stopped()) + uint32_t prefix; + + // TODO: move full integer and array constructors into binary. + static constexpr size_t prefix_bits = sizeof(prefix) * byte_bits; + static constexpr size_t address_bits = short_hash_size * byte_bits; + + if (stopped() || tx.outputs.empty()) return; - //************************************************************************* - // TODO: loop outputs and extract payment addresses and stealth prefixes. - uint32_t prefix = 42; - payment_address address; + // see data_base::push_inputs + // Loop inputs and extract payment addresses. + for (const auto& input: tx.inputs) + { + const auto address = payment_address::extract(input.script); + + if (address) + { + const binary field(address_bits, address.hash()); + notify_address(field, height, block_hash, tx); + notify_payment(address, height, block_hash, tx); + } + } + + // see data_base::push_outputs + // Loop outputs and extract payment addresses. + for (const auto& output: tx.outputs) + { + const auto address = payment_address::extract(output.script); + + if (address) + { + const binary field(address_bits, address.hash()); + notify_address(field, height, block_hash, tx); + notify_payment(address, height, block_hash, tx); + } + } - while (false) + // see data_base::push_stealth + // Loop output pairs and extract stealth payments. + for (size_t index = 0; index < (tx.outputs.size() - 1); ++index) { - notify_address(address, height, block_hash, tx); - notify_stealth(prefix, height, block_hash, tx); + const auto& ephemeral_script = tx.outputs[index].script; + const auto& payment_script = tx.outputs[index + 1].script; + + // Try to extract a stealth prefix from the first output. + // Try to extract the payment address from the second output. + if (to_stealth_prefix(prefix, ephemeral_script) && + payment_address::extract(payment_script)) + { + const binary field(prefix_bits, to_little_endian(prefix)); + notify_address(field, height, block_hash, tx); + notify_stealth(prefix, height, block_hash, tx); + } } - //************************************************************************* } -// TODO: add a sequence value to reply so client can detect dropped message. -void notification_worker::notify_address(const payment_address& address, +// v2/v3 (deprecated) +void notification_worker::notify_payment(const payment_address& address, uint32_t height, const hash_digest& block_hash, const transaction& tx) { - // [ address.version:1 ] - // [ address.hash:20 ] - // [ height:4 ] - // [ block_hash:32 ] - // [ tx:... ] - address_subscriber_->relay(address, height, block_hash, tx); + static const auto code = error::success; + payment_subscriber_->relay(code, address, height, block_hash, tx); } -// TODO: add a sequence value to reply so client can detect dropped message. +// v2/v3 (deprecated) void notification_worker::notify_stealth(uint32_t prefix, uint32_t height, const hash_digest& block_hash, const transaction& tx) { - // [ prefix:4 ] - // [ height:4 ] - // [ block_hash:32 ] - // [ tx:... ] - stealth_subscriber_->relay(prefix, height, block_hash, tx); + static const auto code = error::success; + stealth_subscriber_->relay(code, prefix, height, block_hash, tx); } -// No sequence is required as penetration is monotonically increasing. -void notification_worker::notify_inventory(uint32_t height, - const hash_digest& block_hash, const hash_digest& tx_hash) +// v3 +void notification_worker::notify_address(const binary& field, uint32_t height, + const hash_digest& block_hash, const transaction& tx) { - // Only provide height and hash if chained. - // If chained or penetration is 100 (percent) drop subscription. - // Only send messages at configured thresholds (e.g. 20/40/60/80/100%). - // Thresholding allows the server to mask its peer count. - // [ tx_hash:32 ] - // [ penetration:1 ] - // [ height:4 ] - // [ block_hash:32 ] - inventory_subscriber_->relay(height, block_hash, tx_hash); + static const auto code = error::success; + address_subscriber_->relay(code, field, height, block_hash, tx); } -// Subscribers. -// ---------------------------------------------------------------------------- - -/////// Subscribe to address and stealth prefix notifications. -////void notification_worker::subscribe_address(route& reply_to, -//// binary& prefix_filter, subscribe_type& type) -////{ -//// // Provide delegate that binds the above parameters and parameterizes -//// // the appropriate subscriber args, sending the notification, -//// // The delegate must connect back to a query notification endpoint. -//// address_subscriber_->subscribe(); -//// stealth_subscriber_->subscribe(); -////} -//// -/////// Subscribe to transaction radar notifications. -////void notification_worker::subscribe_radar(route& reply_to, -//// hash_digest& tx_hash) -////{ -//// // Provide delegate that binds the above parameters and parameterizes -//// // the subsciber args, sending the notification and updating state. -//// // The delegate must connect back to a query notification endpoint. -//// inventory_subscriber_->subscribe(); -////} - +// v3.x +void notification_worker::notify_penetration(uint32_t height, + const hash_digest& block_hash, const hash_digest& tx_hash) +{ + static const auto code = error::success; + penetration_subscriber_->relay(code, height, block_hash, tx_hash); +} // reference // ---------------------------------------------------------------------------- @@ -621,6 +904,5 @@ void notification_worker::notify_inventory(uint32_t height, //// return second_clock::universal_time(); ////}; - } // namespace server } // namespace libbitcoin diff --git a/src/workers/query_worker.cpp b/src/workers/query_worker.cpp index cfa9b81e..e0162ac9 100644 --- a/src/workers/query_worker.cpp +++ b/src/workers/query_worker.cpp @@ -27,8 +27,7 @@ #include #include #include -#include -#include +#include #include namespace libbitcoin { @@ -123,17 +122,17 @@ void query_worker::query(zmq::socket& router) // TODO: rewrite the serial blockchain interface to avoid callbacks. // We are using a closure vs. bind to take advantage of move arg syntax. - const auto sender = [&router](outgoing&& response) + const auto sender = [&router](message&& response) { const auto ec = response.send(router); if (ec && ec != error::service_stopped) log::warning(LOG_SERVER) - << "Failed to send query response to " << response.address() - << ec.message(); + << "Failed to send query response to " + << response.route().display() << " " << ec.message(); }; - incoming request; + message request(secure_); const auto ec = request.receive(router); if (ec == error::service_stopped) @@ -142,29 +141,30 @@ void query_worker::query(zmq::socket& router) if (ec) { log::debug(LOG_SERVER) - << "Failed to receive query from " << request.address() - << ec.message(); + << "Failed to receive query from " << request.route().display() + << " " << ec.message(); // Because the query did not parse this is likely to be misaddressed. - sender(outgoing(request, ec)); + sender(message(request, ec)); return; } // Locate the request handler for this command. - const auto handler = command_handlers_.find(request.command); + const auto handler = command_handlers_.find(request.command()); if (handler == command_handlers_.end()) { log::debug(LOG_SERVER) - << "Invalid query command from " << request.address(); + << "Invalid query command from " << request.route().display(); - sender(outgoing(request, error::not_found)); + sender(message(request, error::not_found)); return; } if (settings_.log_requests) log::info(LOG_SERVER) - << "Query " << request.command << " from " << request.address(); + << "Query " << request.command() << " from " + << request.route().display(); // The query executor is the delegate bound by the attach method. const auto& query_execute = handler->second; @@ -190,30 +190,27 @@ void query_worker::attach(const std::string& command, command_handlers_[command] = handler; } +//============================================================================= // TODO: add to client: -// protocol.total_connections // blockchain.fetch_spend // blockchain.fetch_block_transaction_hashes -//------------------------------------------ -// TODO: add to server: -// transaction_radar.subscribe -// electrum.subscribe -// electrum.fetch_history -//------------------------------------------ -// TODO: remove protocol.total_connections (administrative) and -// create administrative query channel (secure only). -// This will require that client public keys be associated to a ZAP domain. -//------------------------------------------ +//============================================================================= // address.fetch_history was present in v1 (obelisk) and v2 (server). // address.fetch_history was called by client v1 (sx) and v2 (bx). -// address.renew was present in v2 (server) and dropped in v3 -// address.subscribe performs renewal (as necessary) in v3 -//------------------------------------------ +//----------------------------------------------------------------------------- +// address.renew is deprecated in v3. +// address.subscribe is deprecated in v3. +// address.subscribe2 is new in v3, also call for renew. +//----------------------------------------------------------------------------- +//// protocol.broadcast_transaction is deprecated in v3 (deferred). +//// transaction_pool.broadcast (with radar) is new in v3 (deferred). +//============================================================================= // Interface class.method names must match protocol (do not change). void query_worker::attach_interface() { - // Queries (request-response). - ////ATTACH(electrum, fetch_history, node_); + ATTACH(address, renew, node_); + ATTACH(address, subscribe, node_); + ATTACH(address, subscribe2, node_); ATTACH(address, fetch_history2, node_); ATTACH(blockchain, fetch_history, node_); ATTACH(blockchain, fetch_block_header, node_); @@ -226,13 +223,9 @@ void query_worker::attach_interface() ATTACH(blockchain, fetch_stealth, node_); ATTACH(transaction_pool, fetch_transaction, node_); ATTACH(transaction_pool, validate, node_); - ATTACH(protocol, total_connections, node_); + ////ATTACH(transaction_pool, broadcast, node_); ATTACH(protocol, broadcast_transaction, node_); - - // Notifications (subscription response with subsequent notifications). - ATTACH(address, subscribe, node_); - ////ATTACH(electrum, subscribe, node_); - ////ATTACH(transaction_radar, subscribe, node_); + ATTACH(protocol, total_connections, node_); } #undef ATTACH From a34de3e22e83fe2febdf9c98026e69f0d6eacd79 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Wed, 20 Jul 2016 13:10:02 -0700 Subject: [PATCH 2/5] Simplify service serializations, return database codes. --- src/interface/blockchain.cpp | 102 +++++++++++++---------------- src/interface/protocol.cpp | 14 ++-- src/interface/transaction_pool.cpp | 17 +++-- src/utility/fetch_helpers.cpp | 17 ++--- 4 files changed, 70 insertions(+), 80 deletions(-) diff --git a/src/interface/blockchain.cpp b/src/interface/blockchain.cpp index 5b903bc7..55ba35f9 100644 --- a/src/interface/blockchain.cpp +++ b/src/interface/blockchain.cpp @@ -96,16 +96,16 @@ void blockchain::fetch_last_height(server_node& node, const message& request, void blockchain::last_height_fetched(const code& ec, size_t last_height, const message& request, send_handler handler) { - BITCOIN_ASSERT(last_height <= bc::max_uint32); + BITCOIN_ASSERT(last_height <= max_uint32); auto last_height32 = static_cast(last_height); - data_chunk result(code_size + sizeof(uint32_t)); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - - serial.write_4_bytes_little_endian(last_height32); - BITCOIN_ASSERT(serial.iterator() == result.end()); + // [ code:4 ] + // [ heigh:4 ] + const auto result = build_chunk( + { + message::to_bytes(ec), + to_little_endian(last_height32) + }); handler(message(request, result)); } @@ -154,18 +154,13 @@ void blockchain::fetch_block_header_by_height(server_node& node, void blockchain::block_header_fetched(const code& ec, const chain::header& block, const message& request, send_handler handler) { - const auto block_size64 = block.serialized_size(false); - BITCOIN_ASSERT_MSG(block_size64 <= max_size_t, "Clearly Bitcoin is dead."); - const auto block_size = static_cast(block_size64); - - data_chunk result(code_size + block_size); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - - data_chunk block_data = block.to_data(false); - serial.write_data(block_data); - BITCOIN_ASSERT(serial.iterator() == result.end()); + // [ code:4 ] + // [ block... ] + const auto result = build_chunk( + { + message::to_bytes(ec), + block.to_data(false) + }); handler(message(request, result)); } @@ -212,16 +207,15 @@ void blockchain::fetch_block_transaction_hashes_by_height(server_node& node, void blockchain::block_transaction_hashes_fetched(const code& ec, const hash_list& hashes, const message& request, send_handler handler) { + // [ code:4 ] + // [[ hash:32 ]...] data_chunk result(code_size + hash_size * hashes.size()); auto serial = make_serializer(result.begin()); serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); for (const auto& tx_hash: hashes) serial.write_hash(tx_hash); - BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(message(request, result)); } @@ -248,19 +242,20 @@ void blockchain::transaction_index_fetched(const code& ec, size_t block_height, size_t index, const message& request, send_handler handler) { BITCOIN_ASSERT(index <= max_uint32); - auto index32 = static_cast(index); - BITCOIN_ASSERT(block_height <= max_uint32); - auto block_height32 = static_cast(block_height); - // error_code (4), block_height (4), index (4) - data_chunk result(code_size + sizeof(uint32_t) + sizeof(uint32_t)); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); + auto index32 = static_cast(index); + auto block_height32 = static_cast(block_height); - serial.write_4_bytes_little_endian(block_height32); - serial.write_4_bytes_little_endian(index32); + // [ code:4 ] + // [ block_height:32 ] + // [ tx_index:4 ] + const auto result = build_chunk( + { + message::to_bytes(ec), + to_little_endian(block_height32), + to_little_endian(index32) + }); handler(message(request, result)); } @@ -291,18 +286,14 @@ void blockchain::fetch_spend(server_node& node, const message& request, void blockchain::spend_fetched(const code& ec, const chain::input_point& inpoint, const message& request, send_handler handler) { - // error_code (4), hash (32), index (4) - const auto inpoint_size64 = inpoint.serialized_size(); - BITCOIN_ASSERT(inpoint_size64 <= max_size_t); - const auto inpoint_size = static_cast(inpoint_size64); - - data_chunk result(code_size + inpoint_size); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - - auto raw_inpoint = inpoint.to_data(); - serial.write_data(raw_inpoint); + // [ code:4 ] + // [ hash:32 ] + // [ index:4 ] + const auto result = build_chunk( + { + message::to_bytes(ec), + inpoint.to_data() + }); handler(message(request, result)); } @@ -331,13 +322,13 @@ void blockchain::block_height_fetched(const code& ec, size_t block_height, BITCOIN_ASSERT(block_height <= max_uint32); auto block_height32 = static_cast(block_height); - // error_code (4), height (4) - data_chunk result(code_size + sizeof(uint32_t)); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - serial.write_4_bytes_little_endian(block_height32); + // [ code:4 ] + // [ height:4 ] + const auto result = build_chunk( + { + message::to_bytes(ec), + to_little_endian(block_height32) + }); handler(message(request, result)); } @@ -381,14 +372,13 @@ void blockchain::stealth_fetched(const code& ec, const stealth_compact::list& stealth_results, const message& request, send_handler handler) { - // [ ephemeral_key_hash:32 ] - // [ address_hash:20 ] - // [ tx_hash:32 ] static constexpr size_t row_size = hash_size + short_hash_size + hash_size; + + // [ code:4 ] + // [[ ephemeral_key_hash:32 ][ address_hash:20 ][ tx_hash:32 ]...] data_chunk result(code_size + row_size * stealth_results.size()); auto serial = make_serializer(result.begin()); serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); for (const auto& row: stealth_results) { diff --git a/src/interface/protocol.cpp b/src/interface/protocol.cpp index 80d3f2bb..c9a67763 100644 --- a/src/interface/protocol.cpp +++ b/src/interface/protocol.cpp @@ -76,13 +76,13 @@ void protocol::handle_total_connections(size_t count, const message& request, BITCOIN_ASSERT(count <= max_uint32); const auto total_connections = static_cast(count); - data_chunk result(code_size + sizeof(uint32_t)); - auto serial = make_serializer(result.begin()); - serial.write_error_code(error::success); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - - serial.write_4_bytes_little_endian(total_connections); - BITCOIN_ASSERT(serial.iterator() == result.end()); + // [ code:4 ] + // [ connections:4 ] + const auto result = build_chunk( + { + message::to_bytes(error::success), + to_little_endian(total_connections) + }); handler(message(request, result)); } diff --git a/src/interface/transaction_pool.cpp b/src/interface/transaction_pool.cpp index ef7612a5..fc32bbd8 100644 --- a/src/interface/transaction_pool.cpp +++ b/src/interface/transaction_pool.cpp @@ -56,10 +56,20 @@ void transaction_pool::fetch_transaction(server_node& node, void transaction_pool::broadcast(server_node& node, const message& request, send_handler handler) { + transaction tx; + + if (!tx.from_data(request.data())) + { + handler(message(request, error::bad_stream)); + return; + } + // TODO: conditionally subscribe to penetration notifications. // TODO: broadcast transaction to receiving peers. + handler(message(request, error::operation_failed)); } +// NOTE: the format of this response changed in v3 (send only code on error). void transaction_pool::validate(server_node& node, const message& request, send_handler handler) { @@ -67,8 +77,6 @@ void transaction_pool::validate(server_node& node, const message& request, if (!tx.from_data(request.data())) { - // NOTE: the format of this response changed in v3 (send only code). - // This is our standard behavior and should not break clients. handler(message(request, error::bad_stream)); return; } @@ -82,10 +90,11 @@ void transaction_pool::handle_validated(const code& ec, const transaction& tx, const hash_digest& tx_hash, const point::indexes& unconfirmed, const message& request, send_handler handler) { + // [ code:4 ] + // [[ unconfirmed_index:4 ]...] data_chunk result(code_size + unconfirmed.size() * index_size); auto serial = make_serializer(result.begin()); serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); for (const auto unconfirmed_index: unconfirmed) { @@ -94,8 +103,6 @@ void transaction_pool::handle_validated(const code& ec, const transaction& tx, serial.write_4_bytes_little_endian(index32); } - BITCOIN_ASSERT(serial.iterator() == result.end()); - handler(message(request, result)); } diff --git a/src/utility/fetch_helpers.cpp b/src/utility/fetch_helpers.cpp index 314a382a..dbcde339 100644 --- a/src/utility/fetch_helpers.cpp +++ b/src/utility/fetch_helpers.cpp @@ -108,18 +108,11 @@ bool unwrap_fetch_transaction_args(hash_digest& hash, void transaction_fetched(const code& ec, const chain::transaction& tx, const message& request, send_handler handler) { - const auto tx_size64 = tx.serialized_size(); - BITCOIN_ASSERT(tx_size64 <= max_size_t); - const auto tx_size = static_cast(tx_size64); - - data_chunk result(code_size + tx_size); - auto serial = make_serializer(result.begin()); - serial.write_error_code(ec); - BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size); - - data_chunk tx_data = tx.to_data(); - serial.write_data(tx_data); - BITCOIN_ASSERT(serial.iterator() == result.end()); + const auto result = build_chunk( + { + message::to_bytes(ec), + tx.to_data() + }); handler(message(request, result)); } From ac412ce96598237e04d6edebb90459be245960b3 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Wed, 20 Jul 2016 15:05:14 -0700 Subject: [PATCH 3/5] Remove dead code. --- console/executor.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/console/executor.cpp b/console/executor.cpp index 247ed86e..4f38853f 100644 --- a/console/executor.cpp +++ b/console/executor.cpp @@ -20,7 +20,6 @@ #include "executor.hpp" #include -#include #include #include #include @@ -28,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -37,9 +35,7 @@ namespace libbitcoin { namespace server { using boost::format; -using namespace std::chrono; using namespace std::placeholders; -using namespace std::this_thread; using namespace boost::system; using namespace bc::config; using namespace bc::database; From ae834d2fe9093959dd0e7bb208c0796334094d93 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 22 Jul 2016 23:24:58 -0700 Subject: [PATCH 4/5] Integrate notification subscription. --- Makefile.am | 2 + .../libbitcoin-server.vcxproj | 2 + .../libbitcoin-server.vcxproj.filters | 6 + include/bitcoin/server.hpp | 1 + include/bitcoin/server/messages/route.hpp | 25 +- include/bitcoin/server/server_node.hpp | 4 +- .../bitcoin/server/utility/address_key.hpp | 69 ++++ .../server/workers/notification_worker.hpp | 28 +- src/address_key.cpp | 51 +++ src/interface/address.cpp | 4 +- src/messages/route.cpp | 7 + src/server_node.cpp | 14 +- src/workers/notification_worker.cpp | 346 +++--------------- 13 files changed, 240 insertions(+), 319 deletions(-) create mode 100644 include/bitcoin/server/utility/address_key.hpp create mode 100644 src/address_key.cpp diff --git a/Makefile.am b/Makefile.am index d5da86c5..4e1450e3 100644 --- a/Makefile.am +++ b/Makefile.am @@ -35,6 +35,7 @@ lib_LTLIBRARIES = src/libbitcoin-server.la src_libbitcoin_server_la_CPPFLAGS = -I${srcdir}/include -DSYSCONFDIR=\"${sysconfdir}\" ${bitcoin_protocol_CPPFLAGS} ${bitcoin_node_CPPFLAGS} src_libbitcoin_server_la_LIBADD = ${bitcoin_protocol_LIBS} ${bitcoin_node_LIBS} src_libbitcoin_server_la_SOURCES = \ + src/address_key.cpp \ src/configuration.cpp \ src/parser.cpp \ src/server_node.cpp \ @@ -120,6 +121,7 @@ include_bitcoin_server_services_HEADERS = \ include_bitcoin_server_utilitydir = ${includedir}/bitcoin/server/utility include_bitcoin_server_utility_HEADERS = \ + include/bitcoin/server/utility/address_key.hpp \ include/bitcoin/server/utility/authenticator.hpp \ include/bitcoin/server/utility/fetch_helpers.hpp diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj index a5ea578d..42e05235 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj @@ -87,6 +87,7 @@ + @@ -95,6 +96,7 @@ + diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters index 77e05d79..b3f1ae1a 100644 --- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -112,6 +112,9 @@ include\bitcoin\server\messages + + include\bitcoin\server\utility + @@ -168,6 +171,9 @@ src\messages + + src\utility + diff --git a/include/bitcoin/server.hpp b/include/bitcoin/server.hpp index 4dbcd7b2..8de8d11b 100644 --- a/include/bitcoin/server.hpp +++ b/include/bitcoin/server.hpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include diff --git a/include/bitcoin/server/messages/route.hpp b/include/bitcoin/server/messages/route.hpp index 3bc69a04..49a6db4c 100644 --- a/include/bitcoin/server/messages/route.hpp +++ b/include/bitcoin/server/messages/route.hpp @@ -20,7 +20,10 @@ #ifndef LIBBITCOIN_SERVER_ROUTE #define LIBBITCOIN_SERVER_ROUTE -#include +#include +#include +#include +#include #include namespace libbitcoin { @@ -37,6 +40,9 @@ class BCS_API route /// A printable address for logging only. std::string display() const; + /// Equality operator. + bool operator==(const route& other) const; + /// The message requires a secure port. bool secure; @@ -53,4 +59,21 @@ class BCS_API route } // namespace server } // namespace libbitcoin +namespace std +{ + template<> + struct hash + { + size_t operator()(const bc::server::route& value) const + { + size_t seed = 0; + boost::hash_combine(seed, value.secure); + boost::hash_combine(seed, value.delimited); + boost::hash_combine(seed, value.address1); + boost::hash_combine(seed, value.address2); + return seed; + } + }; +} // namespace std + #endif diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp index 739a569b..c963b267 100644 --- a/include/bitcoin/server/server_node.hpp +++ b/include/bitcoin/server/server_node.hpp @@ -85,11 +85,11 @@ class BCS_API server_node /// Subscribe to address (including stealth) prefix notifications. /// Stealth prefix is limited to 32 bits, address prefix to 256 bits. - virtual void subscribe_address(const route& reply_to, + virtual void subscribe_address(const route& reply_to, uint32_t id, const binary& prefix_filter, chain::subscribe_type type); /// Subscribe to transaction penetration notifications. - virtual void subscribe_penetration(const route& reply_to, + virtual void subscribe_penetration(const route& reply_to, uint32_t id, const hash_digest& tx_hash); private: diff --git a/include/bitcoin/server/utility/address_key.hpp b/include/bitcoin/server/utility/address_key.hpp new file mode 100644 index 00000000..52e8a221 --- /dev/null +++ b/include/bitcoin/server/utility/address_key.hpp @@ -0,0 +1,69 @@ +/** + * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin-server. + * + * libbitcoin-server is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License with + * additional permissions to the one published by the Free Software + * Foundation, either version 3 of the License, or (at your option) + * any later version. For more information see LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef LIBBITCOIN_SERVER_ADDRESS_KEY_HPP +#define LIBBITCOIN_SERVER_ADDRESS_KEY_HPP + +#include +#include +#include +#include +#include + +namespace libbitcoin { +namespace server { + +class BCS_API address_key +{ +public: + address_key(const route& reply_to, const binary& prefix_filter); + bool operator==(const address_key& other) const; + const route& reply_to() const; + const binary& prefix_filter() const; + +private: + const route& reply_to_; + const binary& prefix_filter_; +}; + +} // namespace server +} // namespace libbitcoin + +namespace std +{ + template<> + struct hash + { + size_t operator()(const bc::server::address_key& value) const + { + // boost::hash_combine uses boost::hash declarations., but these + // are defined as std::hash (for use with std::map). So we must + // explicity perform the hash operation before combining. + const auto to = std::hash()(value.reply_to()); + const auto filter = std::hash()(value.prefix_filter()); + + size_t seed = 0; + boost::hash_combine(seed, to); + boost::hash_combine(seed, filter); + return seed; + } + }; +} // namespace std + +#endif diff --git a/include/bitcoin/server/workers/notification_worker.hpp b/include/bitcoin/server/workers/notification_worker.hpp index 14996417..44c84189 100644 --- a/include/bitcoin/server/workers/notification_worker.hpp +++ b/include/bitcoin/server/workers/notification_worker.hpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace libbitcoin { namespace server { @@ -52,11 +53,11 @@ class BCS_API notification_worker bool stop() override; /// Subscribe to address and stealth prefix notifications. - virtual void subscribe_address(const route& reply_to, + virtual void subscribe_address(const route& reply_to, uint32_t id, const binary& prefix_filter, chain::subscribe_type type); /// Subscribe to transaction penetration notifications. - virtual void subscribe_penetration(const route& reply_to, + virtual void subscribe_penetration(const route& reply_to, uint32_t id, const hash_digest& tx_hash); protected: @@ -71,18 +72,21 @@ class BCS_API notification_worker private: typedef chain::block::ptr_list block_list; typedef chain::point::indexes index_list; - - typedef resubscriber payment_subscriber; - typedef resubscriber stealth_subscriber; - typedef resubscriber sequence_ptr; + + typedef notifier payment_subscriber; + typedef notifier stealth_subscriber; + typedef notifier address_subscriber; - typedef resubscriber penetration_subscriber; + typedef notifier penetration_subscriber; // Remove expired subscriptions. - void prune(); + void purge(); + int32_t purge_interval_milliseconds() const; bool handle_blockchain_reorganization(const code& ec, uint64_t fork_point, const block_list& new_blocks, const block_list&); @@ -133,7 +137,7 @@ class BCS_API notification_worker bool handle_address(const code& ec, const binary& field, uint32_t height, const hash_digest& block_hash, const chain::transaction& tx, const route& reply_to, uint32_t id, const binary& prefix_filter, - std::shared_ptr sequence); + sequence_ptr sequence); const bool secure_; const server::settings& settings_; diff --git a/src/address_key.cpp b/src/address_key.cpp new file mode 100644 index 00000000..dec7b313 --- /dev/null +++ b/src/address_key.cpp @@ -0,0 +1,51 @@ +/** + * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin-server. + * + * libbitcoin-server is free software: you can redistribute it and/or + * modify it under the terms of the GNU Affero General Public License with + * additional permissions to the one published by the Free Software + * Foundation, either version 3 of the License, or (at your option) + * any later version. For more information see LICENSE. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include +#include + +namespace libbitcoin { +namespace server { + +address_key::address_key(const route& reply_to, const binary& prefix_filter) + : reply_to_(reply_to), prefix_filter_(prefix_filter) +{ +} + +bool address_key::operator==(const address_key& other) const +{ + return reply_to_ == other.reply_to_ && + prefix_filter_ == other.prefix_filter_; +} + +const route& address_key::reply_to() const +{ + return reply_to_; +} + +const binary& address_key::prefix_filter() const +{ + return prefix_filter_; +} + +} // namespace server +} // namespace libbitcoin diff --git a/src/interface/address.cpp b/src/interface/address.cpp index 8969a1f8..4350b3dd 100644 --- a/src/interface/address.cpp +++ b/src/interface/address.cpp @@ -72,7 +72,7 @@ void address::subscribe(server_node& node, const message& request, return; } - node.subscribe_address(request.route(), prefix_filter, type); + node.subscribe_address(request.route(), request.id(), prefix_filter, type); handler(message(request, error::success)); } @@ -128,7 +128,7 @@ void address::subscribe2(server_node& node, const message& request, return; } - node.subscribe_address(request.route(), prefix_filter, type); + node.subscribe_address(request.route(), request.id(), prefix_filter, type); handler(message(request, error::success)); } diff --git a/src/messages/route.cpp b/src/messages/route.cpp index 86fc843f..9416593c 100644 --- a/src/messages/route.cpp +++ b/src/messages/route.cpp @@ -20,6 +20,7 @@ #include #include +#include #include namespace libbitcoin { @@ -35,5 +36,11 @@ std::string route::display() const return "[" + encode_base16(address1) + ":" + encode_base16(address2) + "]"; } +bool route::operator==(const route& other) const +{ + return secure == other.secure && delimited == other.delimited && + address1 == other.address1 && address2 == other.address2; +} + } // namespace server } // namespace libbitcoin diff --git a/src/server_node.cpp b/src/server_node.cpp index 6d21986f..c3ad1915 100644 --- a/src/server_node.cpp +++ b/src/server_node.cpp @@ -121,25 +121,27 @@ bool server_node::close() // ---------------------------------------------------------------------------- // Subscribe to address (including stealth) prefix notifications. -void server_node::subscribe_address(const route& reply_to, +void server_node::subscribe_address(const route& reply_to, uint32_t id, const binary& prefix_filter, subscribe_type type) { if (reply_to.secure) secure_notification_worker_ - .subscribe_address(reply_to, prefix_filter, type); + .subscribe_address(reply_to, id, prefix_filter, type); else public_notification_worker_ - .subscribe_address(reply_to, prefix_filter, type); + .subscribe_address(reply_to, id, prefix_filter, type); } // Subscribe to transaction penetration notifications. -void server_node::subscribe_penetration(const route& reply_to, +void server_node::subscribe_penetration(const route& reply_to, uint32_t id, const hash_digest& tx_hash) { if (reply_to.secure) - secure_notification_worker_.subscribe_penetration(reply_to, tx_hash); + secure_notification_worker_ + .subscribe_penetration(reply_to, id, tx_hash); else - public_notification_worker_.subscribe_penetration(reply_to, tx_hash); + public_notification_worker_ + .subscribe_penetration(reply_to, id, tx_hash); } // Services. diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp index 785e1b64..e24d8b9e 100644 --- a/src/workers/notification_worker.cpp +++ b/src/workers/notification_worker.cpp @@ -19,6 +19,7 @@ */ #include +#include #include #include #include @@ -41,9 +42,14 @@ using namespace bc::chain; using namespace bc::protocol; using namespace bc::wallet; -static const std::string address_update = "address.update"; -static const std::string address_update2 = "address.update2"; -static const std::string address_stealth = "address.stealth_update"; +// Purge subscriptions at 10% of the expiration period. +static constexpr int64_t purge_interval_ratio = 10; + +// Notifications respond with commands that are distinct from the subscription. +static const std::string address_update("address.update"); +static const std::string address_stealth("address.stealth_update"); +static const std::string address_update2("address.update2"); +static const std::string penetration_update("penetration.update"); notification_worker::notification_worker(zmq::authenticator& authenticator, server_node& node, bool secure) @@ -99,17 +105,17 @@ bool notification_worker::stop() // v2/v3 (deprecated) payment_subscriber_->stop(); - payment_subscriber_->do_relay(code, {}, 0, {}, {}); + payment_subscriber_->invoke(code, {}, 0, {}, {}); stealth_subscriber_->stop(); - stealth_subscriber_->do_relay(code, 0, 0, {}, {}); + stealth_subscriber_->invoke(code, 0, 0, {}, {}); // v3 address_subscriber_->stop(); - address_subscriber_->do_relay(code, {}, 0, {}, {}); + address_subscriber_->invoke(code, {}, 0, {}, {}); penetration_subscriber_->stop(); - penetration_subscriber_->do_relay(code, 0, {}, {}); + penetration_subscriber_->invoke(code, 0, {}, {}); return zmq::worker::stop(); } @@ -126,19 +132,28 @@ void notification_worker::work() zmq::poller poller; poller.add(router); + const auto interval = purge_interval_milliseconds(); // We do not send/receive on the poller, we use its timer and context stop. // Other threads connect and disconnect dynamically to send updates. while (!poller.terminated() && !stopped()) { - poller.wait(); - prune(); + poller.wait(interval); + purge(); } // Disconnect the socket and exit this thread. finished(disconnect(router)); } +int32_t notification_worker::purge_interval_milliseconds() const +{ + const int64_t minutes = settings_.subscription_expiration_minutes; + const int64_t milliseconds = minutes * 60 * 1000 / purge_interval_ratio; + const auto capped = std::max(milliseconds, static_cast(max_int32)); + return static_cast(capped); +} + // Connect/Disconnect. //----------------------------------------------------------------------------- @@ -180,25 +195,22 @@ bool notification_worker::disconnect(socket& router) // ---------------------------------------------------------------------------- // Signal expired subscriptions to self-remove. -void notification_worker::prune() +void notification_worker::purge() { static const auto code = error::channel_timeout; // v2/v3 (deprecated) - payment_subscriber_->relay(code, {}, 0, {}, {}); - stealth_subscriber_->relay(code, 0, 0, {}, {}); + payment_subscriber_->purge(code, {}, 0, {}, {}); + stealth_subscriber_->purge(code, 0, 0, {}, {}); // v3 - address_subscriber_->relay(code, {}, 0, {}, {}); - penetration_subscriber_->relay(code, 0, {}, {}); + address_subscriber_->purge(code, {}, 0, {}, {}); + penetration_subscriber_->purge(code, 0, {}, {}); } // Sending. // ---------------------------------------------------------------------------- -//***************************************************************************** -// TODO: the notify_dealer must be connected to the query_service router. -//***************************************************************************** void notification_worker::send(const route& reply_to, const std::string& command, uint32_t id, const data_chunk& payload) { @@ -299,11 +311,6 @@ bool notification_worker::handle_payment(const code& ec, const hash_digest& block_hash, const chain::transaction& tx, const route& reply_to, uint32_t id, const binary& prefix_filter) { - if (ec == error::channel_timeout && true /* not expired */) - { - return true; - } - if (ec) { send(reply_to, address_update, id, message::to_bytes(ec)); @@ -321,11 +328,6 @@ bool notification_worker::handle_stealth(const code& ec, const chain::transaction& tx, const route& reply_to, uint32_t id, const binary& prefix_filter) { - if (ec == error::channel_timeout && true /* not expired */) - { - return true; - } - if (ec) { send(reply_to, address_stealth, id, message::to_bytes(ec)); @@ -341,13 +343,8 @@ bool notification_worker::handle_stealth(const code& ec, bool notification_worker::handle_address(const code& ec, const binary& field, uint32_t height, const hash_digest& block_hash, const chain::transaction& tx, const route& reply_to, uint32_t id, - const binary& prefix_filter, std::shared_ptr sequence) + const binary& prefix_filter, sequence_ptr sequence) { - if (ec == error::channel_timeout && true /* not expired */) - { - return true; - } - if (ec) { send(reply_to, address_update2, id, message::to_bytes(ec)); @@ -355,7 +352,10 @@ bool notification_worker::handle_address(const code& ec, } if (prefix_filter.is_prefix_of(field)) + { send_address(reply_to, id, *sequence, height, block_hash, tx); + ++(*sequence); + } return true; } @@ -365,32 +365,38 @@ bool notification_worker::handle_address(const code& ec, // Subscribe to address and stealth prefix notifications. // Each delegate must connect to the appropriate query notification endpoint. -void notification_worker::subscribe_address(const route& reply_to, +void notification_worker::subscribe_address(const route& reply_to, uint32_t id, const binary& prefix_filter, subscribe_type type) { - static const auto code = error::channel_stopped; + static const auto error_code = error::channel_stopped; + const auto& duration = settings_.subscription_expiration(); + const address_key key(reply_to, prefix_filter); switch (type) { // v2/v3 (deprecated) case subscribe_type::payment: { + // This class must be kept in scope until work is terminated. const auto handler = std::bind(¬ification_worker::handle_payment, - this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter); + this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter); - payment_subscriber_->subscribe(handler, code, {}, 0, {}, {}); + payment_subscriber_->subscribe(handler, key, duration, error_code, + {}, 0, {}, {}); break; } // v2/v3 (deprecated) case subscribe_type::stealth: { + // This class must be kept in scope until work is terminated. const auto handler = std::bind(¬ification_worker::handle_stealth, - this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter); + this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter); - stealth_subscriber_->subscribe(handler, code, 0, 0, {}, {}); + stealth_subscriber_->subscribe(handler, key, duration, error_code, + 0, 0, {}, {}); break; } @@ -401,13 +407,15 @@ void notification_worker::subscribe_address(const route& reply_to, // The sequence enables the client to detect dropped messages. const auto sequence = std::make_shared(0); + // This class must be kept in scope until work is terminated. const auto handler = std::bind(¬ification_worker::handle_address, - this, _1, _2, _3, _4, _5, reply_to, 0, prefix_filter, + this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter, sequence); // v3 - address_subscriber_->subscribe(handler, code, {}, 0, {}, {}); + address_subscriber_->subscribe(handler, key, duration, error_code, + {}, 0, {}, {}); break; } } @@ -416,7 +424,7 @@ void notification_worker::subscribe_address(const route& reply_to, // Subscribe to transaction penetration notifications. // Each delegate must connect to the appropriate query notification endpoint. void notification_worker::subscribe_penetration(const route& reply_to, - const hash_digest& tx_hash) + uint32_t id, const hash_digest& tx_hash) { // TODO: // Height and hash are zeroized if tx is not chained (inv/mempool). @@ -650,259 +658,5 @@ void notification_worker::notify_penetration(uint32_t height, penetration_subscriber_->relay(code, height, block_hash, tx_hash); } -// reference -// ---------------------------------------------------------------------------- - -////void notification_worker::subscribe(const incoming& request, send_handler handler) -////{ -//// const auto ec = create(request, handler); -//// -//// // Send response. -//// handler(outgoing(request, ec)); -////} -//// -////// Create new subscription entry. -////code notification_worker::create(const incoming& request, send_handler handler) -////{ -//// subscription_record record; -//// -//// if (!deserialize(record.prefix, record.type, request.data)) -//// return error::bad_stream; -//// -//// record.expiry_time = now() + settings_.subscription_expiration(); -//// record.locator.handler = std::move(handler); -//// record.locator.address1 = request.address1; -//// record.locator.address2 = request.address2; -//// record.locator.delimited = request.delimited; -//// -//// /////////////////////////////////////////////////////////////////////////// -//// // Critical Section -//// mutex_.lock_upgrade(); -//// -//// if (subscriptions_.size() >= settings_.subscription_limit) -//// { -//// mutex_.unlock_upgrade(); -//// //--------------------------------------------------------------------- -//// return error::pool_filled; -//// } -//// -//// mutex_.unlock_upgrade_and_lock(); -//// //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -//// subscriptions_.emplace_back(record); -//// -//// mutex_.unlock(); -//// /////////////////////////////////////////////////////////////////////////// -//// -//// // This is the end of the subscribe sequence. -//// return error::success; -////} -//// -////// Renew sequence. -////// ---------------------------------------------------------------------------- -//// -////void notification_worker::renew(const incoming& request, send_handler handler) -////{ -//// const auto ec = update(request, handler); -//// -//// // Send response. -//// handler(outgoing(request, error::success)); -////} -//// -////// Find subscription record and update expiration. -////code notification_worker::update(const incoming& request, send_handler handler) -////{ -//// binary prefix; -//// subscribe_type type; -//// -//// if (!deserialize(prefix, type, request.data)) -//// return error::bad_stream; -//// -//// /////////////////////////////////////////////////////////////////////////// -//// // Critical Section -//// mutex_.lock_upgrade(); -//// -//// const auto expiry_time = now() + settings_.subscription_expiration(); -//// -//// for (auto& subscription: subscriptions_) -//// { -//// // TODO: is address1 correct and sufficient? -//// if (subscription.type != type || -//// subscription.locator.address1 != request.address1 || -//// !subscription.prefix.is_prefix_of(prefix)) -//// { -//// continue; -//// } -//// -//// mutex_.unlock_upgrade_and_lock(); -//// //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -//// subscription.expiry_time = expiry_time; -//// //--------------------------------------------------------------------- -//// mutex_.unlock_and_lock_upgrade(); -//// } -//// -//// mutex_.unlock_upgrade(); -//// /////////////////////////////////////////////////////////////////////////// -//// -//// // This is the end of the renew sequence. -//// return error::success; -////} -//// -////// Pruning sequence. -////// ---------------------------------------------------------------------------- -//// -////// Delete entries that have expired. -////size_t notification_worker::prune() -////{ -//// /////////////////////////////////////////////////////////////////////////// -//// // Critical Section -//// unique_lock(mutex_); -//// -//// const auto current_time = now(); -//// const auto start_size = subscriptions_.size(); -//// -//// for (auto it = subscriptions_.begin(); it != subscriptions_.end();) -//// { -//// if (current_time < it->expiry_time) -//// ++it; -//// else -//// it = subscriptions_.erase(it); -//// } -//// -//// // This is the end of the pruning sequence. -//// return start_size - subscriptions_.size(); -//// /////////////////////////////////////////////////////////////////////////// -////} -//// -////// Scan sequence. -////// ---------------------------------------------------------------------------- -//// -////void notification_worker::receive_block(uint32_t height, const block::ptr block) -////{ -//// const auto hash = block->header.hash(); -//// -//// for (const auto& transaction: block->transactions) -//// scan(height, hash, transaction); -//// -//// prune(); -////} -//// -////void notification_worker::receive_transaction(const transaction& transaction) -////{ -//// scan(0, null_hash, transaction); -////} -//// -////void notification_worker::scan(uint32_t height, const hash_digest& block_hash, -//// const transaction& tx) -////{ -//// for (const auto& input: tx.inputs) -//// { -//// const auto address = payment_address::extract(input.script); -//// -//// if (address) -//// post_updates(address, height, block_hash, tx); -//// } -//// -//// uint32_t prefix; -//// for (const auto& output: tx.outputs) -//// { -//// const auto address = payment_address::extract(output.script); -//// -//// if (address) -//// post_updates(address, height, block_hash, tx); -//// else if (to_stealth_prefix(prefix, output.script)) -//// post_stealth_updates(prefix, height, block_hash, tx); -//// } -////} -//// -////void notification_worker::post_updates(const payment_address& address, -//// uint32_t height, const hash_digest& block_hash, const transaction& tx) -////{ -//// subscription_locators locators; -//// -//// /////////////////////////////////////////////////////////////////////////// -//// // Critical Section -//// mutex_.lock_shared(); -//// -//// for (const auto& subscription: subscriptions_) -//// if (subscription.type == subscribe_type::address && -//// subscription.prefix.is_prefix_of(address.hash())) -//// locators.push_back(subscription.locator); -//// -//// mutex_.unlock_shared(); -//// /////////////////////////////////////////////////////////////////////////// -//// -//// if (locators.empty()) -//// return; -//// -//// // [ address.version:1 ] -//// // [ address.hash:20 ] -//// // [ height:4 ] -//// // [ block_hash:32 ] -//// // [ tx ] -//// const auto data = build_chunk( -//// { -//// to_array(address.version()), -//// address.hash(), -//// to_little_endian(height), -//// block_hash, -//// tx.to_data() -//// }); -//// -//// // Send the result to everyone interested. -//// for (const auto& locator: locators) -//// locator.handler(outgoing("address.update", data, locator.address1, -//// locator.address2, locator.delimited)); -//// -//// // This is the end of the scan address sequence. -////} -//// -////void notification_worker::post_stealth_updates(uint32_t prefix, uint32_t height, -//// const hash_digest& block_hash, const transaction& tx) -////{ -//// subscription_locators locators; -//// -//// /////////////////////////////////////////////////////////////////////////// -//// // Critical Section -//// mutex_.lock_shared(); -//// -//// for (const auto& subscription: subscriptions_) -//// if (subscription.type == subscribe_type::stealth && -//// subscription.prefix.is_prefix_of(prefix)) -//// locators.push_back(subscription.locator); -//// -//// mutex_.unlock_shared(); -//// /////////////////////////////////////////////////////////////////////////// -//// -//// if (locators.empty()) -//// return; -//// -//// // [ prefix:4 ] -//// // [ height:4 ] -//// // [ block_hash:32 ] -//// // [ tx ] -//// const auto data = build_chunk( -//// { -//// to_little_endian(prefix), -//// to_little_endian(height), -//// block_hash, -//// tx.to_data() -//// }); -//// -//// // Send the result to everyone interested. -//// for (const auto& locator: locators) -//// locator.handler(outgoing("address.stealth_update", data, -//// locator.address1, locator.address2, locator.delimited)); -//// -//// // This is the end of the scan stealth sequence. -////} -//// -////// Utilities -////// ---------------------------------------------------------------------------- -//// -////ptime notification_worker::now() -////{ -//// return second_clock::universal_time(); -////}; - } // namespace server } // namespace libbitcoin From 6545c24cc7a3154a39b8af19defb43ad53b786dd Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 23 Jul 2016 15:14:58 -0700 Subject: [PATCH 5/5] Add address.unsubscribe2. --- include/bitcoin/server/interface/address.hpp | 4 ++++ src/interface/address.cpp | 18 ++++++++++++++++++ src/workers/notification_worker.cpp | 15 ++++++++++++++- src/workers/query_worker.cpp | 2 ++ 4 files changed, 38 insertions(+), 1 deletion(-) diff --git a/include/bitcoin/server/interface/address.hpp b/include/bitcoin/server/interface/address.hpp index a4a79614..6fee37bf 100644 --- a/include/bitcoin/server/interface/address.hpp +++ b/include/bitcoin/server/interface/address.hpp @@ -48,6 +48,10 @@ class BCS_API address static void subscribe2(server_node& node, const message& request, send_handler handler); + /// Unsubscribe to payment and stealth address notifications by prefix. + static void unsubscribe2(server_node& node, const message& request, + send_handler handler); + private: static bool unwrap_subscribe_args(binary& prefix_filter, chain::subscribe_type& type, const message& request); diff --git a/src/interface/address.cpp b/src/interface/address.cpp index 4350b3dd..0a7f7de8 100644 --- a/src/interface/address.cpp +++ b/src/interface/address.cpp @@ -132,6 +132,24 @@ void address::subscribe2(server_node& node, const message& request, handler(message(request, error::success)); } +// v3 adds unsubscribe2, which we map to subscription_type 'unsubscribe'. +void address::unsubscribe2(server_node& node, const message& request, + send_handler handler) +{ + static constexpr auto type = subscribe_type::unsubscribe; + + binary prefix_filter; + + if (!unwrap_subscribe2_args(prefix_filter, request)) + { + handler(message(request, error::bad_stream)); + return; + } + + node.subscribe_address(request.route(), request.id(), prefix_filter, type); + handler(message(request, error::success)); +} + bool address::unwrap_subscribe2_args(binary& prefix_filter, const message& request) { diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp index e24d8b9e..d7880623 100644 --- a/src/workers/notification_worker.cpp +++ b/src/workers/notification_worker.cpp @@ -401,7 +401,6 @@ void notification_worker::subscribe_address(const route& reply_to, uint32_t id, } // v3 - default: case subscribe_type::unspecified: { // The sequence enables the client to detect dropped messages. @@ -418,6 +417,20 @@ void notification_worker::subscribe_address(const route& reply_to, uint32_t id, {}, 0, {}, {}); break; } + + // v3 + default: + case subscribe_type::unsubscribe: + { + // Just as with an expiration (purge) this will cause the stored + // handler (notification_worker::handle_address) to be invoked but + // with the specified error code (error::channel_stopped) as + // opposed to error::channel_timeout. + + // v3 + address_subscriber_->unsubscribe(key, error_code, {}, 0, {}, {}); + break; + } } } diff --git a/src/workers/query_worker.cpp b/src/workers/query_worker.cpp index e0162ac9..c5a2d2ce 100644 --- a/src/workers/query_worker.cpp +++ b/src/workers/query_worker.cpp @@ -201,6 +201,7 @@ void query_worker::attach(const std::string& command, // address.renew is deprecated in v3. // address.subscribe is deprecated in v3. // address.subscribe2 is new in v3, also call for renew. +// address.unsubscribe2 is new in v3 (there was never an address.unsubscribe). //----------------------------------------------------------------------------- //// protocol.broadcast_transaction is deprecated in v3 (deferred). //// transaction_pool.broadcast (with radar) is new in v3 (deferred). @@ -211,6 +212,7 @@ void query_worker::attach_interface() ATTACH(address, renew, node_); ATTACH(address, subscribe, node_); ATTACH(address, subscribe2, node_); + ATTACH(address, unsubscribe2, node_); ATTACH(address, fetch_history2, node_); ATTACH(blockchain, fetch_history, node_); ATTACH(blockchain, fetch_block_header, node_);