Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions include/bitcoin/network/acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ class BCT_API acceptor
virtual void stop();

private:
bool stopped();
code safe_listen(uint16_t port);
std::shared_ptr<channel> new_channel(asio::socket_ptr socket);
void handle_accept(const boost_code& ec, asio::socket_ptr socket,
accept_handler handler);

std::atomic<bool> stopped_;
threadpool& pool_;
const settings& settings_;
dispatcher dispatch_;
Expand Down
59 changes: 21 additions & 38 deletions include/bitcoin/network/connections.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ class BCT_API connections
{
public:
typedef std::shared_ptr<connections> ptr;
typedef config::authority authority;
typedef std::function<void(bool)> truth_handler;
typedef std::function<void(size_t)> count_handler;
typedef std::function<void(const code&)> result_handler;
typedef std::function<void(const code&, channel::ptr)> channel_handler;

/// Construct an instance.
connections(threadpool& pool);
connections();

/// Validate connections stopped.
~connections();
Expand All @@ -57,63 +56,47 @@ class BCT_API connections
connections(const connections&) = delete;
void operator=(const connections&) = delete;

/// Completion handler returns operation_failed if any channel send failed.
/// Completion handler always returns success.
template <typename Message>
void broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
const auto method = &connections::do_broadcast<Message>;
dispatch_.concurrent(method, shared_from_this(), message,
handle_channel, handle_complete);
}

virtual void stop(const code& ec);
virtual void count(count_handler handler) const;
virtual void store(channel::ptr channel, result_handler handler);
virtual void remove(channel::ptr channel, result_handler handler);
virtual void exists(const authority& authority,
truth_handler handler) const;

private:
typedef std::vector<channel::ptr> list;
// We cannot use a synchronizer here because handler closure in loop.
auto counter = std::make_shared<std::atomic<size_t>>(channels_.size());

template <typename Message>
void do_broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
// The list is copied, which protects the iteration without a lock.
auto channels = safe_copy();
const auto size = channels.size();
const auto counter = std::make_shared<std::atomic<size_t>>(size);
const auto result = std::make_shared<std::atomic<error::error_code_t>>(
error::success);

for (const auto channel: channels)
for (const auto channel: safe_copy())
{
const auto handle_send = [=](const code ec)
const auto handle_send = [=](code ec)
{
handle_channel(ec, channel);

if (ec)
result->store(error::operation_failed);

if (counter->fetch_sub(1) == 1)
handle_complete(result->load());
handle_complete(error::success);
};

channel->send(message, handle_send);
}
}

virtual void stop(const code& ec);
virtual void count(count_handler handler) const;
virtual void store(channel::ptr channel, result_handler handler);
virtual void remove(channel::ptr channel, result_handler handler);
virtual void exists(const config::authority& authority,
truth_handler handler) const;

private:
typedef std::vector<channel::ptr> list;

list safe_copy() const;
size_t safe_count() const;
bool safe_store(channel::ptr channel);
code safe_store(channel::ptr channel);
bool safe_remove(channel::ptr channel);
bool safe_exists(const authority& address) const;
bool safe_exists(const config::authority& address) const;

list channels_;
dispatcher dispatch_;
mutable shared_mutex mutex_;
std::atomic<bool> stopped_;
mutable upgrade_mutex mutex_;
};

} // namespace network
Expand Down
5 changes: 4 additions & 1 deletion include/bitcoin/network/hosts.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <vector>
#include <boost/circular_buffer.hpp>
Expand All @@ -40,8 +41,10 @@ namespace network {
/// The file is a line-oriented set of config::authority serializations.
/// Duplicate addresses and those with zero-valued ports are disacarded.
class BCT_API hosts
: public enable_shared_from_base<hosts>
{
public:
typedef std::shared_ptr<hosts> ptr;
typedef message::network_address address;
typedef std::function<void(const code&)> result_handler;

Expand All @@ -54,7 +57,7 @@ class BCT_API hosts

virtual code load();
virtual code save();
virtual size_t count();
virtual size_t count() const;
virtual code fetch(address& out);
virtual code remove(const address& host);
virtual code store(const address& host);
Expand Down
21 changes: 4 additions & 17 deletions include/bitcoin/network/p2p.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,12 @@ class BCT_API p2p

// ------------------------------------------------------------------------

/// Send message to all connections, handler invoked for each channel.
/// Send message to all connections.
template <typename Message>
void broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
dispatch_.ordered(
std::bind(&network::p2p::do_broadcast<Message>,
this, message, handle_channel, handle_complete));
connections_->broadcast(message, handle_channel, handle_complete);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -168,17 +166,7 @@ class BCT_API p2p
return std::make_shared<Session>(*this, std::forward<Args>(args)...);
}

/// No-operation handler, used in default stop handling.
static result_handler unhandled;

private:
template <typename Message>
void do_broadcast(const Message& message, channel_handler handle_channel,
result_handler handle_complete)
{
connections_->broadcast(message, handle_channel, handle_complete);
}

void handle_stopped(const code& ec);
void handle_manual_started(const code& ec, result_handler handler);
void handle_inbound_started(const code& ec, result_handler handler);
Expand All @@ -196,9 +184,8 @@ class BCT_API p2p

// These are thread safe.
threadpool threadpool_;
dispatcher dispatch_;
hosts hosts_;
std::shared_ptr<connections> connections_;
hosts::ptr hosts_;
connections::ptr connections_;
stop_subscriber::ptr stop_subscriber_;
channel_subscriber::ptr channel_subscriber_;
};
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/network/pending_channels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class BCT_API pending_channels
bool safe_exists(uint64_t version_nonce) const;

list channels_;
mutable shared_mutex mutex_;
mutable upgrade_mutex mutex_;
};

} // namespace network
Expand Down
20 changes: 17 additions & 3 deletions include/bitcoin/network/sessions/session_batch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#ifndef LIBBITCOIN_NETWORK_SESSION_BATCH_HPP
#define LIBBITCOIN_NETWORK_SESSION_BATCH_HPP

#include <atomic>
#include <cstddef>
#include <memory>
#include <bitcoin/bitcoin.hpp>
#include <bitcoin/network/channel.hpp>
#include <bitcoin/network/connector.hpp>
Expand All @@ -45,14 +48,25 @@ class BCT_API session_batch
virtual void connect(connector::ptr connect, channel_handler handler);

private:
typedef std::atomic<size_t> atomic_counter;
typedef std::shared_ptr<atomic_counter> atomic_counter_ptr;
typedef std::shared_ptr<upgrade_mutex> upgrade_mutex_ptr;

void converge(const code& ec, channel::ptr channel,
atomic_counter_ptr counter, upgrade_mutex_ptr mutex,
channel_handler handler);

// Connect sequence
void new_connect(connector::ptr connect, channel_handler handler);
void new_connect(connector::ptr connect, atomic_counter_ptr counter,
channel_handler handler);
void start_connect(const code& ec, const authority& host,
connector::ptr connect, channel_handler handler);
connector::ptr connect, atomic_counter_ptr counter,
channel_handler handler);
void handle_connect(const code& ec, channel::ptr channel,
const authority& host, connector::ptr connect,
channel_handler handler);
atomic_counter_ptr counter, channel_handler handler);

const size_t batch_size_;
};

} // namespace network
Expand Down
12 changes: 8 additions & 4 deletions src/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ using std::placeholders::_1;
static const auto reuse_address = asio::acceptor::reuse_address(true);

acceptor::acceptor(threadpool& pool, const settings& settings)
: stopped_(true),
pool_(pool),
: pool_(pool),
settings_(settings),
dispatch_(pool, NAME),
acceptor_(std::make_shared<asio::acceptor>(pool_.service())),
Expand Down Expand Up @@ -136,9 +135,14 @@ void acceptor::handle_accept(const boost_code& ec, asio::socket_ptr socket,
if (ec)
handler(error::boost_to_error_code(ec), nullptr);
else
handler(error::success,
std::make_shared<channel>(pool_, socket, settings_));
handler(error::success, new_channel(socket));
}

std::shared_ptr<channel> acceptor::new_channel(asio::socket_ptr socket)
{
return std::make_shared<channel>(pool_, socket, settings_);
}


} // namespace network
} // namespace libbitcoin
Loading