Skip to content

Commit

Permalink
Merge pull request #713 from fomars/develop
Browse files Browse the repository at this point in the history
FileMultiReader stop event, PhantomReader StopIteration
  • Loading branch information
fomars committed Mar 6, 2019
2 parents fb0386d + a06390d commit 333eb79
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 101 deletions.
9 changes: 6 additions & 3 deletions yandextank/aggregator/tank_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,18 @@ def _collect_data(self, end=False):
logger.info(ts)
self.__notify_listeners(data_item, StatsReader.stats_item(ts, 0, 0))

def is_aggr_finished(self):
return self.drain._finished.is_set() and self.stats_drain._finished.is_set()

def is_test_finished(self):
self._collect_data()
return -1

def end_test(self, retcode):
retcode = self.generator.end_test(retcode)
if self.reader:
logger.debug('Closing gun reader')
self.reader.close()
# if self.reader:
# logger.debug('Closing gun reader')
# self.reader.close()
if self.stats_reader:
logger.debug('Closing stats reader')
self.stats_reader.close()
Expand Down
16 changes: 3 additions & 13 deletions yandextank/aggregator/tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,7 @@ def __init__(self, phout):

def get_reader(self):
if self.reader is None:
# def reader(phout):
# with open(phout) as f:
# while True:
# line = f.readline()
# if line:
# print '.'
# yield string_to_df(line)
# else:
# self.finished.set()
# break
self.reader = PhantomReader(FileMultiReader(self.phout_filename).get_file(),
ready_file=True)
self.reader = PhantomReader(FileMultiReader(self.phout_filename, self.finished).get_file())
return self.reader

def get_stats_reader(self):
Expand Down Expand Up @@ -57,7 +46,8 @@ def test_agregator(phout, expected_rps):
listener = ListenerMock(expected_rps)
aggregator.add_result_listener(listener)
aggregator.start_test(poll_period=0)
while not generator.reader.closed:
generator.finished.set()
while not aggregator.is_aggr_finished():
aggregator.is_test_finished()
aggregator.end_test(1)
assert abs(listener.avg - expected_rps) < 0.1 * expected_rps
10 changes: 7 additions & 3 deletions yandextank/common/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import socket
from threading import Thread
from threading import Thread, Event

import pytest
from queue import Queue
Expand Down Expand Up @@ -156,25 +156,29 @@ def phout_multi_read(self):
with open(self.filename) as f:
exp = f.read()
errors = []
mr = FileMultiReader(self.filename)
stop = Event()
mr = FileMultiReader(self.filename, stop)
threads = [Thread(target=self.mock_consumer,
args=(mr.get_file(i), exp, i, errors),
name='Thread-%d' % i) for i in [1000, 4000, 8000]]
[th.start() for th in threads]
stop.set()
[th.join() for th in threads]
mr.close()
return errors

def phout_multi_readline(self):
errors = []
mr = FileMultiReader(self.filename)
stop = Event()
mr = FileMultiReader(self.filename, stop)
threads = [Thread(target=self.mock_complex_consumer,
args=(mr.get_file(i), exp, 10, errors),
name='Thread-%d' % i) for i, exp in
[(1000, '\n1543699431'),
(4000, '815\t0\t200\n1543699487'),
(8000, '10968\t3633\t16\t7283\t36\t7387\t1066\t328\t0\t405\n1543699534')]]
[th.start() for th in threads]
stop.set()
[th.join() for th in threads]
mr.close()
return errors
Expand Down
45 changes: 13 additions & 32 deletions yandextank/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,51 +642,41 @@ def retry(cls, exception):


class FileMultiReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50):
def __init__(self, filename, provider_stop_event, cache_size=1024 * 1024 * 50):
self.buffer = ""
self.filename = filename
self.cache_size = cache_size
self._cursor_map = {}
self._is_locked = False
self._opened_file = open(self.filename)
self.can_close_map = {}
self.stop = provider_stop_event

def close(self, force=False):
while not self.can_close() and force:
logger.warning('Cannot close until all children finish reading'.format(
[k for k, v in self.can_close_map.items() if not v]))
self.wait_lock()
self._opened_file.close()
self.unlock()

def get_file(self, cache_size=None):
cache_size = self.cache_size if not cache_size else cache_size
fileobj = FileLike(self, cache_size)
self.can_close_map[id(fileobj)] = False
return fileobj

def read_with_lock(self, pos, _len):
self.wait_lock()
try:
self._opened_file.seek(pos)
result = self._opened_file.read(_len)
stop_pos = self._opened_file.tell()
except ValueError:
result, stop_pos = '', pos
finally:
self.unlock()
return result, stop_pos

def readline_with_lock(self, pos):
def read_with_lock(self, pos, _len=None):
"""
Reads {_len} characters if _len is not None else reads line
:param pos: start reading position
:param _len: number of characters to read
:rtype: (string, int)
"""
self.wait_lock()
try:
self._opened_file.seek(pos)
result = self._opened_file.readline()
result = self._opened_file.read(_len) if _len is not None else self._opened_file.readline()
stop_pos = self._opened_file.tell()
except ValueError:
result, stop_pos = '', pos
finally:
self.unlock()
if not result and self.stop.is_set():
result = None
return result, stop_pos

@retry(wait_random_min=5, wait_random_max=20, stop_max_delay=10000,
Expand All @@ -701,12 +691,6 @@ def wait_lock(self):
def unlock(self):
self._is_locked = False

def _close_child(self, _id):
self.can_close_map[_id] = True

def can_close(self):
return all(self.can_close_map.values())


class FileLike(object):
def __init__(self, multireader, cache_size):
Expand All @@ -723,8 +707,5 @@ def read(self, _len=None):
return result

def readline(self):
result, self._cursor = self.multireader.readline_with_lock(self._cursor)
result, self._cursor = self.multireader.read_with_lock(self._cursor)
return result

def close(self):
self.multireader._close_child(id(self))
4 changes: 4 additions & 0 deletions yandextank/plugins/NeUploader/config/example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
neuploader:
enabled: true
package: yandextank.plugins.NeUploader
test_name: my_test
3 changes: 2 additions & 1 deletion yandextank/plugins/NeUploader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def monitoring_data(self, data_list):

def post_process(self, retcode):
for chunk in self.reader:
self.uploader(chunk)
if chunk is not None:
self.uploader(chunk)
return retcode

@property
Expand Down
8 changes: 7 additions & 1 deletion yandextank/plugins/Pandora/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import logging
import subprocess
import time
from threading import Event

import yaml

from netort.resource import manager as resource_manager
Expand All @@ -25,6 +27,7 @@ class Plugin(GeneratorPlugin):

def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.output_finished = Event()
self.enum_ammo = False
self.process_start_time = None
self.pandora_cmd = None
Expand Down Expand Up @@ -153,7 +156,7 @@ def __find_report_filename(self):

def get_reader(self, parser=string_to_df):
if self.reader is None:
self.reader = FileMultiReader(self.sample_log)
self.reader = FileMultiReader(self.sample_log, self.output_finished)
return PhantomReader(self.reader.get_file(), parser=parser)

def get_stats_reader(self):
Expand Down Expand Up @@ -198,10 +201,12 @@ def is_test_finished(self):
retcode = self.process.poll()
if retcode is not None and retcode == 0:
logger.info("Pandora subprocess done its work successfully and finished w/ retcode 0")
self.output_finished.set()
return retcode
elif retcode is not None and retcode != 0:
lines_amount = 20
logger.info("Pandora finished with non-zero retcode. Last %s logs of Pandora log:", lines_amount)
self.output_finished.set()
last_log_contents = tail_lines(self.process_stderr_file, lines_amount)
for logline in last_log_contents:
logger.info(logline.strip('\n'))
Expand All @@ -214,6 +219,7 @@ def end_test(self, retcode):
logger.warning(
"Terminating worker process with PID %s", self.process.pid)
self.process.terminate()
self.output_finished.set()
if self.process_stderr:
self.process_stderr.close()
else:
Expand Down
6 changes: 5 additions & 1 deletion yandextank/plugins/Phantom/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import subprocess
import time
from threading import Event

from .reader import PhantomReader, PhantomStatsReader, string_to_df
from .utils import PhantomConfig
Expand All @@ -25,6 +26,7 @@ class Plugin(GeneratorPlugin):

def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.phout_finished = Event()
self.predefined_phout = None
self.did_phout_import_try = False
self.eta_file = None
Expand Down Expand Up @@ -87,7 +89,7 @@ def stat_log(self):

def get_reader(self, parser=string_to_df):
if self.reader is None:
self.reader = FileMultiReader(self.phantom.phout_file)
self.reader = FileMultiReader(self.phantom.phout_file, self.phout_finished)
return PhantomReader(self.reader.get_file(), parser=parser)

def get_stats_reader(self):
Expand Down Expand Up @@ -155,6 +157,7 @@ def is_test_finished(self):
retcode = self.process.poll()
if retcode is not None:
logger.info("Phantom done its work with exit code: %s", retcode)
self.phout_finished.set()
return abs(retcode)
else:
info = self.get_info()
Expand All @@ -167,6 +170,7 @@ def end_test(self, retcode):
if self.process and self.process.poll() is None:
logger.info("Terminating phantom process with PID %s", self.process.pid)
self.process.terminate()
self.phout_finished.set()
if self.process:
self.process.communicate()
else:
Expand Down
52 changes: 11 additions & 41 deletions yandextank/plugins/Phantom/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,58 +80,28 @@ def string_to_df_microsec(data):


class PhantomReader(object):
def __init__(self, fileobj, cache_size=1024 * 1024 * 50, ready_file=False, parser=string_to_df):
def __init__(self, fileobj, cache_size=1024 * 1024 * 50, parser=string_to_df):
self.buffer = ""
self.phout = fileobj
self.closed = False
self.cache_size = cache_size
self.ready_file = ready_file
self.parser = parser

def _read_phout_chunk(self):
def __iter__(self):
return self

def next(self):
data = self.phout.read(self.cache_size)
if data:
if data is None:
raise StopIteration
else:
parts = data.rsplit('\n', 1)
if len(parts) > 1:
ready_chunk = self.buffer + parts[0] + '\n'
chunk = self.buffer + parts[0] + '\n'
self.buffer = parts[1]
return self.parser(ready_chunk)
return self.parser(chunk)
else:
self.buffer += parts[0]
else:
self.buffer += self.phout.readline()
if self.ready_file:
self.close()
return None

def __iter__(self):
return self
# while not self.closed:
# yield self._read_phout_chunk()
# # read end
# chunk = self._read_phout_chunk()
# while chunk is not None:
# yield chunk
# chunk = self._read_phout_chunk()
# # don't forget the buffer
# if self.buffer:
# yield self.parser(self.buffer)
# self.phout.close()

def next(self):
chunk = self._read_phout_chunk()
if self.closed and chunk is None:
if self.buffer:
buff, self.buffer = self.buffer, ''
return self.parser(buff)
else:
self.phout.close()
raise StopIteration
else:
return chunk

def close(self):
self.closed = True
return None


class PhantomStatsReader(StatsReader):
Expand Down
12 changes: 6 additions & 6 deletions yandextank/plugins/Phantom/tests/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from threading import Event

import pandas as pd

from yandextank.common.util import FileMultiReader
Expand All @@ -6,7 +8,9 @@

class TestPhantomReader(object):
def setup_class(self):
self.multireader = FileMultiReader('yandextank/plugins/Phantom/tests/phout.dat')
stop = Event()
self.multireader = FileMultiReader('yandextank/plugins/Phantom/tests/phout.dat', stop)
stop.set()

def teardown_class(self):
self.multireader.close()
Expand All @@ -16,16 +20,12 @@ def test_read_all(self):
self.multireader.get_file(), cache_size=1024)
df = pd.DataFrame()
for chunk in reader:
if chunk is None:
reader.close()
else:
df = df.append(chunk)
df = df.append(chunk)
assert (len(df) == 200)
assert (df['interval_real'].mean() == 11000714.0)

def test_reader_closed(self):
reader = PhantomReader(self.multireader.get_file(), cache_size=64)
reader.close()
frames = [i for i in reader]
result = pd.concat(frames)
assert len(result) == 200
Expand Down

0 comments on commit 333eb79

Please sign in to comment.