diff --git a/Makefile.am b/Makefile.am index 22f62353..b3654be5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -57,6 +57,7 @@ src_libbitcoin_server_la_SOURCES = \ src/protocols/electrum/protocol_electrum_mempool.cpp \ src/protocols/electrum/protocol_electrum_outputs.cpp \ src/protocols/electrum/protocol_electrum_scripthash.cpp \ + src/protocols/electrum/protocol_electrum_scripthash_subscribe.cpp \ src/protocols/electrum/protocol_electrum_scriptpubkey.cpp \ src/protocols/electrum/protocol_electrum_server.cpp \ src/protocols/electrum/protocol_electrum_transactions.cpp \ diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj index b928e3a0..623d2c23 100644 --- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj +++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj @@ -139,6 +139,7 @@ + diff --git a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters index 8f672155..c84516db 100644 --- a/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters +++ b/builds/msvc/vs2022/libbitcoin-server/libbitcoin-server.vcxproj.filters @@ -117,6 +117,9 @@ src\protocols\electrum + + src\protocols\electrum + src\protocols\electrum diff --git a/include/bitcoin/server/error.hpp b/include/bitcoin/server/error.hpp index c64c451f..2ad3586c 100644 --- a/include/bitcoin/server/error.hpp +++ b/include/bitcoin/server/error.hpp @@ -61,6 +61,7 @@ enum error_t : uint8_t not_found, not_implemented, invalid_argument, + subscription_limit, unsupported_argument, unconfirmable_transaction, argument_overflow, diff --git a/include/bitcoin/server/protocols/protocol_electrum.hpp b/include/bitcoin/server/protocols/protocol_electrum.hpp index df5286b0..32605c96 100644 --- a/include/bitcoin/server/protocols/protocol_electrum.hpp +++ b/include/bitcoin/server/protocols/protocol_electrum.hpp @@ -19,8 +19,9 @@ #ifndef LIBBITCOIN_SERVER_PROTOCOLS_PROTOCOL_ELECTRUM_HPP #define LIBBITCOIN_SERVER_PROTOCOLS_PROTOCOL_ELECTRUM_HPP +#include #include -#include +#include #include #include #include @@ -52,6 +53,7 @@ class BCS_API protocol_electrum system::wallet::payment_address::mainnet_p2sh : system::wallet::payment_address::testnet_p2sh), channel_(std::dynamic_pointer_cast(channel)), + notification_strand_(channel_->service().get_executor()), network::tracker(session->log) { } @@ -227,48 +229,77 @@ class BCS_API protocol_electrum void do_get_history(const hash_digest& hash) NOEXCEPT; void do_get_mempool(const hash_digest& hash) NOEXCEPT; void do_list_unspent(const hash_digest& hash) NOEXCEPT; - void do_status(const hash_digest& hash, - const status_handler& sender) NOEXCEPT; - - void complete_get_balance(const code& ec, uint64_t confirmed, - int64_t unconfirmed) NOEXCEPT; - void complete_get_history(const code& ec, - const histories& histories) NOEXCEPT; - void complete_get_mempool(const code& ec, - const histories& histories) NOEXCEPT; - void complete_list_unspent(const code& ec, - const unspents& unspents) NOEXCEPT; - void complete_status(const code& ec, const hash_digest& hash, - const hash_digest& status, const status_handler& sender) NOEXCEPT; - - void send_status(const code& ec, const hash_digest& hash, - const hash_digest& status) NOEXCEPT; - void notify_status(const code& ec, const hash_digest& hash, - const hash_digest& status, notify_t type, - node::header_t link) NOEXCEPT; - - /// Notification senders and send handlers. + + void complete_get_balance(const code& ec, uint64_t confirmed, int64_t unconfirmed) NOEXCEPT; + void complete_get_history(const code& ec, const histories& histories) NOEXCEPT; + void complete_get_mempool(const code& ec, const histories& histories) NOEXCEPT; + void complete_list_unspent(const code& ec, const unspents& unspents) NOEXCEPT; + + /// Notification event handlers. /// ----------------------------------------------------------------------- void do_height(node::header_t link) NOEXCEPT; void do_header(node::header_t link) NOEXCEPT; void do_outpoint(node::header_t link) NOEXCEPT; void do_scripthash(node::header_t link) NOEXCEPT; + void do_regressed(node::header_t link) NOEXCEPT; + + /// Address. + /// ----------------------------------------------------------------------- + + // subscription. + void scripthash_subscribe(const hash_digest& hash, + notify_t type) NOEXCEPT; + void do_scripthash_subscribe(const hash_digest& hash, + notify_t type) NOEXCEPT; + void complete_scripthash_subscribe(const code& ec, + hash_digest& status, const hash_digest& hash) NOEXCEPT; + + // unsubscription. + void scripthash_unsubscribe(const hash_digest& hash) NOEXCEPT; + void do_scripthash_unsubscribe(const hash_digest& hash) NOEXCEPT; + void complete_scripthash_unsubscribe(bool found) NOEXCEPT; + + // notification (do_scripthash()). + void scripthash_notify(const hash_digest& status, const hash_digest& hash, + notify_t type) NOEXCEPT; + + /// Outpoint. + /// ----------------------------------------------------------------------- + + // subscription (do_outpoint()). + bool get_outpoint_status(interface::object_t& status, + const system::chain::point& prevout) const NOEXCEPT; + bool send_outpoint_status(const system::chain::point& prevout, + const std::string& spk_hint) NOEXCEPT; + + // unsubscription. + // notification. /// Utilities. /// ----------------------------------------------------------------------- + /// The negotiated version is at least the specified level. inline bool at_least(server::electrum::version version) const NOEXCEPT { return channel_->version() >= version; } + /// Configuration options. inline const options_t& options() const NOEXCEPT { return options_; } private: + // Post to notification strand. + template + inline auto notify(Method&& method, Args&&... args) NOEXCEPT + { + return boost::asio::post(notification_strand_, + BIND_SAFE(BIND_SHARED(method, args))); + } + // Status hash optimization (~200 bytes). struct midstate { @@ -277,6 +308,14 @@ class BCS_API protocol_electrum system::hash::sha256::fast writer{ stream }; }; + // Subscription to address/scripthash/scruptpubkey. + struct subscription + { + notify_t type{}; + midstate state{}; + database::address_link cursor{}; + }; + // Aliases. using array_t = network::rpc::array_t; using object_t = network::rpc::object_t; @@ -284,6 +323,10 @@ class BCS_API protocol_electrum static constexpr electrum::version minimum = version_t::minimum; static constexpr electrum::version maximum = version_t::maximum; + // Scripthash status. + code get_scripthash_status(hash_digest& out, subscription& sub, + const hash_digest& hash) NOEXCEPT; + // Transformations. static std::string to_method_name(notify_t type) NOEXCEPT; static array_t transform(const unspents& unspents) NOEXCEPT; @@ -304,14 +347,6 @@ class BCS_API protocol_electrum code validate_tx(const system::chain::transaction& tx) const NOEXCEPT; code broadcast_tx(const system::chain::transaction::cptr& tx) NOEXCEPT; - // Shared send/get implementations. - void send_scripthash_unsubscribe(const hash_digest& hash) NOEXCEPT; - void send_scripthash_subscribe(const hash_digest& hash) NOEXCEPT; - bool send_outpoint_status(const system::chain::point& prevout, - const std::string& spk_hint) NOEXCEPT; - bool get_outpoint_status(object_t& status, - const system::chain::point& prevout) const NOEXCEPT; - // These are thread safe. const options_t& options_; const bool turbo_; @@ -320,14 +355,18 @@ class BCS_API protocol_electrum std::atomic_bool stopping_{}; std::atomic_bool subscribed_height_{}; std::atomic_bool subscribed_header_{}; + std::atomic_bool subscribed_address_{}; std::atomic_bool subscribed_outpoint_{}; - std::atomic_bool subscribed_scripthash_{}; // This is mostly thread safe, and used in a thread safe manner. const channel_t::ptr channel_; - // This is protected by strand. - std::unordered_set subscriptions_{}; + // This is thread safe, uses network threadpool. + network::asio::strand notification_strand_; + + // These are protected by notification strand. + std::map address_subscriptions_{}; + std::set outpoint_subscriptions_{}; }; } // namespace server diff --git a/src/error.cpp b/src/error.cpp index e138b927..1eef4f26 100644 --- a/src/error.cpp +++ b/src/error.cpp @@ -51,6 +51,7 @@ DEFINE_ERROR_T_MESSAGE_MAP(error) { not_found, "not_found" }, { not_implemented, "not_implemented" }, { invalid_argument, "invalid_argument" }, + { subscription_limit, "subscription_limit" }, { unsupported_argument, "unsupported_argument" }, { unconfirmable_transaction, "unconfirmable_transaction" }, { argument_overflow, "argument_overflow" }, diff --git a/src/protocols/electrum/protocol_electrum.cpp b/src/protocols/electrum/protocol_electrum.cpp index 76da0496..e10ee9ff 100644 --- a/src/protocols/electrum/protocol_electrum.cpp +++ b/src/protocols/electrum/protocol_electrum.cpp @@ -28,6 +28,7 @@ namespace libbitcoin { namespace server { #define CLASS protocol_electrum +#define NOTIFY(method, ...) notify(&CLASS::method, __VA_ARGS__) using namespace system; using namespace network::rpc; @@ -47,7 +48,6 @@ BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) void protocol_electrum::start() NOEXCEPT { BC_ASSERT(stranded()); - if (started()) return; @@ -158,18 +158,26 @@ bool protocol_electrum::handle_event(const code&, node::chase event_, if (subscribed_outpoint_.load(relaxed)) { BC_ASSERT(std::holds_alternative(value)); - POST(do_outpoint, std::get(value)); + NOTIFY(do_outpoint, std::get(value)); } - if (subscribed_scripthash_.load(relaxed)) + if (subscribed_address_.load(relaxed)) { BC_ASSERT(archive().address_enabled()); BC_ASSERT(std::holds_alternative(value)); - POST(do_scripthash, std::get(value)); + NOTIFY(do_scripthash, std::get(value)); } break; } + case node::chase::regressed: + case node::chase::disorganized: + { + // value is regression branch_point. + BC_ASSERT(std::holds_alternative(value)); + NOTIFY(do_regressed, std::get(value)); + break; + } default: { break; @@ -179,7 +187,7 @@ bool protocol_electrum::handle_event(const code&, node::chase event_, return true; } -// notifications +// height/header notifications. // ---------------------------------------------------------------------------- // Each notification is an independent message. @@ -187,7 +195,6 @@ bool protocol_electrum::handle_event(const code&, node::chase event_, void protocol_electrum::do_height(node::header_t link) NOEXCEPT { BC_ASSERT(stranded()); - const auto& query = archive(); const auto height = query.get_height(link); @@ -207,7 +214,6 @@ void protocol_electrum::do_height(node::header_t link) NOEXCEPT void protocol_electrum::do_header(node::header_t link) NOEXCEPT { BC_ASSERT(stranded()); - const auto& query = archive(); const auto height = query.get_height(link); const auto header = query.get_wire_header(link); @@ -225,82 +231,6 @@ void protocol_electrum::do_header(node::header_t link) NOEXCEPT }, 64, BIND(complete, _1)); } -// Notifier for blockchain_outpoint_subscribe events. -void protocol_electrum::do_outpoint(node::header_t link) NOEXCEPT -{ - BC_ASSERT(stranded()); - - // TODO: get prevout from event. - /////////////////////////////////////////////////////////////////////////// - chain::point prevout{}; - /////////////////////////////////////////////////////////////////////////// - - object_t status{}; - if (!get_outpoint_status(status, prevout)) - { - LOGF("Electrum::do_outpoint, outpoint not found (" << link << ")."); - return; - } - - send_notification("blockchain.outpoint.subscribe", array_t - { - array_t{ encode_hash(prevout.hash()), prevout.index() }, - std::move(status) - }, 128, BIND(handle_send, _1)); -} - -// Notifier for blockchain_scripthash_subscribe events. -void protocol_electrum::do_scripthash(node::header_t link) NOEXCEPT -{ - BC_ASSERT(stranded()); - - // TODO: get hash/type from event. - /////////////////////////////////////////////////////////////////////////// - hash_digest hash{}; - constexpr auto type = notify_t::scripthash; - /////////////////////////////////////////////////////////////////////////// - - // Address status is long-running, so cannot tie up strand. - PARALLEL(do_status, hash, BIND(notify_status, _1, _2, _3, type, link)); -} - -void protocol_electrum::notify_status(const code& ec, const hash_digest& hash, - const hash_digest& status, notify_t type, node::header_t link) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (ec) - { - LOGF("Electrum::do_scripthash, address not found (" << link << ")."); - return; - } - - send_notification(to_method_name(type), array_t - { - encode_hash(hash), - status == null_hash ? value_t{} : value_t{ encode_hash(status) } - }, 128, BIND(complete, _1)); -} - -// utilities -// ---------------------------------------------------------------------------- -// private/static - -// Convert enumeration to json-rpc notification method name. -std::string protocol_electrum::to_method_name(notify_t type) NOEXCEPT -{ - switch (type) - { - case notify_t::address: - return "blockchain.address.subscribe"; - case notify_t::scripthash: - return "blockchain.scripthash.subscribe"; - default: - case notify_t::scriptpubkey: - return "blockchain.scriptpubkey.subscribe"; - } -} - BC_POP_WARNING() } // namespace server diff --git a/src/protocols/electrum/protocol_electrum_addresses.cpp b/src/protocols/electrum/protocol_electrum_addresses.cpp index 4a1e8d00..7efda6c8 100644 --- a/src/protocols/electrum/protocol_electrum_addresses.cpp +++ b/src/protocols/electrum/protocol_electrum_addresses.cpp @@ -118,7 +118,7 @@ void protocol_electrum::handle_blockchain_address_subscribe(const code& ec, return; } - send_scripthash_subscribe(hash); + scripthash_subscribe(hash, notify_t::address); } // utilities diff --git a/src/protocols/electrum/protocol_electrum_headers.cpp b/src/protocols/electrum/protocol_electrum_headers.cpp index 85f741c9..c6960c11 100644 --- a/src/protocols/electrum/protocol_electrum_headers.cpp +++ b/src/protocols/electrum/protocol_electrum_headers.cpp @@ -31,6 +31,7 @@ namespace server { using namespace system; using namespace network::rpc; using namespace std::placeholders; +constexpr auto relaxed = std::memory_order_relaxed; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) @@ -46,7 +47,7 @@ void protocol_electrum::handle_blockchain_number_of_blocks_subscribe( return; } - subscribed_height_.store(true, std::memory_order_relaxed); + subscribed_height_.store(true, relaxed); const auto top_height = archive().get_top_confirmed(); send_result(top_height, 42, BIND(complete, _1)); } @@ -362,7 +363,7 @@ void protocol_electrum::handle_blockchain_headers_subscribe(const code& ec, return; } - subscribed_header_.store(true, std::memory_order_relaxed); + subscribed_header_.store(true, relaxed); send_result(object_t { { "height", top }, diff --git a/src/protocols/electrum/protocol_electrum_outputs.cpp b/src/protocols/electrum/protocol_electrum_outputs.cpp index b57fbc5d..2cb12242 100644 --- a/src/protocols/electrum/protocol_electrum_outputs.cpp +++ b/src/protocols/electrum/protocol_electrum_outputs.cpp @@ -29,6 +29,7 @@ namespace server { using namespace system; using namespace network::rpc; using namespace std::placeholders; +constexpr auto relaxed = std::memory_order_relaxed; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) @@ -36,6 +37,7 @@ void protocol_electrum::handle_blockchain_utxo_get_address(const code& ec, rpc_interface::blockchain_utxo_get_address, const std::string& tx_hash, double index) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -84,6 +86,7 @@ void protocol_electrum::handle_blockchain_outpoint_get_status(const code& ec, rpc_interface::blockchain_outpoint_get_status, const std::string& tx_hash, double txout_idx, const std::string& spk_hint) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -109,6 +112,7 @@ void protocol_electrum::handle_blockchain_outpoint_subscribe(const code& ec, rpc_interface::blockchain_outpoint_subscribe, const std::string& tx_hash, double txout_idx, const std::string& spk_hint) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -126,20 +130,21 @@ void protocol_electrum::handle_blockchain_outpoint_subscribe(const code& ec, return; } - chain::point prevout{ hash, index }; - if (!send_outpoint_status(prevout, spk_hint)) - return; - // TODO: subscribe. /////////////////////////////////////////////////////////////////////////// - subscribed_outpoint_.store(true, std::memory_order_relaxed); + subscribed_outpoint_.store(true, relaxed); /////////////////////////////////////////////////////////////////////////// + + chain::point prevout{ hash, index }; + if (!send_outpoint_status(prevout, spk_hint)) + return; } void protocol_electrum::handle_blockchain_outpoint_unsubscribe(const code& ec, rpc_interface::blockchain_outpoint_unsubscribe, const std::string& tx_hash, double txout_idx) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -161,18 +166,70 @@ void protocol_electrum::handle_blockchain_outpoint_unsubscribe(const code& ec, // TODO: unsubscribe. /////////////////////////////////////////////////////////////////////////// - const auto prior = subscribed_outpoint_.load(std::memory_order_relaxed); + const auto prior = subscribed_outpoint_.load(relaxed); /////////////////////////////////////////////////////////////////////////// send_result(prior, 16, BIND(complete, _1)); } +// notification. +// ============================================================================ + +// Notifier for blockchain_outpoint_subscribe events. +void protocol_electrum::do_outpoint(node::header_t link) NOEXCEPT +{ + BC_ASSERT(notification_strand_.running_in_this_thread()); + + // TODO: get prevout from event. + /////////////////////////////////////////////////////////////////////////// + chain::point prevout{}; + /////////////////////////////////////////////////////////////////////////// + + object_t status{}; + if (!get_outpoint_status(status, prevout)) + { + LOGF("Electrum::do_outpoint, outpoint not found (" << link << ")."); + return; + } + + // TODO: post_notification(bounce to network strand). + send_notification("blockchain.outpoint.subscribe", array_t + { + array_t{ encode_hash(prevout.hash()), prevout.index() }, + std::move(status) + }, 128, BIND(handle_send, _1)); +} + +// ============================================================================ + // utility. // ---------------------------------------------------------------------------- +bool protocol_electrum::get_outpoint_status(object_t& status, + const chain::point& prevout) const NOEXCEPT +{ + // May be on either network or notification strand (thread safe). + + const auto& query = archive(); + const auto out = query.get_tx_history(prevout.hash()); + if (!out.tx.is_valid()) + return false; + + status = { { "height", to_unsigned(out.tx.height()) } }; + if (const auto ins = query.get_spenders_history(prevout); !ins.empty()) + { + status["spender_txhash"] = encode_hash(ins.front().tx.hash()); + status["spender_height"] = to_unsigned(ins.front().tx.height()); + } + + return true; +} + bool protocol_electrum::send_outpoint_status(const chain::point& prevout, const std::string& spk_hint) NOEXCEPT { + BC_ASSERT(stranded()); + // This is parsed for correctness but is not used. // Script is advisory, and should match output script. if (!spk_hint.empty()) @@ -203,24 +260,6 @@ bool protocol_electrum::send_outpoint_status(const chain::point& prevout, return true; } -bool protocol_electrum::get_outpoint_status(object_t& status, - const chain::point& prevout) const NOEXCEPT -{ - const auto& query = archive(); - const auto out = query.get_tx_history(prevout.hash()); - if (!out.tx.is_valid()) - return false; - - status = { { "height", to_unsigned(out.tx.height()) } }; - if (const auto ins = query.get_spenders_history(prevout); !ins.empty()) - { - status["spender_txhash"] = encode_hash(ins.front().tx.hash()); - status["spender_height"] = to_unsigned(ins.front().tx.height()); - } - - return true; -} - BC_POP_WARNING() } // namespace server diff --git a/src/protocols/electrum/protocol_electrum_scripthash.cpp b/src/protocols/electrum/protocol_electrum_scripthash.cpp index 781f44b6..6f78cc2e 100644 --- a/src/protocols/electrum/protocol_electrum_scripthash.cpp +++ b/src/protocols/electrum/protocol_electrum_scripthash.cpp @@ -25,14 +25,13 @@ namespace libbitcoin { namespace server { #define CLASS protocol_electrum +#define NOTIFY(method, ...) notify(&CLASS::method, __VA_ARGS__) using namespace system; using namespace network::rpc; using namespace std::placeholders; BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) -BC_PUSH_WARNING(SMART_PTR_NOT_NEEDED) -BC_PUSH_WARNING(NO_VALUE_OR_CONST_REF_SHARED_PTR) // get_balance // ---------------------------------------------------------------------------- @@ -42,7 +41,6 @@ void protocol_electrum::handle_blockchain_scripthash_get_balance(const code& ec, const std::string& scripthash) NOEXCEPT { BC_ASSERT(stranded()); - if (stopped(ec)) return; @@ -57,8 +55,10 @@ void protocol_electrum::handle_blockchain_scripthash_get_balance(const code& ec, get_balance(hash); } +// common void protocol_electrum::get_balance(const hash_digest& hash) NOEXCEPT { + BC_ASSERT(stranded()); if (hash == null_hash) { send_code(error::invalid_argument); @@ -78,7 +78,6 @@ void protocol_electrum::get_balance(const hash_digest& hash) NOEXCEPT void protocol_electrum::do_get_balance(const hash_digest& hash) NOEXCEPT { BC_ASSERT(!stranded()); - const auto& query = archive(); uint64_t confirmed{}, unconfirmed{}; auto ec = query.get_balance(stopping_, confirmed, unconfirmed, hash); @@ -89,7 +88,6 @@ void protocol_electrum::complete_get_balance(const code& ec, uint64_t confirmed, int64_t unconfirmed) NOEXCEPT { BC_ASSERT(stranded()); - monitor(false); if (stopped()) return; @@ -115,6 +113,7 @@ void protocol_electrum::handle_blockchain_scripthash_get_history(const code& ec, rpc_interface::blockchain_scripthash_get_history, const std::string& scripthash) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -129,8 +128,10 @@ void protocol_electrum::handle_blockchain_scripthash_get_history(const code& ec, get_history(hash); } +// common void protocol_electrum::get_history(const system::hash_digest& hash) NOEXCEPT { + BC_ASSERT(stranded()); if (hash == null_hash) { send_code(error::invalid_argument); @@ -150,7 +151,6 @@ void protocol_electrum::get_history(const system::hash_digest& hash) NOEXCEPT void protocol_electrum::do_get_history(const hash_digest& hash) NOEXCEPT { BC_ASSERT(!stranded()); - histories histories{}; const auto& query = archive(); const auto ec = query.get_history(stopping_, histories, hash, turbo_); @@ -161,7 +161,6 @@ void protocol_electrum::complete_get_history(const code& ec, const histories& histories) NOEXCEPT { BC_ASSERT(stranded()); - monitor(false); if (stopped()) return; @@ -184,6 +183,7 @@ void protocol_electrum::handle_blockchain_scripthash_get_mempool(const code& ec, rpc_interface::blockchain_scripthash_get_mempool, const std::string& scripthash) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -198,8 +198,10 @@ void protocol_electrum::handle_blockchain_scripthash_get_mempool(const code& ec, get_mempool(hash); } +// common void protocol_electrum::get_mempool(const system::hash_digest& hash) NOEXCEPT { + BC_ASSERT(stranded()); if (hash == null_hash) { send_code(error::invalid_argument); @@ -219,7 +221,6 @@ void protocol_electrum::get_mempool(const system::hash_digest& hash) NOEXCEPT void protocol_electrum::do_get_mempool(const hash_digest& hash) NOEXCEPT { BC_ASSERT(!stranded()); - histories histories{}; const auto& query = archive(); auto ec = query.get_unconfirmed_history(stopping_, histories, hash, turbo_); @@ -230,7 +231,6 @@ void protocol_electrum::complete_get_mempool(const code& ec, const histories& histories) NOEXCEPT { BC_ASSERT(stranded()); - monitor(false); if (stopped()) return; @@ -252,6 +252,7 @@ void protocol_electrum::handle_blockchain_scripthash_list_unspent(const code& ec rpc_interface::blockchain_scripthash_list_unspent, const std::string& scripthash) NOEXCEPT { + BC_ASSERT(stranded()); if (stopped(ec)) return; @@ -266,8 +267,10 @@ void protocol_electrum::handle_blockchain_scripthash_list_unspent(const code& ec list_unspent(hash); } +// common void protocol_electrum::list_unspent(const system::hash_digest& hash) NOEXCEPT { + BC_ASSERT(stranded()); if (hash == null_hash) { send_code(error::invalid_argument); @@ -287,7 +290,6 @@ void protocol_electrum::list_unspent(const system::hash_digest& hash) NOEXCEPT void protocol_electrum::do_list_unspent(const hash_digest& hash) NOEXCEPT { BC_ASSERT(!stranded()); - unspents unspents{}; const auto& query = archive(); const auto ec = query.get_unspent(stopping_, unspents, hash, turbo_); @@ -298,7 +300,6 @@ void protocol_electrum::complete_list_unspent(const code& ec, const unspents& unspents) NOEXCEPT { BC_ASSERT(stranded()); - monitor(false); if (stopped()) return; @@ -313,153 +314,12 @@ void protocol_electrum::complete_list_unspent(const code& ec, send_result(transform(unspents), size, BIND(complete, _1)); } -// subscribe -// ---------------------------------------------------------------------------- - -void protocol_electrum::handle_blockchain_scripthash_subscribe(const code& ec, - rpc_interface::blockchain_scripthash_subscribe, - const std::string& scripthash) NOEXCEPT -{ - if (stopped(ec)) - return; - - if (!at_least(electrum::version::v1_1)) - { - send_code(error::wrong_version); - return; - } - - hash_digest hash{}; - if (!decode_hash(hash, scripthash)) - { - send_code(error::invalid_argument); - return; - } - - send_scripthash_subscribe(hash); -} - -void protocol_electrum::send_scripthash_subscribe( - const hash_digest& hash) NOEXCEPT -{ - if (!archive().address_enabled()) - { - send_code(error::not_implemented); - return; - } - - monitor(true); - PARALLEL(do_status, hash, BIND(send_status, _1, _2, _3)); -} - -// Invoked by send_scripthash_subscribe() or do_scripthash(). -void protocol_electrum::do_status(const hash_digest& hash, - const status_handler& sender) NOEXCEPT -{ - // TODO: subscribe. - /////////////////////////////////////////////////////////////////////////// - subscribed_scripthash_.store(true, std::memory_order_relaxed); - /////////////////////////////////////////////////////////////////////////// - - histories histories{}; - const auto& query = archive(); - const auto ec = query.get_history(stopping_, histories, hash, turbo_); - POST(complete_status, ec, hash, to_status(histories), sender); -} - -void protocol_electrum::complete_status(const code& ec, - const hash_digest& hash, const hash_digest& status, - const status_handler& sender) NOEXCEPT -{ - BC_ASSERT(stranded()); - - monitor(false); - if (stopped()) - return; - - sender(ec, hash, status); -} - -void protocol_electrum::send_status(const code& ec, const hash_digest& hash, - const hash_digest& status) NOEXCEPT -{ - BC_ASSERT(stranded()); - - if (ec) - { - send_code(ec); - return; - } - - send_result(array_t - { - encode_hash(hash), - status == null_hash ? value_t{} : value_t{ encode_hash(status) } - }, 128, BIND(complete, _1)); -} - -// unsubscribe -// ---------------------------------------------------------------------------- - -void protocol_electrum::handle_blockchain_scripthash_unsubscribe(const code& ec, - rpc_interface::blockchain_scripthash_unsubscribe, - const std::string& scripthash) NOEXCEPT -{ - if (stopped(ec)) - return; - - if (!at_least(electrum::version::v1_4_2)) - { - send_code(error::wrong_version); - return; - } - - hash_digest hash{}; - if (!decode_hash(hash, scripthash)) - { - send_code(error::invalid_argument); - return; - } - - send_scripthash_unsubscribe(hash); -} - -void protocol_electrum::send_scripthash_unsubscribe( - const hash_digest& ) NOEXCEPT -{ - // TODO: unsubscribe. - /////////////////////////////////////////////////////////////////////////// - const auto prior = subscribed_scripthash_.load(std::memory_order_relaxed); - /////////////////////////////////////////////////////////////////////////// - - send_result(prior, 16, BIND(complete, _1)); -} - // utilities // ---------------------------------------------------------------------------- -// TODO: these can be implemented as electrum json serializers (see bitcoind). // private/static // Height is zero (rooted) or max_size_t for unconfirmed history txs. -hash_digest protocol_electrum::to_status(const histories& histories) NOEXCEPT -{ - if (histories.empty()) - return {}; - - midstate out{}; - for (const auto& record: histories) - { - out.writer.write_string(encode_hash(record.tx.hash())); - out.writer.write_string(":"); - out.writer.write_string(std::to_string(to_signed(record.tx.height()))); - out.writer.write_string(":"); - } - - out.writer.flush(); - return out.status; -} - -// Height is zero (rooted) or max_size_t for unconfirmed history txs. +// TODO: this can be implemented as electrum json serializers (see bitcoind). array_t protocol_electrum::transform(const histories& ins) NOEXCEPT { // to_signed() conversion is simple but sacrifices top height bit (ok). @@ -491,6 +351,7 @@ array_t protocol_electrum::transform(const histories& ins) NOEXCEPT } // Height is zero for unconfirmed unspent output txs. +// TODO: this can be implemented as electrum json serializers (see bitcoind). array_t protocol_electrum::transform(const unspents& ins) NOEXCEPT { array_t out(ins.size()); @@ -509,8 +370,6 @@ array_t protocol_electrum::transform(const unspents& ins) NOEXCEPT return out; } -BC_POP_WARNING() -BC_POP_WARNING() BC_POP_WARNING() } // namespace server diff --git a/src/protocols/electrum/protocol_electrum_scripthash_subscribe.cpp b/src/protocols/electrum/protocol_electrum_scripthash_subscribe.cpp new file mode 100644 index 00000000..46c3ce4c --- /dev/null +++ b/src/protocols/electrum/protocol_electrum_scripthash_subscribe.cpp @@ -0,0 +1,296 @@ +/** + * Copyright (c) 2011-2026 libbitcoin developers (see AUTHORS) + * + * This file is part of libbitcoin. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include + +#include +#include + +namespace libbitcoin { +namespace server { + +#define CLASS protocol_electrum +#define NOTIFY(method, ...) notify(&CLASS::method, __VA_ARGS__) + +using namespace system; +using namespace network::rpc; +using namespace std::placeholders; +constexpr auto relaxed = std::memory_order_relaxed; + +BC_PUSH_WARNING(NO_THROW_IN_NOEXCEPT) + +// subscribe +// ---------------------------------------------------------------------------- + +void protocol_electrum::handle_blockchain_scripthash_subscribe(const code& ec, + rpc_interface::blockchain_scripthash_subscribe, + const std::string& scripthash) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (stopped(ec)) + return; + + if (!at_least(electrum::version::v1_1)) + { + send_code(error::wrong_version); + return; + } + + hash_digest hash{}; + if (!decode_hash(hash, scripthash)) + { + send_code(error::invalid_argument); + return; + } + + scripthash_subscribe(hash, notify_t::scripthash); +} + +// common +void protocol_electrum::scripthash_subscribe(const hash_digest& hash, + notify_t type) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (!archive().address_enabled()) + { + send_code(error::not_implemented); + return; + } + + if (address_subscriptions_.size() >= options_.maximum_subscriptions) + { + send_code(error::subscription_limit); + return; + } + + // Post to an independent strand on the network threadpool. This protects + // the subscriptions and ensures that the channel remains both cancellable + // and responsive. The channel listener remains paused during this call, + // which guards against call backlogging (DoS) and requires the monitor to + // allow socket cancellation and server stop to interrupt expensive query. + monitor(true); + NOTIFY(do_scripthash_subscribe, hash, type); +} + +void protocol_electrum::do_scripthash_subscribe(const hash_digest& hash, + notify_t type) NOEXCEPT +{ + // Cancellability is preserved because not on channel strand. + BC_ASSERT(notification_strand_.running_in_this_thread()); + + // Subscription response is idempotent. + subscribed_address_.store(true, relaxed); + auto [it, in] = address_subscriptions_.try_emplace(hash, type, midstate{}); + + hash_digest status{}; + const auto ec = get_scripthash_status(status, it->second, it->first); + POST(complete_scripthash_subscribe, ec, hash, status); +} + +void protocol_electrum::complete_scripthash_subscribe(const code& ec, + hash_digest& status, const hash_digest& hash) NOEXCEPT +{ + BC_ASSERT(stranded()); + monitor(false); + if (stopped()) + return; + + if (ec) + { + send_code(ec); + return; + } + + send_result(array_t + { + encode_hash(hash), + status == null_hash ? value_t{} : value_t{ encode_hash(status) } + }, 128, BIND(complete, _1)); +} + +// unsubscribe +// ---------------------------------------------------------------------------- + +void protocol_electrum::handle_blockchain_scripthash_unsubscribe(const code& ec, + rpc_interface::blockchain_scripthash_unsubscribe, + const std::string& scripthash) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (stopped(ec)) + return; + + if (!at_least(electrum::version::v1_4_2)) + { + send_code(error::wrong_version); + return; + } + + hash_digest hash{}; + if (!decode_hash(hash, scripthash)) + { + send_code(error::invalid_argument); + return; + } + + scripthash_unsubscribe(hash); +} + +// common +void protocol_electrum::scripthash_unsubscribe( + const hash_digest& hash) NOEXCEPT +{ + BC_ASSERT(stranded()); + if (!archive().address_enabled()) + { + send_code(error::not_implemented); + return; + } + + NOTIFY(do_scripthash_unsubscribe, hash); +} + +void protocol_electrum::do_scripthash_unsubscribe( + const hash_digest& hash) NOEXCEPT +{ + BC_ASSERT(notification_strand_.running_in_this_thread()); + + const auto found = to_bool(address_subscriptions_.erase(hash)); + if (is_zero(address_subscriptions_.size())) + subscribed_address_.store(false, relaxed); + + POST(complete_scripthash_unsubscribe, found); +} + +void protocol_electrum::complete_scripthash_unsubscribe(bool found) NOEXCEPT +{ + send_result(found, 16, BIND(complete, _1)); +} + +// notification +// ---------------------------------------------------------------------------- + +// Notifier for blockchain_scripthash_subscribe events. +void protocol_electrum::do_scripthash(node::header_t) NOEXCEPT +{ + // Cancellability is preserved because not on channel strand. + BC_ASSERT(notification_strand_.running_in_this_thread()); + + code ec{}; + hash_digest status{}; + for (auto& [key, sub]: address_subscriptions_) + { + if ((ec = get_scripthash_status(status, sub, key))) + { + if (ec == database::error::canceled) return; + LOGF("Electrum::do_scripthash, " << ec.message()); + } + else + { + // Asio-buffered output (small, not under caller control). + POST(scripthash_notify, status, key, sub.type); + } + } +} + +void protocol_electrum::scripthash_notify(const hash_digest& status, + const hash_digest& hash, notify_t type) NOEXCEPT +{ + BC_ASSERT(stranded()); + + send_notification(to_method_name(type), array_t + { + encode_hash(hash), + status == null_hash ? value_t{} : value_t{ encode_hash(status) } + }, 128, BIND(handle_send, _1)); +} + +// regression +// ---------------------------------------------------------------------------- + +// The chain has regressed, clear all midstate cache and cursors. +void protocol_electrum::do_regressed(node::header_t) NOEXCEPT +{ + BC_ASSERT(notification_strand_.running_in_this_thread()); + + for (auto& [key, sub]: address_subscriptions_) + { + // writer.flush resets hash accumulator, sub.type remains unchanged. + sub.state.writer.flush(); + sub.cursor = {}; + } +} + +// utilities +// ---------------------------------------------------------------------------- +// private/static + +// Convert enumeration to json-rpc notification method name. +std::string protocol_electrum::to_method_name(notify_t type) NOEXCEPT +{ + switch (type) + { + case notify_t::address: + return "blockchain.address.subscribe"; + case notify_t::scripthash: + return "blockchain.scripthash.subscribe"; + default: + case notify_t::scriptpubkey: + return "blockchain.scriptpubkey.subscribe"; + } +} + +// Height is zero (rooted) or max_size_t for unconfirmed history txs. +// TODO: this can be implemented as electrum json serializer (see bitcoind). +hash_digest protocol_electrum::to_status(const histories& histories) NOEXCEPT +{ + if (histories.empty()) + return {}; + + midstate out{}; + for (const auto& record: histories) + { + out.writer.write_string(encode_hash(record.tx.hash())); + out.writer.write_string(":"); + out.writer.write_string(std::to_string(to_signed(record.tx.height()))); + out.writer.write_string(":"); + } + + out.writer.flush(); + return out.status; +} + +// non-static +code protocol_electrum::get_scripthash_status(hash_digest& out, + subscription& /* sub */, const hash_digest& hash) NOEXCEPT +{ + // TODO: use cursors and midstate to optimize succesive queries. + ////auto& state = sub.state; + ////const auto& point = sub.point_cursor; + ////const auto& address = sub.address_cursor; + + histories histories{}; + const auto& query = archive(); + const auto ec = query.get_history(stopping_, histories, hash, turbo_); + if (!ec) out = to_status(histories); + return ec; +} + +BC_POP_WARNING() + +} // namespace server +} // namespace libbitcoin diff --git a/src/protocols/electrum/protocol_electrum_scriptpubkey.cpp b/src/protocols/electrum/protocol_electrum_scriptpubkey.cpp index 13cc511f..ebcfac4e 100644 --- a/src/protocols/electrum/protocol_electrum_scriptpubkey.cpp +++ b/src/protocols/electrum/protocol_electrum_scriptpubkey.cpp @@ -181,7 +181,7 @@ void protocol_electrum::handle_blockchain_scriptpubkey_subscribe( return; } - send_scripthash_subscribe(script.hash()); + scripthash_subscribe(script.hash(), notify_t::scriptpubkey); } void protocol_electrum::handle_blockchain_scriptpubkey_unsubscribe( @@ -211,7 +211,7 @@ void protocol_electrum::handle_blockchain_scriptpubkey_unsubscribe( return; } - send_scripthash_unsubscribe(script.hash()); + scripthash_unsubscribe(script.hash()); } BC_POP_WARNING() diff --git a/test/error.cpp b/test/error.cpp index cf801603..2c76bfbf 100644 --- a/test/error.cpp +++ b/test/error.cpp @@ -209,6 +209,15 @@ BOOST_AUTO_TEST_CASE(error_t__code__invalid_argument__true_expected_message) BOOST_REQUIRE_EQUAL(ec.message(), "invalid_argument"); } +BOOST_AUTO_TEST_CASE(error_t__code__subscription_limit__true_expected_message) +{ + constexpr auto value = error::subscription_limit; + const auto ec = code(value); + BOOST_REQUIRE(ec); + BOOST_REQUIRE(ec == value); + BOOST_REQUIRE_EQUAL(ec.message(), "subscription_limit"); +} + BOOST_AUTO_TEST_CASE(error_t__code__unsupported_argument__true_expected_message) { constexpr auto value = error::unsupported_argument;