Skip to content

Commit

Permalink
* Added transport setting max_send_hertz to TransportSettings (#91)
Browse files Browse the repository at this point in the history
* Added max_send_hertz enforcement to UDP transports
  * uses EpochEnforcer
* Modified file_receiver.py to be a little bit more interesting though it isn't tracking progress correctly
* Updated MFS to include option -sz --send-hz that provides access to TransportSettings::max_send_hertz
  • Loading branch information
jredmondson committed Sep 23, 2018
1 parent 222d0f5 commit 9edc711
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 83 deletions.
6 changes: 6 additions & 0 deletions include/madara/transport/TransportSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ madara::transport::TransportSettings::TransportSettings (
send_reduced_message_header (settings.send_reduced_message_header),
slack_time (settings.slack_time),
read_thread_hertz (settings.read_thread_hertz),
max_send_hertz (settings.max_send_hertz),
hosts (),
no_sending (settings.no_sending),
no_receiving (settings.no_receiving),
Expand Down Expand Up @@ -107,6 +108,7 @@ madara::transport::TransportSettings::operator= (
send_reduced_message_header = settings.send_reduced_message_header;
slack_time = settings.slack_time;
read_thread_hertz = settings.read_thread_hertz;
max_send_hertz = settings.max_send_hertz;

hosts.resize (settings.hosts.size ());
for (unsigned int i = 0; i < settings.hosts.size (); ++i)
Expand Down Expand Up @@ -156,6 +158,7 @@ madara::transport::TransportSettings::load (const std::string & filename,
send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
slack_time = knowledge.get (prefix + ".slack_time").to_double ();
read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
max_send_hertz = knowledge.get (prefix + ".max_send_hertz").to_double ();

containers::StringVector kb_hosts (prefix + ".hosts", knowledge);

Expand Down Expand Up @@ -203,6 +206,7 @@ madara::transport::TransportSettings::load_text (const std::string & filename,
send_reduced_message_header = knowledge.get (prefix + ".send_reduced_message_header").is_true ();
slack_time = knowledge.get (prefix + ".slack_time").to_double ();
read_thread_hertz = knowledge.get (prefix + ".read_thread_hertz").to_double ();
max_send_hertz = knowledge.get (prefix + ".max_send_hertz").to_double ();

containers::StringVector kb_hosts (prefix + ".hosts", knowledge);

Expand Down Expand Up @@ -257,6 +261,7 @@ madara::transport::TransportSettings::save (const std::string & filename,
Integer (send_reduced_message_header));
knowledge.set (prefix + ".slack_time", slack_time);
knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
knowledge.set (prefix + ".max_send_hertz", max_send_hertz);

for (size_t i = 0; i < hosts.size (); ++i)
kb_hosts.set (i, hosts[i]);
Expand Down Expand Up @@ -307,6 +312,7 @@ madara::transport::TransportSettings::save_text (const std::string & filename,
Integer (send_reduced_message_header));
knowledge.set (prefix + ".slack_time", slack_time);
knowledge.set (prefix + ".read_thread_hertz", read_thread_hertz);
knowledge.set (prefix + ".max_send_hertz", max_send_hertz);

for (size_t i = 0; i < hosts.size (); ++i)
kb_hosts.set (i, hosts[i]);
Expand Down
27 changes: 18 additions & 9 deletions include/madara/transport/TransportSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,36 +229,45 @@ namespace madara
/// See madara::transport::Reliabilities for options
uint32_t reliability = DEFAULT_RELIABILITY;

/// the id of this process (DEPRECATED). You do not need to set this
/// The id of this process (DEPRECATED). You do not need to set this
uint32_t id = DEFAULT_ID;

/// number of processes (DEPRECATED). You do not need to set this
/// Number of processes (DEPRECATED). You do not need to set this
uint32_t processes = DEFAULT_PROCESSES;

/// logic to be evaluated after every successful update
/// Logic to be evaluated after every successful update
std::string on_data_received_logic;

/// delay launching transports
/// Delay launching transports until explicit activate call
bool delay_launch = false;

/// prevent MADARA from exiting on fatal errors and invalid state
/// Prevent MADARA from exiting on fatal errors and invalid state
bool never_exit = false;

/// send the reduced message header (clock, size, updates, KaRL id)
/// Send a reduced message header (clock, size, updates, KaRL id)
bool send_reduced_message_header = false;

/// map of fragments received by originator
/// Map of fragments received by originator
mutable OriginatorFragmentMap fragment_map;

/// time to sleep between sends and rebroadcasts
/// Time to sleep between sends and rebroadcasts
double slack_time = 0;

/**
* number of valid messages allowed to be received per second. This
* Number of valid messages allowed to be received per second. This
* value can be -1 or 0.0 to go as fast as possible
**/
double read_thread_hertz = 0.0;

/**
* Maximum rate of sending messages. This is not a bandwidth limit.
* This specifically limits the number of times the transport can
* send in a second. Under the hood, it is causing the transport to
* enforce sleeps between sends to provide time for the OS or transport
* to recover. May be especially useful to UDP transports.
**/
double max_send_hertz = 0.0;

/**
* Host information for transports that require it. The format of these
* is transport specific, but for UDP, you might have "localhost:1234"
Expand Down
8 changes: 7 additions & 1 deletion include/madara/transport/udp/UdpTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace madara { namespace transport {
UdpTransport::UdpTransport (const std::string & id,
knowledge::ThreadSafeContext & context,
TransportSettings & config, bool launch_transport)
: BasicASIOTransport (id, context, config)
: BasicASIOTransport (id, context, config),
enforcer_ (1 / config.max_send_hertz)
{
if (launch_transport)
setup ();
Expand Down Expand Up @@ -96,6 +97,11 @@ UdpTransport::send_buffer (
send_attempts < settings_.resend_attempts))
{

if (settings_.max_send_hertz > 0)
{
enforcer_.sleep_until_next ();
}

// send the fragment
try {
actual_sent = socket_.send_to (
Expand Down
10 changes: 8 additions & 2 deletions include/madara/transport/udp/UdpTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "madara/transport/BasicASIOTransport.h"
#include "madara/transport/Transport.h"
#include "madara/threads/Threader.h"
#include "madara/utility/EpochEnforcer.h"

#include <string>
#include <map>
Expand Down Expand Up @@ -58,7 +59,8 @@ namespace madara
* @param updates listing of all updates that must be sent
* @return result of write operation or -1 if we are shutting down
**/
long send_data (const madara::knowledge::VariableReferenceMap & updates) override;
long send_data (
const madara::knowledge::VariableReferenceMap & updates) override;

protected:
int setup_read_socket () override;
Expand All @@ -67,8 +69,12 @@ namespace madara

long send_message (const char *buf, size_t size);
long send_buffer (const udp::endpoint &target, const char *buf, size_t size);
virtual bool pre_send_buffer (size_t addr_index) { return addr_index != 0; }
virtual bool pre_send_buffer (size_t addr_index)
{
return addr_index != 0;
}

utility::EpochEnforcer <utility::Clock> enforcer_;
friend class UdpTransportReadThread;
};
}
Expand Down
2 changes: 1 addition & 1 deletion include/madara/utility/EpochEnforcer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace madara
* @param period the period between epochs in seconds
* @param max_duration the maximum duration in seconds
**/
EpochEnforcer (double period, double max_duration)
EpochEnforcer (double period, double max_duration = -1)
{
start ();
set_period (period);
Expand Down
37 changes: 28 additions & 9 deletions port/python/tests/file_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

# create transport settings for a multicast transport
settings = transport.QoSTransportSettings()
settings.hosts.append("239.255.0.1:4150")
settings.type = transport.TransportTypes.MULTICAST
#settings.hosts.append("127.0.0.1:40002")
#settings.hosts.append("127.0.0.1:40001")
#settings.hosts.append("127.0.0.1:40000")
#settings.hosts.append("239.255.0.1:4150")
#settings.type = transport.TransportTypes.MULTICAST
settings.type = transport.TransportTypes.UDP
settings.hosts.append("127.0.0.1:40002")
settings.hosts.append("127.0.0.1:40001")
settings.hosts.append("127.0.0.1:40000")
#settings.type = transport.TransportTypes.ZMQ
settings.queue_length = 12000000

Expand All @@ -32,13 +33,31 @@
settings.add_receive_filter(filter)

# create a knowledge base with the multicast transport settings
knowledge = engine.KnowledgeBase("agent1", settings)
kb = engine.KnowledgeBase("agent1", settings)

sleep_time = 180
filename = "samples/chapter16.mp3"

if len(sys.argv) > 1:
filename = sys.argv[1]
print ("Changing file name to ", filename)

if len(sys.argv) > 2:
print ("Changing sleep time to ", sys.argv[1])
sleep_time = int (sys.argv[1])
sleep_time = float (sys.argv[2])
print ("Changing sleep time to ", sleep_time)

print ("keys will be read from ", "agent.0.sandbox.files.file.", filename)

for i in range (18):
crc = kb.get ("agent.0.sandbox.files.file." + filename + ".crc").to_integer ()
file_size = kb.get ("agent.0.sandbox.files.file." + filename + ".size").to_integer ()
percentage = 0
print ("CRC is ", crc, " and file size is ", file_size)

if (crc):
received_bytes = madara.utility.get_file_progress ("files/" + filename, crc, file_size)
percentage = float (received_bytes / file_size)

time.sleep(sleep_time)
print ("Percentage is ", percentage)
time.sleep(sleep_time)

133 changes: 72 additions & 61 deletions tools/mfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,16 @@ void handle_arguments (int argc, char ** argv)

++i;
}
else if (arg1 == "-sz" || arg1 == "--send-hz")
{
if (i + 1 < argc)
{
std::stringstream buffer (argv[i + 1]);
buffer >> settings.max_send_hertz;
}

++i;
}
else if (arg1 == "-u" || arg1 == "--udp")
{
if (i + 1 < argc)
Expand Down Expand Up @@ -1407,67 +1417,68 @@ void handle_arguments (int argc, char ** argv)
else
{
madara_logger_ptr_log (logger::global_logger.get (), logger::LOG_ALWAYS,
"\nProgram summary for %s [options] [Logic]:\n\n" \
"Serves files up to clients within MFS-defined sandboxes.\n\noptions:\n" \
" [-b|--broadcast ip:port] the broadcast ip to send and listen to\n" \
" [-d|--domain domain] the knowledge domain to send and listen to\n" \
" [-esb|--send-bandwidth limit] bandwidth limit to enforce over a 10s\n" \
" window. If the limit is violated, then the\n" \
" mfs agent will not send again until the send\n" \
" bandwidth is within an acceptable level again\n" \
" [-etb|--total-bandwidth limit] bandwidth limit to enforce over a 10s\n" \
" window. If the limit is violated, then the\n" \
" mfs agent will not send again until the total\n" \
" bandwidth is within an acceptable level again\n" \
" [-f|--logfile file] log to a file\n" \
" [-h|--help] print help menu (i.e., this menu)\n" \
" [-l|--level level] the logger level (0+, higher is higher detail)\n" \
" [-lt|--load-transport file] a file to load transport settings from\n" \
" [-ltp|--load-transport-prefix prfx] prefix of saved settings\n" \
" [-ltt|--load-transport-text file] a text file to load transport settings from\n" \
" [-lz4|--lz4] add lz4 compression filter\n" \
" [-m|--multicast ip:port] the multicast ip to send and listen to\n" \
" [-o|--host hostname] the hostname of this process (def:localhost)\n" \
" [-p|--prefix prefix] prefix of this agent / service (e.g. agent.0)\n" \
" [-q|--queue-length size] size of network buffers in bytes\n" \
" [-r|--reduced] use the reduced message header\n" \
" [-rhz|--read-hz hz] hertz rate of read threads\n" \
" [-rq|--requests size] size of the requests queue (def 50)\n" \
" [-s|--save file] save the resulting knowledge base as karl\n" \
" [-sb|--save-binary file] save the resulting knowledge base as a\n" \
" binary checkpoint\n" \
" [-sc|--save-checkpoint file] save any changes by logics since initial\n" \
" loads as a checkpoint diff to the specified\n" \
" file in an appended layer\n"
" [-scp|--save-checkpoint-prefix prfx]\n" \
" prefix of knowledge to save in checkpoint\n" \
" [-sj|--save-json file] save the resulting knowledge base as JSON\n" \
" [-ss|--save-size bytes] size of buffer needed for file saves\n" \
" [-ssl|--ssl pass] add an ssl filter with a password\n" \
" [-st|--save-transsport file] a file to save transport settings to\n"
" [-stp|--save-transport-prefix prfx] prefix to save settings at\n" \
" [-stt|--save-transport-text file] a text file to save transport settings to\n" \
" [-t|--time time] time to run this service (-1 is forever).\n" \
" [-u|--udp ip:port] the udp ips to send to (first is self to bind to)\n" \
" only runs once. If zero, hertz is infinite.\n" \
" If positive, hertz is that hertz rate.\n" \
" [--zmq|--0mq proto://ip:port] a ZeroMQ endpoint to connect to.\n" \
" examples include tcp://127.0.0.1:30000\n" \
" or any of the other endpoint types like\n" \
" pgm://. For tcp, remember that the first\n" \
" endpoint defined must be your own, the\n" \
" one you are binding to, and all other\n" \
" agent endpoints must also be defined or\n" \
" no messages will ever be sent to them.\n" \
" Similarly, all agents will have to have\n" \
" this endpoint added to their list or\n" \
" this karl agent will not see them.\n" \
" [-0|--init-logic logic] logic containing initial variables (only ran once)\n" \
" [-0f|--init-file file] file containing initial variables (only ran once)\n" \
" [-0b|--init-bin file] file containing binary knowledge base, the result\n" \
" of save_context (only ran once)\n" \
" [--meta-prefix prefix] store checkpoint meta data at knowledge prefix\n" \
" [--use-id] use the id of the checkpointed binary load\n" \
"\nProgram summary for %s [options] [Logic]:\n\n" \
"Serves files up to clients within MFS-defined sandboxes.\n\noptions:\n" \
" [-b|--broadcast ip:port] the broadcast ip to send and listen to\n" \
" [-d|--domain domain] the knowledge domain to send and listen to\n" \
" [-esb|--send-bandwidth limit] bandwidth limit to enforce over a 10s\n" \
" window. If the limit is violated, then the\n" \
" mfs agent will not send again until the send\n" \
" bandwidth is within an acceptable level again\n" \
" [-etb|--total-bandwidth limit] bandwidth limit to enforce over a 10s\n" \
" window. If the limit is violated, then the\n" \
" mfs agent will not send again until the total\n" \
" bandwidth is within an acceptable level again\n" \
" [-f|--logfile file] log to a file\n" \
" [-h|--help] print help menu (i.e., this menu)\n" \
" [-l|--level level] the logger level (0+, higher is higher detail)\n" \
" [-lt|--load-transport file] a file to load transport settings from\n" \
" [-ltp|--load-transport-prefix prfx] prefix of saved settings\n" \
" [-ltt|--load-transport-text file] text file to load transport settings\n" \
" [-lz4|--lz4] add lz4 compression filter\n" \
" [-m|--multicast ip:port] the multicast ip to send and listen to\n" \
" [-o|--host hostname] the hostname of this process (def:localhost)\n" \
" [-p|--prefix prefix] prefix of this agent / service (e.g. agent.0)\n" \
" [-q|--queue-length size] size of network buffers in bytes\n" \
" [-r|--reduced] use the reduced message header\n" \
" [-rhz|--read-hz hz] hertz rate of read threads\n" \
" [-rq|--requests size] size of the requests queue (def 50)\n" \
" [-s|--save file] save the resulting knowledge base as karl\n" \
" [-sb|--save-binary file] save the resulting knowledge base as a\n" \
" binary checkpoint\n" \
" [-sc|--save-checkpoint file] save any changes by logics since initial\n" \
" loads as a checkpoint diff to the specified\n" \
" file in an appended layer\n"
" [-scp|--save-checkpoint-prefix prfx]\n" \
" prefix of knowledge to save in checkpoint\n" \
" [-sj|--save-json file] save the resulting knowledge base as JSON\n" \
" [-ss|--save-size bytes] size of buffer needed for file saves\n" \
" [-ssl|--ssl pass] add an ssl filter with a password\n" \
" [-st|--save-transsport file] a file to save transport settings to\n"
" [-stp|--save-transport-prefix prfx] prefix to save settings at\n" \
" [-stt|--save-transport-text file] text file to save settings to\n" \
" [-sz|--send-hz hz] maximum messages per second (inf by default)\n" \
" [-t|--time time] time to run this service (-1 is forever).\n" \
" [-u|--udp ip:port] the udp ips to send to (first is self to bind\n" \
" to) only runs once. If zero, hertz is infinite.\n"\
" If positive, hertz is that hertz rate.\n" \
" [--zmq|--0mq proto://ip:port] a ZeroMQ endpoint to connect to.\n" \
" examples include tcp://127.0.0.1:30000\n" \
" or any of the other endpoint types like\n" \
" pgm://. For tcp, remember that the first\n" \
" endpoint defined must be your own, the\n" \
" one you are binding to, and all other\n" \
" agent endpoints must also be defined or\n" \
" no messages will ever be sent to them.\n" \
" Similarly, all agents will have to have\n" \
" this endpoint added to their list or\n" \
" this karl agent will not see them.\n" \
" [-0|--init-logic logic] logic with initial variables (only ran once)\n" \
" [-0f|--init-file file] file with initial variables (only ran once)\n" \
" [-0b|--init-bin file] file with binary knowledge base, the result\n" \
" of save_context (only ran once)\n" \
" [--meta-prefix prefix] store checkpoint meta data at knowledge prefix\n" \
" [--use-id] use the id of the checkpointed binary load\n" \
"\n",
argv[0]);
exit (0);
Expand Down

0 comments on commit 9edc711

Please sign in to comment.