Skip to content

Commit

Permalink
Merge pull request #693 from fomars/release
Browse files Browse the repository at this point in the history
 uploader backend recognition, uploader retry and interrupt
  • Loading branch information
fomars committed Dec 18, 2018
2 parents 112c1d5 + cab8802 commit fc93c97
Show file tree
Hide file tree
Showing 25 changed files with 79 additions and 68 deletions.
12 changes: 7 additions & 5 deletions yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ def get_key():
raise TypeError("Abstract method needs to be overridden")

# TODO: do we realy need cfg_updater here?
def __init__(self, core, cfg):
def __init__(self, core, cfg, name):
"""
:param name:
:type core: TankCore
:type cfg: dict
"""
Expand All @@ -25,6 +26,7 @@ def __init__(self, core, cfg):
self.log = logging.getLogger(__name__)
self.core = core
self.cfg = cfg
self.cfg_section_name = name
self.interrupted = self.core.interrupted

def set_option(self, option, value):
Expand Down Expand Up @@ -207,8 +209,8 @@ class GeneratorPlugin(AbstractPlugin):
'loop_count': 0
}

def __init__(self, core, cfg):
super(GeneratorPlugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(GeneratorPlugin, self).__init__(core, cfg, name)
self.stats_reader = None
self.reader = None
self.process = None
Expand Down Expand Up @@ -265,8 +267,8 @@ def stats_item(ts, instances, rps):

class MonitoringPlugin(AbstractPlugin):

def __init__(self, core, cfg):
super(MonitoringPlugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(MonitoringPlugin, self).__init__(core, cfg, name)
self.listeners = []

def add_listener(self, plugin):
Expand Down
2 changes: 1 addition & 1 deletion yandextank/core/consoleworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def __init__(self, configs, cli_options=None, cfg_patches=None, cli_args=None, n

self.interrupted = Event()
self.config_list = self._combine_configs(configs, cli_options, cfg_patches, cli_args, no_local)
self.core = TankCore(self.config_list)
self.core = TankCore(self.config_list, self.interrupted)
self.status = Status.TEST_INITIATED
self.test_id = self.core.test_id
self.retcode = None
Expand Down
20 changes: 10 additions & 10 deletions yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ class TankCore(object):
UUID_OPTION = 'uuid'
API_JOBNO = 'api_jobno'

def __init__(self, configs, artifacts_base_dir=None, artifacts_dir_name=None):
def __init__(self, configs, interrupted_event, artifacts_base_dir=None, artifacts_dir_name=None):
"""
:param configs: list of dict
:param interrupted_event: threading.Event
"""
self.output = {}
self.raw_configs = configs
Expand All @@ -114,7 +115,7 @@ def __init__(self, configs, artifacts_base_dir=None, artifacts_dir_name=None):
self._job = None
self._cfg_snapshot = None

self.interrupted = False
self.interrupted = interrupted_event

self.error_log = None

Expand Down Expand Up @@ -197,7 +198,7 @@ def load_plugins(self):
logger.debug('Plugin name %s path %s import error', plugin_name, plugin_path, exc_info=True)
raise
try:
instance = getattr(plugin, 'Plugin')(self, cfg=plugin_cfg)
instance = getattr(plugin, 'Plugin')(self, cfg=plugin_cfg, name=plugin_name)
except AttributeError:
logger.warning('Plugin %s classname should be `Plugin`', plugin_name)
raise
Expand All @@ -215,7 +216,7 @@ def job(self):
gen = self.get_plugin_of_type(GeneratorPlugin)
except KeyError:
logger.warning("Load generator not found")
gen = GeneratorPlugin(self, {})
gen = GeneratorPlugin(self, {}, 'generator dummy')
# aggregator
aggregator = TankAggregator(gen)
self._job = Job(monitoring_plugins=monitorings,
Expand All @@ -234,7 +235,7 @@ def plugins_configure(self):
self.__setup_taskset(self.taskset_affinity, pid=os.getpid())

for plugin in self.plugins.values():
if not self.interrupted:
if not self.interrupted.is_set():
logger.debug("Configuring %s", plugin)
plugin.configure()

Expand All @@ -243,13 +244,13 @@ def plugins_prepare_test(self):
logger.info("Preparing test...")
self.publish("core", "stage", "prepare")
for plugin in self.plugins.values():
if not self.interrupted:
if not self.interrupted.is_set():
logger.debug("Preparing %s", plugin)
plugin.prepare_test()

def plugins_start_test(self):
""" Call start_test() on all plugins """
if not self.interrupted:
if not self.interrupted.is_set():
logger.info("Starting test...")
self.publish("core", "stage", "start")
self.job.aggregator.start_test()
Expand All @@ -264,14 +265,14 @@ def wait_for_finish(self):
"""
Call is_test_finished() on all plugins 'till one of them initiates exit
"""
if not self.interrupted:
if not self.interrupted.is_set():
logger.info("Waiting for test to finish...")
logger.info('Artifacts dir: {dir}'.format(dir=self.artifacts_dir))
self.publish("core", "stage", "shoot")
if not self.plugins:
raise RuntimeError("It's strange: we have no plugins loaded...")

while not self.interrupted:
while not self.interrupted.is_set():
begin_time = time.time()
aggr_retcode = self.job.aggregator.is_test_finished()
if aggr_retcode >= 0:
Expand Down Expand Up @@ -337,7 +338,6 @@ def plugins_post_process(self, retcode):

def interrupt(self):
logger.warning('Interrupting')
self.interrupted = True

def __setup_taskset(self, affinity, pid=None, args=None):
""" if pid specified: set process w/ pid `pid` CPU affinity to specified `affinity` core(s)
Expand Down
8 changes: 5 additions & 3 deletions yandextank/core/tests/test_tankcore.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import glob
import logging
import os
import threading

import pytest
import sys
Expand Down Expand Up @@ -110,7 +111,8 @@ def setup_module(module):
)
])
def test_core_load_plugins(config, expected):
core = TankCore(configs=[load_yaml(os.path.join(os.path.dirname(__file__), '../config'), '00-base.yaml'), config])
core = TankCore([load_yaml(os.path.join(os.path.dirname(__file__), '../config'), '00-base.yaml'), config],
threading.Event())
core.load_plugins()
assert set(core.plugins.keys()) == expected

Expand All @@ -119,7 +121,7 @@ def test_core_load_plugins(config, expected):
(CFG1, None)
])
def test_core_plugins_configure(config, expected):
core = TankCore(configs=[config])
core = TankCore([config], threading.Event())
core.plugins_configure()


Expand All @@ -129,7 +131,7 @@ def test_core_plugins_configure(config, expected):
(CFG_MULTI, None)
])
def test_plugins_prepare_test(config, expected):
core = TankCore(configs=[config])
core = TankCore([config], threading.Event())
core.plugins_prepare_test()


Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Android/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ class Plugin(AbstractPlugin):
SECTION = "android"
SECTION_META = "meta"

def __init__(self, core, cfg):
def __init__(self, core, cfg, name):
self.stats_reader = None
self.reader = None
super(Plugin, self).__init__(core, cfg)
super(Plugin, self).__init__(core, cfg, name)
self.device = None
try:
self.cfg = cfg['volta_options']
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Autostop/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ class Plugin(AbstractPlugin, AggregateResultListener):
""" Plugin that accepts criterion classes and triggers autostop """
SECTION = 'autostop'

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
AggregateResultListener.__init__(self)

self.cause_criterion = None
Expand Down
2 changes: 1 addition & 1 deletion yandextank/plugins/Bfg/guns.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class GunConfigError(Exception):

class AbstractGun(AbstractPlugin):
def __init__(self, core, cfg):
super(AbstractGun, self).__init__(core, cfg)
super(AbstractGun, self).__init__(core, cfg, 'bfg_gun')
self.results = None

@contextmanager
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Bfg/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ class Plugin(GeneratorPlugin):
""" Big Fucking Gun plugin """
SECTION = 'bfg'

def __init__(self, core, cfg):
super(Plugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self._bfg = None
self.log = logging.getLogger(__name__)
self.gun_type = None
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Console/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ class Plugin(AbstractPlugin, AggregateResultListener):
''' Console plugin '''
SECTION = 'console'

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
self.log = logging.getLogger(__name__)
self.screen = None
self.render_exception = None
Expand Down
18 changes: 11 additions & 7 deletions yandextank/plugins/DataUploader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class BackendTypes(object):
LUNAPARK = 'LUNAPARK'

@classmethod
def identify_backend(cls, api_address):
def identify_backend(cls, api_address, cfg_section_name):
clues = [
('overload', cls.OVERLOAD),
('lunapark', cls.LUNAPARK),
Expand All @@ -44,9 +44,13 @@ def identify_backend(cls, api_address):
if clue in api_address:
return backend_type
else:
raise KeyError(
'Config section name doesn\'t match any of the patterns:\n%s' %
'\n'.join(['*%s*' % ptrn[0] for ptrn in clues]))
for clue, backend_type in clues:
if clue in cfg_section_name:
return backend_type
else:
raise KeyError(
'Can not identify backend: neither api address nor section name match any of the patterns:\n%s' %
'\n'.join(['*%s*' % ptrn[0] for ptrn in clues]))


def chop(data_list, chunk_size):
Expand All @@ -66,8 +70,8 @@ class Plugin(AbstractPlugin, AggregateResultListener,
VERSION = '3.0'
SECTION = 'uploader'

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
self.data_queue = Queue()
self.monitoring_queue = Queue()
if self.core.error_log:
Expand Down Expand Up @@ -96,7 +100,7 @@ def __init__(self, core, cfg):
self.monitoring.daemon = True

self._is_telegraf = None
self.backend_type = BackendTypes.identify_backend(self.cfg['api_address'])
self.backend_type = BackendTypes.identify_backend(self.cfg['api_address'], self.cfg_section_name)
self._task = None
self._api_token = ''
self._lp_job = None
Expand Down
13 changes: 7 additions & 6 deletions yandextank/plugins/DataUploader/tests/test_uploader_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

class TestBackendTypes(object):

@pytest.mark.parametrize('api_address, expected_type', [
('lunapark.foo-bar.ru', BackendTypes.LUNAPARK),
('lunapark.test.foo-bar.ru', BackendTypes.LUNAPARK),
('overload.yandex.net', BackendTypes.OVERLOAD),
@pytest.mark.parametrize('api_address, section_name, expected_type', [
('lunapark.foo-bar.ru', 'uploader', BackendTypes.LUNAPARK),
('lunapark.test.foo-bar.ru', 'overload', BackendTypes.LUNAPARK),
('overload.yandex.net', 'uploade', BackendTypes.OVERLOAD),
('localhost', 'lunapark', BackendTypes.LUNAPARK)
])
def test_identify(self, api_address, expected_type):
assert BackendTypes.identify_backend(api_address) == expected_type
def test_identify(self, api_address, section_name, expected_type):
assert BackendTypes.identify_backend(api_address, section_name) == expected_type
4 changes: 2 additions & 2 deletions yandextank/plugins/Influx/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class Plugin(AbstractPlugin, AggregateResultListener,
MonitoringDataListener):
SECTION = 'influx'

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
self.tank_tag = self.get_option("tank_tag")
address = self.get_option("address")
port = self.get_option("port")
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/JMeter/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class Plugin(GeneratorPlugin):
STOP_TEST_NOW = 'Stop Test'
DISCOVER_PORT_PATTERN = r'Waiting for possible .* message on port (?P<port>\d+)'

def __init__(self, core, cfg):
super(Plugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.args = None
self.original_jmx = None
self.jtl_file = None
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/JsonReport/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class Plugin(AbstractPlugin, AggregateResultListener, MonitoringDataListener):
# pylint:disable=R0902
SECTION = 'json_report'

def __init__(self, core, cfg):
super(Plugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.monitoring_stream = io.open(os.path.join(self.core.artifacts_dir,
self.get_option('monitoring_log')),
mode='wb')
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Pandora/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class Plugin(GeneratorPlugin):
SECTION = "pandora"
DEFAULT_REPORT_FILE = "phout.log"

def __init__(self, core, cfg):
super(Plugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.enum_ammo = False
self.process_start_time = None
self.pandora_cmd = None
Expand Down
2 changes: 1 addition & 1 deletion yandextank/plugins/Pandora/tests/test_pandora_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
])
def test_patch_config(cfg, expected):
plugin = Plugin(MagicMock(), None)
plugin = Plugin(MagicMock(), None, 'pandora')
# '/tmp/9b73d966bcbf27467d4c4190cfe58c2a.downloaded_resource'
filename = plugin.patch_config(cfg)['pools'][0]['ammo']['file']
assert filename.endswith('.downloaded_resource')
4 changes: 2 additions & 2 deletions yandextank/plugins/Phantom/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ class Plugin(GeneratorPlugin):

OPTION_CONFIG = "config"

def __init__(self, core, cfg):
super(Plugin, self).__init__(core, cfg)
def __init__(self, core, cfg, name):
super(Plugin, self).__init__(core, cfg, name)
self.predefined_phout = None
self.did_phout_import_try = False
self.eta_file = None
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/Platform/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
self.hosts = []
self.port = None
self.logfile = None
Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/RCAssert/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
class Plugin(AbstractPlugin):
SECTION = 'rcassert'

def __init__(self, core, cfg):
AbstractPlugin.__init__(self, core, cfg)
def __init__(self, core, cfg, name):
AbstractPlugin.__init__(self, core, cfg, name)
self.ok_codes = []
self.fail_code = 10

Expand Down
4 changes: 2 additions & 2 deletions yandextank/plugins/ResourceCheck/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ class Plugin(AbstractPlugin):
def get_key():
return __file__

def __init__(self, core, cfg):
def __init__(self, core, cfg, name):
''' Constructor '''
AbstractPlugin.__init__(self, core, cfg)
AbstractPlugin.__init__(self, core, cfg, name)
self.interval = "10s"
self.disk_limit = 2048 # 2 GB
self.mem_limit = 512 # 0.5 GB
Expand Down

0 comments on commit fc93c97

Please sign in to comment.