Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/console/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void client::cmd_connect(std::stringstream& args)

zmq::socket socket(context_, zmq::socket::role::dealer);

if (!socket.connect(server))
if (socket.connect(server) != bc::error::success)
std::cout << "error: failed to connect" << std::endl;
else
connection_ = std::make_shared<connection>(socket, 6000);
Expand Down
28 changes: 17 additions & 11 deletions examples/console/read_line.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#include "read_line.hpp"

#include <cassert>
#include <cstring>
#include <iostream>
#include <memory>
Expand All @@ -36,14 +37,16 @@ read_line::~read_line()
{
zmq::message message;
message.enqueue_little_endian(signal_halt);
message.send(socket_);
const auto ec = message.send(socket_);
assert(!ec);
thread_->join();
}

read_line::read_line(zmq::context& context)
: socket_(context, zmq::socket::role::requester)
{
socket_.bind({ "inproc://terminal" });
const auto ec = socket_.bind({ "inproc://terminal" });
assert(!ec);

// The thread must be constructed after the socket is already bound.
thread_ = std::make_shared<std::thread>(
Expand All @@ -56,7 +59,8 @@ void read_line::show_prompt()
std::cout << "> " << std::flush;
zmq::message message;
message.enqueue_little_endian(signal_continue);
message.send(socket_);
const auto ec = message.send(socket_);
assert(!ec);
}

std::string read_line::get_line()
Expand All @@ -67,8 +71,9 @@ std::string read_line::get_line()
if (poller.wait().contains(socket_.id()))
{
zmq::message message;
if (message.receive(socket_))
return message.dequeue_text();
auto ec = message.receive(socket_);
assert(!ec);
return message.dequeue_text();
}

return{};
Expand All @@ -77,18 +82,18 @@ std::string read_line::get_line()
void read_line::run(zmq::context& context)
{
zmq::socket socket(context, zmq::socket::role::replier);
socket.connect({ "inproc://terminal" });
auto ec = socket.connect({ "inproc://terminal" });
assert(!ec);

while (true)
{
uint32_t signal;
zmq::message message;
ec = message.receive(socket);
assert(!ec);

if (!message.receive(socket) || !message.dequeue(signal) ||
signal == signal_halt)
{
if (!message.dequeue(signal) || signal == signal_halt)
break;
}

// Read input:
char line[1000];
Expand All @@ -97,7 +102,8 @@ void read_line::run(zmq::context& context)

zmq::message response;
response.enqueue(text);
response.send(socket);
ec = response.send(socket);
assert(!ec);
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/get_height/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ int main(int argc, char* argv[])
zmq::context context;
zmq::socket socket(context, zmq::socket::role::dealer);

if (!socket.connect({ argv[1] }))
if (socket.connect({ argv[1] }) != error::success)
{
std::cerr << "Cannot connect to " << argv[1] << std::endl;
return 1;
Expand Down
8 changes: 4 additions & 4 deletions include/bitcoin/client/dealer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class BCC_API dealer
virtual bool read(stream& stream) override;

/// Write the specified data to this stream.
virtual void write(const data_stack& data) override;
virtual bool write(const data_stack& data) override;

protected:
typedef std::chrono::system_clock clock;
Expand Down Expand Up @@ -102,13 +102,13 @@ class BCC_API dealer
static int32_t remaining(const time& deadline);

// send_request->send
void send(const obelisk_message& message);
bool send(const obelisk_message& message);

// write->receive->decode_reply
void receive(const obelisk_message& message);
bool receive(const obelisk_message& message);

// Sends an outgoing request, and adds handlers to pending request table.
void send_request(const std::string& command, const data_chunk& payload,
bool send_request(const std::string& command, const data_chunk& payload,
error_handler on_error, decoder on_reply);

// Decodes an incoming message, invoking the error and/or reply handler.
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/client/socket_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BCC_API socket_stream
// stream interface.
virtual int32_t refresh();
virtual bool read(stream& stream);
virtual void write(const data_stack& data);
virtual bool write(const data_stack& data);

private:
protocol::zmq::socket& socket_;
Expand Down
2 changes: 1 addition & 1 deletion include/bitcoin/client/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BCC_API stream
virtual bool read(stream& stream) = 0;

/// Write the specified data to this stream.
virtual void write(const data_stack& data) = 0;
virtual bool write(const data_stack& data) = 0;
};

} // namespace client
Expand Down
25 changes: 12 additions & 13 deletions src/dealer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ int32_t dealer::remaining(const time& deadline)

// Create a mssage with identity and send it via the message stream.
// This is invoked by derived class message senders, such as the proxy.
void dealer::send_request(const std::string& command,
bool dealer::send_request(const std::string& command,
const data_chunk& payload, error_handler on_error, decoder on_reply)
{
const auto now = steady_clock::now();
Expand All @@ -176,11 +176,11 @@ void dealer::send_request(const std::string& command,
request.on_reply = std::move(on_reply);
request.resends = 0;
request.deadline = now + milliseconds(timeout_milliseconds_);
send(request.message);
return send(request.message);
}

// Send or resend an existing message by writing it to the message stream.
void dealer::send(const obelisk_message& message)
bool dealer::send(const obelisk_message& message)
{
data_stack data;

Expand All @@ -192,9 +192,8 @@ void dealer::send(const obelisk_message& message)
data.push_back(to_chunk(to_little_endian(message.id)));
data.push_back(message.payload);

// BUGBUG: we are losing error state here, prevents fast fail.
// This creates a message and sends it on the socket.
out_.write(data);
return out_.write(data);
}

// Stream interface, not utilized on this class.
Expand All @@ -204,11 +203,10 @@ bool dealer::read(stream& stream)
}

// stream interface.
void dealer::write(const data_stack& data)
bool dealer::write(const data_stack& data)
{
// BUGBUG: we are losing error state here, prevents fast fail.
if (data.size() < 3 || data.size() > 4)
return;
return false;

obelisk_message message;
auto it = data.begin();
Expand All @@ -232,35 +230,36 @@ void dealer::write(const data_stack& data)
message.payload = *(++it);
}

receive(message);
return receive(message);
}

// Handle a message, call from write.
void dealer::receive(const obelisk_message& message)
bool dealer::receive(const obelisk_message& message)
{
// Subscription updates are not tracked in pending.
if (message.command == "address.update")
{
decode_update(message);
return;
return true;
}

// Subscription updates are not tracked in pending.
if (message.command == "address.stealth_update")
{
decode_stealth_update(message);
return;
return true;
}

const auto command = pending_.find(message.id);
if (command == pending_.end())
{
on_unknown_(message.command);
return;
return false;
}

decode_reply(message, command->second.on_error, command->second.on_reply);
pending_.erase(command);
return true;
}

void dealer::decode_reply(const obelisk_message& message,
Expand Down
4 changes: 3 additions & 1 deletion src/obelisk_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
*/
#include <bitcoin/client/obelisk_client.hpp>

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <thread>
Expand All @@ -42,6 +43,7 @@ static uint32_t to_milliseconds(uint16_t seconds)
static const auto on_unknown = [](const std::string&){};

// Retries is overloaded as configuration for resends as well.
// Timeout is capped at ~ 25 days by signed/millsecond conversions.
obelisk_client::obelisk_client(uint16_t timeout_seconds, uint8_t retries)
: socket_(context_, zmq::socket::role::dealer),
stream_(socket_),
Expand All @@ -67,7 +69,7 @@ bool obelisk_client::connect(const endpoint& address)

for (auto attempt = 0; attempt < 1 + retries_; ++attempt)
{
if (socket_.connect(host_address))
if (socket_.connect(host_address) == error::success)
return true;

// Arbitrary delay between connection attempts.
Expand Down
45 changes: 18 additions & 27 deletions src/socket_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,60 +50,51 @@ bool socket_stream::read(stream& stream)
data_stack data;
zmq::message message;

if (!message.receive(socket_))
if (message.receive(socket_) != error::success)
return false;

// Copy the message to a data stack.
while (!message.empty())
data.push_back(message.dequeue_data());

stream.write(data);
return true;
return stream.write(data);
}

////bool socket_stream::read(std::shared_ptr<response_stream> stream)
////{
//// if (stream)
//// {
//// response_message message;
//// if (!stream)
//// return false;
////
//// if (message.receive(socket_))
//// {
//// std::shared_ptr<bc::protocol::response> response =
//// message.get_response();
//// response_message message;
////
//// if (response)
//// {
//// stream->write(response);
//// return true;
//// }
//// }
//// }
//// if (message.receive(socket_) != error::success)
//// return false;
////
//// return false;
//// auto response = message.get_response();
//// return response && (stream->write(response) == error::success);
////}

// TODO: optimize by passing the internal type of the message object.
// Send a message built from the stack parameter to this socket.
void socket_stream::write(const data_stack& data)
bool socket_stream::write(const data_stack& data)
{
zmq::message message;

// Copy the data stack to a message.
for (const auto& chunk: data)
message.enqueue(chunk);

message.send(socket_);
return message.send(socket_) == error::success;
}

////void socket_stream::write(const std::shared_ptr<request>& request)
////bool socket_stream::write(const std::shared_ptr<request>& request)
////{
//// if (request)
//// {
//// request_message message;
//// message.set_request(request);
//// message.send(socket_);
//// }
//// if (!request)
//// return false;
////
//// request_message message;
//// message.set_request(request);
//// return message.send(socket_) == error::success;
////}

} // namespace client
Expand Down
3 changes: 2 additions & 1 deletion test/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class stream_fixture
return false;
}

virtual void write(const data_stack& data) override
virtual bool write(const data_stack& data) override
{
out = data;
return true;
}
};

Expand Down