From 188a42a71d15289cd72d5eee7a7583f1b886df89 Mon Sep 17 00:00:00 2001 From: chengzhiping Date: Mon, 23 Apr 2018 17:36:04 +0800 Subject: [PATCH] clear hosts.cache if large invalid peer nodes. #271 and restart seeding --- include/metaverse/network/hosts.hpp | 3 ++ include/metaverse/network/p2p.hpp | 5 ++- .../network/sessions/session_outbound.hpp | 5 +++ .../network/sessions/session_seed.hpp | 2 + src/lib/network/hosts.cpp | 21 ++++++++++ src/lib/network/p2p.cpp | 21 +++++++++- src/lib/network/sessions/session_outbound.cpp | 39 ++++++++++++++++++- src/lib/network/sessions/session_seed.cpp | 10 ++++- 8 files changed, 100 insertions(+), 6 deletions(-) diff --git a/include/metaverse/network/hosts.hpp b/include/metaverse/network/hosts.hpp index 2c05b7e15..4937e209d 100644 --- a/include/metaverse/network/hosts.hpp +++ b/include/metaverse/network/hosts.hpp @@ -69,6 +69,9 @@ class BCT_API hosts // Save hosts to file. virtual code stop(); + // Clear hosts buffer + virtual code clear(); + virtual size_t count() const; virtual code fetch(address& out, const config::authority::list& excluded_list); virtual code remove(const address& host); diff --git a/include/metaverse/network/p2p.hpp b/include/metaverse/network/p2p.hpp index 6e8d06eb4..e894daafc 100644 --- a/include/metaverse/network/p2p.hpp +++ b/include/metaverse/network/p2p.hpp @@ -202,6 +202,9 @@ class BCT_API p2p config::authority::ptr get_out_address(); #endif + //restart the seeding session + void restart_seeding(); + protected: /// Attach a session to the network, caller must start the session. @@ -239,7 +242,7 @@ class BCT_API p2p connections::ptr connections_; stop_subscriber::ptr stop_subscriber_; channel_subscriber::ptr channel_subscriber_; - + session_seed::ptr seed; }; } // namespace network } // namespace libbitcoin diff --git a/include/metaverse/network/sessions/session_outbound.hpp b/include/metaverse/network/sessions/session_outbound.hpp index d8d315b88..f683428b6 100644 --- a/include/metaverse/network/sessions/session_outbound.hpp +++ b/include/metaverse/network/sessions/session_outbound.hpp @@ -52,6 +52,8 @@ class BCT_API session_outbound virtual void attach_protocols(channel::ptr channel); void delay_new_connect(connector::ptr connect); + void delay_reseeding(); + private: void new_connection(connector::ptr connect); void handle_started(const code& ec, result_handler handler); @@ -62,6 +64,9 @@ class BCT_API session_outbound channel::ptr channel); void handle_channel_start(const code& ec, connector::ptr connect, channel::ptr channel); + + std::atomic_int outbound_counter; + p2p& network__; }; } // namespace network diff --git a/include/metaverse/network/sessions/session_seed.hpp b/include/metaverse/network/sessions/session_seed.hpp index e897bfdc8..106432793 100644 --- a/include/metaverse/network/sessions/session_seed.hpp +++ b/include/metaverse/network/sessions/session_seed.hpp @@ -48,6 +48,8 @@ class BCT_API session_seed /// Start the session. void start(result_handler handler) override; + void restart(result_handler handler); + protected: /// Override to attach specialized protocols upon channel start. virtual void attach_protocols(channel::ptr channel, diff --git a/src/lib/network/hosts.cpp b/src/lib/network/hosts.cpp index b661eb992..422c4a901 100644 --- a/src/lib/network/hosts.cpp +++ b/src/lib/network/hosts.cpp @@ -282,6 +282,27 @@ code hosts::stop() return error::success; } +code hosts::clear() +{ + // Critical Section + mutex_.lock_upgrade(); + + if (stopped_) + { + mutex_.unlock_upgrade(); + //--------------------------------------------------------------------- + return error::service_stopped; + } + + + mutex_.unlock_upgrade_and_lock(); + buffer_.clear(); + mutex_.unlock(); + /////////////////////////////////////////////////////////////////////////// + + return error::success; +} + code hosts::remove(const address& host) { /////////////////////////////////////////////////////////////////////////// diff --git a/src/lib/network/p2p.cpp b/src/lib/network/p2p.cpp index 4704e2c36..559a604a5 100644 --- a/src/lib/network/p2p.cpp +++ b/src/lib/network/p2p.cpp @@ -66,7 +66,8 @@ p2p::p2p(const settings& settings) hosts_(std::make_shared(threadpool_, settings_)), connections_(std::make_shared()), stop_subscriber_(std::make_shared(threadpool_, NAME "_stop_sub")), - channel_subscriber_(std::make_shared(threadpool_, NAME "_sub")) + channel_subscriber_(std::make_shared(threadpool_, NAME "_sub")), + seed(nullptr) { } @@ -143,7 +144,7 @@ void p2p::handle_hosts_loaded(const code& ec, result_handler handler) } // The instance is retained by the stop handler (until shutdown). - const auto seed = attach_seed_session(); + seed = attach_seed_session(); // This is invoked on a new thread. seed->start( @@ -618,6 +619,22 @@ void p2p::map_port(bool use_upnp) } } +void p2p::restart_seeding() +{ + //1. clear the host::buffer_ cache + const auto result = hosts_->clear(); + log::info(LOG_NETWORK) << "restart_seeding clear hosts cache: " << result.message(); + + //2. start the session_seed + result_handler handler = [](const code& ec) { + log::info(LOG_NETWORK) << "restart_seeding result: " << ec.message(); + }; + + seed->restart(handler); + + +} + #else void p2p::map_port(bool) { diff --git a/src/lib/network/sessions/session_outbound.cpp b/src/lib/network/sessions/session_outbound.cpp index 400252f90..1d2f10314 100644 --- a/src/lib/network/sessions/session_outbound.cpp +++ b/src/lib/network/sessions/session_outbound.cpp @@ -37,6 +37,7 @@ using namespace std::placeholders; session_outbound::session_outbound(p2p& network) : session_batch(network, true), + network__(network), CONSTRUCT_TRACK(session_outbound) { } @@ -65,6 +66,8 @@ void session_outbound::handle_started(const code& ec, result_handler handler) return; } + outbound_counter = 0; + const auto connect = create_connector(); for (size_t peer = 0; peer < settings_.outbound_connections; ++peer) { @@ -109,6 +112,30 @@ void session_outbound::delay_new_connect(connector::ptr connect) }); } +void session_outbound::delay_reseeding() +{ + log::error(LOG_NETWORK) << "outbound channel counter decreased, trigger the re-seeding timer!"; + auto timer = std::make_shared(pool_, asio::seconds(60)); + auto self = shared_from_this(); + timer->start([this, timer, self](const code& ec){ + if (stopped()) + { + return; + } + auto pThis = shared_from_this(); + auto action = [this, pThis](){ + const int counter = outbound_counter; + if (counter > 1) { + log::info(LOG_NETWORK) << "outbound channel counter recovered to [" << counter << "], re-seeding is canceled!"; + return; + } + log::error(LOG_NETWORK) << "execute re-seeding!"; + network__.restart_seeding(); + }; + pool_.service().post(action); + }); +} + void session_outbound::handle_connect(const code& ec, channel::ptr channel, connector::ptr connect) { @@ -122,7 +149,7 @@ void session_outbound::handle_connect(const code& ec, channel::ptr channel, log::trace(LOG_NETWORK) << "Connected to outbound channel [" << channel->authority() << "]"; - + ++outbound_counter; register_channel(channel, BIND3(handle_channel_start, _1, connect, channel), BIND3(handle_channel_stop, _1, connect, channel)); @@ -157,11 +184,19 @@ void session_outbound::handle_channel_stop(const code& ec, channel->invoke_protocol_start_handler(error::channel_stopped); log::debug(LOG_NETWORK) << "channel stopped," << ec.message(); + const int counter = --outbound_counter; + if(! stopped() && ec.value() != error::service_stopped) { - delay_new_connect(connect); + delay_new_connect(connect); + //restart the seeding procedure with in 1 minutes when outbound session count reduce to 1. + if (counter == 1) { + delay_reseeding(); + } } } + + } // namespace network } // namespace libbitcoin diff --git a/src/lib/network/sessions/session_seed.cpp b/src/lib/network/sessions/session_seed.cpp index 649820319..22561ee7a 100644 --- a/src/lib/network/sessions/session_seed.cpp +++ b/src/lib/network/sessions/session_seed.cpp @@ -60,6 +60,11 @@ void session_seed::start(result_handler handler) session::start(CONCURRENT2(handle_started, _1, handler)); } +void session_seed::restart(result_handler handler) +{ + handle_started(error::success, handler); +} + void session_seed::handle_started(const code& ec, result_handler handler) { if (ec) @@ -182,7 +187,7 @@ void session_seed::attach_protocols(channel::ptr channel, void session_seed::handle_channel_stop(const code& ec) { - log::debug(LOG_NETWORK) + log::info(LOG_NETWORK) << "Seed channel stopped: " << ec.message(); } @@ -190,6 +195,9 @@ void session_seed::handle_channel_stop(const code& ec) void session_seed::handle_complete(size_t start_size, result_handler handler) { address_count(BIND3(handle_final_count, _1, start_size, handler)); + + log::info(LOG_NETWORK) + << "session_seed complete!"; } // We succeed only if there is a host count increase.