Skip to content

Commit

Permalink
* Added debug_to_kb to ZMQ transport
Browse files Browse the repository at this point in the history
* Removed unused TransportDebug struct
  • Loading branch information
jredmondson committed Sep 24, 2018
1 parent 582c44a commit 17a2366
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 46 deletions.
1 change: 0 additions & 1 deletion include/madara/transport/Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "ReducedMessageHeader.h"
#include "madara/transport/BandwidthMonitor.h"
#include "madara/transport/PacketScheduler.h"
#include "madara/transport/TransportDebug.h"

#include "madara/knowledge/KnowledgeRecord.h"
#include "madara/knowledge/ThreadSafeContext.h"
Expand Down
42 changes: 0 additions & 42 deletions include/madara/transport/TransportDebug.h

This file was deleted.

42 changes: 39 additions & 3 deletions include/madara/transport/zmq/ZMQTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ madara::transport::ZMQTransport::ZMQTransport (const std::string & id,
if (launch_transport)
setup ();

if (config.debug_to_kb_prefix != "")
{
knowledge::KnowledgeBase kb;
kb.use (context);

sent_packets_.set_name (config.debug_to_kb_prefix + ".sent_packets", kb);
failed_sends_.set_name (config.debug_to_kb_prefix + ".failed_sends", kb);
sent_data_max_.set_name (
config.debug_to_kb_prefix + ".sent_data_max", kb);
sent_data_min_.set_name (
config.debug_to_kb_prefix + ".sent_data_min", kb);
sent_data_.set_name (config.debug_to_kb_prefix + ".sent_data", kb);
}
}

madara::transport::ZMQTransport::~ZMQTransport ()
Expand Down Expand Up @@ -270,9 +283,32 @@ madara::transport::ZMQTransport::send_data (
result = (long) zmq_send (
write_socket_, (void *)buffer_.get_ptr (), (size_t)result, 0);

madara_logger_log (context_.get_logger (), logger::LOG_MAJOR,
"ZMQTransport::send:" \
" sent %d bytes on socket\n", (int)result);
if (result > 0)
{
if (settings_.debug_to_kb_prefix != "")
{
sent_data_ += result;
++sent_packets_;
if (sent_data_max_ < result)
{
sent_data_max_ = result;
}
if (sent_data_min_ > result || sent_data_min_ == 0)
{
sent_data_min_ = result;
}
}

madara_logger_log (context_.get_logger (), logger::LOG_MAJOR,
"ZMQTransport::send:" \
" sent %d bytes on socket\n", (int)result);
}
else
{
madara_logger_log (context_.get_logger (), logger::LOG_MAJOR,
"ZMQTransport::send:" \
" failed to send message. Error code %d\n", (int)result);
}
}
}

Expand Down
16 changes: 16 additions & 0 deletions include/madara/transport/zmq/ZMQTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ namespace madara

/// underlying socket for sending
void * write_socket_;

/// sent packets
knowledge::containers::Integer sent_packets_;

/// failed sends
knowledge::containers::Integer failed_sends_;

/// sent data
knowledge::containers::Integer sent_data_;

/// max data sent
knowledge::containers::Integer sent_data_max_;

/// min data sent
knowledge::containers::Integer sent_data_min_;

};
}
}
Expand Down
34 changes: 34 additions & 0 deletions include/madara/transport/zmq/ZMQTransportReadThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ madara::transport::ZMQTransportReadThread::init (
int zero = 0;
size_t opt_len = sizeof (int);

if (settings_.debug_to_kb_prefix != "")
{
received_packets_.set_name (
settings_.debug_to_kb_prefix + ".received_packets", knowledge);
failed_receives_.set_name (
settings_.debug_to_kb_prefix + ".failed_receives", knowledge);
received_data_max_.set_name (
settings_.debug_to_kb_prefix + ".received_data_max", knowledge);
received_data_min_.set_name (
settings_.debug_to_kb_prefix + ".received_data_min", knowledge);
received_data_.set_name (
settings_.debug_to_kb_prefix + ".received_data", knowledge);
}

// setup the receive buffer
if (settings_.queue_length > 0)
buffer_ = new char[settings_.queue_length];
Expand Down Expand Up @@ -319,6 +333,21 @@ madara::transport::ZMQTransportReadThread::run (void)

if (buffer_remaining > 0)
{
if (settings_.debug_to_kb_prefix != "")
{
received_data_ += buffer_remaining;
++received_packets_;

if (received_data_max_ < buffer_remaining)
{
received_data_max_ = buffer_remaining;
}
if (received_data_min_ > buffer_remaining || received_data_min_ == 0)
{
received_data_min_ = buffer_remaining;
}
}

MessageHeader * header = (MessageHeader *)buffer;

madara_logger_log (context_->get_logger (), logger::LOG_MINOR,
Expand Down Expand Up @@ -351,6 +380,11 @@ madara::transport::ZMQTransportReadThread::run (void)
"%s:" \
" wait timeout on new messages. Proceeding to next wait\n",
print_prefix);

if (settings_.debug_to_kb_prefix != "")
{
++failed_receives_;
}
}
}
}
15 changes: 15 additions & 0 deletions include/madara/transport/zmq/ZMQTransportReadThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ namespace madara

/// scheduler for mimicking target network conditions
PacketScheduler & packet_scheduler_;

/// received packets
knowledge::containers::Integer received_packets_;

/// bad receives
knowledge::containers::Integer failed_receives_;

/// received data
knowledge::containers::Integer received_data_;

/// max data received
knowledge::containers::Integer received_data_max_;

/// min data received
knowledge::containers::Integer received_data_min_;
};
}
}
Expand Down

0 comments on commit 17a2366

Please sign in to comment.