Skip to content

Commit

Permalink
added a per-minute tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
tarekziade committed Aug 2, 2017
1 parent 0734800 commit 7e90372
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 14 deletions.
40 changes: 29 additions & 11 deletions molotov/fmwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


_STOP = False
_STARTED_AT = _TOLERANCE = None
_REFRESH = .3


Expand Down Expand Up @@ -94,11 +95,20 @@ async def step(worker_id, step_id, session, quiet, verbose, stream,
_HOWLONG = 0


def _reached_tolerance(results, args):
def _reached_tolerance(minute_res, current_time, args):
if not args.sizing:
return False
OK = results['OK']
FAILED = results['FAILED']

global _TOLERANCE
if current_time - _TOLERANCE > 60:
# we need to reset the tolerance counters
_TOLERANCE = current_time
minute_res['OK'] = 0
minute_res['FAILED'] = 0
return False

OK = minute_res['OK']
FAILED = minute_res['FAILED']

if OK + FAILED < 100:
# we don't have enough samples
Expand All @@ -109,7 +119,7 @@ def _reached_tolerance(results, args):
return current_ratio > args.sizing_tolerance


async def worker(num, loop, results, args, stream, statsd, delay):
async def worker(num, loop, results, minute_res, args, stream, statsd, delay):
global _STOP
if delay > 0.:
await asyncio.sleep(delay)
Expand Down Expand Up @@ -154,15 +164,19 @@ async def worker(num, loop, results, args, stream, statsd, delay):
while howlong < duration and not _STOP:
if args.max_runs and count > args.max_runs:
break
howlong = _now() - start
current_time = _now()
howlong = current_time - start
session.step = count
result = await step(num, count, session, quiet, verbose,
stream, single)
if result == 1:
results['OK'] += 1
minute_res['OK'] += 1
elif result == -1:
results['FAILED'] += 1
if (exception or _reached_tolerance(results, args)):
minute_res['FAILED'] += 1
if (exception or _reached_tolerance(minute_res, current_time,
args)):
await stream.put('WORKER_STOPPED')
_STOP = True
elif result == 0:
Expand Down Expand Up @@ -192,7 +206,7 @@ def _worker_done(num, future):
log(e)


def _runner(loop, args, results, stream, statsd):
def _runner(loop, args, results, minute_res, stream, statsd):
def _prepare():
tasks = []
delay = 0
Expand All @@ -202,8 +216,9 @@ def _prepare():
step = 0.

for i in range(args.workers):
future = asyncio.ensure_future(worker(i, loop, results, args,
stream, statsd, delay))
f = worker(i, loop, results, minute_res, args, stream, statsd,
delay)
future = asyncio.ensure_future(f)
future.add_done_callback(partial(_worker_done, i))
tasks.append(future)
delay += step
Expand All @@ -219,7 +234,9 @@ def _prepare():


def _process(args):
global _STOP
global _STOP, _STARTED_AT, _TOLERANCE
_STARTED_AT = _TOLERANCE = _now()

if args.processes > 1:
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
Expand All @@ -233,6 +250,7 @@ def _process(args):
loop.set_debug(True)

results = {'OK': 0, 'FAILED': 0}
minute_res = {'OK': 0, 'FAILED': 0}
stream = asyncio.Queue(loop=loop)

co_tasks = []
Expand All @@ -249,7 +267,7 @@ def _process(args):

co_tasks = asyncio.gather(*co_tasks, loop=loop, return_exceptions=True)

workers = _runner(loop, args, results, stream, statsd)
workers = _runner(loop, args, results, minute_res, stream, statsd)
run_task = asyncio.gather(*workers, loop=loop, return_exceptions=True)

_TASKS.extend(workers)
Expand Down
9 changes: 6 additions & 3 deletions molotov/tests/test_fmwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,12 @@ async def test_two(session):
pass

results = {'OK': 0, 'FAILED': 0}
min = {'OK': 0, 'FAILED': 0}
stream = asyncio.Queue()
args = self.get_args()
statsd = None

await worker(1, loop, results, args, stream, statsd, delay=0)
await worker(1, loop, results, min, args, stream, statsd, delay=0)

self.assertTrue(results['OK'] > 0)
self.assertEqual(results['FAILED'], 0)
Expand Down Expand Up @@ -203,12 +204,13 @@ async def test_two(session):
pass

results = {'OK': 0, 'FAILED': 0}
min = {'OK': 0, 'FAILED': 0}
stream = asyncio.Queue()
args = self.get_args()
args.exception = False
statsd = None

await worker(1, loop, results, args, stream, statsd, delay=0)
await worker(1, loop, results, min, args, stream, statsd, delay=0)

self.assertTrue(results['OK'] > 0)
self.assertEqual(results['FAILED'], 0)
Expand All @@ -222,11 +224,12 @@ async def test_failing(session):
raise ValueError()

results = {'OK': 0, 'FAILED': 0}
min = {'OK': 0, 'FAILED': 0}
stream = asyncio.Queue()
args = self.get_args()
statsd = None

await worker(1, loop, results, args, stream, statsd, delay=0)
await worker(1, loop, results, min, args, stream, statsd, delay=0)

self.assertEqual(results['OK'], 0)
self.assertTrue(results['FAILED'] > 0)
Expand Down

0 comments on commit 7e90372

Please sign in to comment.