Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions include/bitcoin/network/p2p.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ class BCT_API p2p
public:
typedef std::shared_ptr<p2p> ptr;
typedef message::network_address address;
typedef std::function<void()> stop_handler;
typedef std::function<void(bool)> truth_handler;
typedef std::function<void(size_t)> count_handler;
typedef std::function<void(const code&)> result_handler;
typedef std::function<void(const code&, const address&)> address_handler;
typedef std::function<void(const code&, channel::ptr)> channel_handler;
typedef std::function<bool(const code&, channel::ptr)> connect_handler;
typedef subscriber<const code&> stop_subscriber;
typedef resubscriber<const code&, channel::ptr> channel_subscriber;

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -100,12 +102,15 @@ class BCT_API p2p
/// Invoke startup and seeding sequence, call from constructing thread.
virtual void start(result_handler handler);

/// Subscribe to connection creation events.
virtual void subscribe_connection(connect_handler handler);

/// Subscribe to service stop event.
virtual void subscribe_stop(result_handler handler);

/// Begin long running sessions, call from start handler.
virtual void run(result_handler handler);

/// Subscribe to connection creation and service stop events.
virtual void subscribe_connections(connect_handler handler);

/// Maintain a connection to hostname:port.
virtual void connect(const std::string& hostname, uint16_t port);

Expand Down Expand Up @@ -194,7 +199,8 @@ class BCT_API p2p
dispatcher dispatch_;
hosts hosts_;
std::shared_ptr<connections> connections_;
channel_subscriber::ptr subscriber_;
stop_subscriber::ptr stop_subscriber_;
channel_subscriber::ptr channel_subscriber_;
};

} // namespace network
Expand Down
4 changes: 2 additions & 2 deletions include/bitcoin/network/proxy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ class BCT_API proxy
using message_handler = std::function<bool(const code&,
std::shared_ptr<Message>)>;

typedef std::function<void(const code&)> result_handler;
typedef std::shared_ptr<proxy> ptr;
typedef std::function<void()> completion_handler;
typedef std::function<void(const code&)> result_handler;
typedef subscriber<const code&> stop_subscriber;
typedef std::shared_ptr<proxy> ptr;

static void close(asio::socket_ptr socket);

Expand Down
13 changes: 4 additions & 9 deletions include/bitcoin/network/sessions/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class BCT_API session
public:
typedef std::shared_ptr<session> ptr;
typedef config::authority authority;
typedef std::function<void()> stop_handler;
typedef std::function<void(bool)> truth_handler;
typedef std::function<void(size_t)> count_handler;
typedef std::function<void(const code&)> result_handler;
Expand All @@ -90,7 +89,7 @@ class BCT_API session
virtual void start(result_handler handler);

/// Subscribe to receive session stop notification.
virtual void subscribe_stop(stop_handler handler);
virtual void subscribe_stop(result_handler handler);

protected:

Expand Down Expand Up @@ -163,15 +162,11 @@ class BCT_API session
}

// Socket creators.
void do_stop_acceptor(acceptor::ptr connect);
void do_stop_connector(connector::ptr connect);
void do_stop_acceptor(const code& ec, acceptor::ptr connect);
void do_stop_connector(const code& ec, connector::ptr connect);

// Start sequence.
void do_stop_session();

// Stop sequence
bool handle_connect_event(const code& ec, channel::ptr channel,
stop_handler handler);
void do_stop_session(const code&);

// Connect sequence
void new_connect(connector::ptr connect, channel_handler handler);
Expand Down
61 changes: 33 additions & 28 deletions src/p2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ p2p::p2p(const settings& settings)
dispatch_(threadpool_, NAME),
hosts_(threadpool_, settings_),
connections_(std::make_shared<connections>(threadpool_)),
subscriber_(std::make_shared<channel_subscriber>(threadpool_, NAME "_sub"))
stop_subscriber_(std::make_shared<stop_subscriber>(threadpool_, NAME "_stop_sub")),
channel_subscriber_(std::make_shared<channel_subscriber>(threadpool_, NAME "_sub"))
{
}

Expand Down Expand Up @@ -100,22 +101,20 @@ void p2p::start(result_handler handler)
return;
}

stopped_ = false;
subscriber_->start();

threadpool_.join();
threadpool_.spawn(settings_.threads, thread_priority::low);

const auto manual_started_handler =
std::bind(&p2p::handle_manual_started,
this, _1, handler);
stopped_ = false;
stop_subscriber_->start();
channel_subscriber_->start();

// This instance is retained by stop handler and member references.
auto manual = attach<session_manual>();
manual->start(manual_started_handler);
const auto manual = attach<session_manual>();
manual_.store(manual);

////handle_manual_started(error::success, handler);
manual->start(
std::bind(&p2p::handle_manual_started,
this, _1, handler));
}

void p2p::handle_manual_started(const code& ec, result_handler handler)
Expand Down Expand Up @@ -193,8 +192,6 @@ void p2p::run(result_handler handler)
attach<session_inbound>()->start(
std::bind(&p2p::handle_inbound_started,
this, _1, handler));

////handle_inbound_started(error::success, handler);
}

void p2p::handle_inbound_started(const code& ec, result_handler handler)
Expand Down Expand Up @@ -227,12 +224,17 @@ void p2p::handle_outbound_started(const code& ec, result_handler handler)
handler(error::success);
}

// Channel subscription.
// Subscriptions.
// ----------------------------------------------------------------------------

void p2p::subscribe_connections(connect_handler handler)
void p2p::subscribe_connection(connect_handler handler)
{
subscriber_->subscribe(handler, error::service_stopped, nullptr);
channel_subscriber_->subscribe(handler, error::service_stopped, nullptr);
}

void p2p::subscribe_stop(result_handler handler)
{
stop_subscriber_->subscribe(handler, error::service_stopped);
}

// Manual connections.
Expand Down Expand Up @@ -269,24 +271,27 @@ void p2p::connect(const std::string& hostname, uint16_t port,

void p2p::stop(result_handler handler)
{
// Stop is thread safe and idempotent, allows subscription to be unguarded.

// Prevent subscription after stop.
subscriber_->stop();
subscriber_->relay(error::service_stopped, nullptr);

// Must be after subscriber stop (why?).
connections_->stop(error::service_stopped);
manual_.store(nullptr);

// Host save is expensive, so minimize repeats.
const auto ec = stopped_ ? error::success : hosts_.save();
stopped_ = true;

if (ec)
log::error(LOG_NETWORK)
<< "Error saving hosts file: " << ec.message();
<< "Error saving hosts file: " << ec.message();

// Stop is thread safe and idempotent, allows subscription to be unguarded.
stopped_ = true;

// Prevent subscription after stop.
stop_subscriber_->stop();
stop_subscriber_->relay(error::service_stopped);

// Prevent subscription after stop.
channel_subscriber_->stop();
channel_subscriber_->relay(error::service_stopped, nullptr);

// Must be after subscriber stop.
connections_->stop(error::service_stopped);
manual_.store(nullptr);
threadpool_.shutdown();

// This is the end of the stop sequence.
Expand Down Expand Up @@ -341,7 +346,7 @@ void p2p::handle_new_connection(const code& ec, channel::ptr channel,
handler(ec);

if (!ec && channel->notify())
subscriber_->relay(error::success, channel);
channel_subscriber_->relay(error::success, channel);
}

void p2p::remove(channel::ptr channel, result_handler handler)
Expand Down
94 changes: 42 additions & 52 deletions src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ proxy::~proxy()
// Properties.
// ----------------------------------------------------------------------------

// public:
const config::authority& proxy::authority() const
{
return authority_;
Expand All @@ -87,7 +86,6 @@ const config::authority& proxy::authority() const
// Start sequence.
// ----------------------------------------------------------------------------

// public:
void proxy::start(result_handler handler)
{
if (!stopped())
Expand All @@ -107,58 +105,9 @@ void proxy::start(result_handler handler)
read_heading();
}

// Stop sequence.
// ----------------------------------------------------------------------------

// public:
void proxy::stop(const code& ec)
{
BITCOIN_ASSERT_MSG(ec, "The stop code must be an error code.");

// Stop is thread safe and idempotent, allows subscription to be unguarded.
if (stopped())
return;

stopped_ = true;

// This prevents resubscription after stop is relayed.
message_subscriber_.stop();

// This fires all message subscriptions with the channel_stopped code.
message_subscriber_.broadcast(error::channel_stopped);

// This prevents resubscription after stop is relayed.
stop_subscriber_->stop();

// This fires all stop subscriptions with the channel stop reason code.
stop_subscriber_->relay(ec);

// Give channel opportunity to terminate timers.
handle_stopping();

// Critical Section
///////////////////////////////////////////////////////////////////////////
unique_lock lock(mutex_);

// The socket_ must be guarded against concurrent use.
proxy::close(socket_);
///////////////////////////////////////////////////////////////////////////
}

void proxy::stop(const boost_code& ec)
{
stop(error::boost_to_error_code(ec));
}

bool proxy::stopped() const
{
return stopped_;
}

// Stop subscription sequence.
// Stop subscription.
// ----------------------------------------------------------------------------

// public:
void proxy::subscribe_stop(result_handler handler)
{
stop_subscriber_->subscribe(handler, error::channel_stopped);
Expand Down Expand Up @@ -348,5 +297,46 @@ void proxy::handle_send(const boost_code& ec, result_handler handler)
handler(error);
}

// Stop sequence.
// ----------------------------------------------------------------------------

// public:
void proxy::stop(const code& ec)
{
BITCOIN_ASSERT_MSG(ec, "The stop code must be an error code.");

// Stop is thread safe and idempotent, allows stop to be unguarded.
stopped_ = true;

// Prevent subscription after stop.
message_subscriber_.stop();
message_subscriber_.broadcast(error::channel_stopped);

// Prevent subscription after stop.
stop_subscriber_->stop();
stop_subscriber_->relay(ec);

// Give channel opportunity to terminate timers.
handle_stopping();

// Critical Section
///////////////////////////////////////////////////////////////////////////
unique_lock lock(mutex_);

// The socket_ must be guarded against concurrent use.
proxy::close(socket_);
///////////////////////////////////////////////////////////////////////////
}

void proxy::stop(const boost_code& ec)
{
stop(error::boost_to_error_code(ec));
}

bool proxy::stopped() const
{
return stopped_;
}

} // namespace network
} // namespace libbitcoin
Loading