Skip to content

Commit

Permalink
Merge pull request #709 from fomars/develop
Browse files Browse the repository at this point in the history
move Phantom and Pandora onto new reader
  • Loading branch information
fomars committed Mar 4, 2019
2 parents 32d23b3 + 0718a65 commit fdd6255
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 47 deletions.
6 changes: 4 additions & 2 deletions yandextank/aggregator/tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import pytest as pytest

from yandextank.aggregator import TankAggregator
from yandextank.plugins.Phantom import PhantomReader
from yandextank.common.util import FileMultiReader
from yandextank.plugins.Phantom.reader import PhantomReader


class PhantomMock(object):
Expand All @@ -24,7 +25,8 @@ def get_reader(self):
# else:
# self.finished.set()
# break
self.reader = PhantomReader(self.phout_filename, ready_file=True)
self.reader = PhantomReader(FileMultiReader(self.phout_filename).get_file(),
ready_file=True)
return self.reader

def get_stats_reader(self):
Expand Down
32 changes: 17 additions & 15 deletions yandextank/common/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,27 @@ def phout_multi_read(self):
with open(self.filename) as f:
exp = f.read()
errors = []
with FileMultiReader(self.filename) as get_reader:
threads = [Thread(target=self.mock_consumer,
args=(get_reader(i), exp, i, errors),
name='Thread-%d' % i) for i in [1000, 4000, 8000]]
[th.start() for th in threads]
[th.join() for th in threads]
mr = FileMultiReader(self.filename)
threads = [Thread(target=self.mock_consumer,
args=(mr.get_file(i), exp, i, errors),
name='Thread-%d' % i) for i in [1000, 4000, 8000]]
[th.start() for th in threads]
[th.join() for th in threads]
mr.close()
return errors

def phout_multi_readline(self):
errors = []
with FileMultiReader(self.filename) as get_reader:
threads = [Thread(target=self.mock_complex_consumer,
args=(get_reader(i), exp, 10, errors),
name='Thread-%d' % i) for i, exp in
[(1000, '\n1543699431'),
(4000, '815\t0\t200\n1543699487'),
(8000, '10968\t3633\t16\t7283\t36\t7387\t1066\t328\t0\t405\n1543699534')]]
[th.start() for th in threads]
[th.join() for th in threads]
mr = FileMultiReader(self.filename)
threads = [Thread(target=self.mock_complex_consumer,
args=(mr.get_file(i), exp, 10, errors),
name='Thread-%d' % i) for i, exp in
[(1000, '\n1543699431'),
(4000, '815\t0\t200\n1543699487'),
(8000, '10968\t3633\t16\t7283\t36\t7387\t1066\t328\t0\t405\n1543699534')]]
[th.start() for th in threads]
[th.join() for th in threads]
mr.close()
return errors

@pytest.mark.benchmark(min_rounds=10)
Expand Down
55 changes: 36 additions & 19 deletions yandextank/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,40 +645,48 @@ class FileMultiReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50):
self.buffer = ""
self.filename = filename
self.closed = False
self.cache_size = cache_size
self._cursor_map = {}
self._is_locked = False

def __enter__(self):
self._opened_file = open(self.filename)
return self.get_reader
self.can_close_map = {}

def __exit__(self, exc_type, exc_val, exc_tb):
def close(self, force=False):
while not self.can_close() and force:
logger.warning('Cannot close until all children finish reading'.format(
[k for k, v in self.can_close_map.items() if not v]))
self.wait_lock()
self._opened_file.close()
if exc_type:
msg = 'Exception occurred:\n{}: {}\n{}'.format(exc_type, exc_val, '\n'.join(traceback.format_tb(exc_tb)))
logger.error(msg)
self.unlock()

def get_reader(self, cache_size=None):
def get_file(self, cache_size=None):
cache_size = self.cache_size if not cache_size else cache_size
return FileLike(self, cache_size)
fileobj = FileLike(self, cache_size)
self.can_close_map[id(fileobj)] = False
return fileobj

def read_with_lock(self, pos, _len):
self.wait_lock()
self._opened_file.seek(pos)
result = self._opened_file.read(_len)
stop_pos = self._opened_file.tell()
self.unlock()
try:
self._opened_file.seek(pos)
result = self._opened_file.read(_len)
stop_pos = self._opened_file.tell()
except ValueError:
result, stop_pos = '', pos
finally:
self.unlock()
return result, stop_pos

def readline_with_lock(self, pos):
self.wait_lock
self._opened_file.seek(pos)
result = self._opened_file.readline()
stop_pos = self._opened_file.tell()
self.unlock()
self.wait_lock()
try:
self._opened_file.seek(pos)
result = self._opened_file.readline()
stop_pos = self._opened_file.tell()
except ValueError:
result, stop_pos = '', pos
finally:
self.unlock()
return result, stop_pos

@retry(wait_random_min=5, wait_random_max=20, stop_max_delay=10000,
Expand All @@ -693,6 +701,12 @@ def wait_lock(self):
def unlock(self):
self._is_locked = False

def _close_child(self, _id):
self.can_close_map[_id] = True

def can_close(self):
return all(self.can_close_map.values())


class FileLike(object):
def __init__(self, multireader, cache_size):
Expand All @@ -711,3 +725,6 @@ def read(self, _len=None):
def readline(self):
result, self._cursor = self.multireader.readline_with_lock(self._cursor)
return result

def close(self):
self.multireader._close_child(id(self))
6 changes: 3 additions & 3 deletions yandextank/plugins/Pandora/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..Console import screen as ConsoleScreen
from ..Phantom import PhantomReader
from ...common.interfaces import AbstractInfoWidget, GeneratorPlugin
from ...common.util import tail_lines
from ...common.util import tail_lines, FileMultiReader

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -153,8 +153,8 @@ def __find_report_filename(self):

def get_reader(self):
if self.reader is None:
self.reader = PhantomReader(self.sample_log)
return self.reader
self.reader = FileMultiReader(self.sample_log)
return PhantomReader(self.reader.get_file())

def get_stats_reader(self):
if self.stats_reader is None:
Expand Down
6 changes: 3 additions & 3 deletions yandextank/plugins/Phantom/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from ..Autostop import Plugin as AutostopPlugin
from ..Console import Plugin as ConsolePlugin
from ...common.interfaces import AbstractCriterion, GeneratorPlugin
from ...common.util import expand_to_seconds
from ...common.util import expand_to_seconds, FileMultiReader

from netort.process import execute

Expand Down Expand Up @@ -87,8 +87,8 @@ def stat_log(self):

def get_reader(self):
if self.reader is None:
self.reader = PhantomReader(self.phantom.phout_file)
return self.reader
self.reader = FileMultiReader(self.phantom.phout_file)
return PhantomReader(self.reader.get_file())

def get_stats_reader(self):
if self.stats_reader is None:
Expand Down
6 changes: 3 additions & 3 deletions yandextank/plugins/Phantom/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ def string_to_df_microsec(data):


class PhantomReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50, ready_file=False, parser=string_to_df):
def __init__(self, fileobj, cache_size=1024 * 1024 * 50, ready_file=False, parser=string_to_df):
self.buffer = ""
self.phout = open(filename, 'r')
self.phout = fileobj
self.closed = False
self.cache_size = cache_size
self.ready_file = ready_file
Expand Down Expand Up @@ -149,7 +149,7 @@ def _decode_stat_data(self, chunk):

offset = chunk_date - 1 - self.start_time
reqps = 0
if offset >= 0 and offset < len(self.phantom_info.steps):
if 0 <= offset < len(self.phantom_info.steps):
reqps = self.phantom_info.steps[offset][0]
yield self.stats_item(chunk_date - 1, instances, reqps)

Expand Down
11 changes: 9 additions & 2 deletions yandextank/plugins/Phantom/tests/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import pandas as pd

from yandextank.common.util import FileMultiReader
from yandextank.plugins.Phantom.reader import PhantomReader, PhantomStatsReader, string_to_df_microsec


class TestPhantomReader(object):
def setup_class(self):
self.multireader = FileMultiReader('yandextank/plugins/Phantom/tests/phout.dat')

def teardown_class(self):
self.multireader.close()

def test_read_all(self):
reader = PhantomReader(
'yandextank/plugins/Phantom/tests/phout.dat', cache_size=1024)
self.multireader.get_file(), cache_size=1024)
df = pd.DataFrame()
for chunk in reader:
if chunk is None:
Expand All @@ -17,7 +24,7 @@ def test_read_all(self):
assert (df['interval_real'].mean() == 11000714.0)

def test_reader_closed(self):
reader = PhantomReader('yandextank/plugins/Phantom/tests/phout.dat', cache_size=64)
reader = PhantomReader(self.multireader.get_file(), cache_size=64)
reader.close()
frames = [i for i in reader]
result = pd.concat(frames)
Expand Down

0 comments on commit fdd6255

Please sign in to comment.