Skip to content

Commit

Permalink
Merge pull request #585 from fomars/develop
Browse files Browse the repository at this point in the history
YASM plugin
  • Loading branch information
fomars committed May 11, 2018
2 parents fc091f5 + 7ecfec0 commit e6baa98
Show file tree
Hide file tree
Showing 12 changed files with 300 additions and 74 deletions.
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[pytest]
norecursedirs = build dist .eggs .tox .env tmp .env3
testpaths = yandextank/validator
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name='yandextank',
version='1.9.13',
version='1.10.0',
description='a performance measurement tool',
longer_description='''
Yandex.Tank is a performance measurement and load testing automatization tool.
Expand All @@ -19,7 +19,7 @@
'psutil>=1.2.1', 'requests>=2.5.1', 'paramiko>=1.16.0',
'pandas>=0.18.0', 'numpy>=1.12.1', 'future>=0.16.0',
'pip>=8.1.2',
'pyyaml>=3.12', 'cerberus==1.1', 'influxdb>=5.0.0',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0',
'netort>=0.0.11'
],
setup_requires=[
Expand Down Expand Up @@ -64,6 +64,7 @@
'yandextank.plugins.Influx': ['config/*'],
'yandextank.plugins.JMeter': ['config/*'],
'yandextank.plugins.JsonReport': ['config/*'],
'yandextank.plugins.YASM': ['config/*'],
'yandextank.plugins.Pandora': ['config/*'],
'yandextank.plugins.Phantom': ['config/*'],
'yandextank.plugins.RCAssert': ['config/*'],
Expand Down
12 changes: 11 additions & 1 deletion yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def close(self):


class MonitoringDataListener(object):
""" Monitoring interface
""" Monitoring listener interface
parent class for Monitoring data listeners"""

def __init__(self):
Expand Down Expand Up @@ -244,3 +244,13 @@ def stats_item(ts, instances, rps):
'reqps': rps
}
}


class MonitoringPlugin(AbstractPlugin):

def __init__(self, core, cfg):
super(MonitoringPlugin, self).__init__(core, cfg)
self.listeners = []

def add_listener(self, plugin):
self.listeners.append(plugin)
34 changes: 18 additions & 16 deletions yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
from builtins import str

from yandextank.common.exceptions import PluginNotPrepared
from yandextank.common.interfaces import GeneratorPlugin
from yandextank.common.interfaces import GeneratorPlugin, MonitoringPlugin
from yandextank.validator.validator import TankConfig, ValidationError
from yandextank.aggregator import TankAggregator
from ..common.util import update_status, pid_exists
from ..plugins.Telegraf import Plugin as TelegrafPlugin

from netort.resource import manager as resource
from netort.process import execute
Expand All @@ -42,26 +41,25 @@
class Job(object):
def __init__(
self,
monitoring_plugin,
monitoring_plugins,
aggregator,
tank,
generator_plugin=None):
"""
:type aggregator: TankAggregator
:type monitoring_plugins: list of
"""
self.monitoring_plugin = monitoring_plugin
self.monitoring_plugins = monitoring_plugins
self.aggregator = aggregator
self.tank = tank
self._phantom_info = None
self.generator_plugin = generator_plugin

def subscribe_plugin(self, plugin):
self.aggregator.add_result_listener(plugin)
try:
self.monitoring_plugin.monitoring.add_listener(plugin)
except AttributeError:
logging.info('Monitoring plugin is not enabled')
for monitoring_plugin in self.monitoring_plugins:
monitoring_plugin.add_listener(plugin)

@property
def phantom_info(self):
Expand Down Expand Up @@ -206,20 +204,16 @@ def load_plugins(self):
def job(self):
if not self._job:
# monitoring plugin
try:
mon = self.get_plugin_of_type(TelegrafPlugin)
except KeyError:
logger.debug("Telegraf plugin not found:", exc_info=True)
mon = None
monitorings = [plugin for plugin in self.plugins.values() if isinstance(plugin, MonitoringPlugin)]
# generator plugin
try:
gen = self.get_plugin_of_type(GeneratorPlugin)
except KeyError:
logger.warning("Load generator not found")
gen = GeneratorPlugin()
gen = GeneratorPlugin(self, {}, None)
# aggregator
aggregator = TankAggregator(gen)
self._job = Job(monitoring_plugin=mon,
self._job = Job(monitoring_plugins=monitorings,
generator_plugin=gen,
aggregator=aggregator,
tank=socket.getfqdn())
Expand Down Expand Up @@ -295,7 +289,15 @@ def plugins_end_test(self, retcode):
logger.info("Stopping load generator and aggregator")
retcode = self.job.aggregator.end_test(retcode)
logger.debug("RC after: %s", retcode)
for plugin in [p for p in self.plugins.values() if p is not self.job.generator_plugin]:

logger.info('Stopping monitoring')
for plugin in self.job.monitoring_plugins:
logger.info('Stopping %s', plugin)
retcode = plugin.end_test(retcode)
logger.info('RC after: %s', retcode)

for plugin in [p for p in self.plugins.values() if
p is not self.job.generator_plugin and p not in self.job.monitoring_plugins]:
logger.debug("Finalize %s", plugin)
try:
logger.debug("RC before: %s", retcode)
Expand Down
22 changes: 13 additions & 9 deletions yandextank/plugins/DataUploader/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __make_api_request(
data=None,
response_callback=lambda x: x,
writer=False,
interrupted_event=None,
trace=False,
json=None,
maintenance_timeouts=None,
Expand All @@ -184,7 +185,7 @@ def __make_api_request(
network_timeouts = self.network_timeouts()
maintenance_timeouts = maintenance_timeouts or self.maintenance_timeouts()
maintenance_msg = maintenance_msg or "%s is under maintenance" % (self._base_url)
while True:
while interrupted_event is None or not interrupted_event.is_set():
try:
response = self.__send_single_request(request, ids.next(), trace=trace)
return response_callback(response)
Expand Down Expand Up @@ -264,16 +265,17 @@ def __get(self, addr, trace=False, maintenance_timeouts=None, maintenance_msg=No
maintenance_msg=maintenance_msg
)

def __post_raw(self, addr, txt_data, trace=False):
def __post_raw(self, addr, txt_data, trace=False, interrupted_event=None):
return self.__make_api_request(
'POST', addr, txt_data, lambda r: r.content, trace=trace)
'POST', addr, txt_data, lambda r: r.content, trace=trace, interrupted_event=interrupted_event)

def __post(self, addr, data, trace=False):
def __post(self, addr, data, interrupted_event=None, trace=False):
return self.__make_api_request(
'POST',
addr,
json=data,
response_callback=lambda r: r.json(),
interrupted_event=interrupted_event,
trace=trace)

def __put(self, addr, data, trace=False):
Expand Down Expand Up @@ -344,7 +346,7 @@ def new_job(
raise self.JobNotCreated('Failed to create job on lunapark\n{}'.format(e.response.content))
except Exception as e:
logger.warn('Failed to create job on lunapark')
logger.warn(e.message)
logger.warn(repr(e), )
raise self.JobNotCreated()

def get_job_summary(self, jobno):
Expand Down Expand Up @@ -471,6 +473,7 @@ def push_test_data(
upload_token,
data_item,
stat_item,
interrupted_event,
trace=False):
items = []
uri = 'api/job/{0}/push_data.json?upload_token={1}'.format(
Expand All @@ -489,7 +492,7 @@ def push_test_data(
items.append(overall)

api_timeouts = self.api_timeouts()
while True:
while not interrupted_event.is_set():
try:
if self.writer_url:
res = self.__make_writer_request(
Expand All @@ -504,7 +507,7 @@ def push_test_data(
logger.debug("Writer response: %s", res.text)
return res.json()["success"]
else:
res = self.__post(uri, items, trace=trace)
res = self.__post(uri, items, interrupted_event, trace=trace)
logger.debug("API response: %s", res)
success = int(res[0]['success'])
return success
Expand All @@ -522,12 +525,13 @@ def push_monitoring_data(
jobno,
upload_token,
send_data,
interrupted_event,
trace=False):
if send_data:
addr = "api/monitoring/receiver/push?job_id=%s&upload_token=%s" % (
jobno, upload_token)
api_timeouts = self.api_timeouts()
while True:
while not interrupted_event.is_set():
try:
if self.writer_url:
res = self.__make_writer_request(
Expand All @@ -543,7 +547,7 @@ def push_monitoring_data(
return res.json()["success"]
else:
res = self.__post_raw(
addr, json.dumps(send_data), trace=trace)
addr, json.dumps(send_data), trace=trace, interrupted_event=interrupted_event)
logger.debug("API response: %s", res)
success = res == 'ok'
return success
Expand Down

0 comments on commit e6baa98

Please sign in to comment.