diff --git a/include/bitcoin/network/p2p.hpp b/include/bitcoin/network/p2p.hpp index 222e53fb4..3f47b3243 100644 --- a/include/bitcoin/network/p2p.hpp +++ b/include/bitcoin/network/p2p.hpp @@ -56,31 +56,49 @@ class BCT_API p2p typedef subscriber stop_subscriber; typedef resubscriber channel_subscriber; + /// Send message to all connections. + template + void broadcast(const Message& message, channel_handler handle_channel, + result_handler handle_complete) + { + connections_->broadcast(message, handle_channel, handle_complete); + } + // ------------------------------------------------------------------------ /// Construct an instance. p2p(const settings& settings); /// Ensure all threads are coalesced. - ~p2p(); + virtual ~p2p(); /// This class is not copyable. p2p(const p2p&) = delete; void operator=(const p2p&) = delete; + // Start/Run/Stop/Close sequences. // ------------------------------------------------------------------------ - /// Send message to all connections. - template - void broadcast(const Message& message, channel_handler handle_channel, - result_handler handle_complete) - { - connections_->broadcast(message, handle_channel, handle_complete); - } + /// Invoke startup and seeding sequence, call from constructing thread. + virtual void start(result_handler handler); + + /// Synchronize the blockchain and then begin long running sessions, + /// call from start result handler. Call base method to skip sync. + virtual void run(result_handler handler); + /// Non-blocking call to coalesce all work, start may be reinvoked after. + /// Handler returns the result of file save operations. + virtual void stop(result_handler handler); + + /// Blocking call to coalesce all work and then terminate all threads. + /// Call from thread that constructed this class, or don't call at all. + /// This calls stop, and start may be reinvoked after calling this. + virtual void close(); + + // Properties. // ------------------------------------------------------------------------ - /// Return a reference to the network configuration settings. + /// Network configuration settings. virtual const settings& network_settings() const; /// Return the current block height. @@ -95,19 +113,17 @@ class BCT_API p2p /// Return a reference to the network threadpool. virtual threadpool& thread_pool(); + // Subscriptions. // ------------------------------------------------------------------------ - /// Invoke startup and seeding sequence, call from constructing thread. - virtual void start(result_handler handler); - /// Subscribe to connection creation events. virtual void subscribe_connection(connect_handler handler); /// Subscribe to service stop event. virtual void subscribe_stop(result_handler handler); - /// Begin long running sessions, call from start handler. - virtual void run(result_handler handler); + // Manual connections. + // ---------------------------------------------------------------------------- /// Maintain a connection to hostname:port. virtual void connect(const std::string& hostname, uint16_t port); @@ -117,15 +133,7 @@ class BCT_API p2p virtual void connect(const std::string& hostname, uint16_t port, channel_handler handler); - /// Non-blocking call to coalesce all work, start may be reinvoked after - /// handler fired. Handler returns the result of host file save operation. - virtual void stop(result_handler handler); - - /// Blocking call to coalesce all work and then terminate all threads. - /// Call from thread that constructed this class, or don't call at all. - /// This calls stop, and start may be reinvoked after calling this. - virtual void close(); - + // Connections collection. // ------------------------------------------------------------------------ /// Determine if there exists a connection to the address. @@ -140,6 +148,7 @@ class BCT_API p2p /// Get the number of connections. virtual void connected_count(count_handler handler); + // Hosts collection. // ------------------------------------------------------------------------ /// Get a randomly-selected adress. @@ -167,16 +176,18 @@ class BCT_API p2p } private: - void handle_stopped(const code& ec); void handle_manual_started(const code& ec, result_handler handler); void handle_inbound_started(const code& ec, result_handler handler); - void handle_outbound_started(const code& ec, result_handler handler); void handle_hosts_loaded(const code& ec, result_handler handler); - void handle_hosts_seeded(const code& ec, result_handler handler); void handle_hosts_saved(const code& ec, result_handler handler); void handle_new_connection(const code& ec, channel::ptr channel, result_handler handler); + void handle_started(const code& ec, result_handler handler); + void handle_running(const code& ec, result_handler handler); + void handle_stopped(const code& ec, result_handler handler); + void handle_closing(const code& ec); + std::atomic stopped_; std::atomic height_; bc::atomic manual_; diff --git a/src/p2p.cpp b/src/p2p.cpp index 4cf977ce6..14357118f 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -56,36 +56,6 @@ p2p::p2p(const settings& settings) { } -// Properties. -// ---------------------------------------------------------------------------- - -const settings& p2p::network_settings() const -{ - return settings_; -} - -// The blockchain height is set in our version message for handshake. -size_t p2p::height() const -{ - return height_; -} - -// The height is set externally and is safe as an atomic. -void p2p::set_height(size_t value) -{ - height_ = value; -} - -bool p2p::stopped() const -{ - return stopped_; -} - -threadpool& p2p::thread_pool() -{ - return threadpool_; -} - // Start sequence. // ---------------------------------------------------------------------------- @@ -152,11 +122,11 @@ void p2p::handle_hosts_loaded(const code& ec, result_handler handler) // This is invoked on a new thread. // The instance is retained by the stop handler (until shutdown). attach()->start( - std::bind(&p2p::handle_hosts_seeded, + std::bind(&p2p::handle_started, this, _1, handler)); } -void p2p::handle_hosts_seeded(const code& ec, result_handler handler) +void p2p::handle_started(const code& ec, result_handler handler) { if (stopped()) { @@ -206,11 +176,11 @@ void p2p::handle_inbound_started(const code& ec, result_handler handler) // This is invoked on a new thread. // This instance is retained by the stop handler (until shutdown). attach()->start( - std::bind(&p2p::handle_outbound_started, + std::bind(&p2p::handle_running, this, _1, handler)); } -void p2p::handle_outbound_started(const code& ec, result_handler handler) +void p2p::handle_running(const code& ec, result_handler handler) { if (ec) { @@ -224,49 +194,6 @@ void p2p::handle_outbound_started(const code& ec, result_handler handler) handler(error::success); } -// Subscriptions. -// ---------------------------------------------------------------------------- - -void p2p::subscribe_connection(connect_handler handler) -{ - channel_subscriber_->subscribe(handler, error::service_stopped, nullptr); -} - -void p2p::subscribe_stop(result_handler handler) -{ - stop_subscriber_->subscribe(handler, error::service_stopped); -} - -// Manual connections. -// ---------------------------------------------------------------------------- - -void p2p::connect(const std::string& hostname, uint16_t port) -{ - if (stopped()) - return; - - auto manual = manual_.load(); - if (manual) - manual->connect(hostname, port); -} - -void p2p::connect(const std::string& hostname, uint16_t port, - channel_handler handler) -{ - if (stopped()) - { - handler(error::service_stopped, nullptr); - return; - } - - auto manual = manual_.load(); - if (manual) - { - // Connect is invoked on a new thread. - manual->connect(hostname, port, handler); - } -} - // Stop sequence. // ---------------------------------------------------------------------------- // All shutdown actions must be queued by the end of the stop call. @@ -301,32 +228,115 @@ void p2p::stop(result_handler handler) manual_.store(nullptr); threadpool_.shutdown(); + // This indirection is not required but presented for consistency. + handle_stopped(ec, handler); +} + +void p2p::handle_stopped(const code& ec, result_handler handler) +{ // This is the end of the stop sequence. handler(ec); } -// Destruct sequence. +// Close sequence. // ---------------------------------------------------------------------------- +// This allows for shutdown based on destruct without need to call stop. p2p::~p2p() { - // This allows for shutdown based on destruct without need to call stop. p2p::close(); } +// This must block until handle_closing completes. +// This must be called from the thread that constructed this class (see join). void p2p::close() { + // Stop must either complete sequentially or we must wait here for closed. p2p::stop( - std::bind(&p2p::handle_stopped, + std::bind(&p2p::handle_closing, this, _1)); } -void p2p::handle_stopped(const code&) +// Okay to ignore code as we are in the destructor, use stop if code is needed. +void p2p::handle_closing(const code&) { - // This is the end of the destruct sequence. + // This is the end of the close sequence. threadpool_.join(); } +// Properties. +// ---------------------------------------------------------------------------- + +const settings& p2p::network_settings() const +{ + return settings_; +} + +// The blockchain height is set in our version message for handshake. +size_t p2p::height() const +{ + return height_; +} + +// The height is set externally and is safe as an atomic. +void p2p::set_height(size_t value) +{ + height_ = value; +} + +bool p2p::stopped() const +{ + return stopped_; +} + +threadpool& p2p::thread_pool() +{ + return threadpool_; +} + +// Subscriptions. +// ---------------------------------------------------------------------------- + +void p2p::subscribe_connection(connect_handler handler) +{ + channel_subscriber_->subscribe(handler, error::service_stopped, nullptr); +} + +void p2p::subscribe_stop(result_handler handler) +{ + stop_subscriber_->subscribe(handler, error::service_stopped); +} + +// Manual connections. +// ---------------------------------------------------------------------------- + +void p2p::connect(const std::string& hostname, uint16_t port) +{ + if (stopped()) + return; + + auto manual = manual_.load(); + if (manual) + manual->connect(hostname, port); +} + +void p2p::connect(const std::string& hostname, uint16_t port, + channel_handler handler) +{ + if (stopped()) + { + handler(error::service_stopped, nullptr); + return; + } + + auto manual = manual_.load(); + if (manual) + { + // Connect is invoked on a new thread. + manual->connect(hostname, port, handler); + } +} + // Connections collection. // ----------------------------------------------------------------------------