Skip to content

Commit

Permalink
Merge pull request #708 from fomars/develop
Browse files Browse the repository at this point in the history
FileMultiReader
  • Loading branch information
fomars committed Mar 1, 2019
2 parents 5ccbb1f + 8eb62a4 commit 32d23b3
Show file tree
Hide file tree
Showing 11 changed files with 10,236 additions and 19 deletions.
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
'psutil>=1.2.1', 'requests>=2.5.1', 'paramiko>=1.16.0',
'pandas>=0.18.0', 'numpy>=1.12.1', 'future>=0.16.0',
'pip>=8.1.2',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0', 'netort==0.3.0',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0', 'netort>=0.3.0',
'retrying==1.3.3', 'pytest-benchmark==3.2.2'
],
setup_requires=[
],
tests_require=[
'pytest', 'pytest-runner', 'flake8',
'pytest', 'pytest-runner', 'flake8', 'pytest-benchmark'
],
license='LGPLv2',
classifiers=[
Expand Down
4 changes: 2 additions & 2 deletions yandextank/aggregator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .chopper import TimeChopper # noqa:F401
from .tank_aggregator import TankAggregator # noqa:F401
from .chopper import TimeChopper # noqa:F401
from .tank_aggregator import TankAggregator # noqa:F401
6 changes: 3 additions & 3 deletions yandextank/aggregator/tank_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, generator):
self.generator = generator
self.listeners = [] # [LoggingListener()]
self.results = q.Queue()
self.stats = q.Queue()
self.stats_results = q.Queue()
self.data_cache = {}
self.stat_cache = {}
self.reader = None
Expand Down Expand Up @@ -73,7 +73,7 @@ def start_test(self, poll_period=1):
self.stats_drain = Drain(
Chopper(DataPoller(
source=self.stats_reader, poll_period=poll_period)),
self.stats)
self.stats_results)
self.stats_drain.start()
else:
logger.warning("Generator not found. Generator must provide a reader and a stats_reader interface")
Expand All @@ -83,7 +83,7 @@ def _collect_data(self, end=False):
Collect data, cache it and send to listeners
"""
data = get_nowait_from_queue(self.results)
stats = get_nowait_from_queue(self.stats)
stats = get_nowait_from_queue(self.stats_results)
logger.debug("Data timestamps: %s" % [d.get('ts') for d in data])
logger.debug("Stats timestamps: %s" % [d.get('ts') for d in stats])
for item in data:
Expand Down
4 changes: 3 additions & 1 deletion yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ def __init__(
self.loop_count = loop_count

def get_info(self):
# type: () -> Info
"""
:rtype: GeneratorPlugin.Info
"""
return self.Info(**self.DEFAULT_INFO)

def get_reader(self):
Expand Down
10,075 changes: 10,075 additions & 0 deletions yandextank/common/tests/ph.out

Large diffs are not rendered by default.

58 changes: 57 additions & 1 deletion yandextank/common/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import socket
from threading import Thread

import pytest
from queue import Queue
from yandextank.common.util import FileScanner
from yandextank.common.util import FileScanner, FileMultiReader
from yandextank.common.util import AddressWizard

from netort.data_processing import Drain, Chopper
Expand Down Expand Up @@ -130,3 +132,57 @@ def test_hostname_braces(self):

def test_hostname_braces_port(self):
self.__resolve_hostname_and_test('[ya.ru]:666', 'ya.ru', '666')


class TestFileMultiReader(object):
filename = 'yandextank/common/tests/ph.out'

@staticmethod
def mock_consumer(f, expected, step, errors):
for line in [expected[i: i + step] for i in range(0, len(expected), step)]:
res = f.read(step)
if line not in res:
errors.append("Expected: {}\nGot: {}".format(expected, res))

@staticmethod
def mock_complex_consumer(f, expected, n_steps, errors):
for n in range(n_steps):
f.read()
res = f.readline() + f.read(10)
if res != expected:
errors.append("Expected: {}\nGot: {}".format(expected, res))

def phout_multi_read(self):
with open(self.filename) as f:
exp = f.read()
errors = []
with FileMultiReader(self.filename) as get_reader:
threads = [Thread(target=self.mock_consumer,
args=(get_reader(i), exp, i, errors),
name='Thread-%d' % i) for i in [1000, 4000, 8000]]
[th.start() for th in threads]
[th.join() for th in threads]
return errors

def phout_multi_readline(self):
errors = []
with FileMultiReader(self.filename) as get_reader:
threads = [Thread(target=self.mock_complex_consumer,
args=(get_reader(i), exp, 10, errors),
name='Thread-%d' % i) for i, exp in
[(1000, '\n1543699431'),
(4000, '815\t0\t200\n1543699487'),
(8000, '10968\t3633\t16\t7283\t36\t7387\t1066\t328\t0\t405\n1543699534')]]
[th.start() for th in threads]
[th.join() for th in threads]
return errors

@pytest.mark.benchmark(min_rounds=10)
def test_read(self, benchmark):
errors = benchmark(self.phout_multi_read)
assert len(errors) == 0

@pytest.mark.benchmark(min_rounds=5)
def test_readline(self, benchmark):
errors = benchmark(self.phout_multi_readline)
assert len(errors) == 0
82 changes: 82 additions & 0 deletions yandextank/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pwd
import socket
import traceback

import http.client
import logging
import errno
Expand All @@ -13,6 +14,7 @@
import argparse

from paramiko import SSHClient, AutoAddPolicy
from retrying import retry

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -629,3 +631,83 @@ def tail_lines(filepath, lines_num, bufsize=8192):
return data[-lines_num:]
except (IOError, OSError):
return data


class FileLockedError(RuntimeError):
pass

@classmethod
def retry(cls, exception):
return isinstance(exception, cls)


class FileMultiReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50):
self.buffer = ""
self.filename = filename
self.closed = False
self.cache_size = cache_size
self._cursor_map = {}
self._is_locked = False

def __enter__(self):
self._opened_file = open(self.filename)
return self.get_reader

def __exit__(self, exc_type, exc_val, exc_tb):
self.wait_lock()
self._opened_file.close()
if exc_type:
msg = 'Exception occurred:\n{}: {}\n{}'.format(exc_type, exc_val, '\n'.join(traceback.format_tb(exc_tb)))
logger.error(msg)

def get_reader(self, cache_size=None):
cache_size = self.cache_size if not cache_size else cache_size
return FileLike(self, cache_size)

def read_with_lock(self, pos, _len):
self.wait_lock()
self._opened_file.seek(pos)
result = self._opened_file.read(_len)
stop_pos = self._opened_file.tell()
self.unlock()
return result, stop_pos

def readline_with_lock(self, pos):
self.wait_lock
self._opened_file.seek(pos)
result = self._opened_file.readline()
stop_pos = self._opened_file.tell()
self.unlock()
return result, stop_pos

@retry(wait_random_min=5, wait_random_max=20, stop_max_delay=10000,
retry_on_exception=FileLockedError.retry, wrap_exception=True)
def wait_lock(self):
if self._is_locked:
raise FileLockedError('Generator output file {} is locked'.format(self.filename))
else:
self._is_locked = True
return True

def unlock(self):
self._is_locked = False


class FileLike(object):
def __init__(self, multireader, cache_size):
"""
:type multireader: FileMultiReader
"""
self.multireader = multireader
self.cache_size = cache_size
self._cursor = 0

def read(self, _len=None):
_len = self.cache_size if not _len else _len
result, self._cursor = self.multireader.read_with_lock(self._cursor, _len)
return result

def readline(self):
result, self._cursor = self.multireader.readline_with_lock(self._cursor)
return result
2 changes: 1 addition & 1 deletion yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def load_plugins(self):
logger.info("Loading plugins...")
for (plugin_name, plugin_path, plugin_cfg) in self.config.plugins:
logger.debug("Loading plugin %s from %s", plugin_name, plugin_path)
if plugin_path is "yandextank.plugins.Overload":
if plugin_path == "yandextank.plugins.Overload":
logger.warning(
"Deprecated plugin name: 'yandextank.plugins.Overload'\n"
"There is a new generic plugin now.\n"
Expand Down
2 changes: 1 addition & 1 deletion yandextank/plugins/Console/screen.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def add(self, ts, key, value, color=''):
self.data[ts][key] = (color, value)

def get_sparkline(self, key, baseline='zero', spark_len='auto', align='right'):
if spark_len is 'auto':
if spark_len == 'auto':
spark_len = self.window
elif spark_len <= 0:
return ''
Expand Down
5 changes: 3 additions & 2 deletions yandextank/plugins/Phantom/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ def string_to_df_microsec(data):


class PhantomReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50, ready_file=False):
def __init__(self, filename, cache_size=1024 * 1024 * 50, ready_file=False, parser=string_to_df):
self.buffer = ""
self.phout = open(filename, 'r')
self.closed = False
self.cache_size = cache_size
self.ready_file = ready_file
self.parser = parser

def _read_phout_chunk(self):
data = self.phout.read(self.cache_size)
Expand All @@ -94,7 +95,7 @@ def _read_phout_chunk(self):
if len(parts) > 1:
ready_chunk = self.buffer + parts[0] + '\n'
self.buffer = parts[1]
return string_to_df(ready_chunk)
return self.parser(ready_chunk)
else:
self.buffer += parts[0]
else:
Expand Down
12 changes: 6 additions & 6 deletions yandextank/stepper/missile.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ def __init__(self, filename, use_cache=True, **kwargs):
def __iter__(self):
def read_chunk_header(ammo_file):
chunk_header = ''
while chunk_header is '':
while chunk_header == '':
line = ammo_file.readline()
if line is '':
if line == '':
return line
chunk_header = line.strip('\r\n')
return chunk_header
Expand All @@ -118,7 +118,7 @@ def read_chunk_header(ammo_file):
# if we got StopIteration here, the file is empty
chunk_header = read_chunk_header(ammo_file)
while chunk_header:
if chunk_header is not '':
if chunk_header != '':
try:
fields = chunk_header.split()
chunk_size = int(fields[0])
Expand Down Expand Up @@ -312,11 +312,11 @@ def __init__(self, filename, headers=None, http_ver='1.1', use_cache=True, **kwa
def __iter__(self):
def read_chunk_header(ammo_file):
chunk_header = ''
while chunk_header is '':
while chunk_header == '':
line = ammo_file.readline()
if line.startswith('['):
self.headers.update(_parse_header(line.strip('\r\n[]\t ')))
elif line is '':
elif line == '':
return line
else:
chunk_header = line.strip('\r\n')
Expand All @@ -328,7 +328,7 @@ def read_chunk_header(ammo_file):
# if we got StopIteration here, the file is empty
chunk_header = read_chunk_header(ammo_file)
while chunk_header:
if chunk_header is not '':
if chunk_header != '':
try:
fields = chunk_header.split()
chunk_size = int(fields[0])
Expand Down

0 comments on commit 32d23b3

Please sign in to comment.