Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions data/bs.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ log_requests = false
secure_only = false
# Enable the query service, defaults to true.
query_service_enabled = true
# Enable the heartbeat service, defaults to true.
heartbeat_service_enabled = true
# Enable the block publishing service, defaults to true.
block_service_enabled = true
# Enable the transaction publishing service, defaults to true.
transaction_service_enabled = true
# Enable the heartbeat service, defaults to false.
heartbeat_service_enabled = false
# Enable the block publishing service, defaults to false.
block_service_enabled = false
# Enable the transaction publishing service, defaults to false.
transaction_service_enabled = false
# The public query endpoint, defaults to 'tcp://*:9091'.
public_query_endpoint = tcp://*:9091
# The public heartbeat endpoint, defaults to 'tcp://*:9092'.
Expand Down
6 changes: 5 additions & 1 deletion include/bitcoin/server/messages/incoming.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ namespace server {
class BCS_API incoming
{
public:
bool receive(bc::protocol::zmq::socket& socket);
/// A printable address for logging only.
std::string address();

/// Send a message from the socket.
code receive(bc::protocol::zmq::socket& socket);

/// The message route as seen at workers.
data_chunk address1;
Expand Down
6 changes: 5 additions & 1 deletion include/bitcoin/server/messages/outgoing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ class BCS_API outgoing
const data_chunk& address1, const data_chunk& address2,
bool delimited);

bool send(bc::protocol::zmq::socket& socket);
/// A printable address for logging only.
std::string address();

/// Send the message one the socket.
code send(bc::protocol::zmq::socket& socket);

protected:
outgoing(const std::string& command, const data_chunk& data,
Expand Down
9 changes: 6 additions & 3 deletions include/bitcoin/server/server_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include <bitcoin/server/configuration.hpp>
#include <bitcoin/server/define.hpp>
////#include <bitcoin/server/services/block_service.hpp>
////#include <bitcoin/server/services/heart_service.hpp>
#include <bitcoin/server/services/heart_service.hpp>
#include <bitcoin/server/services/query_service.hpp>
////#include <bitcoin/server/services/trans_service.hpp>
#include <bitcoin/server/utility/address_notifier.hpp>
Expand Down Expand Up @@ -99,8 +99,9 @@ class BCS_API server_node
void handle_running(const code& ec, result_handler handler);
void handle_closing(const code& ec, std::promise<code>& wait);

void start_query_services(result_handler handler);
void allocate_query_workers(bool secure, result_handler handler);
bool start_heart_services();
bool start_query_services();
bool start_query_workers(bool secure);

const configuration& configuration_;
const size_t last_checkpoint_height_;
Expand All @@ -109,6 +110,8 @@ class BCS_API server_node
curve_authenticator authenticator_;
query_service secure_query_service_;
query_service public_query_service_;
heart_service secure_heart_service_;
heart_service public_heart_service_;

// This is protected by block mutex.
block_notify_list block_subscriptions_;
Expand Down
38 changes: 13 additions & 25 deletions include/bitcoin/server/services/heart_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,19 @@
#ifndef LIBBITCOIN_SERVER_HEART_SERVICE_HPP
#define LIBBITCOIN_SERVER_HEART_SERVICE_HPP

#include <atomic>
#include <chrono>
#include <cstdint>
#include <future>
#include <memory>
#include <bitcoin/protocol.hpp>
#include <bitcoin/server/define.hpp>
#include <bitcoin/server/settings.hpp>

namespace libbitcoin {
namespace server {

class server_node;

class BCS_API heart_service
: public enable_shared_from_base<heart_service>
: public bc::protocol::zmq::worker
{
public:
typedef std::shared_ptr<heart_service> ptr;
Expand All @@ -43,33 +41,23 @@ class BCS_API heart_service
heart_service(bc::protocol::zmq::authenticator& authenticator,
server_node& node, bool secure);

/// This class is not copyable.
heart_service(const heart_service&) = delete;
void operator=(const heart_service&) = delete;
protected:
virtual bool bind(bc::protocol::zmq::socket& publisher);
virtual bool unbind(bc::protocol::zmq::socket& publisher);

/// Start the endpoint.
bool start();
// Implement the service.
virtual void work();

/// Stop the endpoint.
/// Stopping the authenticated context does not stop the publisher.
bool stop();
// Publish the heartbeat (integrated worker).
void publish(uint32_t count, bc::protocol::zmq::socket& socket);

private:
void publisher(std::promise<code>& started);
void send(uint32_t count, bc::protocol::zmq::socket& socket);
const server::settings& settings_;
const int32_t period_;
const bool secure_;

// These are protected by mutex.
// This is thread safe.
bc::protocol::zmq::authenticator& authenticator_;
dispatcher dispatch_;
std::atomic<bool> stopped_;
std::promise<code> stopping_;
mutable shared_mutex mutex_;

const bc::config::endpoint endpoint_;
const std::chrono::seconds interval_;
const bool log_;
const bool enabled_;
const bool secure_;
};

} // namespace server
Expand Down
7 changes: 4 additions & 3 deletions include/bitcoin/server/workers/query_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ class BCS_API query_worker
typedef std::function<void(const incoming&, send_handler)> command_handler;
typedef std::unordered_map<std::string, command_handler> command_map;

virtual bool connect(bc::protocol::zmq::socket& router);
virtual bool disconnect(bc::protocol::zmq::socket& router);

virtual void attach_interface();
virtual void attach(const std::string& command, command_handler handler);
virtual bool connect(bc::protocol::zmq::socket& socket);
virtual bool disconnect(bc::protocol::zmq::socket& socket);
virtual void query(bc::protocol::zmq::socket& socket);
virtual void query(bc::protocol::zmq::socket& router);

// Implement the worker.
virtual void work();
Expand Down
33 changes: 22 additions & 11 deletions src/messages/incoming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,35 @@ namespace server {

using namespace bc::protocol;

std::string incoming::address()
{
return "[" + encode_base16(address1) + "]";
}

// Protocol delimitation migration plan.
//-----------------------------------------------------------------------------
// v1/v2 server: ROUTER, requires not framed
// v3 server: ROUTER, allows/echos framed
// v1/v2 server: ROUTER, requires not delimited
// v3 server: ROUTER, allows/echos delimited
// v1/v2/v3 client: DEALER (not framed)
//-----------------------------------------------------------------------------
// v4 server: ROUTER, requires framed
// v4 client: DEALER (framed) or REQ
// v4 server: ROUTER, requires delimited
// v4 client: DEALER (delimited) or REQ
//-----------------------------------------------------------------------------

// TODO: generalize address stripping with hack parameter of expected
// payload length for parsing undelimited addressing.
bool incoming::receive(zmq::socket& socket)
// TODO: generalize address stripping, store all routes, use (hack) parameter
// of expected payload length for parsing undelimited addressing.
// BUGBUG: current implementation assumes client has not added any addresses.
// This probably prevents the use of generalized zeromq routing to the server.
code incoming::receive(zmq::socket& socket)
{
zmq::message message;
auto ec = message.receive(socket);

if (!message.receive(socket) || message.size() < 5 || message.size() > 6)
return false;
if (ec)
return ec;

if (message.size() < 5 || message.size() > 6)
return error::bad_stream;

// Client is undelimited DEALER -> 2 addresses with no delimiter.
// Client is REQ or delimited DEALER -> 2 addresses with delimiter.
Expand All @@ -66,11 +77,11 @@ bool incoming::receive(zmq::socket& socket)

// Arbitrary caller data (returned to caller for correlation).
if (!message.dequeue(id))
return false;
return error::bad_stream;

// Serialized query.
data = message.dequeue_data();
return true;
return error::success;
}

} // namespace server
Expand Down
8 changes: 7 additions & 1 deletion src/messages/outgoing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ outgoing::outgoing(const std::string& command, const data_chunk& data,
message_.enqueue(data);
}

bool outgoing::send(zmq::socket& socket)
std::string outgoing::address()
{
auto message = message_;
return "[" + encode_base16(message.dequeue_data()) + "]";
}

code outgoing::send(zmq::socket& socket)
{
return message_.send(socket);
}
Expand Down
6 changes: 3 additions & 3 deletions src/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,17 @@ options_metadata parser::load_settings()
(
"server.heartbeat_service_enabled",
value<bool>(&configured.server.heartbeat_service_enabled),
"Enable the heartbeat service, defaults to true."
"Enable the heartbeat service, defaults to false."
)
(
"server.block_service_enabled",
value<bool>(&configured.server.block_service_enabled),
"Enable the block publishing service, defaults to true."
"Enable the block publishing service, defaults to false."
)
(
"server.transaction_service_enabled",
value<bool>(&configured.server.transaction_service_enabled),
"Enable the transaction publishing service, defaults to true."
"Enable the transaction publishing service, defaults to false."
)
(
"server.public_query_endpoint",
Expand Down
70 changes: 42 additions & 28 deletions src/server_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ server_node::server_node(const configuration& configuration)
last_checkpoint_height_(configuration.last_checkpoint_height()),
authenticator_(*this),
secure_query_service_(authenticator_, *this, true),
public_query_service_(authenticator_, *this, false)
public_query_service_(authenticator_, *this, false),
secure_heart_service_(authenticator_, *this, true),
public_heart_service_(authenticator_, *this, false)
{
}

Expand Down Expand Up @@ -145,8 +147,12 @@ void server_node::handle_running(const code& ec, result_handler handler)
return;
}

// Start query services and workers.
start_query_services(handler);
// Start services and workers.
if (!start_query_services() || !start_heart_services())
{
handler(error::operation_failed);
return;
}

// Subscribe to blockchain reorganizations.
subscribe_blockchain(
Expand All @@ -167,7 +173,7 @@ void server_node::handle_running(const code& ec, result_handler handler)

// The number of threads available in the thread pool must be sufficient
// to allocate the workers. This will wait forever on thread availability.
void server_node::allocate_query_workers(bool secure, result_handler handler)
bool server_node::start_query_workers(bool secure)
{
auto& server = *this;
const auto& settings = configuration_.server;
Expand All @@ -178,43 +184,51 @@ void server_node::allocate_query_workers(bool secure, result_handler handler)
server, secure);

if (!worker->start())
{
handler(error::operation_failed);
return;
}
return false;

subscribe_stop([=](const code&) { worker->stop(); });
}

return true;
}

void server_node::start_query_services(result_handler handler)
bool server_node::start_query_services()
{
const auto& settings = configuration_.server;

if (!settings.query_service_enabled || settings.query_workers == 0)
return;
return true;

if (settings.server_private_key)
{
if (!secure_query_service_.start())
{
handler(error::operation_failed);
return;
}
// Start secure service and workers if enabled.
if (settings.server_private_key && (!secure_query_service_.start() ||
!start_query_workers(true)))
return false;

allocate_query_workers(true, handler);
}
// Start public service and workers if enabled.
if (!settings.secure_only && (!public_query_service_.start() ||
!start_query_workers(false)))
return false;

if (!settings.secure_only)
{
if (!public_query_service_.start())
{
handler(error::operation_failed);
return;
}
return true;
}

allocate_query_workers(false, handler);
}
bool server_node::start_heart_services()
{
const auto& settings = configuration_.server;

if (!settings.heartbeat_service_enabled ||
settings.heartbeat_interval_seconds == 0)
return true;

// Start public service if enabled.
if (settings.server_private_key && !secure_heart_service_.start())
return false;

// Start public service if enabled.
if (!settings.secure_only && !public_heart_service_.start())
return false;

return true;
}

// Subscriptions.
Expand Down
Loading