diff --git a/include/bitcoin/network/p2p.hpp b/include/bitcoin/network/p2p.hpp index 5b694aa7a..305712b91 100644 --- a/include/bitcoin/network/p2p.hpp +++ b/include/bitcoin/network/p2p.hpp @@ -46,12 +46,14 @@ class BCT_API p2p public: typedef std::shared_ptr ptr; typedef message::network_address address; + typedef std::function stop_handler; typedef std::function truth_handler; typedef std::function count_handler; typedef std::function result_handler; typedef std::function address_handler; typedef std::function channel_handler; typedef std::function connect_handler; + typedef subscriber stop_subscriber; typedef resubscriber channel_subscriber; // ------------------------------------------------------------------------ @@ -100,12 +102,15 @@ class BCT_API p2p /// Invoke startup and seeding sequence, call from constructing thread. virtual void start(result_handler handler); + /// Subscribe to connection creation events. + virtual void subscribe_connection(connect_handler handler); + + /// Subscribe to service stop event. + virtual void subscribe_stop(result_handler handler); + /// Begin long running sessions, call from start handler. virtual void run(result_handler handler); - /// Subscribe to connection creation and service stop events. - virtual void subscribe_connections(connect_handler handler); - /// Maintain a connection to hostname:port. virtual void connect(const std::string& hostname, uint16_t port); @@ -194,7 +199,8 @@ class BCT_API p2p dispatcher dispatch_; hosts hosts_; std::shared_ptr connections_; - channel_subscriber::ptr subscriber_; + stop_subscriber::ptr stop_subscriber_; + channel_subscriber::ptr channel_subscriber_; }; } // namespace network diff --git a/include/bitcoin/network/proxy.hpp b/include/bitcoin/network/proxy.hpp index 5145e8b38..f71e00397 100644 --- a/include/bitcoin/network/proxy.hpp +++ b/include/bitcoin/network/proxy.hpp @@ -44,10 +44,10 @@ class BCT_API proxy using message_handler = std::function)>; - typedef std::function result_handler; + typedef std::shared_ptr ptr; typedef std::function completion_handler; + typedef std::function result_handler; typedef subscriber stop_subscriber; - typedef std::shared_ptr ptr; static void close(asio::socket_ptr socket); diff --git a/include/bitcoin/network/sessions/session.hpp b/include/bitcoin/network/sessions/session.hpp index 4fc3a083b..e481b930d 100644 --- a/include/bitcoin/network/sessions/session.hpp +++ b/include/bitcoin/network/sessions/session.hpp @@ -78,7 +78,6 @@ class BCT_API session public: typedef std::shared_ptr ptr; typedef config::authority authority; - typedef std::function stop_handler; typedef std::function truth_handler; typedef std::function count_handler; typedef std::function result_handler; @@ -90,7 +89,7 @@ class BCT_API session virtual void start(result_handler handler); /// Subscribe to receive session stop notification. - virtual void subscribe_stop(stop_handler handler); + virtual void subscribe_stop(result_handler handler); protected: @@ -163,15 +162,11 @@ class BCT_API session } // Socket creators. - void do_stop_acceptor(acceptor::ptr connect); - void do_stop_connector(connector::ptr connect); + void do_stop_acceptor(const code& ec, acceptor::ptr connect); + void do_stop_connector(const code& ec, connector::ptr connect); // Start sequence. - void do_stop_session(); - - // Stop sequence - bool handle_connect_event(const code& ec, channel::ptr channel, - stop_handler handler); + void do_stop_session(const code&); // Connect sequence void new_connect(connector::ptr connect, channel_handler handler); diff --git a/src/p2p.cpp b/src/p2p.cpp index 059a4fd7e..78ab596ef 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -55,7 +55,8 @@ p2p::p2p(const settings& settings) dispatch_(threadpool_, NAME), hosts_(threadpool_, settings_), connections_(std::make_shared(threadpool_)), - subscriber_(std::make_shared(threadpool_, NAME "_sub")) + stop_subscriber_(std::make_shared(threadpool_, NAME "_stop_sub")), + channel_subscriber_(std::make_shared(threadpool_, NAME "_sub")) { } @@ -100,22 +101,20 @@ void p2p::start(result_handler handler) return; } - stopped_ = false; - subscriber_->start(); - threadpool_.join(); threadpool_.spawn(settings_.threads, thread_priority::low); - const auto manual_started_handler = - std::bind(&p2p::handle_manual_started, - this, _1, handler); + stopped_ = false; + stop_subscriber_->start(); + channel_subscriber_->start(); // This instance is retained by stop handler and member references. - auto manual = attach(); - manual->start(manual_started_handler); + const auto manual = attach(); manual_.store(manual); - ////handle_manual_started(error::success, handler); + manual->start( + std::bind(&p2p::handle_manual_started, + this, _1, handler)); } void p2p::handle_manual_started(const code& ec, result_handler handler) @@ -193,8 +192,6 @@ void p2p::run(result_handler handler) attach()->start( std::bind(&p2p::handle_inbound_started, this, _1, handler)); - - ////handle_inbound_started(error::success, handler); } void p2p::handle_inbound_started(const code& ec, result_handler handler) @@ -227,12 +224,17 @@ void p2p::handle_outbound_started(const code& ec, result_handler handler) handler(error::success); } -// Channel subscription. +// Subscriptions. // ---------------------------------------------------------------------------- -void p2p::subscribe_connections(connect_handler handler) +void p2p::subscribe_connection(connect_handler handler) { - subscriber_->subscribe(handler, error::service_stopped, nullptr); + channel_subscriber_->subscribe(handler, error::service_stopped, nullptr); +} + +void p2p::subscribe_stop(result_handler handler) +{ + stop_subscriber_->subscribe(handler, error::service_stopped); } // Manual connections. @@ -269,24 +271,27 @@ void p2p::connect(const std::string& hostname, uint16_t port, void p2p::stop(result_handler handler) { - // Stop is thread safe and idempotent, allows subscription to be unguarded. - - // Prevent subscription after stop. - subscriber_->stop(); - subscriber_->relay(error::service_stopped, nullptr); - - // Must be after subscriber stop (why?). - connections_->stop(error::service_stopped); - manual_.store(nullptr); - // Host save is expensive, so minimize repeats. const auto ec = stopped_ ? error::success : hosts_.save(); - stopped_ = true; if (ec) log::error(LOG_NETWORK) - << "Error saving hosts file: " << ec.message(); + << "Error saving hosts file: " << ec.message(); + + // Stop is thread safe and idempotent, allows subscription to be unguarded. + stopped_ = true; + // Prevent subscription after stop. + stop_subscriber_->stop(); + stop_subscriber_->relay(error::service_stopped); + + // Prevent subscription after stop. + channel_subscriber_->stop(); + channel_subscriber_->relay(error::service_stopped, nullptr); + + // Must be after subscriber stop. + connections_->stop(error::service_stopped); + manual_.store(nullptr); threadpool_.shutdown(); // This is the end of the stop sequence. @@ -341,7 +346,7 @@ void p2p::handle_new_connection(const code& ec, channel::ptr channel, handler(ec); if (!ec && channel->notify()) - subscriber_->relay(error::success, channel); + channel_subscriber_->relay(error::success, channel); } void p2p::remove(channel::ptr channel, result_handler handler) diff --git a/src/proxy.cpp b/src/proxy.cpp index c9b63ae6b..88bee5ed1 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -78,7 +78,6 @@ proxy::~proxy() // Properties. // ---------------------------------------------------------------------------- -// public: const config::authority& proxy::authority() const { return authority_; @@ -87,7 +86,6 @@ const config::authority& proxy::authority() const // Start sequence. // ---------------------------------------------------------------------------- -// public: void proxy::start(result_handler handler) { if (!stopped()) @@ -107,58 +105,9 @@ void proxy::start(result_handler handler) read_heading(); } -// Stop sequence. -// ---------------------------------------------------------------------------- - -// public: -void proxy::stop(const code& ec) -{ - BITCOIN_ASSERT_MSG(ec, "The stop code must be an error code."); - - // Stop is thread safe and idempotent, allows subscription to be unguarded. - if (stopped()) - return; - - stopped_ = true; - - // This prevents resubscription after stop is relayed. - message_subscriber_.stop(); - - // This fires all message subscriptions with the channel_stopped code. - message_subscriber_.broadcast(error::channel_stopped); - - // This prevents resubscription after stop is relayed. - stop_subscriber_->stop(); - - // This fires all stop subscriptions with the channel stop reason code. - stop_subscriber_->relay(ec); - - // Give channel opportunity to terminate timers. - handle_stopping(); - - // Critical Section - /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); - - // The socket_ must be guarded against concurrent use. - proxy::close(socket_); - /////////////////////////////////////////////////////////////////////////// -} - -void proxy::stop(const boost_code& ec) -{ - stop(error::boost_to_error_code(ec)); -} - -bool proxy::stopped() const -{ - return stopped_; -} - -// Stop subscription sequence. +// Stop subscription. // ---------------------------------------------------------------------------- -// public: void proxy::subscribe_stop(result_handler handler) { stop_subscriber_->subscribe(handler, error::channel_stopped); @@ -348,5 +297,46 @@ void proxy::handle_send(const boost_code& ec, result_handler handler) handler(error); } +// Stop sequence. +// ---------------------------------------------------------------------------- + +// public: +void proxy::stop(const code& ec) +{ + BITCOIN_ASSERT_MSG(ec, "The stop code must be an error code."); + + // Stop is thread safe and idempotent, allows stop to be unguarded. + stopped_ = true; + + // Prevent subscription after stop. + message_subscriber_.stop(); + message_subscriber_.broadcast(error::channel_stopped); + + // Prevent subscription after stop. + stop_subscriber_->stop(); + stop_subscriber_->relay(ec); + + // Give channel opportunity to terminate timers. + handle_stopping(); + + // Critical Section + /////////////////////////////////////////////////////////////////////////// + unique_lock lock(mutex_); + + // The socket_ must be guarded against concurrent use. + proxy::close(socket_); + /////////////////////////////////////////////////////////////////////////// +} + +void proxy::stop(const boost_code& ec) +{ + stop(error::boost_to_error_code(ec)); +} + +bool proxy::stopped() const +{ + return stopped_; +} + } // namespace network } // namespace libbitcoin diff --git a/src/sessions/session.cpp b/src/sessions/session.cpp index 8cf2ec832..c38da0c4e 100644 --- a/src/sessions/session.cpp +++ b/src/sessions/session.cpp @@ -97,11 +97,11 @@ bool session::blacklisted(const authority& authority) const acceptor::ptr session::create_acceptor() { const auto accept = std::make_shared(pool_, settings_); - subscribe_stop(BIND_1(do_stop_acceptor, accept)); + subscribe_stop(BIND_2(do_stop_acceptor, _1, accept)); return accept; } -void session::do_stop_acceptor(acceptor::ptr accept) +void session::do_stop_acceptor(const code&, acceptor::ptr accept) { accept->stop(); } @@ -110,11 +110,11 @@ void session::do_stop_acceptor(acceptor::ptr accept) connector::ptr session::create_connector() { const auto connect = std::make_shared(pool_, settings_); - subscribe_stop(BIND_1(do_stop_connector, connect)); + subscribe_stop(BIND_2(do_stop_connector, _1, connect)); return connect; } -void session::do_stop_connector(connector::ptr connect) +void session::do_stop_connector(const code&, connector::ptr connect) { connect->stop(); } @@ -123,7 +123,6 @@ void session::do_stop_connector(connector::ptr connect) // ---------------------------------------------------------------------------- // Must not change context before subscribing. -// public: void session::start(result_handler handler) { if (!stopped()) @@ -133,14 +132,16 @@ void session::start(result_handler handler) } stopped_ = false; - subscribe_stop(BIND_0(do_stop_session)); + subscribe_stop(BIND_1(do_stop_session, _1)); // This is the end of the start sequence. handler(error::success); } -void session::do_stop_session() +void session::do_stop_session(const code&) { + // This signals the session to stop creating connections, but does not stop + // the session. Channels are stopped resulting in session losing scope. stopped_ = true; } @@ -151,31 +152,10 @@ bool session::stopped() const // Subscribe Stop sequence. // ---------------------------------------------------------------------------- -// Must not change context before resubscribing. -// Must not change context in event handler (use bind). -// public: -void session::subscribe_stop(stop_handler handler) +void session::subscribe_stop(result_handler handler) { - // This uses the channel subscriber to detect stop. - network_.subscribe_connections( - BIND_3(handle_connect_event, _1, _2, handler)); -} - -bool session::handle_connect_event(const code& ec, channel::ptr, - stop_handler handler) -{ - if (ec == error::service_stopped) - { - // This is the end of the subscribe stop sequence. - handler(); - return false; - } - - // Resubscribe to connection events. - // This is a problem if the subscriber stops and doesn't reregister. - // We get new channels after subscriber stop (that aren't service stop). - return true; + network_.subscribe_stop(handler); } // Registration sequence. diff --git a/test/p2p.cpp b/test/p2p.cpp index 28fdfd2b4..50be71869 100644 --- a/test/p2p.cpp +++ b/test/p2p.cpp @@ -157,7 +157,7 @@ static int subscribe_result(p2p& network) promise.set_value(ec); return false; }; - network.subscribe_connections(handler); + network.subscribe_connection(handler); return promise.get_future().get().value(); } @@ -169,7 +169,7 @@ static int subscribe_connect_result(p2p& network, const config::endpoint& host) promise.set_value(ec); return false; }; - network.subscribe_connections(handler); + network.subscribe_connection(handler); network.connect(host.host(), host.port()); return promise.get_future().get().value(); } @@ -402,7 +402,7 @@ BOOST_AUTO_TEST_CASE(p2p__subscribe__started_stop__service_stopped) }; // Expect queued handler until destruct because service is started. - network.subscribe_connections(handler); + network.subscribe_connection(handler); } BOOST_AUTO_TEST_CASE(p2p__subscribe__started_connect__success) @@ -443,7 +443,7 @@ BOOST_AUTO_TEST_CASE(p2p__subscribe__seed_outbound__success) subscribe.set_value(ec); return false; }; - network.subscribe_connections(subscribe_handler); + network.subscribe_connection(subscribe_handler); std::promise run; const auto run_handler = [&run, &network](code ec)