Skip to content

Commit

Permalink
Added custom trace printing callback
Browse files Browse the repository at this point in the history
  • Loading branch information
saleyn committed May 23, 2012
1 parent 72f3cf7 commit d8b8071
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 99 deletions.
48 changes: 33 additions & 15 deletions include/eixx/connect/basic_otp_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,12 @@ class basic_otp_connection
void timer_reconnect(const boost::system::error_code& ec) {
if (ec == boost::asio::error::operation_aborted)
return; // timer reset

if (verbose() >= VERBOSE_TRACE)
std::cerr << "basic_otp_connection::timer_reconnect: " << ec.message() << std::endl;

if (unlikely(verbose() >= VERBOSE_TRACE)) {
std::stringstream s;
s << "basic_otp_connection::timer_reconnect: " << ec.message();
report_status(REPORT_INFO, s.str());
}

m_transport = connection_type::create(
m_io_service, this, m_node->nodename().to_string(),
Expand Down Expand Up @@ -165,8 +168,10 @@ class basic_otp_connection
BOOST_ASSERT(m_transport.get() == a_con);
if (m_on_connect_status)
m_on_connect_status(this, std::string());
if (verbose() > VERBOSE_NONE)
std::cerr << "Connected to node: " << a_con->remote_node() << std::endl;
if (unlikely(verbose() > VERBOSE_NONE)) {
report_status(REPORT_INFO,
"Connected to node: " + a_con->remote_node());
}
m_connected = true;
}

Expand All @@ -178,18 +183,23 @@ class basic_otp_connection
m_connected = false;
if (m_on_connect_status)
m_on_connect_status(this, a_error);
else if (verbose() > VERBOSE_NONE)
std::cerr << "Failed to connect to node " << a_con->remote_node()
<< ": " << a_error << std::endl;
else if (unlikely(verbose() > VERBOSE_NONE)) {
std::stringstream s;
s << "Failed to connect to node " << a_con->remote_node() << ": " << a_error;
report_status(REPORT_ERROR, s.str());
}
reconnect();
}

void on_disconnect(connection_type* a_con, const boost::system::error_code& err) {
m_connected = false;

if (verbose() > VERBOSE_NONE)
std::cerr << "Disconnected from node: " << a_con->remote_node()
<< " (" << err.message() << ')' << std::endl;
if (unlikely(verbose() > VERBOSE_NONE)) {
std::stringstream s;
s << "Disconnected from node: " << a_con->remote_node()
<< " (" << err.message() << ')';
report_status(REPORT_ERROR, s.str());
}
if (m_node)
m_node->on_disconnect_internal(*this, a_con->remote_node(), err);

Expand All @@ -198,20 +208,28 @@ class basic_otp_connection
}

void on_error(connection_type* a_con, const std::string& s) {
std::cerr << "Error in communication with node: " << a_con->remote_node() << std::endl
<< " " << s << std::endl;
std::stringstream str;
str << "Error in communication with node: " << a_con->remote_node()
<< "\n " << s;
report_status(REPORT_ERROR, str.str());
}

void on_message(connection_type* a_con, const transport_msg<Alloc>& a_tm) {
try {
m_node->deliver(a_tm);
} catch (std::exception& e) {
std::cerr << "Got message " << a_tm.type_string() << std::endl
std::stringstream s;
s << "Got message " << a_tm.type_string() << std::endl
<< " cntrl: " << a_tm.cntrl() << std::endl
<< " msg..: " << a_tm.msg() << std::endl
<< " error: " << e.what() << std::endl;
<< " error: " << e.what();
report_status(REPORT_INFO, s.str());
}
}

void report_status(report_level a_level, const std::string& s) {
node()->report_status(a_level, this, s);
}
};

} // namespace connect
Expand Down
2 changes: 2 additions & 0 deletions include/eixx/connect/basic_otp_mailbox.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ class basic_otp_mailbox
boost::asio::io_service& io_service() const { return m_io_service; }
/// Queue of pending received messages.
queue_type& queue() { return m_queue; }
/// Indicates if mailbox doesn't have any pending messages
bool empty() const { return m_queue.empty(); }

void name(const atom& a_name) { m_name = a_name; }

Expand Down
11 changes: 11 additions & 0 deletions include/eixx/connect/basic_otp_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ class basic_otp_node: public basic_otp_node_local {
on_disconnect(*this, a_con, a_remote_node, err);
}

void report_status(report_level a_level, const connection_t* a_con, const std::string& s);

protected:
/// Publish the node port to epmd making this node known to the world.
void publish_port() throw (err_connection);
Expand Down Expand Up @@ -289,6 +291,15 @@ class basic_otp_node: public basic_otp_node_local {
void (self&, const connection_t&, const std::string&, const boost::system::error_code&)
> on_disconnect;

/**
* Callback invoked if verbosity is different from VERBOSE_NONE. If not assigned,
* the content is printed to stderr.
*/
boost::function<
// OtpNode OtpConnection Status Message
void (self&, const connection_t*, report_level, const std::string&)
> on_status;

/**
* Accept connections from client processes.
* This method sets the socket listener for incoming connections and
Expand Down
15 changes: 14 additions & 1 deletion include/eixx/connect/basic_otp_node.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,17 @@ void basic_otp_node<Alloc, Mutex>::stop_server()
throw std::runtime_error("Not implemented");
}

template <typename Alloc, typename Mutex>
void basic_otp_node<Alloc, Mutex>::
report_status(report_level a_level, const connection_t* a_con, const std::string& s)
{
static const char* s_levels[] = {"INFO", "WARN", "ERROR"};
if (on_status)
on_status(*this, a_con, a_level, s);
else
std::cerr << s_levels[a_level] << "| " << s << std::endl;
}

template <typename Alloc, typename Mutex>
void basic_otp_node<Alloc, Mutex>::deliver(const transport_msg<Alloc>& a_msg)
throw (err_bad_argument, err_no_process, err_connection)
Expand All @@ -154,7 +165,9 @@ void basic_otp_node<Alloc, Mutex>::deliver(const transport_msg<Alloc>& a_msg)
l_mbox->deliver(a_msg);
} catch (std::exception& e) {
// FIXME: Add proper error reporting.
std::cerr << "Cannot deliver message " << a_msg.to_string() << ": " << e.what() << std::endl;
std::stringstream s;
s << "Cannot deliver message " << a_msg.to_string() << ": " << e.what();
report_status(REPORT_WARNING, NULL, s.str());
}
}

Expand Down
36 changes: 23 additions & 13 deletions include/eixx/connect/transport_otp_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ class connection
, m_is_writing(false)
, m_connection_aborted(false)
{
if (handler()->verbose() >= VERBOSE_TRACE)
std::cout << "Calling connection::connection(type=" << m_type << ')' << std::endl;
if (unlikely(handler()->verbose() >= VERBOSE_TRACE)) {
std::stringstream s;
s << "Calling connection::connection(type=" << m_type << ')';
a_h->report_status(REPORT_INFO, s.str());
}
}

char* allocate(size_t a_sz) {
Expand Down Expand Up @@ -150,10 +153,13 @@ class connection
m_is_writing = true;
flip_queues(); // Work on the data accumulated in the available_queue.
if (unlikely(verbose() >= VERBOSE_WIRE))
for(cb_t::const_iterator it=buffers.begin(); it != buffers.end(); ++it)
std::cout << " async_write " << boost::asio::buffer_size(*it) << " bytes: "
<< to_binary_string(boost::asio::buffer_cast<const char*>(*it),
boost::asio::buffer_size(*it)) << std::endl;
for(cb_t::const_iterator it=buffers.begin(); it != buffers.end(); ++it) {
std::stringstream s;
s << " async_write " << boost::asio::buffer_size(*it) << " bytes: "
<< to_binary_string(boost::asio::buffer_cast<const char*>(*it),
boost::asio::buffer_size(*it));
m_handler->report_status(REPORT_INFO, s.str());
}
async_write(buffers, boost::asio::transfer_all(),
boost::bind(&connection<Handler, Alloc>::handle_write, this->shared_from_this(),
boost::asio::placeholders::error));
Expand Down Expand Up @@ -195,10 +201,12 @@ class connection
a_msg.encode(data, sz, s_header_size, true);
BOOST_ASSERT(*(data - 1) == s_header_magic);

if (unlikely(verbose() >= VERBOSE_MESSAGE))
std::cout << "client -> agent: " << a_msg.to_string() << std::endl;
if (unlikely(verbose() >= VERBOSE_WIRE))
to_binary_string(std::cout << "client -> agent: ", data, sz) << std::endl;
if (unlikely(verbose() >= VERBOSE_MESSAGE)) {
m_handler->report_status(REPORT_INFO, "client -> agent: " + a_msg.to_string());
if (unlikely(verbose() >= VERBOSE_WIRE))
m_handler->report_status(REPORT_INFO, "client -> agent: " +
to_binary_string(data, sz));
}

boost::asio::const_buffer b(data, sz);
m_io_service.post(
Expand Down Expand Up @@ -236,7 +244,8 @@ class connection
return;

if (handler()->verbose() >= VERBOSE_TRACE)
std::cout << "Calling connection::start()" << std::endl;
m_handler->report_status(REPORT_INFO, "Calling connection::start()");

m_connection_aborted = false;
m_handler->on_connect(this);

Expand Down Expand Up @@ -282,7 +291,7 @@ class connection

virtual ~connection() {
if (handler()->verbose() >= VERBOSE_TRACE)
std::cout << "Calling connection::~connection()" << std::endl;
m_handler->report_status(REPORT_INFO, "Calling ~connection::connection()");
}

/// Close connection channel orderly by user.
Expand All @@ -305,7 +314,8 @@ class connection
return;

if (handler()->verbose() >= VERBOSE_TRACE)
std::cout << "Calling connection::stop(" << e.message() << ')' << std::endl;
m_handler->report_status(REPORT_INFO,
std::string("Calling ~connection::connection()") + e.message());

m_connection_aborted = true;
m_handler->on_disconnect(this, e);
Expand Down
94 changes: 55 additions & 39 deletions include/eixx/connect/transport_otp_connection.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -171,17 +171,18 @@ void connection<Handler, Alloc>::
handle_write(const boost::system::error_code& err)
{
if (m_connection_aborted) {
if (verbose() >= VERBOSE_TRACE)
std::cout << "Connection aborted - "
"exiting connection<Handler, Alloc>::handle_write"
<< std::endl;
if (unlikely(verbose() >= VERBOSE_TRACE))
m_handler->report_status(REPORT_INFO,
"Connection aborted - exiting connection::handle_write");
return;
}

if (unlikely(err)) {
if (verbose() >= VERBOSE_TRACE)
std::cout << "connection<Handler, Alloc>::handle_write("
<< err.value() << ')' << std::endl;
if (verbose() >= VERBOSE_TRACE) {
std::stringstream s;
s << "connection::handle_write(" << err.value() << ')';
m_handler->report_status(REPORT_INFO, s.str());
}
// We use operation_aborted as a user-initiated connection reset,
// therefore check to substitute the error since bytes_transferred == 0
// means a connection loss.
Expand Down Expand Up @@ -210,22 +211,25 @@ template <class Handler, class Alloc>
void connection<Handler, Alloc>::
handle_read(const boost::system::error_code& err, size_t bytes_transferred)
{
if (unlikely(verbose() >= VERBOSE_WIRE))
std::cout << "connection::handle_read(transferred="
<< bytes_transferred << ", got_header="
<< (m_got_header ? "true" : "false")
<< ", rd_buf.size=" << m_rd_buf.capacity()
<< ", rd_ptr=" << (m_rd_ptr - &m_rd_buf[0])
<< ", rd_end=" << (m_rd_end - &m_rd_buf[0])
<< ", rd_capacity=" << rd_capacity()
<< ", pkt_sz=" << m_packet_size << " (ec="
<< err.value() << ')' << std::endl;
if (unlikely(verbose() >= VERBOSE_WIRE)) {
std::stringstream s;
s << "connection::handle_read(transferred="
<< bytes_transferred << ", got_header="
<< (m_got_header ? "true" : "false")
<< ", rd_buf.size=" << m_rd_buf.capacity()
<< ", rd_ptr=" << (m_rd_ptr - &m_rd_buf[0])
<< ", rd_end=" << (m_rd_end - &m_rd_buf[0])
<< ", rd_capacity=" << rd_capacity()
<< ", pkt_sz=" << m_packet_size << " (ec="
<< err.value() << ')';
m_handler->report_status(REPORT_INFO, s.str());
}

if (unlikely(m_connection_aborted)) {
if (verbose() >= VERBOSE_WIRE)
std::cout << "Connection aborted - "
"exiting connection<Handler, Alloc>::handle_read"
<< std::endl;
if (verbose() >= VERBOSE_WIRE) {
m_handler->report_status(REPORT_INFO,
"Connection aborted - exiting connection::handle_read");
}
return;
} else if (unlikely(err)) {
// We use operation_aborted as a user-initiated connection reset,
Expand Down Expand Up @@ -328,15 +332,17 @@ handle_read(const boost::system::error_code& err, size_t bytes_transferred)
crunched = true;
}

if (unlikely(verbose() >= VERBOSE_WIRE))
std::cout << "Scheduling connection::async_read(offset="
<< (m_rd_end-&m_rd_buf[0])
<< ", capacity=" << rd_capacity() << ", pkt_size="
<< m_packet_size << ", need=" << need_bytes
<< ", got_header=" << (m_got_header ? "true" : "false")
<< ", crunched=" << (crunched ? "true" : "false")
<< ", aborted=" << (m_connection_aborted ? "true" : "false") << ')'
<< std::endl;
if (unlikely(verbose() >= VERBOSE_WIRE)) {
std::stringstream s;
s << "Scheduling connection::async_read(offset="
<< (m_rd_end-&m_rd_buf[0])
<< ", capacity=" << rd_capacity() << ", pkt_size="
<< m_packet_size << ", need=" << need_bytes
<< ", got_header=" << (m_got_header ? "true" : "false")
<< ", crunched=" << (crunched ? "true" : "false")
<< ", aborted=" << (m_connection_aborted ? "true" : "false") << ')';
m_handler->report_status(REPORT_INFO, s.str());
}

boost::asio::mutable_buffers_1 buffers(m_rd_end, rd_capacity());
async_read(
Expand Down Expand Up @@ -431,11 +437,18 @@ process_message(const char* a_buf, size_t a_size)
break;
*/
default:
if (unlikely(verbose() >= VERBOSE_WIRE))
std::cout << "Got transport msg - (cntrl): " << tm.cntrl() << std::endl;
if (unlikely(verbose() >= VERBOSE_MESSAGE))
if (tm.has_msg())
std::cout << "Got transport msg - (msg): " << tm.msg() << std::endl;
if (unlikely(verbose() >= VERBOSE_MESSAGE)) {
if (unlikely(verbose() >= VERBOSE_WIRE)) {
std::stringstream s;
s << "Got transport msg - (cntrl): " << tm.cntrl();
m_handler->report_status(REPORT_INFO, s.str());
}
if (tm.has_msg()) {
std::stringstream s;
s << "Got transport msg - (msg): " << tm.msg();
m_handler->report_status(REPORT_INFO, s.str());
}
}
m_handler->on_message(this, tm);
}
}
Expand All @@ -460,10 +473,13 @@ send(const transport_msg<Alloc>& a_msg)
if (l_has_msg)
a_msg.msg().encode(s + cntrl_sz, msg_sz, 0, true);

if (unlikely(verbose() >= VERBOSE_MESSAGE))
std::cout << "SEND cntrl="
<< l_cntrl.to_string() << (l_has_msg ? ", msg=" : "")
<< (l_has_msg ? a_msg.msg().to_string() : std::string("")) << std::endl;
if (unlikely(verbose() >= VERBOSE_MESSAGE)) {
std::stringstream s;
s << "SEND cntrl="
<< l_cntrl.to_string() << (l_has_msg ? ", msg=" : "")
<< (l_has_msg ? a_msg.msg().to_string() : std::string(""));
m_handler->report_status(REPORT_INFO, s.str());
}
//if (unlikely(verbose() >= VERBOSE_WIRE))
// std::cout << "SEND " << len << " bytes " << to_binary_string(data, len) << std::endl;

Expand Down
Loading

0 comments on commit d8b8071

Please sign in to comment.