From d9377649c46f04de9c4bb16cc99848e3d5fa3558 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 18 Mar 2016 19:29:36 -0700 Subject: [PATCH 1/9] Remove dead code. --- include/bitcoin/network/acceptor.hpp | 2 -- src/acceptor.cpp | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/include/bitcoin/network/acceptor.hpp b/include/bitcoin/network/acceptor.hpp index 6804c41c8..774cc8e89 100644 --- a/include/bitcoin/network/acceptor.hpp +++ b/include/bitcoin/network/acceptor.hpp @@ -61,12 +61,10 @@ class BCT_API acceptor virtual void stop(); private: - bool stopped(); code safe_listen(uint16_t port); void handle_accept(const boost_code& ec, asio::socket_ptr socket, accept_handler handler); - std::atomic stopped_; threadpool& pool_; const settings& settings_; dispatcher dispatch_; diff --git a/src/acceptor.cpp b/src/acceptor.cpp index ad973d379..ee5fb7de1 100644 --- a/src/acceptor.cpp +++ b/src/acceptor.cpp @@ -37,8 +37,7 @@ using std::placeholders::_1; static const auto reuse_address = asio::acceptor::reuse_address(true); acceptor::acceptor(threadpool& pool, const settings& settings) - : stopped_(true), - pool_(pool), + : pool_(pool), settings_(settings), dispatch_(pool, NAME), acceptor_(std::make_shared(pool_.service())), From f8b95227b2069f35585247ed78e834b739c1542d Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 18 Mar 2016 19:42:14 -0700 Subject: [PATCH 2/9] Comments. --- src/proxy.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/proxy.cpp b/src/proxy.cpp index 88bee5ed1..3953d8b82 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -300,12 +300,14 @@ void proxy::handle_send(const boost_code& ec, result_handler handler) // Stop sequence. // ---------------------------------------------------------------------------- -// public: +// This is not short-circuited by a stop test because we need to ensure it +// completes at least once before invoking the handler. This requires a unique +// lock be taken around the entire section, which poses a deadlock risk. +// Instead this is thread safe and idempotent, allowing it to be unguarded. void proxy::stop(const code& ec) { BITCOIN_ASSERT_MSG(ec, "The stop code must be an error code."); - // Stop is thread safe and idempotent, allows stop to be unguarded. stopped_ = true; // Prevent subscription after stop. From 70f6294528b3f10bf39953a059e198397b9f8a2b Mon Sep 17 00:00:00 2001 From: evoskuil Date: Fri, 18 Mar 2016 19:56:39 -0700 Subject: [PATCH 3/9] Improve concurrency safetey, remove unnecessary dispatch. --- include/bitcoin/network/connections.hpp | 59 +++++-------- include/bitcoin/network/hosts.hpp | 5 +- include/bitcoin/network/p2p.hpp | 21 +---- include/bitcoin/network/pending_channels.hpp | 2 +- src/connections.cpp | 88 +++++++++++++++----- src/connector.cpp | 20 ++++- src/hosts.cpp | 5 +- src/p2p.cpp | 32 +++---- src/pending_channels.cpp | 28 +++++-- src/pending_sockets.cpp | 14 +++- test/p2p.cpp | 19 +++-- 11 files changed, 181 insertions(+), 112 deletions(-) diff --git a/include/bitcoin/network/connections.hpp b/include/bitcoin/network/connections.hpp index 2dfa631de..746ecaec7 100644 --- a/include/bitcoin/network/connections.hpp +++ b/include/bitcoin/network/connections.hpp @@ -41,14 +41,13 @@ class BCT_API connections { public: typedef std::shared_ptr ptr; - typedef config::authority authority; typedef std::function truth_handler; typedef std::function count_handler; typedef std::function result_handler; typedef std::function channel_handler; /// Construct an instance. - connections(threadpool& pool); + connections(); /// Validate connections stopped. ~connections(); @@ -57,63 +56,47 @@ class BCT_API connections connections(const connections&) = delete; void operator=(const connections&) = delete; - /// Completion handler returns operation_failed if any channel send failed. + /// Completion handler always returns success. template void broadcast(const Message& message, channel_handler handle_channel, result_handler handle_complete) { - const auto method = &connections::do_broadcast; - dispatch_.concurrent(method, shared_from_this(), message, - handle_channel, handle_complete); - } - - virtual void stop(const code& ec); - virtual void count(count_handler handler) const; - virtual void store(channel::ptr channel, result_handler handler); - virtual void remove(channel::ptr channel, result_handler handler); - virtual void exists(const authority& authority, - truth_handler handler) const; - -private: - typedef std::vector list; + // We cannot use a synchronizer here because handler closure in loop. + auto counter = std::make_shared>(channels_.size()); - template - void do_broadcast(const Message& message, channel_handler handle_channel, - result_handler handle_complete) - { - // The list is copied, which protects the iteration without a lock. - auto channels = safe_copy(); - const auto size = channels.size(); - const auto counter = std::make_shared>(size); - const auto result = std::make_shared>( - error::success); - - for (const auto channel: channels) + for (const auto channel: safe_copy()) { - const auto handle_send = [=](const code ec) + const auto handle_send = [=](code ec) { handle_channel(ec, channel); - if (ec) - result->store(error::operation_failed); - if (counter->fetch_sub(1) == 1) - handle_complete(result->load()); + handle_complete(error::success); }; channel->send(message, handle_send); } } + virtual void stop(const code& ec); + virtual void count(count_handler handler) const; + virtual void store(channel::ptr channel, result_handler handler); + virtual void remove(channel::ptr channel, result_handler handler); + virtual void exists(const config::authority& authority, + truth_handler handler) const; + +private: + typedef std::vector list; + list safe_copy() const; size_t safe_count() const; - bool safe_store(channel::ptr channel); + code safe_store(channel::ptr channel); bool safe_remove(channel::ptr channel); - bool safe_exists(const authority& address) const; + bool safe_exists(const config::authority& address) const; list channels_; - dispatcher dispatch_; - mutable shared_mutex mutex_; + std::atomic stopped_; + mutable upgrade_mutex mutex_; }; } // namespace network diff --git a/include/bitcoin/network/hosts.hpp b/include/bitcoin/network/hosts.hpp index 0cead019e..58ad38b22 100644 --- a/include/bitcoin/network/hosts.hpp +++ b/include/bitcoin/network/hosts.hpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -40,8 +41,10 @@ namespace network { /// The file is a line-oriented set of config::authority serializations. /// Duplicate addresses and those with zero-valued ports are disacarded. class BCT_API hosts + : public enable_shared_from_base { public: + typedef std::shared_ptr ptr; typedef message::network_address address; typedef std::function result_handler; @@ -54,7 +57,7 @@ class BCT_API hosts virtual code load(); virtual code save(); - virtual size_t count(); + virtual size_t count() const; virtual code fetch(address& out); virtual code remove(const address& host); virtual code store(const address& host); diff --git a/include/bitcoin/network/p2p.hpp b/include/bitcoin/network/p2p.hpp index 305712b91..622aa3325 100644 --- a/include/bitcoin/network/p2p.hpp +++ b/include/bitcoin/network/p2p.hpp @@ -70,14 +70,12 @@ class BCT_API p2p // ------------------------------------------------------------------------ - /// Send message to all connections, handler invoked for each channel. + /// Send message to all connections. template void broadcast(const Message& message, channel_handler handle_channel, result_handler handle_complete) { - dispatch_.ordered( - std::bind(&network::p2p::do_broadcast, - this, message, handle_channel, handle_complete)); + connections_->broadcast(message, handle_channel, handle_complete); } // ------------------------------------------------------------------------ @@ -168,17 +166,7 @@ class BCT_API p2p return std::make_shared(*this, std::forward(args)...); } - /// No-operation handler, used in default stop handling. - static result_handler unhandled; - private: - template - void do_broadcast(const Message& message, channel_handler handle_channel, - result_handler handle_complete) - { - connections_->broadcast(message, handle_channel, handle_complete); - } - void handle_stopped(const code& ec); void handle_manual_started(const code& ec, result_handler handler); void handle_inbound_started(const code& ec, result_handler handler); @@ -196,9 +184,8 @@ class BCT_API p2p // These are thread safe. threadpool threadpool_; - dispatcher dispatch_; - hosts hosts_; - std::shared_ptr connections_; + hosts::ptr hosts_; + connections::ptr connections_; stop_subscriber::ptr stop_subscriber_; channel_subscriber::ptr channel_subscriber_; }; diff --git a/include/bitcoin/network/pending_channels.hpp b/include/bitcoin/network/pending_channels.hpp index 91e8b16db..3f9a17e79 100644 --- a/include/bitcoin/network/pending_channels.hpp +++ b/include/bitcoin/network/pending_channels.hpp @@ -57,7 +57,7 @@ class BCT_API pending_channels bool safe_exists(uint64_t version_nonce) const; list channels_; - mutable shared_mutex mutex_; + mutable upgrade_mutex mutex_; }; } // namespace network diff --git a/src/connections.cpp b/src/connections.cpp index a3fefd368..bac51ebdf 100644 --- a/src/connections.cpp +++ b/src/connections.cpp @@ -26,10 +26,12 @@ namespace libbitcoin { namespace network { +using namespace bc::config; + #define NAME "connections" -connections::connections(threadpool& pool) - : dispatch_(pool, NAME) +connections::connections() + : stopped_(false) { } @@ -38,23 +40,43 @@ connections::~connections() BITCOIN_ASSERT_MSG(channels_.empty(), "Connections was not cleared."); } -connections::list connections::safe_copy() const +// This is idempotent. +void connections::stop(const code& ec) { + connections::list channels; + // Critical Section /////////////////////////////////////////////////////////////////////////// - shared_lock lock(mutex_); + mutex_.lock_upgrade(); - return channels_; + if (!stopped_) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); + stopped_ = true; + mutex_.unlock_and_lock_upgrade(); + //--------------------------------------------------------------------- + + // Once stopped this list cannot change, but must copy to escape lock. + channels = channels_; + } + + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// + + // Channel stop handlers should remove channels from list. + for (const auto channel: channels) + channel->stop(ec); } -void connections::stop(const code& ec) +connections::list connections::safe_copy() const { - // The list is copied, which protects the iteration without a lock. - auto channels = safe_copy(); + // Critical Section + /////////////////////////////////////////////////////////////////////////// + shared_lock lock(mutex_); - for (auto channel: channels) - channel->stop(ec); + return channels_; + /////////////////////////////////////////////////////////////////////////// } bool connections::safe_exists(const authority& address) const @@ -82,16 +104,25 @@ bool connections::safe_remove(channel::ptr channel) { // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); + mutex_.lock_upgrade(); const auto it = std::find(channels_.begin(), channels_.end(), channel); const auto found = it != channels_.end(); if (found) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); channels_.erase(it); + mutex_.unlock(); + //--------------------------------------------------------------------- + return true; + } - return found; + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// + + return false; } void connections::remove(channel::ptr channel, result_handler handler) @@ -99,7 +130,7 @@ void connections::remove(channel::ptr channel, result_handler handler) handler(safe_remove(channel) ? error::success : error::not_found); } -bool connections::safe_store(channel::ptr channel) +code connections::safe_store(channel::ptr channel) { const auto address = channel->authority(); const auto match = [&address](channel::ptr entry) @@ -109,21 +140,36 @@ bool connections::safe_store(channel::ptr channel) // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); - - const auto it = std::find_if(channels_.begin(), channels_.end(), match); - const auto found = it != channels_.end(); + mutex_.lock_upgrade(); - if (!found) - channels_.push_back(channel); + const auto stopped = stopped_.load(); - return found; + if (!stopped) + { + auto it = std::find_if(channels_.begin(), channels_.end(), match); + const auto found = it != channels_.end(); + + if (!found) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); + channels_.push_back(channel); + mutex_.unlock(); + //--------------------------------------------------------------------- + return error::success; + } + } + + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// + + // Stopped and found are the only ways to get here. + return stopped ? error::service_stopped : error::address_in_use; } void connections::store(channel::ptr channel, result_handler handler) { - handler(safe_store(channel) ? error::address_in_use : error::success); + handler(safe_store(channel)); } size_t connections::safe_count() const diff --git a/src/connector.cpp b/src/connector.cpp index ddc77dde2..1ec570099 100644 --- a/src/connector.cpp +++ b/src/connector.cpp @@ -62,14 +62,23 @@ void connector::safe_stop() { // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); + mutex_.lock_upgrade(); if (!stopped_) { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); + // This will asynchronously invoke the handler of each pending resolve. resolver_->cancel(); stopped_ = true; + + mutex_.unlock(); + //--------------------------------------------------------------------- + return; } + + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// } @@ -111,6 +120,9 @@ void connector::connect(const std::string& hostname, uint16_t port, // We preserve the asynchronous contract of the async_resolve. // Dispatch ensures job does not execute in the current thread. dispatch_.concurrent(handler, error::service_stopped, nullptr); + + mutex_.unlock_upgrade(); + //--------------------------------------------------------------------- return; } @@ -138,12 +150,18 @@ void connector::handle_resolve(const boost_code& ec, asio::iterator iterator, if (stopped_) { dispatch_.concurrent(handler, error::service_stopped, nullptr); + + mutex_.unlock_upgrade(); + //--------------------------------------------------------------------- return; } if (ec) { dispatch_.concurrent(handler, error::resolve_failed, nullptr); + + mutex_.unlock_upgrade(); + //--------------------------------------------------------------------- return; } diff --git a/src/hosts.cpp b/src/hosts.cpp index 1b34c4adc..95820a295 100644 --- a/src/hosts.cpp +++ b/src/hosts.cpp @@ -50,7 +50,7 @@ hosts::iterator hosts::find(const address& host) return std::find_if(buffer_.begin(), buffer_.end(), found); } -size_t hosts::count() +size_t hosts::count() const { /////////////////////////////////////////////////////////////////////////// // Critical Section @@ -187,9 +187,10 @@ void hosts::do_store(const address& host, result_handler handler) void hosts::store(const address::list& hosts, result_handler handler) { + // The handler is invoked once all calls to do_store are completed. // We disperse here to allow other addresses messages to interleave hosts. dispatch_.parallel(hosts, "hosts", handler, - &hosts::do_store, this); + &hosts::do_store, shared_from_this()); } } // namespace network diff --git a/src/p2p.cpp b/src/p2p.cpp index 78ab596ef..8ec844431 100644 --- a/src/p2p.cpp +++ b/src/p2p.cpp @@ -45,16 +45,12 @@ namespace network { using std::placeholders::_1; -// No-operation handler, used in default stop handling. -p2p::result_handler p2p::unhandled = [](code){}; - p2p::p2p(const settings& settings) : stopped_(true), height_(0), settings_(settings), - dispatch_(threadpool_, NAME), - hosts_(threadpool_, settings_), - connections_(std::make_shared(threadpool_)), + hosts_(std::make_shared(threadpool_, settings_)), + connections_(std::make_shared()), stop_subscriber_(std::make_shared(threadpool_, NAME "_stop_sub")), channel_subscriber_(std::make_shared(threadpool_, NAME "_sub")) { @@ -133,7 +129,7 @@ void p2p::handle_manual_started(const code& ec, result_handler handler) return; } - handle_hosts_loaded(hosts_.load(), handler); + handle_hosts_loaded(hosts_->load(), handler); } void p2p::handle_hosts_loaded(const code& ec, result_handler handler) @@ -269,16 +265,19 @@ void p2p::connect(const std::string& hostname, uint16_t port, // All shutdown actions must be queued by the end of the stop call. // IOW queued shutdown operations must not enqueue additional work. +// This is not short-circuited by a stop test because we need to ensure it +// completes at least once before invoking the handler. This requires a unique +// lock be taken around the entire section, which poses a deadlock risk. +// Instead this is thread safe and idempotent, allowing it to be unguarded. void p2p::stop(result_handler handler) { // Host save is expensive, so minimize repeats. - const auto ec = stopped_ ? error::success : hosts_.save(); + const auto ec = stopped_ ? error::success : hosts_->save(); if (ec) log::error(LOG_NETWORK) - << "Error saving hosts file: " << ec.message(); + << "Error saving hosts file: " << ec.message(); - // Stop is thread safe and idempotent, allows subscription to be unguarded. stopped_ = true; // Prevent subscription after stop. @@ -289,8 +288,9 @@ void p2p::stop(result_handler handler) channel_subscriber_->stop(); channel_subscriber_->relay(error::service_stopped, nullptr); - // Must be after subscriber stop. + // Stop accepting channels and stop those that exist (self-clearing). connections_->stop(error::service_stopped); + manual_.store(nullptr); threadpool_.shutdown(); @@ -365,27 +365,27 @@ void p2p::connected_count(count_handler handler) void p2p::fetch_address(address_handler handler) { address out; - handler(hosts_.fetch(out), out); + handler(hosts_->fetch(out), out); } void p2p::store(const address& address, result_handler handler) { - handler(hosts_.store(address)); + handler(hosts_->store(address)); } void p2p::store(const address::list& addresses, result_handler handler) { - hosts_.store(addresses, handler); + hosts_->store(addresses, handler); } void p2p::remove(const address& address, result_handler handler) { - handler(hosts_.remove(address)); + handler(hosts_->remove(address)); } void p2p::address_count(count_handler handler) { - handler(hosts_.count()); + handler(hosts_->count()); } } // namespace network diff --git a/src/pending_channels.cpp b/src/pending_channels.cpp index 17ccd7255..9839db1b8 100644 --- a/src/pending_channels.cpp +++ b/src/pending_channels.cpp @@ -47,37 +47,55 @@ bool pending_channels::safe_store(channel::ptr channel) // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); + mutex_.lock_upgrade(); const auto it = std::find_if(channels_.begin(), channels_.end(), match); const auto found = it != channels_.end(); if (!found) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); channels_.push_back(channel); + mutex_.unlock(); + //--------------------------------------------------------------------- + return true; + } - return found; + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// + + return false; } void pending_channels::store(channel::ptr channel, result_handler handler) { - handler(safe_store(channel) ? error::address_in_use : error::success); + handler(safe_store(channel) ? error::success : error::address_in_use); } bool pending_channels::safe_remove(channel::ptr channel) { // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); + mutex_.lock_upgrade(); const auto it = std::find(channels_.begin(), channels_.end(), channel); const auto found = it != channels_.end(); if (found) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); channels_.erase(it); + mutex_.unlock(); + //--------------------------------------------------------------------- + return true; + } - return found; + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// + + return false; } void pending_channels::remove(channel::ptr channel, result_handler handler) diff --git a/src/pending_sockets.cpp b/src/pending_sockets.cpp index 116b9baa2..e26567026 100644 --- a/src/pending_sockets.cpp +++ b/src/pending_sockets.cpp @@ -64,13 +64,23 @@ void pending_sockets::remove(asio::socket_ptr socket) { // Critical Section /////////////////////////////////////////////////////////////////////////// - unique_lock lock(mutex_); + mutex_.lock_upgrade(); auto it = std::find(sockets_.begin(), sockets_.end(), socket); + const auto found = it != sockets_.end(); // Clear can be called at any time, so the entry may not be found. - if (it != sockets_.end()) + if (found) + { + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex_.unlock_upgrade_and_lock(); sockets_.erase(it); + mutex_.unlock(); + //--------------------------------------------------------------------- + return; + } + + mutex_.unlock_upgrade(); /////////////////////////////////////////////////////////////////////////// } diff --git a/test/p2p.cpp b/test/p2p.cpp index 50be71869..29b59ac73 100644 --- a/test/p2p.cpp +++ b/test/p2p.cpp @@ -64,10 +64,11 @@ using namespace bc::network; config.seeds = { { SEED1 } }; \ config.hosts_file = get_log_path(TEST_NAME, "hosts") -#define SETTINGS_TESTNET_TWO_THREADS_ONE_SEED_OUTBOUND(config) \ +#define SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_TWO_OUTBOUND(config) \ auto config = settings::testnet; \ - config.threads = 2; \ + config.threads = 3; \ config.connection_limit = 0; \ + config.outbound_connections = 2; \ config.seeds = { { SEED1 } }; \ config.hosts_file = get_log_path(TEST_NAME, "hosts") @@ -184,12 +185,12 @@ static int send_result(const Message& message, p2p& network, int channels) }; std::promise promise; - const auto completion_counter = [&promise](code ec) + const auto completion_handler = [&promise](code ec) { promise.set_value(ec); }; - network.broadcast(message, channel_counter, completion_counter); + network.broadcast(message, channel_counter, completion_handler); const auto result = promise.get_future().get().value(); BOOST_REQUIRE_EQUAL(channels, 0); @@ -396,8 +397,6 @@ BOOST_AUTO_TEST_CASE(p2p__subscribe__started_stop__service_stopped) { BOOST_REQUIRE(!channel); BOOST_REQUIRE_EQUAL(ec, error::service_stopped); - - // Resubscribe attempt here would prevent subscriber clearance. return false; }; @@ -429,17 +428,17 @@ BOOST_AUTO_TEST_CASE(p2p__broadcast__ping_two_distinct_hosts__two_sends_and_succ BOOST_REQUIRE_EQUAL(send_result(ping(0), network, 2), error::success); } -// This test may be a little slow. BOOST_AUTO_TEST_CASE(p2p__subscribe__seed_outbound__success) { print_headers(TEST_NAME); - SETTINGS_TESTNET_TWO_THREADS_ONE_SEED_OUTBOUND(configuration); + SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_TWO_OUTBOUND(configuration); p2p network(configuration); BOOST_REQUIRE_EQUAL(start_result(network), error::success); std::promise subscribe; const auto subscribe_handler = [&subscribe, &network](code ec, channel::ptr) { + // Fires on first connection. subscribe.set_value(ec); return false; }; @@ -448,12 +447,16 @@ BOOST_AUTO_TEST_CASE(p2p__subscribe__seed_outbound__success) std::promise run; const auto run_handler = [&run, &network](code ec) { + // Fires once the session is started. run.set_value(ec); }; network.run(run_handler); BOOST_REQUIRE_EQUAL(run.get_future().get().value(), error::success); BOOST_REQUIRE_EQUAL(subscribe.get_future().get().value(), error::success); + + // ~network blocks on stopping all channels. + // during channel.stop each channel removes itself from the collection. } BOOST_AUTO_TEST_SUITE_END() From ec67ef3a814f4a2b92741eee4043a104302d9aef Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 00:36:31 -0700 Subject: [PATCH 4/9] Comments. --- src/sessions/session_manual.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sessions/session_manual.cpp b/src/sessions/session_manual.cpp index f274cb54f..e99b90547 100644 --- a/src/sessions/session_manual.cpp +++ b/src/sessions/session_manual.cpp @@ -112,6 +112,9 @@ void session_manual::handle_connect(const code& ec, channel::ptr channel, << "Failure connecting [" << config::endpoint(hostname, port) << "] manually: " << ec.message(); + // TODO: rename manual_retry_limit to manual_connection_attempts + // and treat zero as infinity. Retry is misleading in this case. + // Retry logic. if (settings_.manual_retry_limit == 0) start_connect(hostname, port, handler, 0); From 93254ca7b89c92f1068f58de82e74571031c0eae Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 00:37:42 -0700 Subject: [PATCH 5/9] Set retry limit for manual connection test (prevent test stall). --- test/p2p.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/test/p2p.cpp b/test/p2p.cpp index 29b59ac73..172d436f0 100644 --- a/test/p2p.cpp +++ b/test/p2p.cpp @@ -56,7 +56,8 @@ using namespace bc::network; config.threads = 1; \ config.host_pool_capacity = 0; \ config.outbound_connections = 0; \ - config.connection_limit = 0 + config.connection_limit = 0; \ + config.manual_retry_limit = 2 #define SETTINGS_TESTNET_ONE_THREAD_ONE_SEED(config) \ SETTINGS_TESTNET_ONE_THREAD_NO_CONNECTIONS(config); \ @@ -64,11 +65,11 @@ using namespace bc::network; config.seeds = { { SEED1 } }; \ config.hosts_file = get_log_path(TEST_NAME, "hosts") -#define SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_TWO_OUTBOUND(config) \ +#define SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_FIVE_OUTBOUND(config) \ auto config = settings::testnet; \ config.threads = 3; \ config.connection_limit = 0; \ - config.outbound_connections = 2; \ + config.outbound_connections = 5; \ config.seeds = { { SEED1 } }; \ config.hosts_file = get_log_path(TEST_NAME, "hosts") @@ -431,7 +432,7 @@ BOOST_AUTO_TEST_CASE(p2p__broadcast__ping_two_distinct_hosts__two_sends_and_succ BOOST_AUTO_TEST_CASE(p2p__subscribe__seed_outbound__success) { print_headers(TEST_NAME); - SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_TWO_OUTBOUND(configuration); + SETTINGS_TESTNET_THREE_THREADS_ONE_SEED_FIVE_OUTBOUND(configuration); p2p network(configuration); BOOST_REQUIRE_EQUAL(start_result(network), error::success); From 52ff8e24043812dfaa5bc9c34a4d361ef5665f29 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 00:43:18 -0700 Subject: [PATCH 6/9] Make synchronizer error suppression explicit. --- src/connector.cpp | 9 +++++---- src/protocols/protocol_seed.cpp | 2 +- src/protocols/protocol_version.cpp | 2 +- src/sessions/session_batch.cpp | 9 ++++++--- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/connector.cpp b/src/connector.cpp index 1ec570099..840044b34 100644 --- a/src/connector.cpp +++ b/src/connector.cpp @@ -120,7 +120,6 @@ void connector::connect(const std::string& hostname, uint16_t port, // We preserve the asynchronous contract of the async_resolve. // Dispatch ensures job does not execute in the current thread. dispatch_.concurrent(handler, error::service_stopped, nullptr); - mutex_.unlock_upgrade(); //--------------------------------------------------------------------- return; @@ -150,7 +149,6 @@ void connector::handle_resolve(const boost_code& ec, asio::iterator iterator, if (stopped_) { dispatch_.concurrent(handler, error::service_stopped, nullptr); - mutex_.unlock_upgrade(); //--------------------------------------------------------------------- return; @@ -159,7 +157,6 @@ void connector::handle_resolve(const boost_code& ec, asio::iterator iterator, if (ec) { dispatch_.concurrent(handler, error::resolve_failed, nullptr); - mutex_.unlock_upgrade(); //--------------------------------------------------------------------- return; @@ -173,7 +170,7 @@ void connector::handle_resolve(const boost_code& ec, asio::iterator iterator, pending_.store(socket); // Manage the socket-timer race. - const auto handle_connect = synchronize(handler, 1, NAME); + const auto handle_connect = synchronize(handler, 1, NAME, false); // This is branch #1 of the connnect sequence. timer->start( @@ -205,12 +202,16 @@ void connector::handle_timer(const code& ec, asio::socket_ptr socket, else handler(error::channel_timeout, nullptr); + log::info(LOG_NETWORK) << "connector::handle_timer - close socket"; + // Critical Section /////////////////////////////////////////////////////////////////////////// unique_lock lock(mutex_); proxy::close(socket); /////////////////////////////////////////////////////////////////////////// + + log::info(LOG_NETWORK) << "connector::handle_timer - closed socket"; } // Connect sequence. diff --git a/src/protocols/protocol_seed.cpp b/src/protocols/protocol_seed.cpp index 7ef5011be..8a2ada995 100644 --- a/src/protocols/protocol_seed.cpp +++ b/src/protocols/protocol_seed.cpp @@ -59,7 +59,7 @@ void protocol_seed::start(event_handler handler) } protocol_timer::start(settings.channel_germination(), - synchronize(complete, 3, NAME)); + synchronize(complete, 3, NAME, false)); SUBSCRIBE2(address, handle_receive_address, _1, _2); send_own_address(settings); diff --git a/src/protocols/protocol_version.cpp b/src/protocols/protocol_version.cpp index 5e963c571..4aa213ca4 100644 --- a/src/protocols/protocol_version.cpp +++ b/src/protocols/protocol_version.cpp @@ -94,7 +94,7 @@ void protocol_version::start(event_handler handler) // The handler is invoked in the context of the last message receipt. protocol_timer::start(settings.channel_handshake(), - synchronize(handler, 2, NAME)); + synchronize(handler, 2, NAME, false)); const auto self = template_factory(authority(), settings, nonce(), height); SUBSCRIBE2(version, handle_receive_version, _1, _2); diff --git a/src/sessions/session_batch.cpp b/src/sessions/session_batch.cpp index 75753a6ba..a0b63a64c 100644 --- a/src/sessions/session_batch.cpp +++ b/src/sessions/session_batch.cpp @@ -46,9 +46,12 @@ session_batch::session_batch(p2p& network, bool persistent) void session_batch::connect(connector::ptr connect, channel_handler handler) { const auto batch = std::max(settings_.connect_batch_size, 1u); - const auto complete = synchronize(handler, 1, NAME); - // We can't use dispatch::race because it doesn't increment the refcount. + // BUGBUG: connections that lose this race are orphaned until shutdown. + // The completion handler is only invoked on successful connection. + // Otherwise it just loops over connections until externally stopped. + const auto complete = synchronize(handler, 1, NAME, false); + for (uint32_t host = 0; host < batch; ++host) new_connect(connect, complete); } @@ -78,7 +81,7 @@ void session_batch::start_connect(const code& ec, const authority& host, return; } - // This could create a tight loop in the case of a small pool. + // This creates a tight loop in the case of a small address pool. if (blacklisted(host)) { log::debug(LOG_NETWORK) From bdd8f4ba92d2a19347a33f3db9a6e769492efdc0 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 17:07:37 -0700 Subject: [PATCH 7/9] Add new_channel method to acceptor for symmetry with connector. --- include/bitcoin/network/acceptor.hpp | 1 + src/acceptor.cpp | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/include/bitcoin/network/acceptor.hpp b/include/bitcoin/network/acceptor.hpp index 774cc8e89..cc5ff8672 100644 --- a/include/bitcoin/network/acceptor.hpp +++ b/include/bitcoin/network/acceptor.hpp @@ -62,6 +62,7 @@ class BCT_API acceptor private: code safe_listen(uint16_t port); + std::shared_ptr new_channel(asio::socket_ptr socket); void handle_accept(const boost_code& ec, asio::socket_ptr socket, accept_handler handler); diff --git a/src/acceptor.cpp b/src/acceptor.cpp index ee5fb7de1..4cedf42ba 100644 --- a/src/acceptor.cpp +++ b/src/acceptor.cpp @@ -135,9 +135,14 @@ void acceptor::handle_accept(const boost_code& ec, asio::socket_ptr socket, if (ec) handler(error::boost_to_error_code(ec), nullptr); else - handler(error::success, - std::make_shared(pool_, socket, settings_)); + handler(error::success, new_channel(socket)); } +std::shared_ptr acceptor::new_channel(asio::socket_ptr socket) +{ + return std::make_shared(pool_, socket, settings_); +} + + } // namespace network } // namespace libbitcoin From 04150bf9bb58ae8f488edc2832aa89e42925b501 Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 17:08:47 -0700 Subject: [PATCH 8/9] Remove unintended debug statements. --- src/connector.cpp | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/connector.cpp b/src/connector.cpp index 840044b34..09b49e1a8 100644 --- a/src/connector.cpp +++ b/src/connector.cpp @@ -202,16 +202,12 @@ void connector::handle_timer(const code& ec, asio::socket_ptr socket, else handler(error::channel_timeout, nullptr); - log::info(LOG_NETWORK) << "connector::handle_timer - close socket"; - // Critical Section /////////////////////////////////////////////////////////////////////////// unique_lock lock(mutex_); proxy::close(socket); /////////////////////////////////////////////////////////////////////////// - - log::info(LOG_NETWORK) << "connector::handle_timer - closed socket"; } // Connect sequence. From d117eac5caa2ed409c7aa6656edb5801519c2a5b Mon Sep 17 00:00:00 2001 From: evoskuil Date: Sat, 19 Mar 2016 20:26:30 -0700 Subject: [PATCH 9/9] Fix session batch to prevent over-connection. --- .../network/sessions/session_batch.hpp | 20 ++++- src/sessions/session_batch.cpp | 84 +++++++++++++++---- 2 files changed, 84 insertions(+), 20 deletions(-) diff --git a/include/bitcoin/network/sessions/session_batch.hpp b/include/bitcoin/network/sessions/session_batch.hpp index 39da02db4..ebd608a01 100644 --- a/include/bitcoin/network/sessions/session_batch.hpp +++ b/include/bitcoin/network/sessions/session_batch.hpp @@ -20,6 +20,9 @@ #ifndef LIBBITCOIN_NETWORK_SESSION_BATCH_HPP #define LIBBITCOIN_NETWORK_SESSION_BATCH_HPP +#include +#include +#include #include #include #include @@ -45,14 +48,25 @@ class BCT_API session_batch virtual void connect(connector::ptr connect, channel_handler handler); private: + typedef std::atomic atomic_counter; + typedef std::shared_ptr atomic_counter_ptr; + typedef std::shared_ptr upgrade_mutex_ptr; + + void converge(const code& ec, channel::ptr channel, + atomic_counter_ptr counter, upgrade_mutex_ptr mutex, + channel_handler handler); // Connect sequence - void new_connect(connector::ptr connect, channel_handler handler); + void new_connect(connector::ptr connect, atomic_counter_ptr counter, + channel_handler handler); void start_connect(const code& ec, const authority& host, - connector::ptr connect, channel_handler handler); + connector::ptr connect, atomic_counter_ptr counter, + channel_handler handler); void handle_connect(const code& ec, channel::ptr channel, const authority& host, connector::ptr connect, - channel_handler handler); + atomic_counter_ptr counter, channel_handler handler); + + const size_t batch_size_; }; } // namespace network diff --git a/src/sessions/session_batch.cpp b/src/sessions/session_batch.cpp index a0b63a64c..e6f516c50 100644 --- a/src/sessions/session_batch.cpp +++ b/src/sessions/session_batch.cpp @@ -19,7 +19,9 @@ */ #include +#include #include +#include #include #include #include @@ -35,29 +37,67 @@ using std::placeholders::_1; using std::placeholders::_2; session_batch::session_batch(p2p& network, bool persistent) - : session(network, true, persistent) + : session(network, true, persistent), + batch_size_(std::max(settings_.connect_batch_size, 1u)) { } +void session_batch::converge(const code& ec, channel::ptr channel, + atomic_counter_ptr counter, upgrade_mutex_ptr mutex, + channel_handler handler) +{ + /////////////////////////////////////////////////////////////////////////// + // Critical Section + mutex->lock_upgrade(); + + const auto initial_count = counter->load(); + BITCOIN_ASSERT(initial_count <= batch_size_); + + // Already completed, don't call handler. + if (initial_count == batch_size_) + { + mutex->unlock_upgrade(); + //----------------------------------------------------------------- + if (!ec) + channel->stop(error::channel_stopped); + + return; + } + + const auto count = !ec ? batch_size_ : initial_count + 1; + const auto cleared = count == batch_size_; + + //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + mutex->unlock_upgrade_and_lock(); + counter->store(count); + mutex->unlock(); + /////////////////////////////////////////////////////////////////////// + + if (cleared) + { + // If the last connection attempt is an error, normalize the code. + const auto result = ec ? error::operation_failed : error::success; + handler(result, channel); + } +} + // Connect sequence. // ---------------------------------------------------------------------------- // protected: void session_batch::connect(connector::ptr connect, channel_handler handler) { - const auto batch = std::max(settings_.connect_batch_size, 1u); - - // BUGBUG: connections that lose this race are orphaned until shutdown. - // The completion handler is only invoked on successful connection. - // Otherwise it just loops over connections until externally stopped. - const auto complete = synchronize(handler, 1, NAME, false); + // synchronizer state. + const auto mutex = std::make_shared(); + const auto counter = std::make_shared(0); + const auto singular = BIND5(converge, _1, _2, counter, mutex, handler); - for (uint32_t host = 0; host < batch; ++host) - new_connect(connect, complete); + for (uint32_t host = 0; host < batch_size_; ++host) + new_connect(connect, counter, singular); } void session_batch::new_connect(connector::ptr connect, - channel_handler handler) + atomic_counter_ptr counter, channel_handler handler) { if (stopped()) { @@ -66,12 +106,18 @@ void session_batch::new_connect(connector::ptr connect, return; } - fetch_address(BIND4(start_connect, _1, _2, connect, handler)); + if (counter->load() == batch_size_) + return; + + fetch_address(BIND5(start_connect, _1, _2, connect, counter, handler)); } void session_batch::start_connect(const code& ec, const authority& host, - connector::ptr connect, channel_handler handler) + connector::ptr connect, atomic_counter_ptr counter, channel_handler handler) { + if (counter->load() == batch_size_) + return; + // This termination prevents a tight loop in the empty address pool case. if (ec) { @@ -86,7 +132,7 @@ void session_batch::start_connect(const code& ec, const authority& host, { log::debug(LOG_NETWORK) << "Fetched blacklisted address [" << host << "] "; - new_connect(connect, handler); + handler(error::address_blocked, nullptr); return; } @@ -94,19 +140,23 @@ void session_batch::start_connect(const code& ec, const authority& host, << "Connecting to [" << host << "]"; // CONNECT - connect->connect(host, BIND5(handle_connect, _1, _2, host, connect, - handler)); + connect->connect(host, BIND6(handle_connect, _1, _2, host, connect, + counter, handler)); } void session_batch::handle_connect(const code& ec, channel::ptr channel, - const authority& host, connector::ptr connect, channel_handler handler) + const authority& host, connector::ptr connect, atomic_counter_ptr counter, + channel_handler handler) { + if (counter->load() == batch_size_) + return; + if (ec) { log::debug(LOG_NETWORK) << "Failure connecting to [" << host << "] " << ec.message(); - new_connect(connect, handler); + handler(ec, nullptr); return; }