Skip to content

Commit

Permalink
Merge pull request #711 from fomars/develop
Browse files Browse the repository at this point in the history
PhantomReader iterator; NeUploader draft
  • Loading branch information
fomars committed Mar 5, 2019
2 parents eb59c73 + 0b7c523 commit d73bb9d
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 19 deletions.
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
'yandextank.plugins.ResourceCheck': ['config/*'],
'yandextank.plugins.ShellExec': ['config/*'],
'yandextank.plugins.ShootExec': ['config/*'],
'yandextank.plugins.Telegraf': ['config/*']
'yandextank.plugins.Telegraf': ['config/*'],
'yandextank.plugins.NeUploader': ['config/*']
},
use_2to3=False, )
2 changes: 1 addition & 1 deletion yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(self, core, cfg, name):
"""
:param name:
:type core: TankCore
:type core: yandextank.core.TankCore
:type cfg: dict
"""
super(AbstractPlugin, self).__init__()
Expand Down
1 change: 1 addition & 0 deletions yandextank/plugins/NeUploader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .plugin import Plugin # noqa:F401
8 changes: 8 additions & 0 deletions yandextank/plugins/NeUploader/config/schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
api_address:
description: luna back url
type: string
default: https://volta-back-testing.common-int.yandex-team.ru/
test_name:
description: test name
type: string
default: my test
77 changes: 77 additions & 0 deletions yandextank/plugins/NeUploader/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
from netort.data_manager import DataSession

from yandextank.plugins.Phantom.reader import string_to_df_microsec
from ...common.interfaces import AbstractPlugin,\
MonitoringDataListener

logger = logging.getLogger(__name__) # pylint: disable=C0103


class Plugin(AbstractPlugin, MonitoringDataListener):
SECTION = 'neuploader'

def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self._is_telegraf = None
self.clients_cfg = [{'type': 'luna', 'api_address': self.cfg.get('api_address')}]

def configure(self):
pass

def start_test(self):
self.data_session = DataSession({'clients': self.clients_cfg})
self.data_session.update_job({'name': self.cfg.get('test_name')})
col_map_aggr = {name: 'metric %s' % name for name in
['interval_real', 'connect_time', 'send_time', 'latency',
'receive_time', 'interval_event']}
self.uploader = get_uploader(self.data_session, col_map_aggr, True)
self.reader = self.core.job.generator_plugin.get_reader(parser=string_to_df_microsec)

def is_test_finished(self):
self.uploader(next(self.reader))
return -1

def monitoring_data(self, data_list):
pass

def post_process(self, retcode):
for chunk in self.reader:
self.uploader(chunk)
self.data_session.close()
return retcode

@property
def is_telegraf(self):
return True


def get_uploader(data_session, column_mapping, overall_only=False):
"""
:type data_session: DataSession
"""

def get_router(tags):
"""
:param tags:
:return: {'%tag': {'%column_name': metric_object(name, group)}}
"""
router = {tag: {col_name: data_session.new_aggregated_metric(name + '-' + tag)
for col_name, name in column_mapping.items()} if not overall_only else {}
for tag in tags}
overall = {col_name: data_session.new_aggregated_metric(name + ' overall')
for col_name, name in column_mapping.items()}
return router, overall

def upload_df(df):
router, overall = get_router(df.tag.unique().tolist())
if len(router) > 0:
for tag, df_tagged in df.groupby('tag'):
for col_name, metric in router[tag].items():
df_tagged['value'] = df_tagged[col_name]
metric.put(df_tagged)
for col_name, metric in overall.items():
df['value'] = df[col_name]
metric.put(df)
return upload_df
6 changes: 3 additions & 3 deletions yandextank/plugins/Pandora/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .reader import PandoraStatsReader
from ..Console import Plugin as ConsolePlugin
from ..Console import screen as ConsoleScreen
from ..Phantom import PhantomReader
from ..Phantom import PhantomReader, string_to_df
from ...common.interfaces import AbstractInfoWidget, GeneratorPlugin
from ...common.util import tail_lines, FileMultiReader

Expand Down Expand Up @@ -151,10 +151,10 @@ def __find_report_filename(self):
return report_filename
return self.DEFAULT_REPORT_FILE

def get_reader(self):
def get_reader(self, parser=string_to_df):
if self.reader is None:
self.reader = FileMultiReader(self.sample_log)
return PhantomReader(self.reader.get_file())
return PhantomReader(self.reader.get_file(), parser=parser)

def get_stats_reader(self):
if self.stats_reader is None:
Expand Down
6 changes: 3 additions & 3 deletions yandextank/plugins/Phantom/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
import time

from .reader import PhantomReader, PhantomStatsReader
from .reader import PhantomReader, PhantomStatsReader, string_to_df
from .utils import PhantomConfig
from .widget import PhantomInfoWidget, PhantomProgressBarWidget
from ..Autostop import Plugin as AutostopPlugin
Expand Down Expand Up @@ -85,10 +85,10 @@ def stat_log(self):
self._stat_log = self.core.mkstemp(".log", "phantom_stat_")
return self._stat_log

def get_reader(self):
def get_reader(self, parser=string_to_df):
if self.reader is None:
self.reader = FileMultiReader(self.phantom.phout_file)
return PhantomReader(self.reader.get_file())
return PhantomReader(self.reader.get_file(), parser=parser)

def get_stats_reader(self):
if self.stats_reader is None:
Expand Down
34 changes: 23 additions & 11 deletions yandextank/plugins/Phantom/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,30 @@ def _read_phout_chunk(self):
return None

def __iter__(self):
while not self.closed:
yield self._read_phout_chunk()
# read end
return self
# while not self.closed:
# yield self._read_phout_chunk()
# # read end
# chunk = self._read_phout_chunk()
# while chunk is not None:
# yield chunk
# chunk = self._read_phout_chunk()
# # don't forget the buffer
# if self.buffer:
# yield self.parser(self.buffer)
# self.phout.close()

def next(self):
chunk = self._read_phout_chunk()
while chunk is not None:
yield chunk
chunk = self._read_phout_chunk()
# don't forget the buffer
if self.buffer:
yield string_to_df(self.buffer)

self.phout.close()
if self.closed and chunk is None:
if self.buffer:
buff, self.buffer = self.buffer, ''
return self.parser(buff)
else:
self.phout.close()
raise StopIteration
else:
return chunk

def close(self):
self.closed = True
Expand Down

0 comments on commit d73bb9d

Please sign in to comment.