Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion include/bitcoin/network/channels/channel_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ class BCT_API channel_http
void send(http::response&& response,
result_handler&& handler) NOEXCEPT;

/// Send without restarting the listener.
void notify(http::response&& notification,
result_handler&& handler) NOEXCEPT;

/// Resume reading from the socket (requires strand).
void resume() NOEXCEPT override;

Expand Down Expand Up @@ -95,7 +99,7 @@ class BCT_API channel_http
/// Handlers.
virtual void handle_receive(const code& ec, size_t bytes,
const http::request_cptr& request) NOEXCEPT;
virtual void handle_send(const code& ec, size_t bytes,
virtual void handle_send(const code& ec, size_t bytes, bool notification,
const std::string& message, const result_handler& handler) NOEXCEPT;

private:
Expand Down
43 changes: 23 additions & 20 deletions include/bitcoin/network/impl/messages/json_body.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,29 @@ size_t CLASS::reader::put(const buffer_type& buffer, boost_code& ec) NOEXCEPT
return {};
}

// Finishing can be very confusing. The derived rpc body calls this base
// method, and then the virtual done() is tested to determine whether the
// logical object is fully read (including optionally required terminator).
// Any error code here signals the beast reader (and any reader that terminates
// based on the end of the framed data, such as websockets) that the parse has
// failed (terminal error). However for custom stream readers that may not be
// aware of byte termination, the `need_more` implies that the parse has not
// failed and that more bytes may be parsed. In either case, when this is
// called and the parse is complete, then the parsed json object is moved to
// the model and the parser is released. In the case of the derived json-rpc
// reader, the json is also then converted to the rpc model if valid, otherwise
// returning a failure code. In no case is the underlying parser_.finish(ec)
// ever called, as that would preclude use in the unbounded scenario.
TEMPLATE
void CLASS::reader::finish(boost_code& ec) NOEXCEPT
{
// Finishing can be very confusing. The derived rpc body calls this base
// method, and then the virtual done() is tested to determine whether the
// logical object is fully read (including optionally required terminator).
// Any error code here signals the beast reader (and any reader that
// terminates based on the end of the framed data, such as websockets) that
// the parse has failed (terminal error). However for custom stream readers
// that may not be aware of byte termination, the `need_more` implies that
// the parse has not failed and that more bytes may be parsed. In either
// case, when this is called and the parse is complete, then the parsed
// json object is moved to the model and the parser is released. In the
// case of the derived json-rpc reader, the json is also then converted to
// the rpc model if valid, otherwise returning a failure code. In no case
// is the underlying parser_.finish(ec) ever called, as that would preclude
// use in the unbounded scenario.

using namespace network::error;

// The internal boost::json parser will always return !done() when parsing
// a top-level primitive as a whole document, specifically a number number
// value, because it has no way to know if more digits are coming. So this
// will return need_more even when a fixed-size buffer is being read. For
// this reason this body does not support *reading* for top-level objects.
if (!done())
{
ec = to_http_code(http_error_t::need_more);
Expand Down Expand Up @@ -203,9 +207,9 @@ CLASS::writer::get(boost_code& ec) NOEXCEPT
try
{
// Always prepares the configured max_size.
const auto buffer = value_.buffer->prepare(size);
const auto data = system::pointer_cast<char>(buffer.data());
const auto view = serializer_.read(data, buffer.size());
const auto scratch = value_.buffer->prepare(size);
const auto data = system::pointer_cast<char>(scratch.data());
const auto view = serializer_.read(data, scratch.size());

// No progress (edge case).
if (view.empty() && !serializer_.done())
Expand All @@ -215,8 +219,7 @@ CLASS::writer::get(boost_code& ec) NOEXCEPT
}

ec.clear();
value_.buffer->commit(view.size());
value_.buffer->consume(view.size());
value_.buffer->consume(scratch.size());
const auto more = !serializer_.done();
return out_buffer{ std::make_pair(boost::asio::buffer(view), more) };
}
Expand Down
7 changes: 7 additions & 0 deletions include/bitcoin/network/protocols/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,20 @@ class BCT_API protocol
inline void send(Message&& message, Method&& method, Args&&... args) NOEXCEPT \
{ channel_->send(std::forward<Message>(message), BIND_SHARED(method, args)); }

#define DECLARE_NOTIFY() \
template <class Derived, class Message, typename Method, typename... Args> \
inline void notify(Message&& message, Method&& method, Args&&... args) NOEXCEPT \
{ channel_->notify(std::forward<Message>(message), BIND_SHARED(method, args)); }

#define DECLARE_SUBSCRIBE_CHANNEL() \
template <class Derived, class Message, typename Method, typename... Args> \
inline void subscribe_channel(Method&& method, Args&&... args) NOEXCEPT \
{ channel_->template subscribe<Message>(BIND_SHARED(method, args)); }

#define SEND(message, method, ...) \
send<CLASS>(message, &CLASS::method, __VA_ARGS__)
#define NOTIFY(message, method, ...) \
notify<CLASS>(message, &CLASS::method, __VA_ARGS__)
#define SUBSCRIBE_CHANNEL(message, method, ...) \
subscribe_channel<CLASS, message>(&CLASS::method, __VA_ARGS__)
#define SUBSCRIBE_BROADCAST(message, method, ...) \
Expand Down
2 changes: 2 additions & 0 deletions include/bitcoin/network/protocols/protocol_http.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class BCT_API protocol_http
protocol_http(const session::ptr& session, const channel::ptr& channel,
const options_t& options) NOEXCEPT;

/// Forwards to channel::send.
DECLARE_SEND()
DECLARE_NOTIFY()
DECLARE_SUBSCRIBE_CHANNEL()

/// Message handlers by http method.
Expand Down
1 change: 1 addition & 0 deletions include/bitcoin/network/protocols/protocol_peer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class BCT_API protocol_peer
protocol_peer(const session::ptr& session,
const channel::ptr& channel) NOEXCEPT;

/// Forwards to channel::send.
DECLARE_SEND()
DECLARE_SUBSCRIBE_CHANNEL()

Expand Down
31 changes: 26 additions & 5 deletions src/channels/channel_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,14 @@ void channel_http::dispatch(const request_cptr& request) NOEXCEPT

flat_buffer& channel_http::request_buffer() NOEXCEPT
{
BC_ASSERT(stranded());
return request_buffer_;
}

request_ptr channel_http::create_request() const NOEXCEPT
{
BC_ASSERT(stranded());

const auto out = to_shared<request>();
if (websocket())
{
Expand All @@ -167,7 +170,7 @@ request_ptr channel_http::create_request() const NOEXCEPT
return out;
}

// Send.
// Send/notify.
// ----------------------------------------------------------------------------

void channel_http::send(response&& response, result_handler&& handler) NOEXCEPT
Expand All @@ -179,25 +182,43 @@ void channel_http::send(response&& response, result_handler&& handler) NOEXCEPT

write(std::move(response),
std::bind(&channel_http::handle_send,
shared_from_base<channel_http>(), _1, _2, std::move(message),
shared_from_base<channel_http>(), _1, _2, false, std::move(message),
std::move(handler)));
}

void channel_http::notify(response&& notification,
result_handler&& handler) NOEXCEPT
{
BC_ASSERT(stranded());
BC_ASSERT(websocket());

auto message = log_message(notification);

write(std::move(notification),
std::bind(&channel_http::handle_send,
shared_from_base<channel_http>(), _1, _2, true, std::move(message),
std::move(handler)));
}

void channel_http::handle_send(const code& ec, size_t bytes,
void channel_http::handle_send(const code& ec, size_t bytes, bool notification,
const std::string& message, const result_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

if (ec) stop(ec);
LOGA(boost::format(message) % bytes);
handler(ec);

// Restart the listener (only in response to requests).
// TODO: use new ::send(rresponse, handler) method to differentiate.
receive();
if (!notification)
receive();
}

// private
void channel_http::assign_json_buffer(response& response) NOEXCEPT
{
BC_ASSERT(stranded());

// websocket is full duplex, so cannot use shared json repsonse buffer.
if (!websocket())
{
Expand Down
4 changes: 3 additions & 1 deletion src/net/proxy_actions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ void proxy::cancel(result_handler&& handler) NOEXCEPT
// WS (generic, framed).
// ----------------------------------------------------------------------------

// flat_buffer must have configured max_size, which will be allocated.
void proxy::read(http::flat_buffer& out, count_handler&& handler) NOEXCEPT
{
do_reading();
Expand Down Expand Up @@ -109,6 +110,7 @@ void proxy::do_tcp_write(const asio::const_buffer& payload,
// RPC (TCP: electrum/stratum_v1, WS: btcd).
// ----------------------------------------------------------------------------

// flat_buffer must have configured max_size, which will be allocated.
void proxy::read(http::flat_buffer& buffer, rpc::request& request,
count_handler&& handler) NOEXCEPT
{
Expand Down Expand Up @@ -161,7 +163,7 @@ void proxy::do_notification_write(const rpc::request_ptr& notification,
// HTTP/WS (generic/rpc).
// ----------------------------------------------------------------------------

// Method reading() is invoked directly if read() is called from strand().
// flat_buffer must have configured max_size, which will be allocated.
void proxy::read(http::flat_buffer& buffer, http::request& request,
count_handler&& handler) NOEXCEPT
{
Expand Down
15 changes: 14 additions & 1 deletion src/net/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ void socket::async_write(const asio::const_buffer& buffer, bool binary,
void socket::async_read_some(const asio::mutable_buffer& buffer,
const count_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

try
{
if (websocket())
Expand Down Expand Up @@ -325,6 +327,8 @@ void socket::async_read_some(const asio::mutable_buffer& buffer,
void socket::async_read(http::flat_buffer& buffer,
const count_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

try
{
if (websocket())
Expand All @@ -336,8 +340,15 @@ void socket::async_read(http::flat_buffer& buffer,
}
else
{
auto remain = floored_subtract(buffer.max_size(), buffer.size());
if (is_zero(remain))
{
handler(error::buffer_overflow, {});
return;
}

VARIANT_DISPATCH_METHOD(get_tcp(),
async_read_some(buffer.prepare(buffer.max_size()),
async_read_some(buffer.prepare(remain),
std::bind(&socket::handle_async, shared_from_this(),
_1, _2, handler, "async_read_some")));
}
Expand All @@ -353,6 +364,8 @@ void socket::async_read(http::flat_buffer& buffer,
void socket::async_read(const asio::mutable_buffer& buffer,
const count_handler& handler) NOEXCEPT
{
BC_ASSERT(stranded());

try
{
if (websocket())
Expand Down
Loading