Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 37 additions & 26 deletions include/bitcoin/network/p2p.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,31 +56,49 @@ class BCT_API p2p
typedef subscriber<const code&> stop_subscriber;
typedef resubscriber<const code&, channel::ptr> channel_subscriber;

/// Send message to all connections.
template <typename Message>
void broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
connections_->broadcast(message, handle_channel, handle_complete);
}

// ------------------------------------------------------------------------

/// Construct an instance.
p2p(const settings& settings);

/// Ensure all threads are coalesced.
~p2p();
virtual ~p2p();

/// This class is not copyable.
p2p(const p2p&) = delete;
void operator=(const p2p&) = delete;

// Start/Run/Stop/Close sequences.
// ------------------------------------------------------------------------

/// Send message to all connections.
template <typename Message>
void broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
connections_->broadcast(message, handle_channel, handle_complete);
}
/// Invoke startup and seeding sequence, call from constructing thread.
virtual void start(result_handler handler);

/// Synchronize the blockchain and then begin long running sessions,
/// call from start result handler. Call base method to skip sync.
virtual void run(result_handler handler);

/// Non-blocking call to coalesce all work, start may be reinvoked after.
/// Handler returns the result of file save operations.
virtual void stop(result_handler handler);

/// Blocking call to coalesce all work and then terminate all threads.
/// Call from thread that constructed this class, or don't call at all.
/// This calls stop, and start may be reinvoked after calling this.
virtual void close();

// Properties.
// ------------------------------------------------------------------------

/// Return a reference to the network configuration settings.
/// Network configuration settings.
virtual const settings& network_settings() const;

/// Return the current block height.
Expand All @@ -95,19 +113,17 @@ class BCT_API p2p
/// Return a reference to the network threadpool.
virtual threadpool& thread_pool();

// Subscriptions.
// ------------------------------------------------------------------------

/// Invoke startup and seeding sequence, call from constructing thread.
virtual void start(result_handler handler);

/// Subscribe to connection creation events.
virtual void subscribe_connection(connect_handler handler);

/// Subscribe to service stop event.
virtual void subscribe_stop(result_handler handler);

/// Begin long running sessions, call from start handler.
virtual void run(result_handler handler);
// Manual connections.
// ----------------------------------------------------------------------------

/// Maintain a connection to hostname:port.
virtual void connect(const std::string& hostname, uint16_t port);
Expand All @@ -117,15 +133,7 @@ class BCT_API p2p
virtual void connect(const std::string& hostname, uint16_t port,
channel_handler handler);

/// Non-blocking call to coalesce all work, start may be reinvoked after
/// handler fired. Handler returns the result of host file save operation.
virtual void stop(result_handler handler);

/// Blocking call to coalesce all work and then terminate all threads.
/// Call from thread that constructed this class, or don't call at all.
/// This calls stop, and start may be reinvoked after calling this.
virtual void close();

// Connections collection.
// ------------------------------------------------------------------------

/// Determine if there exists a connection to the address.
Expand All @@ -140,6 +148,7 @@ class BCT_API p2p
/// Get the number of connections.
virtual void connected_count(count_handler handler);

// Hosts collection.
// ------------------------------------------------------------------------

/// Get a randomly-selected adress.
Expand Down Expand Up @@ -167,16 +176,18 @@ class BCT_API p2p
}

private:
void handle_stopped(const code& ec);
void handle_manual_started(const code& ec, result_handler handler);
void handle_inbound_started(const code& ec, result_handler handler);
void handle_outbound_started(const code& ec, result_handler handler);
void handle_hosts_loaded(const code& ec, result_handler handler);
void handle_hosts_seeded(const code& ec, result_handler handler);
void handle_hosts_saved(const code& ec, result_handler handler);
void handle_new_connection(const code& ec, channel::ptr channel,
result_handler handler);

void handle_started(const code& ec, result_handler handler);
void handle_running(const code& ec, result_handler handler);
void handle_stopped(const code& ec, result_handler handler);
void handle_closing(const code& ec);

std::atomic<bool> stopped_;
std::atomic<size_t> height_;
bc::atomic<session_manual::ptr> manual_;
Expand Down
174 changes: 92 additions & 82 deletions src/p2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,6 @@ p2p::p2p(const settings& settings)
{
}

// Properties.
// ----------------------------------------------------------------------------

const settings& p2p::network_settings() const
{
return settings_;
}

// The blockchain height is set in our version message for handshake.
size_t p2p::height() const
{
return height_;
}

// The height is set externally and is safe as an atomic.
void p2p::set_height(size_t value)
{
height_ = value;
}

bool p2p::stopped() const
{
return stopped_;
}

threadpool& p2p::thread_pool()
{
return threadpool_;
}

// Start sequence.
// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -152,11 +122,11 @@ void p2p::handle_hosts_loaded(const code& ec, result_handler handler)
// This is invoked on a new thread.
// The instance is retained by the stop handler (until shutdown).
attach<session_seed>()->start(
std::bind(&p2p::handle_hosts_seeded,
std::bind(&p2p::handle_started,
this, _1, handler));
}

void p2p::handle_hosts_seeded(const code& ec, result_handler handler)
void p2p::handle_started(const code& ec, result_handler handler)
{
if (stopped())
{
Expand Down Expand Up @@ -206,11 +176,11 @@ void p2p::handle_inbound_started(const code& ec, result_handler handler)
// This is invoked on a new thread.
// This instance is retained by the stop handler (until shutdown).
attach<session_outbound>()->start(
std::bind(&p2p::handle_outbound_started,
std::bind(&p2p::handle_running,
this, _1, handler));
}

void p2p::handle_outbound_started(const code& ec, result_handler handler)
void p2p::handle_running(const code& ec, result_handler handler)
{
if (ec)
{
Expand All @@ -224,49 +194,6 @@ void p2p::handle_outbound_started(const code& ec, result_handler handler)
handler(error::success);
}

// Subscriptions.
// ----------------------------------------------------------------------------

void p2p::subscribe_connection(connect_handler handler)
{
channel_subscriber_->subscribe(handler, error::service_stopped, nullptr);
}

void p2p::subscribe_stop(result_handler handler)
{
stop_subscriber_->subscribe(handler, error::service_stopped);
}

// Manual connections.
// ----------------------------------------------------------------------------

void p2p::connect(const std::string& hostname, uint16_t port)
{
if (stopped())
return;

auto manual = manual_.load();
if (manual)
manual->connect(hostname, port);
}

void p2p::connect(const std::string& hostname, uint16_t port,
channel_handler handler)
{
if (stopped())
{
handler(error::service_stopped, nullptr);
return;
}

auto manual = manual_.load();
if (manual)
{
// Connect is invoked on a new thread.
manual->connect(hostname, port, handler);
}
}

// Stop sequence.
// ----------------------------------------------------------------------------
// All shutdown actions must be queued by the end of the stop call.
Expand Down Expand Up @@ -301,32 +228,115 @@ void p2p::stop(result_handler handler)
manual_.store(nullptr);
threadpool_.shutdown();

// This indirection is not required but presented for consistency.
handle_stopped(ec, handler);
}

void p2p::handle_stopped(const code& ec, result_handler handler)
{
// This is the end of the stop sequence.
handler(ec);
}

// Destruct sequence.
// Close sequence.
// ----------------------------------------------------------------------------

// This allows for shutdown based on destruct without need to call stop.
p2p::~p2p()
{
// This allows for shutdown based on destruct without need to call stop.
p2p::close();
}

// This must block until handle_closing completes.
// This must be called from the thread that constructed this class (see join).
void p2p::close()
{
// Stop must either complete sequentially or we must wait here for closed.
p2p::stop(
std::bind(&p2p::handle_stopped,
std::bind(&p2p::handle_closing,
this, _1));
}

void p2p::handle_stopped(const code&)
// Okay to ignore code as we are in the destructor, use stop if code is needed.
void p2p::handle_closing(const code&)
{
// This is the end of the destruct sequence.
// This is the end of the close sequence.
threadpool_.join();
}

// Properties.
// ----------------------------------------------------------------------------

const settings& p2p::network_settings() const
{
return settings_;
}

// The blockchain height is set in our version message for handshake.
size_t p2p::height() const
{
return height_;
}

// The height is set externally and is safe as an atomic.
void p2p::set_height(size_t value)
{
height_ = value;
}

bool p2p::stopped() const
{
return stopped_;
}

threadpool& p2p::thread_pool()
{
return threadpool_;
}

// Subscriptions.
// ----------------------------------------------------------------------------

void p2p::subscribe_connection(connect_handler handler)
{
channel_subscriber_->subscribe(handler, error::service_stopped, nullptr);
}

void p2p::subscribe_stop(result_handler handler)
{
stop_subscriber_->subscribe(handler, error::service_stopped);
}

// Manual connections.
// ----------------------------------------------------------------------------

void p2p::connect(const std::string& hostname, uint16_t port)
{
if (stopped())
return;

auto manual = manual_.load();
if (manual)
manual->connect(hostname, port);
}

void p2p::connect(const std::string& hostname, uint16_t port,
channel_handler handler)
{
if (stopped())
{
handler(error::service_stopped, nullptr);
return;
}

auto manual = manual_.load();
if (manual)
{
// Connect is invoked on a new thread.
manual->connect(hostname, port, handler);
}
}

// Connections collection.
// ----------------------------------------------------------------------------

Expand Down