Skip to content

Commit

Permalink
Merge pull request #751 from fomars/develop
Browse files Browse the repository at this point in the history
NeUpload monitoring, BFG compatible
  • Loading branch information
fomars committed Jun 5, 2019
2 parents 47c6099 + be52899 commit fef677f
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 5 deletions.
4 changes: 2 additions & 2 deletions yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class MonitoringPlugin(AbstractPlugin):

def __init__(self, core, cfg, name):
super(MonitoringPlugin, self).__init__(core, cfg, name)
self.listeners = []
self.listeners = set()

def add_listener(self, plugin):
self.listeners.append(plugin)
self.listeners.add(plugin)
1 change: 1 addition & 0 deletions yandextank/core/consoleworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ class TankWorker(Thread):
def __init__(self, configs, cli_options=None, cfg_patches=None, cli_args=None, no_local=False,
log_handlers=None, wait_lock=False, files=None, ammo_file=None, api_start=False):
super(TankWorker, self).__init__()
self.daemon = True
self.api_start = api_start
self.wait_lock = wait_lock
self.log_handlers = log_handlers if log_handlers is not None else []
Expand Down
4 changes: 3 additions & 1 deletion yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from builtins import str

from yandextank.common.exceptions import PluginNotPrepared
from yandextank.common.interfaces import GeneratorPlugin, MonitoringPlugin
from yandextank.common.interfaces import GeneratorPlugin, MonitoringPlugin, MonitoringDataListener
from yandextank.plugins.DataUploader.client import LPRequisites
from yandextank.validator.validator import TankConfig, ValidationError
from yandextank.aggregator import TankAggregator
Expand Down Expand Up @@ -243,6 +243,8 @@ def plugins_configure(self):
if not self.interrupted.is_set():
logger.debug("Configuring %s", plugin)
plugin.configure()
if isinstance(plugin, MonitoringDataListener):
[mon.add_listener(plugin) for mon in self.job.monitoring_plugins]

def plugins_prepare_test(self):
""" Call prepare_test() on all plugins """
Expand Down
2 changes: 1 addition & 1 deletion yandextank/plugins/Bfg/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _write_results_into_file(self):
'interval_event', 'size_out', 'size_in', 'net_code', 'proto_code']
for entry in reader:
if entry is not None:
entry.receive_ts = round(entry.receive_ts, 3)
entry.receive_ts = entry.receive_ts.round(3)
with open(self.report_filename, 'a') as report_file:
report_file.write(entry.to_csv(index=False, header=False, sep='\t', columns=columns))
time.sleep(0.1)
Expand Down
32 changes: 31 additions & 1 deletion yandextank/plugins/NeUploader/plugin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
""" Plugin uploading metrics from yhttps://wiki.yandex-team.ru/hr/gor/moebius/andextank to Luna. """
import logging

import pandas
from netort.data_manager import DataSession

from yandextank.plugins.Phantom.reader import string_to_df_microsec
Expand All @@ -19,6 +21,7 @@ def __init__(self, core, cfg, name):
'api_address': self.cfg.get('api_address'),
'db_name': self.cfg.get('db_name')}]
self.metrics_objs = {} # map of case names and metric objects
self.monitoring_metrics = {}

def configure(self):
pass
Expand Down Expand Up @@ -57,7 +60,7 @@ def is_test_finished(self):
return -1

def monitoring_data(self, data_list):
pass
self.upload_monitoring(data_list)

def post_process(self, retcode):
for chunk in self.reader:
Expand Down Expand Up @@ -102,6 +105,33 @@ def upload(self, df):
# result_df = self.filter_df_by_case(df, case_name)
# case_metric_obj.put(result_df)

def upload_monitoring(self, data):
for metric_name, df in self.monitoring_data_to_dfs(data).items():
if metric_name not in self.monitoring_metrics:
panel, metric = metric_name.split(':', 1)
self.monitoring_metrics[metric_name] = self.data_session.new_true_metric(metric,
group='monitoring',
host=panel)
self.monitoring_metrics[metric_name].put(df)

@staticmethod
def monitoring_data_to_dfs(data):
panels = {}
for chunk in data:
for panel_name, content in chunk['data'].items():
if panel_name in panels:
for metric_name, value in content['metrics'].items():
if metric_name in panels[panel_name]:
panels[panel_name][metric_name]['value'].append(value)
panels[panel_name][metric_name]['ts'].append(chunk['timestamp'])
else:
panels[panel_name][metric_name] = {'value': [value], 'ts': [chunk['timestamp']]}
else:
panels[panel_name] = {name: {'value': [value], 'ts': [chunk['timestamp']]} for name, value in content['metrics'].items()}
return {'{}:{}'.format(panelk, name): pandas.DataFrame({'ts': values['ts'], 'value': values['value']})
for panelk, panelv in panels.items() for name, values in panelv.items()}
# return {pandas.DataFrame({'ts': [], 'value': []})] * sum([len(metrics) for metrics in panels.values()})

@staticmethod
def filter_df_by_case(df, case):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
[
{
"timestamp": 1559212915,
"data": {
"panel1": {
"comment": "",
"metrics": {
"custom:portoinst-cpu_guarantee_cores_tmmv": 0.25,
"custom:portoinst-memory_usage_gb_tmmv": 0.045146942138671875,
"custom:portoinst-net_limit_mb_summ": 10,
"custom:portoinst-memory_limit_gb_tmmv": 1,
"custom:portoinst-net_guarantee_mb_summ": 2.5,
"custom:portoinst-io_write_fs_bytes_tmmv": 0,
"custom:unistat-auto_disk_rootfs_total_bytes_axxx": 5,
"custom:unistat-auto_disk_rootfs_usage_bytes_axxx": 0.0001068115234375,
"custom:portoinst-cpu_wait_cores_tmmv": 1.1546799999706536e-05,
"custom:portoinst-io_limit_bytes_tmmv": 52428800,
"custom:portoinst-net_mb_summ": 0.009309768676757812,
"custom:portoinst-io_read_fs_bytes_tmmv": 0,
"custom:portoinst-cpu_usage_cores_tmmv": 0.0043661741997311765,
"custom:portoinst-cpu_limit_cores_tmmv": 0.25
}
}
}
},
{
"timestamp": 1559212915,
"data": {
"panel2": {
"comment": "",
"metrics": {
"custom:portoinst-cpu_guarantee_cores_tmmv": 7.5,
"custom:portoinst-memory_usage_gb_tmmv": 36.21156692504883,
"custom:portoinst-io_read_fs_bytes_tmmv": 4096,
"custom:portoinst-memory_limit_gb_tmmv": 50,
"custom:portoinst-net_guarantee_mb_summ": 75,
"custom:portoinst-io_write_fs_bytes_tmmv": 7720960,
"custom:portoinst-cpu_wait_cores_tmmv": 0.6967493196016221,
"custom:portoinst-io_limit_bytes_tmmv": 655360000,
"custom:portoinst-net_mb_summ": 0.26386165618896484,
"custom:portoinst-net_limit_mb_summ": 640,
"custom:portoinst-cpu_usage_cores_tmmv": 1.1054252468020422,
"custom:portoinst-cpu_limit_cores_tmmv": 10
}
}
}
},
{
"timestamp": 1559212915,
"data": {
"myt1": {
"comment": "",
"metrics": {
"custom:portoinst-cpu_guarantee_cores_tmmv": 7,
"custom:portoinst-memory_usage_gb_tmmv": 23.724361419677734,
"custom:portoinst-io_read_fs_bytes_tmmv": 0,
"custom:portoinst-memory_limit_gb_tmmv": 24,
"custom:portoinst-net_guarantee_mb_summ": 120,
"custom:portoinst-io_write_fs_bytes_tmmv": 0,
"custom:unistat-auto_disk_rootfs_total_bytes_axxx": 200,
"custom:unistat-auto_disk_rootfs_usage_bytes_axxx": 23.592609405517578,
"custom:portoinst-cpu_wait_cores_tmmv": 3.422800546104554e-06,
"custom:portoinst-io_limit_bytes_tmmv": 52428800,
"custom:portoinst-net_mb_summ": 0.0017881393432617188,
"custom:portoinst-net_limit_mb_summ": 480,
"custom:portoinst-cpu_usage_cores_tmmv": 0.005141159399499884,
"custom:portoinst-cpu_limit_cores_tmmv": 7
}
}
}
},
{
"timestamp": 1559212915,
"data": {
"man2": {
"comment": "",
"metrics": {
"custom:portoinst-cpu_guarantee_cores_tmmv": 2,
"custom:portoinst-memory_usage_gb_tmmv": 1.6009635925292969,
"custom:portoinst-io_read_fs_bytes_tmmv": 0,
"custom:portoinst-memory_limit_gb_tmmv": 8,
"custom:portoinst-net_guarantee_mb_summ": 20,
"custom:portoinst-io_write_fs_bytes_tmmv": 0,
"custom:unistat-auto_disk_rootfs_total_bytes_axxx": 60,
"custom:unistat-auto_disk_rootfs_usage_bytes_axxx": 0.14423751831054688,
"custom:portoinst-cpu_wait_cores_tmmv": 5.069680014457845e-05,
"custom:portoinst-io_limit_bytes_tmmv": 52428800,
"custom:portoinst-net_mb_summ": 0.018650054931640625,
"custom:portoinst-net_limit_mb_summ": 80,
"custom:portoinst-cpu_usage_cores_tmmv": 0.02161935000003723,
"custom:portoinst-cpu_limit_cores_tmmv": 2
}
}
}
}
]
26 changes: 26 additions & 0 deletions yandextank/plugins/NeUploader/tests/test_neuploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import pytest
import json
from yandextank.plugins.NeUploader.plugin import Plugin


class TestMonitoringData(object):

@pytest.mark.parametrize('mon_data, length', [
('yandextank/plugins/NeUploader/tests/monitoring_data/monitoring1.json', 54),
])
def test_df_num_and_cols(self, mon_data, length):
with open(mon_data) as f:
jsondata = json.load(f)
dfs = Plugin.monitoring_data_to_dfs(jsondata)
assert len(dfs) == length
assert all([list(df.columns) == ['ts', 'value'] for df in dfs.values()])

@pytest.mark.parametrize('mon_data, names', [
('yandextank/plugins/NeUploader/tests/monitoring_data/monitoring1.json',
()),
])
def test_metrics_names(self, mon_data, names):
with open(mon_data) as f:
jsondata = json.load(f)
dfs = Plugin.monitoring_data_to_dfs(jsondata)
assert set(dfs.keys()) == {'{}:{}'.format(panelk, name) for i in jsondata for panelk, panelv in i['data'].items() for name in panelv['metrics'].keys()}

0 comments on commit fef677f

Please sign in to comment.