Skip to content

Commit

Permalink
clear hosts.cache if large invalid peer nodes. #271
Browse files Browse the repository at this point in the history
and restart seeding
  • Loading branch information
chengzhpchn committed Apr 23, 2018
1 parent 120da07 commit 188a42a
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 6 deletions.
3 changes: 3 additions & 0 deletions include/metaverse/network/hosts.hpp
Expand Up @@ -69,6 +69,9 @@ class BCT_API hosts
// Save hosts to file.
virtual code stop();

// Clear hosts buffer
virtual code clear();

virtual size_t count() const;
virtual code fetch(address& out, const config::authority::list& excluded_list);
virtual code remove(const address& host);
Expand Down
5 changes: 4 additions & 1 deletion include/metaverse/network/p2p.hpp
Expand Up @@ -202,6 +202,9 @@ class BCT_API p2p
config::authority::ptr get_out_address();
#endif

//restart the seeding session
void restart_seeding();

protected:

/// Attach a session to the network, caller must start the session.
Expand Down Expand Up @@ -239,7 +242,7 @@ class BCT_API p2p
connections::ptr connections_;
stop_subscriber::ptr stop_subscriber_;
channel_subscriber::ptr channel_subscriber_;

session_seed::ptr seed;
};
} // namespace network
} // namespace libbitcoin
Expand Down
5 changes: 5 additions & 0 deletions include/metaverse/network/sessions/session_outbound.hpp
Expand Up @@ -52,6 +52,8 @@ class BCT_API session_outbound
virtual void attach_protocols(channel::ptr channel);
void delay_new_connect(connector::ptr connect);

void delay_reseeding();

private:
void new_connection(connector::ptr connect);
void handle_started(const code& ec, result_handler handler);
Expand All @@ -62,6 +64,9 @@ class BCT_API session_outbound
channel::ptr channel);
void handle_channel_start(const code& ec, connector::ptr connect,
channel::ptr channel);

std::atomic_int outbound_counter;
p2p& network__;
};

} // namespace network
Expand Down
2 changes: 2 additions & 0 deletions include/metaverse/network/sessions/session_seed.hpp
Expand Up @@ -48,6 +48,8 @@ class BCT_API session_seed
/// Start the session.
void start(result_handler handler) override;

void restart(result_handler handler);

protected:
/// Override to attach specialized protocols upon channel start.
virtual void attach_protocols(channel::ptr channel,
Expand Down
21 changes: 21 additions & 0 deletions src/lib/network/hosts.cpp
Expand Up @@ -282,6 +282,27 @@ code hosts::stop()
return error::success;
}

code hosts::clear()
{
// Critical Section
mutex_.lock_upgrade();

if (stopped_)
{
mutex_.unlock_upgrade();
//---------------------------------------------------------------------
return error::service_stopped;
}


mutex_.unlock_upgrade_and_lock();
buffer_.clear();
mutex_.unlock();
///////////////////////////////////////////////////////////////////////////

return error::success;
}

code hosts::remove(const address& host)
{
///////////////////////////////////////////////////////////////////////////
Expand Down
21 changes: 19 additions & 2 deletions src/lib/network/p2p.cpp
Expand Up @@ -66,7 +66,8 @@ p2p::p2p(const settings& settings)
hosts_(std::make_shared<hosts>(threadpool_, settings_)),
connections_(std::make_shared<connections>()),
stop_subscriber_(std::make_shared<stop_subscriber>(threadpool_, NAME "_stop_sub")),
channel_subscriber_(std::make_shared<channel_subscriber>(threadpool_, NAME "_sub"))
channel_subscriber_(std::make_shared<channel_subscriber>(threadpool_, NAME "_sub")),
seed(nullptr)
{
}

Expand Down Expand Up @@ -143,7 +144,7 @@ void p2p::handle_hosts_loaded(const code& ec, result_handler handler)
}

// The instance is retained by the stop handler (until shutdown).
const auto seed = attach_seed_session();
seed = attach_seed_session();

// This is invoked on a new thread.
seed->start(
Expand Down Expand Up @@ -618,6 +619,22 @@ void p2p::map_port(bool use_upnp)
}
}

void p2p::restart_seeding()
{
//1. clear the host::buffer_ cache
const auto result = hosts_->clear();
log::info(LOG_NETWORK) << "restart_seeding clear hosts cache: " << result.message();

//2. start the session_seed
result_handler handler = [](const code& ec) {
log::info(LOG_NETWORK) << "restart_seeding result: " << ec.message();
};

seed->restart(handler);


}

#else
void p2p::map_port(bool)
{
Expand Down
39 changes: 37 additions & 2 deletions src/lib/network/sessions/session_outbound.cpp
Expand Up @@ -37,6 +37,7 @@ using namespace std::placeholders;

session_outbound::session_outbound(p2p& network)
: session_batch(network, true),
network__(network),
CONSTRUCT_TRACK(session_outbound)
{
}
Expand Down Expand Up @@ -65,6 +66,8 @@ void session_outbound::handle_started(const code& ec, result_handler handler)
return;
}

outbound_counter = 0;

const auto connect = create_connector();
for (size_t peer = 0; peer < settings_.outbound_connections; ++peer)
{
Expand Down Expand Up @@ -109,6 +112,30 @@ void session_outbound::delay_new_connect(connector::ptr connect)
});
}

void session_outbound::delay_reseeding()
{
log::error(LOG_NETWORK) << "outbound channel counter decreased, trigger the re-seeding timer!";
auto timer = std::make_shared<deadline>(pool_, asio::seconds(60));
auto self = shared_from_this();
timer->start([this, timer, self](const code& ec){
if (stopped())
{
return;
}
auto pThis = shared_from_this();
auto action = [this, pThis](){
const int counter = outbound_counter;
if (counter > 1) {
log::info(LOG_NETWORK) << "outbound channel counter recovered to [" << counter << "], re-seeding is canceled!";
return;
}
log::error(LOG_NETWORK) << "execute re-seeding!";
network__.restart_seeding();
};
pool_.service().post(action);
});
}

void session_outbound::handle_connect(const code& ec, channel::ptr channel,
connector::ptr connect)
{
Expand All @@ -122,7 +149,7 @@ void session_outbound::handle_connect(const code& ec, channel::ptr channel,

log::trace(LOG_NETWORK)
<< "Connected to outbound channel [" << channel->authority() << "]";

++outbound_counter;
register_channel(channel,
BIND3(handle_channel_start, _1, connect, channel),
BIND3(handle_channel_stop, _1, connect, channel));
Expand Down Expand Up @@ -157,11 +184,19 @@ void session_outbound::handle_channel_stop(const code& ec,
channel->invoke_protocol_start_handler(error::channel_stopped);
log::debug(LOG_NETWORK) << "channel stopped," << ec.message();

const int counter = --outbound_counter;

if(! stopped() && ec.value() != error::service_stopped)
{
delay_new_connect(connect);
delay_new_connect(connect);
//restart the seeding procedure with in 1 minutes when outbound session count reduce to 1.
if (counter == 1) {
delay_reseeding();
}
}
}



} // namespace network
} // namespace libbitcoin
10 changes: 9 additions & 1 deletion src/lib/network/sessions/session_seed.cpp
Expand Up @@ -60,6 +60,11 @@ void session_seed::start(result_handler handler)
session::start(CONCURRENT2(handle_started, _1, handler));
}

void session_seed::restart(result_handler handler)
{
handle_started(error::success, handler);
}

void session_seed::handle_started(const code& ec, result_handler handler)
{
if (ec)
Expand Down Expand Up @@ -182,14 +187,17 @@ void session_seed::attach_protocols(channel::ptr channel,

void session_seed::handle_channel_stop(const code& ec)
{
log::debug(LOG_NETWORK)
log::info(LOG_NETWORK)
<< "Seed channel stopped: " << ec.message();
}

// This accepts no error code because individual seed errors are suppressed.
void session_seed::handle_complete(size_t start_size, result_handler handler)
{
address_count(BIND3(handle_final_count, _1, start_size, handler));

log::info(LOG_NETWORK)
<< "session_seed complete!";
}

// We succeed only if there is a host count increase.
Expand Down

0 comments on commit 188a42a

Please sign in to comment.