Skip to content

Commit

Permalink
Merge pull request #698 from fomars/develop
Browse files Browse the repository at this point in the history
logging, aggregator fix, phantom reader microsec, netort>=2.8
  • Loading branch information
fomars committed Dec 27, 2018
2 parents a518e5f + cb9127e commit eb7d4f5
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 10 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
'psutil>=1.2.1', 'requests>=2.5.1', 'paramiko>=1.16.0',
'pandas>=0.18.0', 'numpy>=1.12.1', 'future>=0.16.0',
'pip>=8.1.2',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0', 'netort==0.2.8',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0', 'netort>=0.2.8',
],
setup_requires=[
],
Expand Down
8 changes: 2 additions & 6 deletions yandextank/aggregator/tank_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,11 @@ def end_test(self, retcode):
self.stats_reader.close()
if self.drain:
logger.debug('Waiting for gun drain to finish')
self.drain.wait()
self.drain.join()
logger.debug('Waiting for stats drain to finish')
self.stats_drain.wait()
self.stats_drain.join()
logger.debug('Collecting remaining data')
self._collect_data(end=True)
if self.drain:
self.drain.join()
self.stats_drain.join()

return retcode

def add_result_listener(self, listener):
Expand Down
3 changes: 2 additions & 1 deletion yandextank/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def main():

ammofile = ammofiles[0] if len(ammofiles) > 0 else None

init_logging(options.error_log, options.verbose, options.quiet)
handlers = init_logging(options.error_log, options.verbose, options.quiet)

cli_kwargs = {'core': {'lock_dir': options.lock_dir}} if options.lock_dir else {}
if options.ignore_lock:
Expand All @@ -127,6 +127,7 @@ def main():
[cli_kwargs],
options.no_rc,
ammo_file=ammofile if ammofile else None,
log_handlers=handlers
)
except ValidationError as e:
logging.error('Config validation error:\n{}'.format(e.errors))
Expand Down
8 changes: 7 additions & 1 deletion yandextank/core/consoleworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ..config_converter.converter import convert_ini, convert_single_option

DEFAULT_CONFIG = 'load.yaml'
logger = logging.getLogger('yandextank')
logger = logging.getLogger()


class RealConsoleMarkup(object):
Expand Down Expand Up @@ -429,6 +429,7 @@ def init_logging(self, debug=False):
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s %(filename)s:%(lineno)d\t%(message)s"))
file_handler.addFilter(TankapiLogFilter())
logger.addHandler(file_handler)
logger.info("Log file created")

Expand Down Expand Up @@ -458,6 +459,11 @@ def set_msg(self, msg):
self.msg = msg


class TankapiLogFilter(logging.Filter):
def filter(self, record):
return record.name != 'tankapi'


class DevNullOpts:
def __init__(self):
pass
Expand Down
3 changes: 3 additions & 0 deletions yandextank/plugins/DataUploader/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,6 @@ def link_mobile_job(self, lp_key, mobile_key):

def push_events_data(self, number, token, data):
return

def set_imbalance_and_dsc(self, **kwargs):
return
16 changes: 16 additions & 0 deletions yandextank/plugins/Phantom/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ def string_to_df(data):
return chunk


def string_to_df_microsec(data):
# start_time = time.time()
try:
df = pd.read_csv(StringIO(data), sep='\t', names=phout_columns, dtype=dtypes, quoting=QUOTE_NONE)
except CParserError as e:
logger.error(e.message)
logger.error('Incorrect phout data: {}'.format(data))
return

df['ts'] = (df['send_ts'] * 1e6 + df['interval_real']).astype(int)
df['ts'] -= df["ts"][0]
df['tag'] = df.tag.str.rsplit('#', 1, expand=True)[0]
# logger.debug("Chunk decode time: %.2fms", (time.time() - start_time) * 1000)
return df


class PhantomReader(object):
def __init__(self, filename, cache_size=1024 * 1024 * 50, ready_file=False):
self.buffer = ""
Expand Down
Binary file added yandextank/plugins/Phantom/tests/expected_df.dat
Binary file not shown.
9 changes: 8 additions & 1 deletion yandextank/plugins/Phantom/tests/test_reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from yandextank.plugins.Phantom.reader import PhantomReader, PhantomStatsReader
from yandextank.plugins.Phantom.reader import PhantomReader, PhantomStatsReader, string_to_df_microsec


class TestPhantomReader(object):
Expand All @@ -24,6 +24,13 @@ def test_reader_closed(self):
assert len(result) == 200
assert (result['interval_real'].mean() == 11000714.0)

def test_reader_us(self):
with open('yandextank/plugins/Phantom/tests/phout.dat') as f:
chunk = f.read()
result = string_to_df_microsec(chunk)
expected = pd.read_pickle('yandextank/plugins/Phantom/tests/expected_df.dat')
assert result.equals(expected)


class MockInfo(object):
def __init__(self, steps):
Expand Down

0 comments on commit eb7d4f5

Please sign in to comment.