diff --git a/Makefile.am b/Makefile.am
index 251f365a..4e1450e3 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -35,6 +35,7 @@ lib_LTLIBRARIES = src/libbitcoin-server.la
src_libbitcoin_server_la_CPPFLAGS = -I${srcdir}/include -DSYSCONFDIR=\"${sysconfdir}\" ${bitcoin_protocol_CPPFLAGS} ${bitcoin_node_CPPFLAGS}
src_libbitcoin_server_la_LIBADD = ${bitcoin_protocol_LIBS} ${bitcoin_node_LIBS}
src_libbitcoin_server_la_SOURCES = \
+ src/address_key.cpp \
src/configuration.cpp \
src/parser.cpp \
src/server_node.cpp \
@@ -43,8 +44,8 @@ src_libbitcoin_server_la_SOURCES = \
src/interface/blockchain.cpp \
src/interface/protocol.cpp \
src/interface/transaction_pool.cpp \
- src/messages/incoming.cpp \
- src/messages/outgoing.cpp \
+ src/messages/message.cpp \
+ src/messages/route.cpp \
src/services/block_service.cpp \
src/services/heartbeat_service.cpp \
src/services/query_service.cpp \
@@ -108,8 +109,8 @@ include_bitcoin_server_interface_HEADERS = \
include_bitcoin_server_messagesdir = ${includedir}/bitcoin/server/messages
include_bitcoin_server_messages_HEADERS = \
- include/bitcoin/server/messages/incoming.hpp \
- include/bitcoin/server/messages/outgoing.hpp
+ include/bitcoin/server/messages/message.hpp \
+ include/bitcoin/server/messages/route.hpp
include_bitcoin_server_servicesdir = ${includedir}/bitcoin/server/services
include_bitcoin_server_services_HEADERS = \
@@ -120,6 +121,7 @@ include_bitcoin_server_services_HEADERS = \
include_bitcoin_server_utilitydir = ${includedir}/bitcoin/server/utility
include_bitcoin_server_utility_HEADERS = \
+ include/bitcoin/server/utility/address_key.hpp \
include/bitcoin/server/utility/authenticator.hpp \
include/bitcoin/server/utility/fetch_helpers.hpp
diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj
index 59653c88..42e05235 100644
--- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj
+++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj
@@ -78,8 +78,8 @@
-
-
+
+
@@ -87,6 +87,7 @@
+
@@ -95,13 +96,14 @@
+
-
-
+
+
diff --git a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters
index 5b722c19..b3f1ae1a 100644
--- a/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters
+++ b/builds/msvc/vs2013/libbitcoin-server/libbitcoin-server.vcxproj.filters
@@ -82,12 +82,6 @@
include\bitcoin\server\interface
-
- include\bitcoin\server\messages
-
-
- include\bitcoin\server\messages
-
include\bitcoin\server\workers
@@ -112,6 +106,15 @@
include\bitcoin\server\workers
+
+ include\bitcoin\server\messages
+
+
+ include\bitcoin\server\messages
+
+
+ include\bitcoin\server\utility
+
@@ -138,12 +141,6 @@
src
-
- src\messages
-
-
- src\messages
-
src\utility
@@ -168,6 +165,15 @@
src\workers
+
+ src\messages
+
+
+ src\messages
+
+
+ src\utility
+
diff --git a/console/executor.cpp b/console/executor.cpp
index 247ed86e..4f38853f 100644
--- a/console/executor.cpp
+++ b/console/executor.cpp
@@ -20,7 +20,6 @@
#include "executor.hpp"
#include
-#include
#include
#include
#include
@@ -28,7 +27,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -37,9 +35,7 @@ namespace libbitcoin {
namespace server {
using boost::format;
-using namespace std::chrono;
using namespace std::placeholders;
-using namespace std::this_thread;
using namespace boost::system;
using namespace bc::config;
using namespace bc::database;
diff --git a/include/bitcoin/server.hpp b/include/bitcoin/server.hpp
index e905dc6e..8de8d11b 100644
--- a/include/bitcoin/server.hpp
+++ b/include/bitcoin/server.hpp
@@ -26,12 +26,13 @@
#include
#include
#include
-#include
-#include
+#include
+#include
#include
#include
#include
#include
+#include
#include
#include
#include
diff --git a/include/bitcoin/server/interface/address.hpp b/include/bitcoin/server/interface/address.hpp
index 6e70a43d..6fee37bf 100644
--- a/include/bitcoin/server/interface/address.hpp
+++ b/include/bitcoin/server/interface/address.hpp
@@ -21,21 +21,12 @@
#define LIBBITCOIN_SERVER_ADDRESS_HPP
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
namespace server {
-// TODO: move to bc::protocol and integrate with zmq::message.
-class BCS_API route
-{
- bool secure;
- bool delimited;
- data_queue identities;
-};
-
/// Address interface.
/// Class and method names are published and mapped to the zeromq interface.
class BCS_API address
@@ -43,22 +34,30 @@ class BCS_API address
public:
/// Fetch the blockchain and transaction pool history of a payment address.
static void fetch_history2(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
+
+ /// Alias for subscribe, preserved for backward compatability.
+ static void renew(server_node& node, const message& request,
+ send_handler handler);
+
+ /// Subscribe to payment or stealth address notifications by prefix.
+ static void subscribe(server_node& node, const message& request,
+ send_handler handler);
/// Subscribe to payment and stealth address notifications by prefix.
- static void subscribe(server_node& node, const incoming& request,
+ static void subscribe2(server_node& node, const message& request,
+ send_handler handler);
+
+ /// Unsubscribe to payment and stealth address notifications by prefix.
+ static void unsubscribe2(server_node& node, const message& request,
send_handler handler);
- static bool unwrap_subscribe_args(route& reply_to, binary& prefix_filter,
- chain::subscribe_type& type, const incoming& request);
+private:
+ static bool unwrap_subscribe_args(binary& prefix_filter,
+ chain::subscribe_type& type, const message& request);
- // TODO: can't we just call subscribe again? This would prevent duplicates.
- /////// Subscribe to payment and stealth address notifications by prefix.
- ////static void renew(server_node& node,
- //// const incoming& request, send_handler handler);
- ////
- ////static bool unwrap_renew_args(route& reply_to, binary& prefix_filter,
- //// chain::subscribe_type& type, const incoming& request);
+ static bool unwrap_subscribe2_args(binary& prefix_filter,
+ const message& request);
};
} // namespace server
diff --git a/include/bitcoin/server/interface/blockchain.hpp b/include/bitcoin/server/interface/blockchain.hpp
index 840332e2..15818141 100644
--- a/include/bitcoin/server/interface/blockchain.hpp
+++ b/include/bitcoin/server/interface/blockchain.hpp
@@ -23,8 +23,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -38,77 +37,77 @@ class BCS_API blockchain
public:
/// Fetch the blockchain history of a payment address.
static void fetch_history(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch a transaction from the blockchain by its hash.
static void fetch_transaction(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch the current height of the blockchain.
static void fetch_last_height(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch a block header by hash or height (conditional serialization).
static void fetch_block_header(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch tx hashes of block by hash or height (conditional serialization).
static void fetch_block_transaction_hashes(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch the block index of a transaction and the height of its block.
static void fetch_transaction_index(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch the inpoint which is spent by the specified output.
static void fetch_spend(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch the height of a block by its hash.
static void fetch_block_height(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Fetch the blockchain history of a stealth address by its prefix filter.
static void fetch_stealth(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
private:
static void last_height_fetched(const code& ec, size_t last_height,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void fetch_block_header_by_hash(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void fetch_block_header_by_height(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void block_header_fetched(const code& ec,
- const chain::header& block, const incoming& request,
+ const chain::header& block, const message& request,
send_handler handler);
static void fetch_block_transaction_hashes_by_hash(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void fetch_block_transaction_hashes_by_height(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void block_transaction_hashes_fetched(const code& ec,
- const hash_list& hashes, const incoming& request,
+ const hash_list& hashes, const message& request,
send_handler handler);
static void transaction_index_fetched(const code& ec, size_t block_height,
- size_t index, const incoming& request, send_handler handler);
+ size_t index, const message& request, send_handler handler);
static void spend_fetched(const code& ec,
- const chain::input_point& inpoint, const incoming& request,
+ const chain::input_point& inpoint, const message& request,
send_handler handler);
static void block_height_fetched(const code& ec, size_t block_height,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
static void stealth_fetched(const code& ec,
const chain::stealth_compact::list& stealth_results,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
};
} // namespace server
diff --git a/include/bitcoin/server/interface/protocol.hpp b/include/bitcoin/server/interface/protocol.hpp
index 7f5cab0c..43f036ba 100644
--- a/include/bitcoin/server/interface/protocol.hpp
+++ b/include/bitcoin/server/interface/protocol.hpp
@@ -22,8 +22,7 @@
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -36,15 +35,15 @@ class BCS_API protocol
public:
/// Broadcast a transaction to all connected peers.
static void broadcast_transaction(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
/// Determine the count of all connected peers.
static void total_connections(server_node& node,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
private:
static void handle_total_connections(size_t count,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
};
} // namespace server
diff --git a/include/bitcoin/server/interface/transaction_pool.hpp b/include/bitcoin/server/interface/transaction_pool.hpp
index cbc1dcba..3764446b 100644
--- a/include/bitcoin/server/interface/transaction_pool.hpp
+++ b/include/bitcoin/server/interface/transaction_pool.hpp
@@ -22,8 +22,7 @@
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -35,17 +34,21 @@ class BCS_API transaction_pool
{
public:
/// Fetch a transaction from the transaction pool (only), by its hash.
- static void fetch_transaction(server_node& node, const incoming& request,
+ static void fetch_transaction(server_node& node, const message& request,
+ send_handler handler);
+
+ /// Broadcast a transaction with penetration subscription.
+ static void broadcast(server_node& node, const message& request,
send_handler handler);
/// Validate a transaction against the transaction pool and blockchain.
- static void validate(server_node& node, const incoming& request,
+ static void validate(server_node& node, const message& request,
send_handler handler);
private:
static void handle_validated(const code& ec, const chain::transaction& tx,
const hash_digest& tx_hash, const chain::point::indexes& unconfirmed,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
};
} // namespace server
diff --git a/include/bitcoin/server/messages/message.hpp b/include/bitcoin/server/messages/message.hpp
new file mode 100644
index 00000000..a25b30a1
--- /dev/null
+++ b/include/bitcoin/server/messages/message.hpp
@@ -0,0 +1,84 @@
+/**
+ * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin-server.
+ *
+ * libbitcoin-server is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License with
+ * additional permissions to the one published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option)
+ * any later version. For more information see LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifndef LIBBITCOIN_SERVER_MESSAGE
+#define LIBBITCOIN_SERVER_MESSAGE
+
+#include
+#include
+#include
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+class BCS_API message
+{
+public:
+ static data_chunk to_bytes(const code& ec);
+
+ //// Construct an empty message with security routing context.
+ message(bool secure);
+
+ //// Construct a response for the request (code only).
+ message(const message& request, const code& ec);
+
+ //// Construct a response for the request (data with code).
+ message(const message& request, const data_chunk& data);
+
+ //// Construct a response for the route (subscription code only).
+ message(const server::route& route, const std::string& command,
+ uint32_t id, const code& ec);
+
+ //// Construct a response for the route (subscription data with code).
+ message(const server::route& route, const std::string& command,
+ uint32_t id, const data_chunk& data);
+
+ /// Arbitrary caller data (returned to caller for correlation).
+ uint32_t id() const;
+
+ /// Serialized query or response (defined in relation to command).
+ const data_chunk& data() const;
+
+ /// Query command (used for subscription, always returned to caller).
+ const std::string& command() const;
+
+ /// The message route.
+ const server::route& route() const;
+
+ /// Receive a message via the socket.
+ code receive(bc::protocol::zmq::socket& socket);
+
+ /// Send the message via the socket.
+ code send(bc::protocol::zmq::socket& socket);
+
+private:
+ uint32_t id_;
+ data_chunk data_;
+ server::route route_;
+ std::string command_;
+};
+
+typedef std::function send_handler;
+
+} // namespace server
+} // namespace libbitcoin
+
+#endif
diff --git a/include/bitcoin/server/messages/outgoing.hpp b/include/bitcoin/server/messages/outgoing.hpp
deleted file mode 100644
index b195e8c9..00000000
--- a/include/bitcoin/server/messages/outgoing.hpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
- *
- * This file is part of libbitcoin-server.
- *
- * libbitcoin-server is free software: you can redistribute it and/or
- * modify it under the terms of the GNU Affero General Public License with
- * additional permissions to the one published by the Free Software
- * Foundation, either version 3 of the License, or (at your option)
- * any later version. For more information see LICENSE.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#ifndef LIBBITCOIN_SERVER_OUTGOING
-#define LIBBITCOIN_SERVER_OUTGOING
-
-#include
-#include
-#include
-#include
-#include
-
-namespace libbitcoin {
-namespace server {
-
-class BCS_API outgoing
-{
-public:
- /// Return an error code in response to the incoming query.
- outgoing(const incoming& request, const code& ec);
-
- /// Return data in response to a successfully-executed incoming query.
- outgoing(const incoming& request, const data_chunk& data);
-
- /// Return data as a subscription by the given address.
- outgoing(const std::string& command, const data_chunk& data,
- const data_chunk& address1, const data_chunk& address2,
- bool delimited);
-
- /// A printable address for logging only.
- std::string address();
-
- /// Send the message one the socket.
- code send(bc::protocol::zmq::socket& socket);
-
-protected:
- outgoing(const std::string& command, const data_chunk& data,
- const data_chunk& address1, const data_chunk& address2, bool delimited,
- uint32_t id);
-
-private:
- bc::protocol::zmq::message message_;
-};
-
-typedef std::function send_handler;
-
-} // namespace server
-} // namespace libbitcoin
-
-#endif
diff --git a/include/bitcoin/server/messages/incoming.hpp b/include/bitcoin/server/messages/route.hpp
similarity index 53%
rename from include/bitcoin/server/messages/incoming.hpp
rename to include/bitcoin/server/messages/route.hpp
index 0475856a..49a6db4c 100644
--- a/include/bitcoin/server/messages/incoming.hpp
+++ b/include/bitcoin/server/messages/route.hpp
@@ -17,46 +17,63 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#ifndef LIBBITCOIN_SERVER_INCOMING
-#define LIBBITCOIN_SERVER_INCOMING
+#ifndef LIBBITCOIN_SERVER_ROUTE
+#define LIBBITCOIN_SERVER_ROUTE
-#include
+#include
#include
-#include
+#include
+#include
#include
namespace libbitcoin {
namespace server {
-class BCS_API incoming
+/// This class is not thread safe.
+/// The route is fixed in compliance with v2/v3 limitations.
+class BCS_API route
{
public:
- /// A printable address for logging only.
- std::string address();
+ /// Construct a route.
+ route();
- /// Send a message from the socket.
- code receive(bc::protocol::zmq::socket& socket, bool secure=false);
+ /// A printable address for logging only.
+ std::string display() const;
- /// The message route as seen at workers.
- data_chunk address1;
- data_chunk address2;
- bool delimited;
+ /// Equality operator.
+ bool operator==(const route& other) const;
- /// For deferred work, directs worker to respond on secure endpoint.
+ /// The message requires a secure port.
bool secure;
- /// Query command (used for subscription, always returned to caller).
- std::string command;
+ /// The message route is delimited using an empty frame.
+ bool delimited;
- /// Structure is little-endian.
- /// Arbitrary caller data (returned to caller for correlation).
- uint32_t id;
+ /// The first address.
+ data_chunk address1;
- /// Serialized query (structure defined in relation to command).
- data_chunk data;
+ /// The second address.
+ data_chunk address2;
};
} // namespace server
} // namespace libbitcoin
+namespace std
+{
+ template<>
+ struct hash
+ {
+ size_t operator()(const bc::server::route& value) const
+ {
+ size_t seed = 0;
+ boost::hash_combine(seed, value.secure);
+ boost::hash_combine(seed, value.delimited);
+ boost::hash_combine(seed, value.address1);
+ boost::hash_combine(seed, value.address2);
+ return seed;
+ }
+ };
+} // namespace std
+
#endif
diff --git a/include/bitcoin/server/server_node.hpp b/include/bitcoin/server/server_node.hpp
index ad01e30a..c963b267 100644
--- a/include/bitcoin/server/server_node.hpp
+++ b/include/bitcoin/server/server_node.hpp
@@ -26,8 +26,8 @@
#include
#include
#include
-#include
-#include
+#include
+#include
#include
#include
#include
@@ -83,15 +83,16 @@ class BCS_API server_node
// Notification.
// ------------------------------------------------------------------------
- /////// Subscribe to address and stealth prefix notifications.
- ////virtual void subscribe_address(route& reply_to, binary& prefix_filter,
- //// chain::subscribe_type& type);
+ /// Subscribe to address (including stealth) prefix notifications.
+ /// Stealth prefix is limited to 32 bits, address prefix to 256 bits.
+ virtual void subscribe_address(const route& reply_to, uint32_t id,
+ const binary& prefix_filter, chain::subscribe_type type);
- /////// Subscribe to transaction radar notifications.
- ////virtual void subscribe_radar(route& reply_to, hash_digest& tx_hash);
+ /// Subscribe to transaction penetration notifications.
+ virtual void subscribe_penetration(const route& reply_to, uint32_t id,
+ const hash_digest& tx_hash);
private:
-
void handle_running(const code& ec, result_handler handler);
bool start_services();
@@ -106,7 +107,6 @@ class BCS_API server_node
// These are thread safe.
authenticator authenticator_;
- ////notifications notifications_;
query_service secure_query_service_;
query_service public_query_service_;
heartbeat_service secure_heartbeat_service_;
diff --git a/include/bitcoin/server/utility/address_key.hpp b/include/bitcoin/server/utility/address_key.hpp
new file mode 100644
index 00000000..52e8a221
--- /dev/null
+++ b/include/bitcoin/server/utility/address_key.hpp
@@ -0,0 +1,69 @@
+/**
+ * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin-server.
+ *
+ * libbitcoin-server is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License with
+ * additional permissions to the one published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option)
+ * any later version. For more information see LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifndef LIBBITCOIN_SERVER_ADDRESS_KEY_HPP
+#define LIBBITCOIN_SERVER_ADDRESS_KEY_HPP
+
+#include
+#include
+#include
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+class BCS_API address_key
+{
+public:
+ address_key(const route& reply_to, const binary& prefix_filter);
+ bool operator==(const address_key& other) const;
+ const route& reply_to() const;
+ const binary& prefix_filter() const;
+
+private:
+ const route& reply_to_;
+ const binary& prefix_filter_;
+};
+
+} // namespace server
+} // namespace libbitcoin
+
+namespace std
+{
+ template<>
+ struct hash
+ {
+ size_t operator()(const bc::server::address_key& value) const
+ {
+ // boost::hash_combine uses boost::hash declarations., but these
+ // are defined as std::hash (for use with std::map). So we must
+ // explicity perform the hash operation before combining.
+ const auto to = std::hash()(value.reply_to());
+ const auto filter = std::hash()(value.prefix_filter());
+
+ size_t seed = 0;
+ boost::hash_combine(seed, to);
+ boost::hash_combine(seed, filter);
+ return seed;
+ }
+ };
+} // namespace std
+
+#endif
diff --git a/include/bitcoin/server/utility/fetch_helpers.hpp b/include/bitcoin/server/utility/fetch_helpers.hpp
index 74b791fe..f524f8e4 100644
--- a/include/bitcoin/server/utility/fetch_helpers.hpp
+++ b/include/bitcoin/server/utility/fetch_helpers.hpp
@@ -24,8 +24,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -38,19 +37,19 @@ static BC_CONSTEXPR size_t point_size = hash_size + sizeof(uint32_t);
// fetch_history stuff
bool BCS_API unwrap_fetch_history_args(wallet::payment_address& address,
- uint32_t& from_height, const incoming& request);
+ uint32_t& from_height, const message& request);
void BCS_API send_history_result(const code& ec,
- const chain::history_compact::list& history, const incoming& request,
+ const chain::history_compact::list& history, const message& request,
send_handler handler);
// fetch_transaction stuff
bool BCS_API unwrap_fetch_transaction_args(hash_digest& hash,
- const incoming& request);
+ const message& request);
void BCS_API transaction_fetched(const code& ec, const chain::transaction& tx,
- const incoming& request, send_handler handler);
+ const message& request, send_handler handler);
} // namespace server
} // namespace libbitcoin
diff --git a/include/bitcoin/server/workers/notification_worker.hpp b/include/bitcoin/server/workers/notification_worker.hpp
index b707e250..44c84189 100644
--- a/include/bitcoin/server/workers/notification_worker.hpp
+++ b/include/bitcoin/server/workers/notification_worker.hpp
@@ -24,31 +24,16 @@
#include
#include
#include
-#include
-#include
+#include
+#include
#include
+#include
namespace libbitcoin {
namespace server {
class server_node;
-////struct subscription_locator
-////{
-//// send_handler handler;
-//// data_chunk address1;
-//// data_chunk address2;
-//// bool delimited;
-////};
-////
-////struct subscription_record
-////{
-//// binary prefix;
-//// chain::subscribe_type type;
-//// boost::posix_time::ptime expiry_time;
-//// subscription_locator locator;
-////};
-
// This class is thread safe.
// Provide address and stealth notifications to the query service.
class BCS_API notification_worker
@@ -67,12 +52,13 @@ class BCS_API notification_worker
/// Stop the worker.
bool stop() override;
- /////// Subscribe to address and stealth prefix notifications.
- ////virtual void subscribe_address(route& reply_to, binary& prefix_filter,
- //// chain::subscribe_type& type);
+ /// Subscribe to address and stealth prefix notifications.
+ virtual void subscribe_address(const route& reply_to, uint32_t id,
+ const binary& prefix_filter, chain::subscribe_type type);
- /////// Subscribe to address and stealth prefix notifications.
- ////virtual void subscribe_radar(route& reply_to, hash_digest& tx_hash);
+ /// Subscribe to transaction penetration notifications.
+ virtual void subscribe_penetration(const route& reply_to, uint32_t id,
+ const hash_digest& tx_hash);
protected:
typedef bc::protocol::zmq::socket socket;
@@ -86,50 +72,72 @@ class BCS_API notification_worker
private:
typedef chain::block::ptr_list block_list;
typedef chain::point::indexes index_list;
-
- typedef resubscriber sequence_ptr;
+
+ typedef notifier payment_subscriber;
+ typedef notifier stealth_subscriber;
+ typedef notifier address_subscriber;
- typedef resubscriber
- inventory_subscriber;
- typedef resubscriber stealth_subscriber;
+ typedef notifier penetration_subscriber;
+
+ // Remove expired subscriptions.
+ void purge();
+ int32_t purge_interval_milliseconds() const;
bool handle_blockchain_reorganization(const code& ec, uint64_t fork_point,
const block_list& new_blocks, const block_list&);
bool handle_transaction_pool(const code& ec, const index_list&,
const chain::transaction& tx);
bool handle_inventory(const code& ec,
- const message::inventory::ptr packet);
+ const bc::message::inventory::ptr packet);
void notify_blocks(uint32_t fork_point, const block_list& blocks);
void notify_block(socket& peer, uint32_t height,
const chain::block::ptr block);
void notify_transaction(uint32_t height, const hash_digest& block_hash,
const chain::transaction& tx);
- void notify_address(const wallet::payment_address& address,
+
+ // v2/v3 (deprecated)
+ void notify_payment(const wallet::payment_address& address,
uint32_t height, const hash_digest& block_hash,
const chain::transaction& tx);
void notify_stealth(uint32_t prefix, uint32_t height,
const hash_digest& block_hash, const chain::transaction& tx);
- void notify_inventory(uint32_t height, const hash_digest& block_hash,
- const hash_digest& tx_hash);
- ////static boost::posix_time::ptime now();
-
- ////void scan(uint32_t height, const hash_digest& block_hash,
- //// const chain::transaction& tx);
+ // v3
+ void notify_address(const binary& field, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx);
+ void notify_penetration(uint32_t height, const hash_digest& block_hash,
+ const hash_digest& tx_hash);
- ////void post_updates(const wallet::payment_address& address,
- //// uint32_t height, const hash_digest& block_hash,
- //// const chain::transaction& tx);
- ////void post_stealth_updates(uint32_t prefix, uint32_t height,
- //// const hash_digest& block_hash, const chain::transaction& tx);
+ // Send a notification to the subscriber.
+ void send(const route& reply_to, const std::string& command,
+ uint32_t id, const data_chunk& payload);
+ void send_payment(const route& reply_to, uint32_t id,
+ const wallet::payment_address& address, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx);
+ void send_stealth(const route& reply_to, uint32_t id, uint32_t prefix,
+ uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx);
+ void send_address(const route& reply_to, uint32_t id, uint8_t sequence,
+ uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx);
- size_t prune() { return 0; }
- ////code create(const incoming& request, send_handler handler);
- ////code update(const incoming& request, send_handler handler);
- ////bool deserialize(binary& address, chain::subscribe_type& type,
- //// const data_chunk& data);
+ bool handle_payment(const code& ec, const wallet::payment_address& address,
+ uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx, const route& reply_to, uint32_t id,
+ const binary& prefix_filter);
+ bool handle_stealth(const code& ec, uint32_t prefix, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx,
+ const route& reply_to, uint32_t id, const binary& prefix_filter);
+ bool handle_address(const code& ec, const binary& field, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx,
+ const route& reply_to, uint32_t id, const binary& prefix_filter,
+ sequence_ptr sequence);
const bool secure_;
const server::settings& settings_;
@@ -138,8 +146,9 @@ class BCS_API notification_worker
server_node& node_;
bc::protocol::zmq::authenticator& authenticator_;
address_subscriber::ptr address_subscriber_;
- inventory_subscriber::ptr inventory_subscriber_;
+ payment_subscriber::ptr payment_subscriber_;
stealth_subscriber::ptr stealth_subscriber_;
+ penetration_subscriber::ptr penetration_subscriber_;
};
} // namespace server
diff --git a/include/bitcoin/server/workers/query_worker.hpp b/include/bitcoin/server/workers/query_worker.hpp
index b23cb34e..cf09dd54 100644
--- a/include/bitcoin/server/workers/query_worker.hpp
+++ b/include/bitcoin/server/workers/query_worker.hpp
@@ -26,8 +26,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -50,7 +49,7 @@ class BCS_API query_worker
protected:
typedef bc::protocol::zmq::socket socket;
- typedef std::function command_handler;
+ typedef std::function command_handler;
typedef std::unordered_map command_map;
virtual void attach_interface();
diff --git a/src/address_key.cpp b/src/address_key.cpp
new file mode 100644
index 00000000..dec7b313
--- /dev/null
+++ b/src/address_key.cpp
@@ -0,0 +1,51 @@
+/**
+ * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin-server.
+ *
+ * libbitcoin-server is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License with
+ * additional permissions to the one published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option)
+ * any later version. For more information see LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+
+#include
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+address_key::address_key(const route& reply_to, const binary& prefix_filter)
+ : reply_to_(reply_to), prefix_filter_(prefix_filter)
+{
+}
+
+bool address_key::operator==(const address_key& other) const
+{
+ return reply_to_ == other.reply_to_ &&
+ prefix_filter_ == other.prefix_filter_;
+}
+
+const route& address_key::reply_to() const
+{
+ return reply_to_;
+}
+
+const binary& address_key::prefix_filter() const
+{
+ return prefix_filter_;
+}
+
+} // namespace server
+} // namespace libbitcoin
diff --git a/src/interface/address.cpp b/src/interface/address.cpp
index 69a9f474..0a7f7de8 100644
--- a/src/interface/address.cpp
+++ b/src/interface/address.cpp
@@ -22,8 +22,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
#include
@@ -34,7 +33,7 @@ using namespace std::placeholders;
using namespace bc::chain;
using namespace bc::wallet;
-void address::fetch_history2(server_node& node, const incoming& request,
+void address::fetch_history2(server_node& node, const message& request,
send_handler handler)
{
static constexpr uint64_t limit = 0;
@@ -43,7 +42,7 @@ void address::fetch_history2(server_node& node, const incoming& request,
if (!unwrap_fetch_history_args(address, from_height, request))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -53,50 +52,61 @@ void address::fetch_history2(server_node& node, const incoming& request,
_1, _2, request, handler));
}
-void address::subscribe(server_node& node, const incoming& request,
+// v2/v3 (deprecated), used for resubscription, alias for subscribe in v3.
+void address::renew(server_node& node, const message& request,
+ send_handler handler)
+{
+ subscribe(node, request, handler);
+}
+
+// v2/v3 (deprecated), requires an explicit subscription type.
+void address::subscribe(server_node& node, const message& request,
send_handler handler)
{
- route reply_to;
binary prefix_filter;
subscribe_type type;
- if (!unwrap_subscribe_args(reply_to, prefix_filter, type, request))
+ if (!unwrap_subscribe_args(prefix_filter, type, request))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
- // TODO: reenable.
- ////node.subscribe_address(reply_to, prefix_filter, type);
-
- handler(outgoing(request, error::success));
+ node.subscribe_address(request.route(), request.id(), prefix_filter, type);
+ handler(message(request, error::success));
}
-bool address::unwrap_subscribe_args(route& reply_to, binary& prefix_filter,
- subscribe_type& type, const incoming& request)
+bool address::unwrap_subscribe_args(binary& prefix_filter,
+ subscribe_type& type, const message& request)
{
+ static constexpr auto address_bits = hash_size * byte_bits;
+ static constexpr auto stealth_bits = sizeof(uint32_t) * byte_bits;
+
// [ type:1 ] (0 = address prefix, 1 = stealth prefix)
// [ prefix_bitsize:1 ]
- // [ prefix_blocks:... ]
- const auto& data = request.data;
+ // [ prefix_blocks:...]
+ const auto& data = request.data();
if (data.size() < 2)
return false;
// First byte is the subscribe_type enumeration.
- if (data[0] != static_cast(subscribe_type::address) &&
- data[0] != static_cast(subscribe_type::stealth))
- return false;
-
type = static_cast(data[0]);
+ if (type != subscribe_type::payment && type != subscribe_type::stealth)
+ return false;
+
// Second byte is the number of bits.
const auto bit_length = data[1];
+ if ((type == subscribe_type::payment && bit_length > address_bits) ||
+ (type == subscribe_type::stealth && bit_length > stealth_bits))
+ return false;
+
// Convert the bit length to byte length.
const auto byte_length = binary::blocks_size(bit_length);
- if (data.size() != byte_length + 2)
+ if (data.size() - 2 != byte_length)
return false;
const data_chunk bytes({ data.begin() + 2, data.end() });
@@ -104,28 +114,70 @@ bool address::unwrap_subscribe_args(route& reply_to, binary& prefix_filter,
return true;
}
-////void address::renew(server_node& node, const incoming& request,
-//// send_handler handler)
-////{
-//// route reply_to;
-//// binary prefix_filter;
-//// subscribe_type type;
-////
-//// if (!unwrap_subscribe_args(reply_to, prefix_filter, type, request))
-//// {
-//// handler(outgoing(request, error::bad_stream));
-//// return;
-//// }
-////
-//// node.renew(reply_to, prefix_filter, type);
-////}
-////
-////bool address::unwrap_renew_args(route& reply_to, binary& prefix_filter,
-//// subscribe_type& type, const incoming& request)
-////{
-//// // These are currently isomorphic.
-//// return unwrap_subscribe_args(reply_to, prefix_filter, type, request);
-////}
+// v3 eliminates the subscription type, which we map to 'unspecified'.
+void address::subscribe2(server_node& node, const message& request,
+ send_handler handler)
+{
+ static constexpr auto type = subscribe_type::unspecified;
+
+ binary prefix_filter;
+
+ if (!unwrap_subscribe2_args(prefix_filter, request))
+ {
+ handler(message(request, error::bad_stream));
+ return;
+ }
+
+ node.subscribe_address(request.route(), request.id(), prefix_filter, type);
+ handler(message(request, error::success));
+}
+
+// v3 adds unsubscribe2, which we map to subscription_type 'unsubscribe'.
+void address::unsubscribe2(server_node& node, const message& request,
+ send_handler handler)
+{
+ static constexpr auto type = subscribe_type::unsubscribe;
+
+ binary prefix_filter;
+
+ if (!unwrap_subscribe2_args(prefix_filter, request))
+ {
+ handler(message(request, error::bad_stream));
+ return;
+ }
+
+ node.subscribe_address(request.route(), request.id(), prefix_filter, type);
+ handler(message(request, error::success));
+}
+
+bool address::unwrap_subscribe2_args(binary& prefix_filter,
+ const message& request)
+{
+ static constexpr auto address_bits = hash_size * byte_bits;
+
+ // [ prefix_bitsize:1 ]
+ // [ prefix_blocks:...]
+ const auto& data = request.data();
+
+ if (data.empty())
+ return false;
+
+ // First byte is the number of bits.
+ const auto bit_length = data[0];
+
+ if (bit_length > address_bits)
+ return false;
+
+ // Convert the bit length to byte length.
+ const auto byte_length = binary::blocks_size(bit_length);
+
+ if (data.size() - 1 != byte_length)
+ return false;
+
+ const data_chunk bytes({ data.begin() + 1, data.end() });
+ prefix_filter = binary(bit_length, bytes);
+ return true;
+}
} // namespace server
} // namespace libbitcoin
diff --git a/src/interface/blockchain.cpp b/src/interface/blockchain.cpp
index 6f69f507..55ba35f9 100644
--- a/src/interface/blockchain.cpp
+++ b/src/interface/blockchain.cpp
@@ -24,8 +24,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
#include
@@ -37,8 +36,8 @@ using namespace bc::blockchain;
using namespace bc::chain;
using namespace bc::wallet;
-void blockchain::fetch_history(server_node& node,
- const incoming& request, send_handler handler)
+void blockchain::fetch_history(server_node& node, const message& request,
+ send_handler handler)
{
static constexpr uint64_t limit = 0;
uint32_t from_height;
@@ -46,7 +45,7 @@ void blockchain::fetch_history(server_node& node,
if (!unwrap_fetch_history_args(address, from_height, request))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -59,14 +58,14 @@ void blockchain::fetch_history(server_node& node,
_1, _2, request, handler));
}
-void blockchain::fetch_transaction(server_node& node,
- const incoming& request, send_handler handler)
+void blockchain::fetch_transaction(server_node& node, const message& request,
+ send_handler handler)
{
hash_digest tx_hash;
if (!unwrap_fetch_transaction_args(tx_hash, request))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -78,14 +77,14 @@ void blockchain::fetch_transaction(server_node& node,
_1, _2, request, handler));
}
-void blockchain::fetch_last_height(server_node& node,
- const incoming& request, send_handler handler)
+void blockchain::fetch_last_height(server_node& node, const message& request,
+ send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
- if (!request.data.empty())
+ if (!data.empty())
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -95,39 +94,39 @@ void blockchain::fetch_last_height(server_node& node,
}
void blockchain::last_height_fetched(const code& ec, size_t last_height,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- BITCOIN_ASSERT(last_height <= bc::max_uint32);
+ BITCOIN_ASSERT(last_height <= max_uint32);
auto last_height32 = static_cast(last_height);
- data_chunk result(code_size + sizeof(uint32_t));
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
-
- serial.write_4_bytes_little_endian(last_height32);
- BITCOIN_ASSERT(serial.iterator() == result.end());
+ // [ code:4 ]
+ // [ heigh:4 ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ to_little_endian(last_height32)
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
-void blockchain::fetch_block_header(server_node& node,
- const incoming& request, send_handler handler)
+void blockchain::fetch_block_header(server_node& node, const message& request,
+ send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() == hash_size)
blockchain::fetch_block_header_by_hash(node, request, handler);
else if (data.size() == sizeof(uint32_t))
blockchain::fetch_block_header_by_height(node, request, handler);
else
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
}
void blockchain::fetch_block_header_by_hash(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
BITCOIN_ASSERT(data.size() == hash_size);
auto deserial = make_deserializer(data.begin(), data.end());
@@ -139,9 +138,9 @@ void blockchain::fetch_block_header_by_hash(server_node& node,
}
void blockchain::fetch_block_header_by_height(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
BITCOIN_ASSERT(data.size() == sizeof(uint32_t));
auto deserial = make_deserializer(data.begin(), data.end());
@@ -153,41 +152,36 @@ void blockchain::fetch_block_header_by_height(server_node& node,
}
void blockchain::block_header_fetched(const code& ec,
- const chain::header& block, const incoming& request, send_handler handler)
+ const chain::header& block, const message& request, send_handler handler)
{
- const auto block_size64 = block.serialized_size(false);
- BITCOIN_ASSERT_MSG(block_size64 <= max_size_t, "Clearly Bitcoin is dead.");
- const auto block_size = static_cast(block_size64);
-
- data_chunk result(code_size + block_size);
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
-
- data_chunk block_data = block.to_data(false);
- serial.write_data(block_data);
- BITCOIN_ASSERT(serial.iterator() == result.end());
+ // [ code:4 ]
+ // [ block... ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ block.to_data(false)
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
void blockchain::fetch_block_transaction_hashes(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() == hash_size)
fetch_block_transaction_hashes_by_hash(node, request, handler);
else if (data.size() == sizeof(uint32_t))
fetch_block_transaction_hashes_by_height(node, request, handler);
else
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
}
void blockchain::fetch_block_transaction_hashes_by_hash(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
BITCOIN_ASSERT(data.size() == hash_size);
auto deserial = make_deserializer(data.begin(), data.end());
@@ -198,9 +192,9 @@ void blockchain::fetch_block_transaction_hashes_by_hash(server_node& node,
}
void blockchain::fetch_block_transaction_hashes_by_height(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
BITCOIN_ASSERT(data.size() == sizeof(uint32_t));
auto deserial = make_deserializer(data.begin(), data.end());
@@ -211,30 +205,28 @@ void blockchain::fetch_block_transaction_hashes_by_height(server_node& node,
}
void blockchain::block_transaction_hashes_fetched(const code& ec,
- const hash_list& hashes, const incoming& request,
- send_handler handler)
+ const hash_list& hashes, const message& request, send_handler handler)
{
+ // [ code:4 ]
+ // [[ hash:32 ]...]
data_chunk result(code_size + hash_size * hashes.size());
auto serial = make_serializer(result.begin());
serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
for (const auto& tx_hash: hashes)
serial.write_hash(tx_hash);
- BITCOIN_ASSERT(serial.iterator() == result.end());
-
- handler(outgoing(request, result));
+ handler(message(request, result));
}
void blockchain::fetch_transaction_index(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() != hash_size)
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -247,34 +239,35 @@ void blockchain::fetch_transaction_index(server_node& node,
}
void blockchain::transaction_index_fetched(const code& ec, size_t block_height,
- size_t index, const incoming& request, send_handler handler)
+ size_t index, const message& request, send_handler handler)
{
BITCOIN_ASSERT(index <= max_uint32);
- auto index32 = static_cast(index);
-
BITCOIN_ASSERT(block_height <= max_uint32);
- auto block_height32 = static_cast(block_height);
- // error_code (4), block_height (4), index (4)
- data_chunk result(code_size + sizeof(uint32_t) + sizeof(uint32_t));
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
+ auto index32 = static_cast(index);
+ auto block_height32 = static_cast(block_height);
- serial.write_4_bytes_little_endian(block_height32);
- serial.write_4_bytes_little_endian(index32);
+ // [ code:4 ]
+ // [ block_height:32 ]
+ // [ tx_index:4 ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ to_little_endian(block_height32),
+ to_little_endian(index32)
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
-void blockchain::fetch_spend(server_node& node, const incoming& request,
+void blockchain::fetch_spend(server_node& node, const message& request,
send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() != point_size)
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -291,32 +284,28 @@ void blockchain::fetch_spend(server_node& node, const incoming& request,
}
void blockchain::spend_fetched(const code& ec, const chain::input_point& inpoint,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- // error_code (4), hash (32), index (4)
- const auto inpoint_size64 = inpoint.serialized_size();
- BITCOIN_ASSERT(inpoint_size64 <= max_size_t);
- const auto inpoint_size = static_cast(inpoint_size64);
-
- data_chunk result(code_size + inpoint_size);
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
-
- auto raw_inpoint = inpoint.to_data();
- serial.write_data(raw_inpoint);
+ // [ code:4 ]
+ // [ hash:32 ]
+ // [ index:4 ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ inpoint.to_data()
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
void blockchain::fetch_block_height(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() != hash_size)
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -328,30 +317,30 @@ void blockchain::fetch_block_height(server_node& node,
}
void blockchain::block_height_fetched(const code& ec, size_t block_height,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
BITCOIN_ASSERT(block_height <= max_uint32);
auto block_height32 = static_cast(block_height);
- // error_code (4), height (4)
- data_chunk result(code_size + sizeof(uint32_t));
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
-
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
- serial.write_4_bytes_little_endian(block_height32);
+ // [ code:4 ]
+ // [ height:4 ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ to_little_endian(block_height32)
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
-void blockchain::fetch_stealth(server_node& node, const incoming& request,
+void blockchain::fetch_stealth(server_node& node, const message& request,
send_handler handler)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.empty())
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -363,7 +352,7 @@ void blockchain::fetch_stealth(server_node& node, const incoming& request,
if (data.size() != sizeof(uint8_t) + binary::blocks_size(bitsize) +
sizeof(uint32_t))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -380,17 +369,16 @@ void blockchain::fetch_stealth(server_node& node, const incoming& request,
}
void blockchain::stealth_fetched(const code& ec,
- const stealth_compact::list& stealth_results, const incoming& request,
+ const stealth_compact::list& stealth_results, const message& request,
send_handler handler)
{
- // [ ephemeral_key_hash:32 ]
- // [ address_hash:20 ]
- // [ tx_hash:32 ]
static constexpr size_t row_size = hash_size + short_hash_size + hash_size;
+
+ // [ code:4 ]
+ // [[ ephemeral_key_hash:32 ][ address_hash:20 ][ tx_hash:32 ]...]
data_chunk result(code_size + row_size * stealth_results.size());
auto serial = make_serializer(result.begin());
serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
for (const auto& row: stealth_results)
{
@@ -399,7 +387,7 @@ void blockchain::stealth_fetched(const code& ec,
serial.write_hash(row.transaction_hash);
}
- handler(outgoing(request, result));
+ handler(message(request, result));
}
} // namespace server
diff --git a/src/interface/protocol.cpp b/src/interface/protocol.cpp
index 5d7b343f..c9a67763 100644
--- a/src/interface/protocol.cpp
+++ b/src/interface/protocol.cpp
@@ -24,6 +24,7 @@
#include
#include
#include
+#include
#include
#include
@@ -34,14 +35,14 @@ using namespace std::placeholders;
// This does NOT save to our memory pool.
// The transaction will hit our memory pool when it is picked up from a peer.
-void protocol::broadcast_transaction(server_node& node,
- const incoming& request, send_handler handler)
+void protocol::broadcast_transaction(server_node& node, const message& request,
+ send_handler handler)
{
chain::transaction tx;
- if (!tx.from_data(request.data))
+ if (!tx.from_data(request.data()))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -52,15 +53,15 @@ void protocol::broadcast_transaction(server_node& node,
node.broadcast(tx, ignore_send, ignore_complete);
// Tell the user everything is fine.
- handler(outgoing(request, error::success));
+ handler(message(request, error::success));
}
-void protocol::total_connections(server_node& node, const incoming& request,
+void protocol::total_connections(server_node& node, const message& request,
send_handler handler)
{
- if (!request.data.empty())
+ if (!request.data().empty())
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -69,21 +70,21 @@ void protocol::total_connections(server_node& node, const incoming& request,
_1, request, handler));
}
-void protocol::handle_total_connections(size_t count, const incoming& request,
+void protocol::handle_total_connections(size_t count, const message& request,
send_handler handler)
{
BITCOIN_ASSERT(count <= max_uint32);
const auto total_connections = static_cast(count);
- data_chunk result(code_size + sizeof(uint32_t));
- auto serial = make_serializer(result.begin());
- serial.write_error_code(error::success);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
-
- serial.write_4_bytes_little_endian(total_connections);
- BITCOIN_ASSERT(serial.iterator() == result.end());
+ // [ code:4 ]
+ // [ connections:4 ]
+ const auto result = build_chunk(
+ {
+ message::to_bytes(error::success),
+ to_little_endian(total_connections)
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
} // namespace server
diff --git a/src/interface/transaction_pool.cpp b/src/interface/transaction_pool.cpp
index 7648d62b..fc32bbd8 100644
--- a/src/interface/transaction_pool.cpp
+++ b/src/interface/transaction_pool.cpp
@@ -23,6 +23,7 @@
#include
#include
#include
+#include
#include
#include
@@ -33,13 +34,13 @@ using namespace std::placeholders;
using namespace bc::chain;
void transaction_pool::fetch_transaction(server_node& node,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
hash_digest hash;
if (!unwrap_fetch_transaction_args(hash, request))
{
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
return;
}
@@ -51,16 +52,32 @@ void transaction_pool::fetch_transaction(server_node& node,
_1, _2, request, handler));
}
-void transaction_pool::validate(server_node& node, const incoming& request,
+// Broadcast a transaction with penetration subscription.
+void transaction_pool::broadcast(server_node& node, const message& request,
send_handler handler)
{
transaction tx;
- if (!tx.from_data(request.data))
+ if (!tx.from_data(request.data()))
{
- // NOTE: the format of this response changed in v3 (send only code).
- // This is our standard behavior and should not break clients.
- handler(outgoing(request, error::bad_stream));
+ handler(message(request, error::bad_stream));
+ return;
+ }
+
+ // TODO: conditionally subscribe to penetration notifications.
+ // TODO: broadcast transaction to receiving peers.
+ handler(message(request, error::operation_failed));
+}
+
+// NOTE: the format of this response changed in v3 (send only code on error).
+void transaction_pool::validate(server_node& node, const message& request,
+ send_handler handler)
+{
+ transaction tx;
+
+ if (!tx.from_data(request.data()))
+ {
+ handler(message(request, error::bad_stream));
return;
}
@@ -71,12 +88,13 @@ void transaction_pool::validate(server_node& node, const incoming& request,
void transaction_pool::handle_validated(const code& ec, const transaction& tx,
const hash_digest& tx_hash, const point::indexes& unconfirmed,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
+ // [ code:4 ]
+ // [[ unconfirmed_index:4 ]...]
data_chunk result(code_size + unconfirmed.size() * index_size);
auto serial = make_serializer(result.begin());
serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
for (const auto unconfirmed_index: unconfirmed)
{
@@ -85,9 +103,7 @@ void transaction_pool::handle_validated(const code& ec, const transaction& tx,
serial.write_4_bytes_little_endian(index32);
}
- BITCOIN_ASSERT(serial.iterator() == result.end());
-
- handler(outgoing(request, result));
+ handler(message(request, result));
}
} // namespace server
diff --git a/src/messages/incoming.cpp b/src/messages/incoming.cpp
deleted file mode 100644
index 707f923c..00000000
--- a/src/messages/incoming.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
- *
- * This file is part of libbitcoin-server.
- *
- * libbitcoin-server is free software: you can redistribute it and/or
- * modify it under the terms of the GNU Affero General Public License with
- * additional permissions to the one published by the Free Software
- * Foundation, either version 3 of the License, or (at your option)
- * any later version. For more information see LICENSE.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#include
-
-#include
-#include
-#include
-
-namespace libbitcoin {
-namespace server {
-
-using namespace bc::protocol;
-
-std::string incoming::address()
-{
- return "[" + encode_base16(address1) + "]";
-}
-
-// Protocol delimitation migration plan.
-//-----------------------------------------------------------------------------
-// v1/v2 server: ROUTER, requires not delimited
-// v3 server: ROUTER, allows/echos delimited
-// v1/v2/v3 client: DEALER (not framed)
-//-----------------------------------------------------------------------------
-// v4 server: ROUTER, requires delimited
-// v4 client: DEALER (delimited) or REQ
-//-----------------------------------------------------------------------------
-
-// TODO: generalize address stripping, store all routes, use (hack) parameter
-// of expected payload length for parsing undelimited addressing.
-// BUGBUG: current implementation assumes client has not added any addresses.
-// This probably prevents the use of generalized zeromq routing to the server.
-code incoming::receive(zmq::socket& socket, bool secure)
-{
- zmq::message message;
- auto ec = socket.receive(message);
-
- if (ec)
- return ec;
-
- if (message.size() < 5 || message.size() > 6)
- return error::bad_stream;
-
- // Client is undelimited DEALER -> 2 addresses with no delimiter.
- // Client is REQ or delimited DEALER -> 2 addresses with delimiter.
- address1 = message.dequeue_data();
- address2 = message.dequeue_data();
-
- // In the reply we echo the delimited-ness of the original request.
- delimited = message.size() == 4;
-
- if (delimited)
- message.dequeue();
-
- // All libbitcoin queries have these three frames.
- //-------------------------------------------------------------------------
-
- // Query command (returned to caller).
- command = message.dequeue_text();
-
- // Arbitrary caller data (returned to caller for correlation).
- if (!message.dequeue(id))
- return error::bad_stream;
-
- // Serialized query.
- data = message.dequeue_data();
-
- // For deferred work, directs worker to respond on secure endpoint.
- this->secure = secure;
- return error::success;
-}
-
-} // namespace server
-} // namespace libbitcoin
diff --git a/src/messages/message.cpp b/src/messages/message.cpp
new file mode 100644
index 00000000..6f56f922
--- /dev/null
+++ b/src/messages/message.cpp
@@ -0,0 +1,184 @@
+/**
+ * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin-server.
+ *
+ * libbitcoin-server is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License with
+ * additional permissions to the one published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option)
+ * any later version. For more information see LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+
+#include
+#include
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+using namespace bc::protocol;
+
+// Protocol delimitation migration plan.
+//-----------------------------------------------------------------------------
+// v1/v2 server: ROUTER, requires not delimited
+// v3 server: ROUTER, allows/echos delimited
+// v1/v2/v3 client: DEALER (not delimited)
+//-----------------------------------------------------------------------------
+// v4 server: ROUTER, requires delimited
+// v4 client: DEALER (delimited) or REQ
+//-----------------------------------------------------------------------------
+
+// Convert an error code to data for payload.
+data_chunk message::to_bytes(const code& ec)
+{
+ return build_chunk(
+ {
+ to_little_endian(static_cast(ec.value()))
+ });
+}
+
+// Constructors.
+//-------------------------------------------------------------------------
+
+// Construct an empty message with security routing context.
+message::message(bool secure)
+{
+ // For subscriptions, directs notifier to respond on secure endpoint.
+ route_.secure = secure;
+}
+
+// Construct a response for the request (response code only).
+message::message(const message& request, const code& ec)
+ : message(request, to_bytes(ec))
+{
+}
+
+// Construct a response for the request (response data with code).
+message::message(const message& request, const data_chunk& data)
+ : message(request.route(), request.command(), request.id(), data)
+{
+}
+
+// Construct a response for the route (subscription code only).
+message::message(const server::route& route, const std::string& command,
+ uint32_t id, const code& ec)
+ : message(route, command, id, to_bytes(ec))
+{
+}
+
+// Construct a response for the route (subscription data with code).
+message::message(const server::route& route, const std::string& command,
+ uint32_t id, const data_chunk& data)
+ : route_(route), command_(command), id_(id), data_(data)
+{
+}
+
+// Properties.
+//-------------------------------------------------------------------------
+
+/// Arbitrary caller data (returned to caller for correlation).
+uint32_t message::id() const
+{
+ return id_;
+}
+
+/// Serialized query or response (defined in relation to command).
+const data_chunk& message::data() const
+{
+ return data_;
+}
+
+/// Query command (used for subscription, always returned to caller).
+const std::string& message::command() const
+{
+ return command_;
+}
+
+/// The message route.
+const server::route& message::route() const
+{
+ return route_;
+}
+
+// Transport.
+//-------------------------------------------------------------------------
+
+code message::receive(zmq::socket& socket)
+{
+ zmq::message message;
+ auto ec = socket.receive(message);
+
+ if (ec)
+ return ec;
+
+ if (message.size() < 5 || message.size() > 6)
+ return error::bad_stream;
+
+ // Decode the routing information (TODO: generalize in route).
+ //-------------------------------------------------------------------------
+
+ // Client is undelimited DEALER -> 2 addresses with no delimiter.
+ // Client is REQ or delimited DEALER -> 2 addresses with delimiter.
+ route_.address1 = message.dequeue_data();
+ route_.address2 = message.dequeue_data();
+
+ // In the reply we echo the delimited-ness of the original request.
+ route_.delimited = message.size() == 4;
+
+ if (route_.delimited)
+ message.dequeue();
+
+ // All libbitcoin queries and responses have these three frames.
+ //-------------------------------------------------------------------------
+
+ // Query command (returned to caller).
+ command_ = message.dequeue_text();
+
+ // Arbitrary caller data (returned to caller for correlation).
+ if (!message.dequeue(id_))
+ return error::bad_stream;
+
+ // Serialized query.
+ data_ = message.dequeue_data();
+
+ return error::success;
+}
+
+code message::send(zmq::socket& socket)
+{
+ zmq::message message;
+
+ // Encode the routing information (TODO: generalize in route).
+ //-------------------------------------------------------------------------
+
+ // Client is undelimited DEALER -> 2 addresses with no delimiter.
+ // Client is REQ or delimited DEALER -> 2 addresses with delimiter.
+ message.enqueue(route_.address1);
+ message.enqueue(route_.address2);
+
+ // In the reply we echo the delimited-ness of the original request.
+ if (route_.delimited)
+ message.enqueue();
+
+ // All libbitcoin queries and responses have these three frames.
+ //-------------------------------------------------------------------------
+ message.enqueue(command_);
+ message.enqueue_little_endian(id_);
+ message.enqueue(data_);
+
+ return socket.send(message);
+}
+
+} // namespace server
+} // namespace libbitcoin
diff --git a/src/messages/outgoing.cpp b/src/messages/outgoing.cpp
deleted file mode 100644
index cea77dcb..00000000
--- a/src/messages/outgoing.cpp
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
- *
- * This file is part of libbitcoin-server.
- *
- * libbitcoin-server is free software: you can redistribute it and/or
- * modify it under the terms of the GNU Affero General Public License with
- * additional permissions to the one published by the Free Software
- * Foundation, either version 3 of the License, or (at your option)
- * any later version. For more information see LICENSE.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#include
-
-#include
-#include
-#include
-#include
-#include
-
-namespace libbitcoin {
-namespace server {
-
-using namespace bc::protocol;
-
-// Convert an error code to data for payload.
-inline data_chunk to_chunk(const code& ec)
-{
- return build_chunk(
- {
- to_little_endian(static_cast(ec.value()))
- });
-}
-
-// Return an error code in response to the incoming query.
-outgoing::outgoing(const incoming& request, const code& ec)
- : outgoing(request, to_chunk(ec))
-{
-}
-
-// Return data in response to a successfully-executed incoming query.
-outgoing::outgoing(const incoming& request, const data_chunk& data)
- : outgoing(request.command, data, request.address1, request.address2,
- request.delimited, request.id)
-{
-}
-
-// Return data as a subscription by the given address (zero id).
-outgoing::outgoing(const std::string& command, const data_chunk& data,
- const data_chunk& address1, const data_chunk& address2, bool delimited)
- : outgoing(command, data, address1, address2, delimited, 0)
-{
-}
-
-// protected
-outgoing::outgoing(const std::string& command, const data_chunk& data,
- const data_chunk& address1, const data_chunk& address2, bool delimited,
- uint32_t id)
-{
- // Client is undelimited DEALER -> 2 addresses with no delimiter.
- // Client is REQ or delimited DEALER -> 2 addresses with delimiter.
- message_.enqueue(address1);
- message_.enqueue(address2);
-
- // In the reply we echo the delimited-ness of the original request.
- if (delimited)
- message_.enqueue();
-
- // All libbitcoin queries have these three frames.
- //-------------------------------------------------------------------------
- message_.enqueue(command);
- message_.enqueue_little_endian(id);
- message_.enqueue(data);
-}
-
-std::string outgoing::address()
-{
- auto message = message_;
- return "[" + encode_base16(message.dequeue_data()) + "]";
-}
-
-code outgoing::send(zmq::socket& socket)
-{
- return socket.send(message_);
-}
-
-} // namespace server
-} // namespace libbitcoin
diff --git a/src/messages/route.cpp b/src/messages/route.cpp
new file mode 100644
index 00000000..9416593c
--- /dev/null
+++ b/src/messages/route.cpp
@@ -0,0 +1,46 @@
+/**
+ * Copyright (c) 2011-2015 libbitcoin developers (see AUTHORS)
+ *
+ * This file is part of libbitcoin-server.
+ *
+ * libbitcoin-server is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Affero General Public License with
+ * additional permissions to the one published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option)
+ * any later version. For more information see LICENSE.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#include
+
+#include
+#include
+#include
+
+namespace libbitcoin {
+namespace server {
+
+route::route()
+ : secure(false), delimited(false)
+{
+}
+
+std::string route::display() const
+{
+ return "[" + encode_base16(address1) + ":" + encode_base16(address2) + "]";
+}
+
+bool route::operator==(const route& other) const
+{
+ return secure == other.secure && delimited == other.delimited &&
+ address1 == other.address1 && address2 == other.address2;
+}
+
+} // namespace server
+} // namespace libbitcoin
diff --git a/src/server_node.cpp b/src/server_node.cpp
index 2b38c695..c3ad1915 100644
--- a/src/server_node.cpp
+++ b/src/server_node.cpp
@@ -24,6 +24,7 @@
#include
#include
#include
+#include
#include
namespace libbitcoin {
@@ -38,7 +39,6 @@ server_node::server_node(const configuration& configuration)
: p2p_node(configuration),
configuration_(configuration),
authenticator_(*this),
- ////notifications_();
secure_query_service_(authenticator_, *this, true),
public_query_service_(authenticator_, *this, false),
secure_heartbeat_service_(authenticator_, *this, true),
@@ -61,11 +61,6 @@ server_node::~server_node()
// Properties.
// ----------------------------------------------------------------------------
-////notifications& server_node::notifier()
-////{
-//// return notifications_;
-////}
-
const settings& server_node::server_settings() const
{
return configuration_.server;
@@ -125,26 +120,29 @@ bool server_node::close()
// Notification.
// ----------------------------------------------------------------------------
-////// Subscribe to address and stealth prefix notifications.
-////void server_node::subscribe_address(route& reply_to, binary& prefix_filter,
-//// subscribe_type& type)
-////{
-//// if (true)
-//// secure_notification_worker_
-//// .subscribe_address(reply_to, prefix_filter, type);
-//// else
-//// public_notification_worker_
-//// .subscribe_address(reply_to, prefix_filter, type);
-////}
-////
-////// Subscribe to transaction radar notifications.
-////void server_node::subscribe_radar(route& reply_to, hash_digest& tx_hash)
-////{
-//// if (true)
-//// secure_notification_worker_.subscribe_radar(reply_to, tx_hash);
-//// else
-//// public_notification_worker_.subscribe_radar(reply_to, tx_hash);
-////}
+// Subscribe to address (including stealth) prefix notifications.
+void server_node::subscribe_address(const route& reply_to, uint32_t id,
+ const binary& prefix_filter, subscribe_type type)
+{
+ if (reply_to.secure)
+ secure_notification_worker_
+ .subscribe_address(reply_to, id, prefix_filter, type);
+ else
+ public_notification_worker_
+ .subscribe_address(reply_to, id, prefix_filter, type);
+}
+
+// Subscribe to transaction penetration notifications.
+void server_node::subscribe_penetration(const route& reply_to, uint32_t id,
+ const hash_digest& tx_hash)
+{
+ if (reply_to.secure)
+ secure_notification_worker_
+ .subscribe_penetration(reply_to, id, tx_hash);
+ else
+ public_notification_worker_
+ .subscribe_penetration(reply_to, id, tx_hash);
+}
// Services.
// ----------------------------------------------------------------------------
@@ -179,13 +177,13 @@ bool server_node::start_query_services()
if (!settings.query_service_enabled || settings.query_workers == 0)
return true;
- // Start secure service, query workers and address workers if enabled.
+ // Start secure service, query workers and notification workers if enabled.
if (settings.server_private_key && (!secure_query_service_.start() ||
(settings.subscription_limit > 0 && !secure_notification_worker_.start()) ||
!start_query_workers(true)))
return false;
- // Start public service, query workers and address workers if enabled.
+ // Start public service, query workers and notification workers if enabled.
if (!settings.secure_only && (!public_query_service_.start() ||
(settings.subscription_limit > 0 && !public_notification_worker_.start()) ||
!start_query_workers(false)))
@@ -285,14 +283,14 @@ uint32_t server_node::threads_required(const configuration& configuration)
{
++required;
required += settings.query_workers;
- required += (settings.subscription_limit > 0 ? 1 : 0);
+ required += (settings.subscription_limit > 0 ? 4 : 0);
}
if (!settings.secure_only)
{
++required;
required += settings.query_workers;
- required += (settings.subscription_limit > 0 ? 1 : 0);
+ required += (settings.subscription_limit > 0 ? 4 : 0);
}
}
diff --git a/src/services/block_service.cpp b/src/services/block_service.cpp
index c52c616d..d7ab914b 100644
--- a/src/services/block_service.cpp
+++ b/src/services/block_service.cpp
@@ -201,6 +201,11 @@ void block_service::publish_blocks(uint32_t fork_point,
publish_block(publisher, height++, block);
}
+// [ height:4 ]
+// [ header:80 ]
+// [ txs... ]
+// The payload for block publication is delimited within the zeromq message.
+// This is required for compatability and inconsistent with query payloads.
void block_service::publish_block(zmq::socket& publisher, uint32_t height,
const block::ptr block)
{
@@ -209,10 +214,10 @@ void block_service::publish_block(zmq::socket& publisher, uint32_t height,
const auto security = secure_ ? "secure" : "public";
- zmq::message respose;
- respose.enqueue_little_endian(height);
- respose.enqueue(block->to_data(false));
- const auto ec = publisher.send(respose);
+ zmq::message broadcast;
+ broadcast.enqueue_little_endian(height);
+ broadcast.enqueue(block->to_data(false));
+ const auto ec = publisher.send(broadcast);
if (ec == bc::error::service_stopped)
return;
diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp
index f2524500..845aad71 100644
--- a/src/services/query_service.cpp
+++ b/src/services/query_service.cpp
@@ -57,10 +57,12 @@ void query_service::work()
if (!started(bind(router, query_dealer, notify_dealer)))
return;
- // TODO: integrate notify_dealer into relay.
// TODO: tap in to failure conditions, such as high water.
// Relay messages between router and dealer (blocks on context).
+ //*************************************************************************
+ // TODO: integrate notify_dealer into relay.
relay(router, query_dealer);
+ //*************************************************************************
// Unbind the sockets and exit this thread.
finished(unbind(router, query_dealer, notify_dealer));
diff --git a/src/services/transaction_service.cpp b/src/services/transaction_service.cpp
index 7d16374e..8385d608 100644
--- a/src/services/transaction_service.cpp
+++ b/src/services/transaction_service.cpp
@@ -163,6 +163,7 @@ bool transaction_service::handle_transaction(const code& ec,
return true;
}
+// [ tx... ]
void transaction_service::publish_transaction(const transaction& tx)
{
if (stopped())
@@ -191,9 +192,9 @@ void transaction_service::publish_transaction(const transaction& tx)
if (stopped())
return;
- zmq::message respose;
- respose.enqueue(tx.to_data());
- ec = publisher.send(respose);
+ zmq::message broadcast;
+ broadcast.enqueue(tx.to_data());
+ ec = publisher.send(broadcast);
if (ec == bc::error::service_stopped)
return;
diff --git a/src/utility/fetch_helpers.cpp b/src/utility/fetch_helpers.cpp
index 7b2440ae..dbcde339 100644
--- a/src/utility/fetch_helpers.cpp
+++ b/src/utility/fetch_helpers.cpp
@@ -23,7 +23,7 @@
#include
#include
#include
-#include
+#include
namespace libbitcoin {
namespace server {
@@ -36,12 +36,12 @@ using namespace bc::wallet;
// ----------------------------------------------------------------------------
bool unwrap_fetch_history_args(payment_address& address,
- uint32_t& from_height, const incoming& request)
+ uint32_t& from_height, const message& request)
{
static constexpr size_t history_args_size = sizeof(uint8_t) +
short_hash_size + sizeof(uint32_t);
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() != history_args_size)
{
@@ -61,7 +61,7 @@ bool unwrap_fetch_history_args(payment_address& address,
}
void send_history_result(const code& ec, const history_compact::list& history,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
static constexpr size_t row_size = sizeof(uint8_t) + point_size +
sizeof(uint32_t) + sizeof(uint64_t);
@@ -82,16 +82,16 @@ void send_history_result(const code& ec, const history_compact::list& history,
BITCOIN_ASSERT(serial.iterator() == result.end());
- handler(outgoing(request, result));
+ handler(message(request, result));
}
// fetch_transaction stuff
// ----------------------------------------------------------------------------
bool unwrap_fetch_transaction_args(hash_digest& hash,
- const incoming& request)
+ const message& request)
{
- const auto& data = request.data;
+ const auto& data = request.data();
if (data.size() != hash_size)
{
@@ -106,22 +106,15 @@ bool unwrap_fetch_transaction_args(hash_digest& hash,
}
void transaction_fetched(const code& ec, const chain::transaction& tx,
- const incoming& request, send_handler handler)
+ const message& request, send_handler handler)
{
- const auto tx_size64 = tx.serialized_size();
- BITCOIN_ASSERT(tx_size64 <= max_size_t);
- const auto tx_size = static_cast(tx_size64);
-
- data_chunk result(code_size + tx_size);
- auto serial = make_serializer(result.begin());
- serial.write_error_code(ec);
- BITCOIN_ASSERT(serial.iterator() == result.begin() + code_size);
-
- data_chunk tx_data = tx.to_data();
- serial.write_data(tx_data);
- BITCOIN_ASSERT(serial.iterator() == result.end());
+ const auto result = build_chunk(
+ {
+ message::to_bytes(ec),
+ tx.to_data()
+ });
- handler(outgoing(request, result));
+ handler(message(request, result));
}
} // namespace server
diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp
index 484d0ae0..d7880623 100644
--- a/src/workers/notification_worker.cpp
+++ b/src/workers/notification_worker.cpp
@@ -19,13 +19,18 @@
*/
#include
+#include
#include
#include
#include
+#include
#include
+#include
+#include
#include
#include
#include
+#include
namespace libbitcoin {
namespace server {
@@ -37,15 +42,14 @@ using namespace bc::chain;
using namespace bc::protocol;
using namespace bc::wallet;
-// TX RADAR
-// By monitoring for tx inventory we do not poll and therefore will not
-// suffer from peers that don't provide redundant responses, will not
-// suffer double-counting, or have complexity and poor perf from caching.
-// However this requires that the user subscribe *before* submitting a
-// transaction. This feature is useful for the submitter of the tx to the
-// network - generally the spender. This is intended to allow the spender
-// to increase fees (ie using replace by fee). It is *not* recommended for
-// use by a receiver (ie for the acceptance of zero confirmation txs).
+// Purge subscriptions at 10% of the expiration period.
+static constexpr int64_t purge_interval_ratio = 10;
+
+// Notifications respond with commands that are distinct from the subscription.
+static const std::string address_update("address.update");
+static const std::string address_stealth("address.stealth_update");
+static const std::string address_update2("address.update2");
+static const std::string penetration_update("penetration.update");
notification_worker::notification_worker(zmq::authenticator& authenticator,
server_node& node, bool secure)
@@ -54,22 +58,28 @@ notification_worker::notification_worker(zmq::authenticator& authenticator,
settings_(node.server_settings()),
node_(node),
authenticator_(authenticator),
+ payment_subscriber_(std::make_shared(
+ node.thread_pool(), NAME "_payment")),
+ stealth_subscriber_(std::make_shared(
+ node.thread_pool(), NAME "_stealth")),
address_subscriber_(std::make_shared(
node.thread_pool(), NAME "_address")),
- inventory_subscriber_(std::make_shared(
- node.thread_pool(), NAME "_inventory")),
- stealth_subscriber_(std::make_shared(
- node.thread_pool(), NAME "_stealth"))
+ penetration_subscriber_(std::make_shared(
+ node.thread_pool(), NAME "_penetration"))
{
}
// There is no unsubscribe so this class shouldn't be restarted.
bool notification_worker::start()
{
- address_subscriber_->start();
- inventory_subscriber_->start();
+ // v2/v3 (deprecated)
+ payment_subscriber_->start();
stealth_subscriber_->start();
+ // v3
+ address_subscriber_->start();
+ penetration_subscriber_->start();
+
// Subscribe to blockchain reorganizations.
node_.subscribe_blockchain(
std::bind(¬ification_worker::handle_blockchain_reorganization,
@@ -81,7 +91,7 @@ bool notification_worker::start()
this, _1, _2, _3));
// Subscribe to all inventory messages from all peers.
- node_.subscribe(
+ node_.subscribe(
std::bind(¬ification_worker::handle_inventory,
this, _1, _2));
@@ -91,15 +101,27 @@ bool notification_worker::start()
// No unsubscribe so must be kept in scope until subscriber stop complete.
bool notification_worker::stop()
{
- address_subscriber_->stop();
- inventory_subscriber_->stop();
+ static const auto code = error::channel_stopped;
+
+ // v2/v3 (deprecated)
+ payment_subscriber_->stop();
+ payment_subscriber_->invoke(code, {}, 0, {}, {});
+
stealth_subscriber_->stop();
+ stealth_subscriber_->invoke(code, 0, 0, {}, {});
+
+ // v3
+ address_subscriber_->stop();
+ address_subscriber_->invoke(code, {}, 0, {}, {});
+
+ penetration_subscriber_->stop();
+ penetration_subscriber_->invoke(code, 0, {}, {});
return zmq::worker::stop();
}
// Implement worker as a router to the query service.
-// The address worker receives no messages from the query service.
+// The notification worker receives no messages from the query service.
void notification_worker::work()
{
zmq::socket router(authenticator_, zmq::socket::role::router);
@@ -110,19 +132,28 @@ void notification_worker::work()
zmq::poller poller;
poller.add(router);
+ const auto interval = purge_interval_milliseconds();
// We do not send/receive on the poller, we use its timer and context stop.
// Other threads connect and disconnect dynamically to send updates.
while (!poller.terminated() && !stopped())
{
- poller.wait();
- prune();
+ poller.wait(interval);
+ purge();
}
// Disconnect the socket and exit this thread.
finished(disconnect(router));
}
+int32_t notification_worker::purge_interval_milliseconds() const
+{
+ const int64_t minutes = settings_.subscription_expiration_minutes;
+ const int64_t milliseconds = minutes * 60 * 1000 / purge_interval_ratio;
+ const auto capped = std::max(milliseconds, static_cast(max_int32));
+ return static_cast(capped);
+}
+
// Connect/Disconnect.
//-----------------------------------------------------------------------------
@@ -137,13 +168,13 @@ bool notification_worker::connect(socket& router)
if (ec)
{
log::error(LOG_SERVER)
- << "Failed to connect " << security << " address worker to "
+ << "Failed to connect " << security << " notification worker to "
<< endpoint << " : " << ec.message();
return false;
}
log::debug(LOG_SERVER)
- << "Connected " << security << " address worker to " << endpoint;
+ << "Connected " << security << " notification worker to " << endpoint;
return true;
}
@@ -156,10 +187,272 @@ bool notification_worker::disconnect(socket& router)
return true;
log::error(LOG_SERVER)
- << "Failed to disconnect " << security << " address worker.";
+ << "Failed to disconnect " << security << " notification worker.";
return false;
}
+// Pruning.
+// ----------------------------------------------------------------------------
+
+// Signal expired subscriptions to self-remove.
+void notification_worker::purge()
+{
+ static const auto code = error::channel_timeout;
+
+ // v2/v3 (deprecated)
+ payment_subscriber_->purge(code, {}, 0, {}, {});
+ stealth_subscriber_->purge(code, 0, 0, {}, {});
+
+ // v3
+ address_subscriber_->purge(code, {}, 0, {}, {});
+ penetration_subscriber_->purge(code, 0, {}, {});
+}
+
+// Sending.
+// ----------------------------------------------------------------------------
+
+void notification_worker::send(const route& reply_to,
+ const std::string& command, uint32_t id, const data_chunk& payload)
+{
+ const auto security = secure_ ? "secure" : "public";
+ const auto& endpoint = secure_ ? query_service::secure_notify :
+ query_service::public_notify;
+
+ zmq::socket notifier(authenticator_, zmq::socket::role::router);
+ auto ec = notifier.connect(endpoint);
+
+ if (ec == bc::error::service_stopped)
+ return;
+
+ if (ec)
+ {
+ log::warning(LOG_SERVER)
+ << "Failed to connect " << security << " notification worker: "
+ << ec.message();
+ return;
+ }
+
+ // Notifications are formatted as query response messages.
+ message notification(reply_to, command, id, payload);
+ ec = notification.send(notifier);
+
+ if (ec && ec != error::service_stopped)
+ log::warning(LOG_SERVER)
+ << "Failed to send notification to "
+ << notification.route().display() << " " << ec.message();
+}
+
+void notification_worker::send_payment(const route& reply_to, uint32_t id,
+ const wallet::payment_address& address, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx)
+{
+ // [ address.version:1 ]
+ // [ address.hash:20 ]
+ // [ height:4 ]
+ // [ block_hash:32 ]
+ // [ tx:... ]
+ const auto payload = build_chunk(
+ {
+ to_array(address.version()),
+ address.hash(),
+ to_little_endian(height),
+ block_hash,
+ tx.to_data()
+ });
+
+ send(reply_to, address_update, id, payload);
+}
+
+void notification_worker::send_stealth(const route& reply_to, uint32_t id,
+ uint32_t prefix, uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx)
+{
+ // [ prefix:4 ]
+ // [ height:4 ]
+ // [ block_hash:32 ]
+ // [ tx:... ]
+ const auto payload = build_chunk(
+ {
+ to_little_endian(prefix),
+ to_little_endian(height),
+ block_hash,
+ tx.to_data()
+ });
+
+ send(reply_to, address_stealth, id, payload);
+}
+
+void notification_worker::send_address(const route& reply_to, uint32_t id,
+ uint8_t sequence, uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx)
+{
+ // [ code:4 ]
+ // [ sequence:1 ]
+ // [ height:4 ]
+ // [ block_hash:32 ]
+ // [ tx:... ]
+ const auto payload = build_chunk(
+ {
+ message::to_bytes(error::success),
+ to_array(sequence),
+ to_little_endian(height),
+ block_hash,
+ tx.to_data()
+ });
+
+ send(reply_to, address_update2, id, payload);
+}
+
+// Handlers.
+// ----------------------------------------------------------------------------
+
+bool notification_worker::handle_payment(const code& ec,
+ const payment_address& address, uint32_t height,
+ const hash_digest& block_hash, const chain::transaction& tx,
+ const route& reply_to, uint32_t id, const binary& prefix_filter)
+{
+ if (ec)
+ {
+ send(reply_to, address_update, id, message::to_bytes(ec));
+ return false;
+ }
+
+ if (prefix_filter.is_prefix_of(address.hash()))
+ send_payment(reply_to, id, address, height, block_hash, tx);
+
+ return true;
+}
+
+bool notification_worker::handle_stealth(const code& ec,
+ uint32_t prefix, uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx, const route& reply_to, uint32_t id,
+ const binary& prefix_filter)
+{
+ if (ec)
+ {
+ send(reply_to, address_stealth, id, message::to_bytes(ec));
+ return false;
+ }
+
+ if (prefix_filter.is_prefix_of(prefix))
+ send_stealth(reply_to, id, prefix, height, block_hash, tx);
+
+ return true;
+}
+
+bool notification_worker::handle_address(const code& ec,
+ const binary& field, uint32_t height, const hash_digest& block_hash,
+ const chain::transaction& tx, const route& reply_to, uint32_t id,
+ const binary& prefix_filter, sequence_ptr sequence)
+{
+ if (ec)
+ {
+ send(reply_to, address_update2, id, message::to_bytes(ec));
+ return false;
+ }
+
+ if (prefix_filter.is_prefix_of(field))
+ {
+ send_address(reply_to, id, *sequence, height, block_hash, tx);
+ ++(*sequence);
+ }
+
+ return true;
+}
+
+// Subscribers.
+// ----------------------------------------------------------------------------
+
+// Subscribe to address and stealth prefix notifications.
+// Each delegate must connect to the appropriate query notification endpoint.
+void notification_worker::subscribe_address(const route& reply_to, uint32_t id,
+ const binary& prefix_filter, subscribe_type type)
+{
+ static const auto error_code = error::channel_stopped;
+ const auto& duration = settings_.subscription_expiration();
+ const address_key key(reply_to, prefix_filter);
+
+ switch (type)
+ {
+ // v2/v3 (deprecated)
+ case subscribe_type::payment:
+ {
+ // This class must be kept in scope until work is terminated.
+ const auto handler =
+ std::bind(¬ification_worker::handle_payment,
+ this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter);
+
+ payment_subscriber_->subscribe(handler, key, duration, error_code,
+ {}, 0, {}, {});
+ break;
+ }
+
+ // v2/v3 (deprecated)
+ case subscribe_type::stealth:
+ {
+ // This class must be kept in scope until work is terminated.
+ const auto handler =
+ std::bind(¬ification_worker::handle_stealth,
+ this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter);
+
+ stealth_subscriber_->subscribe(handler, key, duration, error_code,
+ 0, 0, {}, {});
+ break;
+ }
+
+ // v3
+ case subscribe_type::unspecified:
+ {
+ // The sequence enables the client to detect dropped messages.
+ const auto sequence = std::make_shared(0);
+
+ // This class must be kept in scope until work is terminated.
+ const auto handler =
+ std::bind(¬ification_worker::handle_address,
+ this, _1, _2, _3, _4, _5, reply_to, id, prefix_filter,
+ sequence);
+
+ // v3
+ address_subscriber_->subscribe(handler, key, duration, error_code,
+ {}, 0, {}, {});
+ break;
+ }
+
+ // v3
+ default:
+ case subscribe_type::unsubscribe:
+ {
+ // Just as with an expiration (purge) this will cause the stored
+ // handler (notification_worker::handle_address) to be invoked but
+ // with the specified error code (error::channel_stopped) as
+ // opposed to error::channel_timeout.
+
+ // v3
+ address_subscriber_->unsubscribe(key, error_code, {}, 0, {}, {});
+ break;
+ }
+ }
+}
+
+// Subscribe to transaction penetration notifications.
+// Each delegate must connect to the appropriate query notification endpoint.
+void notification_worker::subscribe_penetration(const route& reply_to,
+ uint32_t id, const hash_digest& tx_hash)
+{
+ // TODO:
+ // Height and hash are zeroized if tx is not chained (inv/mempool).
+ // If chained or penetration is 100 (percent) drop subscription.
+ // Only send messages at configured thresholds (e.g. 20/40/60/80/100%).
+ // Thresholding allows the server to mask its peer count.
+ // Penetration is computed by the relay handler.
+ // No sequence is required because gaps are okay.
+ // [ tx_hash:32 ]
+ // [ penetration:1 ]
+ // [ height:4 ]
+ // [ block_hash:32 ]
+ ////penetration_subscriber_->subscribe();
+}
+
// Notification (via blockchain).
// ----------------------------------------------------------------------------
@@ -228,20 +521,21 @@ void notification_worker::notify_block(zmq::socket& publisher, uint32_t height,
const auto block_hash = block->header.hash();
- for (const auto tx: block->transactions)
+ for (const auto& tx: block->transactions)
{
const auto tx_hash = tx.hash();
notify_transaction(height, block_hash, tx);
- notify_inventory(height, block_hash, tx_hash);
+ notify_penetration(height, block_hash, tx_hash);
}
}
// Notification (via transaction inventory).
// ----------------------------------------------------------------------------
+// This relies on peers always notifying us of new txs via inv messages.
bool notification_worker::handle_inventory(const code& ec,
- const message::inventory::ptr packet)
+ const bc::message::inventory::ptr packet)
{
if (stopped() || ec == bc::error::service_stopped)
return false;
@@ -255,10 +549,10 @@ bool notification_worker::handle_inventory(const code& ec,
return true;
}
- //*************************************************************************
- // TODO: loop inventories and extract transaction hashes.
- notify_inventory(0, null_hash, packet->inventories.front().hash);
- //*************************************************************************
+ // Loop inventories and extract transaction hashes.
+ for (const auto& inventory: packet->inventories)
+ if (inventory.type == bc::message::inventory_type_id::transaction)
+ notify_penetration(0, null_hash, inventory.hash);
return true;
}
@@ -277,7 +571,7 @@ bool notification_worker::handle_transaction_pool(const code& ec,
log::warning(LOG_SERVER)
<< "Failure handling new transaction: " << ec.message();
- // Don't let a failure here prevent prevent future notifications.
+ // Don't let a failure here prevent future notifications.
return true;
}
@@ -285,342 +579,97 @@ bool notification_worker::handle_transaction_pool(const code& ec,
return true;
}
+// This parsing is duplicated by bc::database::data_base.
void notification_worker::notify_transaction(uint32_t height,
const hash_digest& block_hash, const transaction& tx)
{
- if (stopped())
+ uint32_t prefix;
+
+ // TODO: move full integer and array constructors into binary.
+ static constexpr size_t prefix_bits = sizeof(prefix) * byte_bits;
+ static constexpr size_t address_bits = short_hash_size * byte_bits;
+
+ if (stopped() || tx.outputs.empty())
return;
- //*************************************************************************
- // TODO: loop outputs and extract payment addresses and stealth prefixes.
- uint32_t prefix = 42;
- payment_address address;
+ // see data_base::push_inputs
+ // Loop inputs and extract payment addresses.
+ for (const auto& input: tx.inputs)
+ {
+ const auto address = payment_address::extract(input.script);
+
+ if (address)
+ {
+ const binary field(address_bits, address.hash());
+ notify_address(field, height, block_hash, tx);
+ notify_payment(address, height, block_hash, tx);
+ }
+ }
+
+ // see data_base::push_outputs
+ // Loop outputs and extract payment addresses.
+ for (const auto& output: tx.outputs)
+ {
+ const auto address = payment_address::extract(output.script);
+
+ if (address)
+ {
+ const binary field(address_bits, address.hash());
+ notify_address(field, height, block_hash, tx);
+ notify_payment(address, height, block_hash, tx);
+ }
+ }
- while (false)
+ // see data_base::push_stealth
+ // Loop output pairs and extract stealth payments.
+ for (size_t index = 0; index < (tx.outputs.size() - 1); ++index)
{
- notify_address(address, height, block_hash, tx);
- notify_stealth(prefix, height, block_hash, tx);
+ const auto& ephemeral_script = tx.outputs[index].script;
+ const auto& payment_script = tx.outputs[index + 1].script;
+
+ // Try to extract a stealth prefix from the first output.
+ // Try to extract the payment address from the second output.
+ if (to_stealth_prefix(prefix, ephemeral_script) &&
+ payment_address::extract(payment_script))
+ {
+ const binary field(prefix_bits, to_little_endian(prefix));
+ notify_address(field, height, block_hash, tx);
+ notify_stealth(prefix, height, block_hash, tx);
+ }
}
- //*************************************************************************
}
-// TODO: add a sequence value to reply so client can detect dropped message.
-void notification_worker::notify_address(const payment_address& address,
+// v2/v3 (deprecated)
+void notification_worker::notify_payment(const payment_address& address,
uint32_t height, const hash_digest& block_hash, const transaction& tx)
{
- // [ address.version:1 ]
- // [ address.hash:20 ]
- // [ height:4 ]
- // [ block_hash:32 ]
- // [ tx:... ]
- address_subscriber_->relay(address, height, block_hash, tx);
+ static const auto code = error::success;
+ payment_subscriber_->relay(code, address, height, block_hash, tx);
}
-// TODO: add a sequence value to reply so client can detect dropped message.
+// v2/v3 (deprecated)
void notification_worker::notify_stealth(uint32_t prefix, uint32_t height,
const hash_digest& block_hash, const transaction& tx)
{
- // [ prefix:4 ]
- // [ height:4 ]
- // [ block_hash:32 ]
- // [ tx:... ]
- stealth_subscriber_->relay(prefix, height, block_hash, tx);
+ static const auto code = error::success;
+ stealth_subscriber_->relay(code, prefix, height, block_hash, tx);
}
-// No sequence is required as penetration is monotonically increasing.
-void notification_worker::notify_inventory(uint32_t height,
- const hash_digest& block_hash, const hash_digest& tx_hash)
+// v3
+void notification_worker::notify_address(const binary& field, uint32_t height,
+ const hash_digest& block_hash, const transaction& tx)
{
- // Only provide height and hash if chained.
- // If chained or penetration is 100 (percent) drop subscription.
- // Only send messages at configured thresholds (e.g. 20/40/60/80/100%).
- // Thresholding allows the server to mask its peer count.
- // [ tx_hash:32 ]
- // [ penetration:1 ]
- // [ height:4 ]
- // [ block_hash:32 ]
- inventory_subscriber_->relay(height, block_hash, tx_hash);
+ static const auto code = error::success;
+ address_subscriber_->relay(code, field, height, block_hash, tx);
}
-// Subscribers.
-// ----------------------------------------------------------------------------
-
-/////// Subscribe to address and stealth prefix notifications.
-////void notification_worker::subscribe_address(route& reply_to,
-//// binary& prefix_filter, subscribe_type& type)
-////{
-//// // Provide delegate that binds the above parameters and parameterizes
-//// // the appropriate subscriber args, sending the notification,
-//// // The delegate must connect back to a query notification endpoint.
-//// address_subscriber_->subscribe();
-//// stealth_subscriber_->subscribe();
-////}
-////
-/////// Subscribe to transaction radar notifications.
-////void notification_worker::subscribe_radar(route& reply_to,
-//// hash_digest& tx_hash)
-////{
-//// // Provide delegate that binds the above parameters and parameterizes
-//// // the subsciber args, sending the notification and updating state.
-//// // The delegate must connect back to a query notification endpoint.
-//// inventory_subscriber_->subscribe();
-////}
-
-
-// reference
-// ----------------------------------------------------------------------------
-
-////void notification_worker::subscribe(const incoming& request, send_handler handler)
-////{
-//// const auto ec = create(request, handler);
-////
-//// // Send response.
-//// handler(outgoing(request, ec));
-////}
-////
-////// Create new subscription entry.
-////code notification_worker::create(const incoming& request, send_handler handler)
-////{
-//// subscription_record record;
-////
-//// if (!deserialize(record.prefix, record.type, request.data))
-//// return error::bad_stream;
-////
-//// record.expiry_time = now() + settings_.subscription_expiration();
-//// record.locator.handler = std::move(handler);
-//// record.locator.address1 = request.address1;
-//// record.locator.address2 = request.address2;
-//// record.locator.delimited = request.delimited;
-////
-//// ///////////////////////////////////////////////////////////////////////////
-//// // Critical Section
-//// mutex_.lock_upgrade();
-////
-//// if (subscriptions_.size() >= settings_.subscription_limit)
-//// {
-//// mutex_.unlock_upgrade();
-//// //---------------------------------------------------------------------
-//// return error::pool_filled;
-//// }
-////
-//// mutex_.unlock_upgrade_and_lock();
-//// //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-//// subscriptions_.emplace_back(record);
-////
-//// mutex_.unlock();
-//// ///////////////////////////////////////////////////////////////////////////
-////
-//// // This is the end of the subscribe sequence.
-//// return error::success;
-////}
-////
-////// Renew sequence.
-////// ----------------------------------------------------------------------------
-////
-////void notification_worker::renew(const incoming& request, send_handler handler)
-////{
-//// const auto ec = update(request, handler);
-////
-//// // Send response.
-//// handler(outgoing(request, error::success));
-////}
-////
-////// Find subscription record and update expiration.
-////code notification_worker::update(const incoming& request, send_handler handler)
-////{
-//// binary prefix;
-//// subscribe_type type;
-////
-//// if (!deserialize(prefix, type, request.data))
-//// return error::bad_stream;
-////
-//// ///////////////////////////////////////////////////////////////////////////
-//// // Critical Section
-//// mutex_.lock_upgrade();
-////
-//// const auto expiry_time = now() + settings_.subscription_expiration();
-////
-//// for (auto& subscription: subscriptions_)
-//// {
-//// // TODO: is address1 correct and sufficient?
-//// if (subscription.type != type ||
-//// subscription.locator.address1 != request.address1 ||
-//// !subscription.prefix.is_prefix_of(prefix))
-//// {
-//// continue;
-//// }
-////
-//// mutex_.unlock_upgrade_and_lock();
-//// //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
-//// subscription.expiry_time = expiry_time;
-//// //---------------------------------------------------------------------
-//// mutex_.unlock_and_lock_upgrade();
-//// }
-////
-//// mutex_.unlock_upgrade();
-//// ///////////////////////////////////////////////////////////////////////////
-////
-//// // This is the end of the renew sequence.
-//// return error::success;
-////}
-////
-////// Pruning sequence.
-////// ----------------------------------------------------------------------------
-////
-////// Delete entries that have expired.
-////size_t notification_worker::prune()
-////{
-//// ///////////////////////////////////////////////////////////////////////////
-//// // Critical Section
-//// unique_lock(mutex_);
-////
-//// const auto current_time = now();
-//// const auto start_size = subscriptions_.size();
-////
-//// for (auto it = subscriptions_.begin(); it != subscriptions_.end();)
-//// {
-//// if (current_time < it->expiry_time)
-//// ++it;
-//// else
-//// it = subscriptions_.erase(it);
-//// }
-////
-//// // This is the end of the pruning sequence.
-//// return start_size - subscriptions_.size();
-//// ///////////////////////////////////////////////////////////////////////////
-////}
-////
-////// Scan sequence.
-////// ----------------------------------------------------------------------------
-////
-////void notification_worker::receive_block(uint32_t height, const block::ptr block)
-////{
-//// const auto hash = block->header.hash();
-////
-//// for (const auto& transaction: block->transactions)
-//// scan(height, hash, transaction);
-////
-//// prune();
-////}
-////
-////void notification_worker::receive_transaction(const transaction& transaction)
-////{
-//// scan(0, null_hash, transaction);
-////}
-////
-////void notification_worker::scan(uint32_t height, const hash_digest& block_hash,
-//// const transaction& tx)
-////{
-//// for (const auto& input: tx.inputs)
-//// {
-//// const auto address = payment_address::extract(input.script);
-////
-//// if (address)
-//// post_updates(address, height, block_hash, tx);
-//// }
-////
-//// uint32_t prefix;
-//// for (const auto& output: tx.outputs)
-//// {
-//// const auto address = payment_address::extract(output.script);
-////
-//// if (address)
-//// post_updates(address, height, block_hash, tx);
-//// else if (to_stealth_prefix(prefix, output.script))
-//// post_stealth_updates(prefix, height, block_hash, tx);
-//// }
-////}
-////
-////void notification_worker::post_updates(const payment_address& address,
-//// uint32_t height, const hash_digest& block_hash, const transaction& tx)
-////{
-//// subscription_locators locators;
-////
-//// ///////////////////////////////////////////////////////////////////////////
-//// // Critical Section
-//// mutex_.lock_shared();
-////
-//// for (const auto& subscription: subscriptions_)
-//// if (subscription.type == subscribe_type::address &&
-//// subscription.prefix.is_prefix_of(address.hash()))
-//// locators.push_back(subscription.locator);
-////
-//// mutex_.unlock_shared();
-//// ///////////////////////////////////////////////////////////////////////////
-////
-//// if (locators.empty())
-//// return;
-////
-//// // [ address.version:1 ]
-//// // [ address.hash:20 ]
-//// // [ height:4 ]
-//// // [ block_hash:32 ]
-//// // [ tx ]
-//// const auto data = build_chunk(
-//// {
-//// to_array(address.version()),
-//// address.hash(),
-//// to_little_endian(height),
-//// block_hash,
-//// tx.to_data()
-//// });
-////
-//// // Send the result to everyone interested.
-//// for (const auto& locator: locators)
-//// locator.handler(outgoing("address.update", data, locator.address1,
-//// locator.address2, locator.delimited));
-////
-//// // This is the end of the scan address sequence.
-////}
-////
-////void notification_worker::post_stealth_updates(uint32_t prefix, uint32_t height,
-//// const hash_digest& block_hash, const transaction& tx)
-////{
-//// subscription_locators locators;
-////
-//// ///////////////////////////////////////////////////////////////////////////
-//// // Critical Section
-//// mutex_.lock_shared();
-////
-//// for (const auto& subscription: subscriptions_)
-//// if (subscription.type == subscribe_type::stealth &&
-//// subscription.prefix.is_prefix_of(prefix))
-//// locators.push_back(subscription.locator);
-////
-//// mutex_.unlock_shared();
-//// ///////////////////////////////////////////////////////////////////////////
-////
-//// if (locators.empty())
-//// return;
-////
-//// // [ prefix:4 ]
-//// // [ height:4 ]
-//// // [ block_hash:32 ]
-//// // [ tx ]
-//// const auto data = build_chunk(
-//// {
-//// to_little_endian(prefix),
-//// to_little_endian(height),
-//// block_hash,
-//// tx.to_data()
-//// });
-////
-//// // Send the result to everyone interested.
-//// for (const auto& locator: locators)
-//// locator.handler(outgoing("address.stealth_update", data,
-//// locator.address1, locator.address2, locator.delimited));
-////
-//// // This is the end of the scan stealth sequence.
-////}
-////
-////// Utilities
-////// ----------------------------------------------------------------------------
-////
-////ptime notification_worker::now()
-////{
-//// return second_clock::universal_time();
-////};
-
+// v3.x
+void notification_worker::notify_penetration(uint32_t height,
+ const hash_digest& block_hash, const hash_digest& tx_hash)
+{
+ static const auto code = error::success;
+ penetration_subscriber_->relay(code, height, block_hash, tx_hash);
+}
} // namespace server
} // namespace libbitcoin
diff --git a/src/workers/query_worker.cpp b/src/workers/query_worker.cpp
index cfa9b81e..c5a2d2ce 100644
--- a/src/workers/query_worker.cpp
+++ b/src/workers/query_worker.cpp
@@ -27,8 +27,7 @@
#include
#include
#include
-#include
-#include
+#include
#include
namespace libbitcoin {
@@ -123,17 +122,17 @@ void query_worker::query(zmq::socket& router)
// TODO: rewrite the serial blockchain interface to avoid callbacks.
// We are using a closure vs. bind to take advantage of move arg syntax.
- const auto sender = [&router](outgoing&& response)
+ const auto sender = [&router](message&& response)
{
const auto ec = response.send(router);
if (ec && ec != error::service_stopped)
log::warning(LOG_SERVER)
- << "Failed to send query response to " << response.address()
- << ec.message();
+ << "Failed to send query response to "
+ << response.route().display() << " " << ec.message();
};
- incoming request;
+ message request(secure_);
const auto ec = request.receive(router);
if (ec == error::service_stopped)
@@ -142,29 +141,30 @@ void query_worker::query(zmq::socket& router)
if (ec)
{
log::debug(LOG_SERVER)
- << "Failed to receive query from " << request.address()
- << ec.message();
+ << "Failed to receive query from " << request.route().display()
+ << " " << ec.message();
// Because the query did not parse this is likely to be misaddressed.
- sender(outgoing(request, ec));
+ sender(message(request, ec));
return;
}
// Locate the request handler for this command.
- const auto handler = command_handlers_.find(request.command);
+ const auto handler = command_handlers_.find(request.command());
if (handler == command_handlers_.end())
{
log::debug(LOG_SERVER)
- << "Invalid query command from " << request.address();
+ << "Invalid query command from " << request.route().display();
- sender(outgoing(request, error::not_found));
+ sender(message(request, error::not_found));
return;
}
if (settings_.log_requests)
log::info(LOG_SERVER)
- << "Query " << request.command << " from " << request.address();
+ << "Query " << request.command() << " from "
+ << request.route().display();
// The query executor is the delegate bound by the attach method.
const auto& query_execute = handler->second;
@@ -190,30 +190,29 @@ void query_worker::attach(const std::string& command,
command_handlers_[command] = handler;
}
+//=============================================================================
// TODO: add to client:
-// protocol.total_connections
// blockchain.fetch_spend
// blockchain.fetch_block_transaction_hashes
-//------------------------------------------
-// TODO: add to server:
-// transaction_radar.subscribe
-// electrum.subscribe
-// electrum.fetch_history
-//------------------------------------------
-// TODO: remove protocol.total_connections (administrative) and
-// create administrative query channel (secure only).
-// This will require that client public keys be associated to a ZAP domain.
-//------------------------------------------
+//=============================================================================
// address.fetch_history was present in v1 (obelisk) and v2 (server).
// address.fetch_history was called by client v1 (sx) and v2 (bx).
-// address.renew was present in v2 (server) and dropped in v3
-// address.subscribe performs renewal (as necessary) in v3
-//------------------------------------------
+//-----------------------------------------------------------------------------
+// address.renew is deprecated in v3.
+// address.subscribe is deprecated in v3.
+// address.subscribe2 is new in v3, also call for renew.
+// address.unsubscribe2 is new in v3 (there was never an address.unsubscribe).
+//-----------------------------------------------------------------------------
+//// protocol.broadcast_transaction is deprecated in v3 (deferred).
+//// transaction_pool.broadcast (with radar) is new in v3 (deferred).
+//=============================================================================
// Interface class.method names must match protocol (do not change).
void query_worker::attach_interface()
{
- // Queries (request-response).
- ////ATTACH(electrum, fetch_history, node_);
+ ATTACH(address, renew, node_);
+ ATTACH(address, subscribe, node_);
+ ATTACH(address, subscribe2, node_);
+ ATTACH(address, unsubscribe2, node_);
ATTACH(address, fetch_history2, node_);
ATTACH(blockchain, fetch_history, node_);
ATTACH(blockchain, fetch_block_header, node_);
@@ -226,13 +225,9 @@ void query_worker::attach_interface()
ATTACH(blockchain, fetch_stealth, node_);
ATTACH(transaction_pool, fetch_transaction, node_);
ATTACH(transaction_pool, validate, node_);
- ATTACH(protocol, total_connections, node_);
+ ////ATTACH(transaction_pool, broadcast, node_);
ATTACH(protocol, broadcast_transaction, node_);
-
- // Notifications (subscription response with subsequent notifications).
- ATTACH(address, subscribe, node_);
- ////ATTACH(electrum, subscribe, node_);
- ////ATTACH(transaction_radar, subscribe, node_);
+ ATTACH(protocol, total_connections, node_);
}
#undef ATTACH