Skip to content
This repository has been archived by the owner on Sep 23, 2020. It is now read-only.

Commit

Permalink
Bug fixes and pretty printing for load test
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam R. Smith committed Jan 29, 2011
1 parent b9f52e0 commit 6cb12ea
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 44 deletions.
19 changes: 13 additions & 6 deletions ion/test/load_runner.py
Expand Up @@ -3,6 +3,7 @@
"""
@file ion/test/load_runner.py
@author Michael Meisinger
@author Adam R. Smith
@brief Spawns a number of Unix processes to create some load
"""

Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(self):

@defer.inlineCallbacks
def start_load_suite(self, suitecls, spawn_procs, options, argv):
print "Start load suite %s" % (suitecls)
print 'Starting load suite "%s.%s"...' % (suitecls.__module__, suitecls.__name__)
numprocs = int(options['count'])

if spawn_procs:
Expand Down Expand Up @@ -135,8 +136,11 @@ def start_load_suite(self, suitecls, spawn_procs, options, argv):
d = self.start_load_proc(suitecls, str(i), options, argv)
deflist.append(d)

dl = defer.DeferredList(deflist)
yield dl
try:
dl = defer.DeferredList(deflist)
yield dl
except SystemExit, ex:
pass

@defer.inlineCallbacks
def start_load_proc(self, suitecls, loadid, options, argv):
Expand All @@ -150,7 +154,11 @@ def start_load_proc(self, suitecls, loadid, options, argv):

self.load_procs[load_proc.load_id] = load_proc

yield defer.maybeDeferred(load_proc.setUp, argv)
try:
yield defer.maybeDeferred(load_proc.setUp, argv)
except SystemExit, ex:
return

yield defer.maybeDeferred(load_proc.generate_load)

yield defer.maybeDeferred(load_proc.tearDown)
Expand All @@ -168,7 +176,7 @@ def pre_shutdown(self):
if self._is_shutdown:
return
self._shutdown_deferred = defer.Deferred()
self._shutdown_to = reactor.callLater(2, self.shutdown_timeout)
self._shutdown_to = reactor.callLater(5, self.shutdown_timeout)

prockeys = sorted(self.load_procs.keys())
for key in prockeys:
Expand All @@ -195,7 +203,6 @@ def load_runner_main(self):
# Use a separator to distinguish options reserved for derived classes
sep = '-'
opts, extraOpts = sys.argv[1:], []
print opts
try:
sepIndex = opts.index(sep)
extraOpts = opts[sepIndex + 1:]
Expand Down
6 changes: 4 additions & 2 deletions ion/test/loadtest.py
Expand Up @@ -34,10 +34,12 @@ class LoadTest(object):
def __init__(self, *args):
self.load_options = LoadTestOptions()
self._shutdown = False
self.start_state = {}
self.base_state = {}
self.cur_state = {}
self.base_state['_time'] = 0.0
self.cur_state['_time'] = time.time()
self.start_state['_time'] = time.time()

def setUp(self):
"""
Expand Down Expand Up @@ -100,11 +102,11 @@ def _enable_monitor(self, delay):
self._update_time()
self._copy_state()

def _call_monitor(self):
def _call_monitor(self, output=True):
self.monitor_call = reactor.callLater(self.monitor_delay, self._call_monitor)
self._update_time()
try:
self.monitor()
self.monitor(output)
except Exception, ex:
print "Exception in load process monitor", ex

Expand Down
105 changes: 69 additions & 36 deletions ion/test/loadtests/brokerload.py
Expand Up @@ -20,29 +20,32 @@
import ion.util.procutils as pu

import sys
import time

class BrokerTestOptions(LoadTestOptions):
optParameters = [
['scenario', 's', 'message', 'Load test scenario.']
, ['host', 'h', 'localhost', 'Broker host name.']
, ['port', 'p', 5672, 'Broker port.']
, ['vhost', 'v', '/', 'Broker vhost.']
, ['heartbeat', 'hb', 0, 'Heartbeat rate [seconds].']
, ['monitor', 'm', 3, 'Monitor poll rate [seconds].']
, ['heartbeat', None, 0, 'Heartbeat rate [seconds].']

# Tuning parameters for custom test
, ['exchange', None, None, 'Name of exchange for distributed tests.']
, ['queue', None, None, 'Name of queue for distributed tests. Supplying a name sets the "exclusive" queue parameter to false.']
, ['route', None, None, 'Name of routing key for distributed tests.']
, ['consume', None, True, 'Whether to run a consumer. Set to false to try and make the broker explode.']
, ['ack', None, True, 'Whether the consumer should "ack" messages. Set to false to try and make the broker explode.']

, ['exchanges', None, 1, 'Number of concurrent exchanges per connection. Note that setting the "exchange" name overrides this.']
, ['routes', None, 1, 'Number of concurrent routes per exchange. Note that setting the "route" name overrides this.']
, ['queues', None, 1, 'Number of concurrent queues per route. Note that setting the "queue" name overrides this.']
, ['publishers', None, 1, 'Number of concurrent publishers per route.']
, ['consumers', None, 1, 'Number of concurrent consumers per queue.']
]
optFlags = []
optFlags = [
['no-consume', None, 'Disable message consumers to try and make the broker explode.']
, ['no-ack', None, 'Disable message acks to try and make the broker explode.']
]


class BrokerTest(LoadTest):
Expand All @@ -59,9 +62,9 @@ def setUp(self, argv=None):
self.broker_port = opts['port']
self.broker_vhost = opts['vhost']
self.monitor_rate = opts['monitor']

print 'Running the "%s" scenario on "%s:%d" at "%s".' % (self.scenario, self.broker_host,
self.broker_port, self.broker_vhost)
self.ack_msgs = not opts['no-ack']
self.consume_msgs = not opts['no-consume']
self.publish_msgs = True

self.cur_state['connects'] = 0
self.cur_state['msgsend'] = 0
Expand All @@ -80,6 +83,8 @@ def generate_load(self):
opts = self.opts

if self.scenario == "connect":
print '#%s] <%s> on %s:%d at %s.' % (self.load_id, self.scenario, self.broker_host,
self.broker_port, self.broker_vhost)
while True:
if self.is_shutdown():
break
Expand All @@ -91,14 +96,21 @@ def generate_load(self):

# Generate a bunch of unique ids for each parameter if an explicit name was not supplied
exchange, queue, route = opts['exchange'], opts['queue'], opts['route']
exchanges = [exchange] if exchange else [self.guid() for i in range(opts['exchanges'])]
routes = [route] if route else [self.guid() for i in range(opts['routes'])]
queues = [queue] if queue else [self.guid() for i in range(opts['queues'])]
exchangecount, queuecount, routecount = int(opts['exchanges']), int(opts['queues']), int(opts['routes'])
exchanges = [exchange] if exchange else [self.guid() for i in range(exchangecount)]
routes = [route] if route else [self.guid() for i in range(routecount)]
queues = [queue] if queue else [self.guid() for i in range(queuecount)]

pubcount, concount = int(opts['publishers']), int(opts['consumers'])
print '#%s] Spawning %d exchanges, %d routes, %d queues, %d total publishers, and %d total consumers.' % (
self.load_id, len(exchanges), len(routes), len(queues), len(exchanges)*len(routes)*pubcount,
len(exchanges)*len(routes)*len(queues)*concount
)
print '-'*80

print 'Exchanges: %s, queues: %s, routes: %s' % (exchanges, queues, routes)

yield self._declare_publishers(exchanges, routes)
yield self._declare_consumers(exchanges, routes, queues)
if self.publish_msgs: yield self._declare_publishers(exchanges, routes, pubcount)
if self.consume_msgs: yield self._declare_consumers(exchanges, routes, queues, concount)

# Start the publisher deferred before waiting/yielding with the consumer
self._run_publishers()
Expand All @@ -118,7 +130,7 @@ def _disconnect_broker(self):
yield self.connection._connection.transport.loseConnection()

@defer.inlineCallbacks
def _declare_publishers(self, exchanges, routes, exchange_type='topic', durable=False, auto_delete=True):
def _declare_publishers(self, exchanges, routes, count, exchange_type='topic', durable=False, auto_delete=True):
self.publishers = []
defers = []
backend = self.connection.create_backend()
Expand All @@ -128,30 +140,33 @@ def _declare_publishers(self, exchanges, routes, exchange_type='topic', durable=
durable=durable, auto_delete=auto_delete))

for route in routes:
pub = messaging.Publisher(connection=self.connection, exchange_type=exchange_type, durable=durable,
auto_delete=auto_delete, exchange=exchange, routing_key=route)
self.publishers.append(pub)
for i in range(count):
pub = messaging.Publisher(connection=self.connection, exchange_type=exchange_type, durable=durable,
auto_delete=auto_delete, exchange=exchange, routing_key=route)
self.publishers.append(pub)

yield defer.DeferredList(defers)

@defer.inlineCallbacks
def _declare_consumers(self, exchanges, routes, queues, exchange_type='topic', durable=False, auto_delete=True,
exclusive=False, no_ack=True):
def _declare_consumers(self, exchanges, routes, queues, count, exchange_type='topic', durable=False,
auto_delete=True, exclusive=False, no_ack=True):
self.consumers = []
qdecs, qbinds = [], []
for exchange in exchanges:
for route in routes:
for queue in queues:
con = messaging.Consumer(connection=self.connection, exchange=exchange, exchange_type=exchange_type,
durable=durable, auto_delete=auto_delete, exclusive=exclusive, no_ack=no_ack, routing_key=route)
for i in range(count):
con = messaging.Consumer(connection=self.connection, exchange=exchange,
exchange_type=exchange_type, durable=durable, auto_delete=auto_delete, exclusive=exclusive,
no_ack=no_ack, routing_key=route)

self.consumers.append(con)
con.register_callback(self._recv_callback)
self.consumers.append(con)
con.register_callback(self._recv_callback)

qdecs.append(con.backend.queue_declare(queue=queue, durable=durable, exclusive=exclusive,
auto_delete=auto_delete, warn_if_exists=con.warn_if_exists))
qbinds.append(con.backend.queue_bind(queue=queue, exchange=exchange,
routing_key=route, arguments={}))
qdecs.append(con.backend.queue_declare(queue=queue, durable=durable, exclusive=exclusive,
auto_delete=auto_delete, warn_if_exists=con.warn_if_exists))
qbinds.append(con.backend.queue_bind(queue=queue, exchange=exchange,
routing_key=route, arguments={}))

#yield self.consumer.qos()

Expand Down Expand Up @@ -197,7 +212,7 @@ def _send_messages(self):

def _recv_callback(self, message):
self.cur_state['msgrecv'] += 1
message.ack()
if self.ack_msgs: message.ack()

@defer.inlineCallbacks
def tearDown(self):
Expand All @@ -206,19 +221,37 @@ def tearDown(self):
yield self._disconnect_broker()

self._disable_monitor()
self._call_monitor()
self._call_monitor(False)
self.summary()

def monitor(self):
interval = self._get_interval()

def monitor(self, output=True):
interval = self._get_interval()
rates = self._get_state_rate()
print "%s: new state %s" % (self.load_id, self.cur_state)
print "%s: rate state %s" % (self.load_id, rates)

#print "%s: performed %s connect (rate %s), %s send, %s receive, %s error" % (
# self.load_id, self.connects, connect_rate, self.msgsend, self.msgrecv, self.errors)
if output:
pieces = []
if rates['errors']: pieces.append('had %.2f errors/sec' % rates['errors'])
if rates['connects']: pieces.append('made %.2f connects/sec' % rates['connects'])
if rates['msgsend']: pieces.append('sent %.2f msgs/sec' % rates['msgsend'])
if rates['msgrecv']: pieces.append('received %.2f msgs/sec' % rates['msgrecv'])
print '#%s] (%s) %s' % (self.load_id, time.strftime('%H:%M:%S'), ', '.join(pieces))

def summary(self):
state = self.cur_state
secsElapsed = (state['_time'] - self.start_state['_time']) or 0.0001

print '\n'.join([
'-'*80
, '#%s Summary' % (self.load_id)
, 'Test ran for %.2f seconds, sending a total of %d messages and receiving a total of %d messages.' % (
secsElapsed, state['msgsend'], state['msgrecv'])
, 'The average messages/second was %.2f sent and %.2f received.' % (
state['msgsend']/secsElapsed, state['msgrecv']/secsElapsed)
, '-'*80
])


"""
python -m ion.test.load_runner -s -c ion.test.loadtests.brokerload.BrokerTest -s connect
python -m ion.test.load_runner -s -c ion.test.loadtests.brokerload.BrokerTest - -s connect
"""

0 comments on commit 6cb12ea

Please sign in to comment.