Skip to content

Commit

Permalink
Merge pull request #533 from nettorta/develop
Browse files Browse the repository at this point in the history
Generator plugins refatoring
  • Loading branch information
direvius committed Mar 12, 2018
2 parents 7cdd1ea + 88a5b6f commit fce5fc1
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 349 deletions.
10 changes: 2 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
'psutil>=1.2.1', 'requests>=2.5.1', 'paramiko>=1.16.0',
'pandas>=0.18.0', 'numpy>=1.12.1', 'future>=0.16.0',
'pip>=8.1.2',
'matplotlib>=1.5.3', 'seaborn>=0.7.1',
'pyyaml>=3.12', 'cerberus>=1.1', 'influxdb>=5.0.0',
'netort>=0.0.3'
'netort>=0.0.11'
],
setup_requires=[
'pytest-runner', 'flake8',
Expand Down Expand Up @@ -59,24 +58,19 @@
'yandextank.core': ['config/*'],
'yandextank.aggregator': ['config/*'],
'yandextank.plugins.Android': ['binary/*', 'config/*'],
'yandextank.plugins.Appium': ['config/*'],
'yandextank.plugins.Autostop': ['config/*'],
'yandextank.plugins.BatteryHistorian': ['config/*'],
'yandextank.plugins.Bfg': ['config/*'],
'yandextank.plugins.Console': ['config/*'],
'yandextank.plugins.DataUploader': ['config/*'],
'yandextank.plugins.GraphiteUploader': ['config/*'],
'yandextank.plugins.Influx': ['config/*'],
'yandextank.plugins.JMeter': ['config/*'],
'yandextank.plugins.JsonReport': ['config/*'],
'yandextank.plugins.Monitoring': ['config/*'],
'yandextank.plugins.Pandora': ['config/*'],
'yandextank.plugins.Phantom': ['config/*'],
'yandextank.plugins.RCAssert': ['config/*'],
'yandextank.plugins.ResourceCheck': ['config/*'],
'yandextank.plugins.ShellExec': ['config/*'],
'yandextank.plugins.ShootExec': ['config/*'],
'yandextank.plugins.Telegraf': ['config/*'],
'yandextank.plugins.TipsAndTricks': ['config/*'],
'yandextank.plugins.Telegraf': ['config/*']
},
use_2to3=False, )
26 changes: 20 additions & 6 deletions yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def __init__(self, core, cfg, cfg_updater=None):
@type core: TankCore
"""
super(AbstractPlugin, self).__init__()
self.log = logging.getLogger(__name__)
self.core = core
self.cfg = cfg
Expand Down Expand Up @@ -118,21 +119,24 @@ def on_aggregated_data(self, data, stats):
data and stats are cached and synchronized by timestamp. Stat items
are holded until corresponding data item is received and vice versa.
"""
raise NotImplementedError("Abstract method needs to be overridden")
raise NotImplementedError("Abstract method should be overridden")


class AbstractInfoWidget(object):
''' InfoWidgets interface
parent class for all InfoWidgets'''
""" InfoWidgets interface
parent class for all InfoWidgets"""

def __init__(self):
pass

def render(self, screen):
raise NotImplementedError("Abstract method needs to be overridden")
raise NotImplementedError("Abstract method should be overridden")

def on_aggregated_data(self, data, stats):
raise NotImplementedError("Abstract method should be overridden")

def get_index(self):
''' get vertical priority index '''
""" get vertical priority index """
return 0


Expand Down Expand Up @@ -178,7 +182,7 @@ def get_type_string():
raise NotImplementedError("Abstract methods requires overriding")


class GeneratorPlugin(object):
class GeneratorPlugin(AbstractPlugin):
DEFAULT_INFO = {
'address': '',
'port': 80,
Expand All @@ -189,6 +193,16 @@ class GeneratorPlugin(object):
'loop_count': 0
}

def __init__(self, core, cfg, cfg_updater):
super(GeneratorPlugin, self).__init__(core, cfg, cfg_updater)
self.stats_reader = None
self.reader = None
self.process = None
self.process_stderr = None
self.start_time = None
self.affinity = None
self.buffered_seconds = 2

class Info(object):
def __init__(
self, address, port, instances, ammo_file, rps_schedule,
Expand Down
18 changes: 7 additions & 11 deletions yandextank/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,12 +515,9 @@ def resolve(self, address_str, do_test=False, explicit_port=False):
try:
resolved = self.lookup_fn(address_str, port)
logger.debug("Lookup result: %s", resolved)
except Exception as exc:
logger.debug(
"Exception trying to resolve hostname %s : %s", address_str,
traceback.format_exc(exc))
msg = "Failed to resolve hostname: %s. Error: %s"
raise RuntimeError(msg % (address_str, exc))
except Exception:
logger.debug("Exception trying to resolve hostname %s : %s", address_str, exc_info=True)
raise

for (family, socktype, proto, canonname, sockaddr) in resolved:
is_v6 = family == socket.AF_INET6
Expand All @@ -536,13 +533,12 @@ def resolve(self, address_str, do_test=False, explicit_port=False):

if do_test:
try:
logger.info("Testing connection to resolved address %s and port %s", parsed_ip, port)
self.__test(family, (parsed_ip, port))
except RuntimeError as exc:
logger.warn(
"Failed TCP connection test using [%s]:%s", parsed_ip,
port)
except RuntimeError:
logger.info("Failed TCP connection test using [%s]:%s", parsed_ip, port)
logger.debug("Failed TCP connection test using [%s]:%s", parsed_ip, port, exc_info=True)
continue

return is_v6, parsed_ip, int(port), address_str

msg = "All connection attempts failed for %s, use {phantom.connection_test: false} to disable it"
Expand Down
2 changes: 1 addition & 1 deletion yandextank/core/config/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ core:
allow_unknown: false
schema:
affinity:
description: specify cpu core(s) to bind tank process to
description: specify cpu core(s) to bind tank process to, http://linuxhowtos.org/manpages/1/taskset.htm
type: string
default: ''
api_jobno:
Expand Down
96 changes: 34 additions & 62 deletions yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ def save_config(self, filename):
@property
def artifacts_base_dir(self):
if not self._artifacts_base_dir:
artifacts_base_dir = os.path.expanduser(
self.get_option(self.SECTION, "artifacts_base_dir"))
artifacts_base_dir = os.path.expanduser(self.get_option(self.SECTION, "artifacts_base_dir"))
if not os.path.exists(artifacts_base_dir):
os.makedirs(artifacts_base_dir)
os.chmod(self.artifacts_base_dir, 0o755)
Expand All @@ -171,11 +170,6 @@ def load_plugins(self):
Tells core to take plugin options and instantiate plugin classes
"""
logger.info("Loading plugins...")

self.taskset_path = self.get_option(
self.SECTION, 'taskset_path')
self.taskset_affinity = self.get_option(self.SECTION, 'affinity')

for (plugin_name, plugin_path, plugin_cfg, cfg_updater) in self.config.plugins:
logger.debug("Loading plugin %s from %s", plugin_name, plugin_path)
if plugin_path is "yandextank.plugins.Overload":
Expand All @@ -187,38 +181,16 @@ def load_plugins(self):
try:
plugin = il.import_module(plugin_path)
except ImportError:
if plugin_path.startswith("yatank_internal_"):
logger.warning(
"Deprecated plugin path format: %s\n"
"Tank plugins are now orginized using"
" namespace packages. Example:\n"
" plugin_jmeter=yandextank.plugins.JMeter",
plugin_path)
plugin_path = plugin_path.replace(
"yatank_internal_", "yandextank.plugins.")
if plugin_path.startswith("yatank_"):
logger.warning(
"Deprecated plugin path format: %s\n"
"Tank plugins are now orginized using"
" namespace packages. Example:\n"
" plugin_jmeter=yandextank.plugins.JMeter",
plugin_path)

plugin_path = plugin_path.replace(
"yatank_", "yandextank.plugins.")
logger.warning("Patched plugin path: %s", plugin_path)
plugin = il.import_module(plugin_path)
logger.warning('Plugin name %s path %s import error', plugin_name, plugin_path)
logger.debug('Plugin name %s path %s import error', plugin_name, plugin_path, exc_info=True)
raise
try:
instance = getattr(plugin, 'Plugin')(self, cfg=plugin_cfg, cfg_updater=cfg_updater)
except AttributeError:
logger.warning(
"Deprecated plugin classname: %s. Should be 'Plugin'",
plugin)
instance = getattr(
plugin, plugin_path.split('.')[-1] + 'Plugin')(self)

self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)

logger.warning('Plugin %s classname should be `Plugin`', plugin_name)
raise
else:
self.register_plugin(self.PLUGIN_PREFIX + plugin_name, instance)
logger.debug("Plugin instances: %s", self._plugins)

@property
Expand All @@ -233,10 +205,10 @@ def job(self):
# generator plugin
try:
gen = self.get_plugin_of_type(GeneratorPlugin)
# aggregator
except KeyError:
logger.warning("Load generator not found")
gen = GeneratorPlugin()
# aggregator
aggregator = TankAggregator(gen)
self._job = Job(monitoring_plugin=mon,
generator_plugin=gen,
Expand All @@ -249,8 +221,9 @@ def plugins_configure(self):
self.publish("core", "stage", "configure")

logger.info("Configuring plugins...")
if self.taskset_affinity != '':
self.taskset(os.getpid(), self.taskset_path, self.taskset_affinity)
self.taskset_affinity = self.get_option(self.SECTION, 'affinity')
if self.taskset_affinity:
self.__setup_taskset(self.taskset_affinity, pid=os.getpid())

for plugin in self.plugins.values():
logger.debug("Configuring %s", plugin)
Expand Down Expand Up @@ -282,7 +255,7 @@ def wait_for_finish(self):
"""

logger.info("Waiting for test to finish...")
logger.info('Artifacts dir: {}'.format(self.artifacts_dir))
logger.info('Artifacts dir: {dir}'.format(dir=self.artifacts_dir))
self.publish("core", "stage", "shoot")
if not self.plugins:
raise RuntimeError("It's strange: we have no plugins loaded...")
Expand Down Expand Up @@ -319,10 +292,8 @@ def plugins_end_test(self, retcode):
logger.debug("RC before: %s", retcode)
retcode = plugin.end_test(retcode)
logger.debug("RC after: %s", retcode)
except Exception as ex:
logger.error("Failed finishing plugin %s: %s", plugin, ex)
logger.debug(
"Failed finishing plugin: %s", traceback.format_exc(ex))
except Exception: # FIXME too broad exception clause
logger.error("Failed finishing plugin %s: %s", plugin, exc_info=True)
if not retcode:
retcode = 1
return retcode
Expand All @@ -333,36 +304,37 @@ def plugins_post_process(self, retcode):
"""
logger.info("Post-processing test...")
self.publish("core", "stage", "post_process")

for plugin in self.plugins.values():
logger.debug("Post-process %s", plugin)
try:
logger.debug("RC before: %s", retcode)
retcode = plugin.post_process(retcode)
logger.debug("RC after: %s", retcode)
except Exception as ex:
logger.error("Failed post-processing plugin %s: %s", plugin, ex)
logger.debug(
"Failed post-processing plugin: %s",
traceback.format_exc(ex))
except Exception: # FIXME too broad exception clause
logger.error("Failed post-processing plugin %s: %s", plugin, exc_info=True)
if not retcode:
retcode = 1
self.__collect_artifacts()

return retcode

def taskset(self, pid, path, affinity):
if affinity:
args = "%s -pc %s %s" % (path, affinity, pid)
retcode, stdout, stderr = execute(
args, shell=True, poll_period=0.1, catch_out=True)
logger.debug('taskset stdout: %s', stdout)
if retcode != 0:
raise KeyError(stderr)
def __setup_taskset(self, affinity, pid=None, args=None):
""" if pid specified: set process w/ pid `pid` CPU affinity to specified `affinity` core(s)
if args specified: modify list of args for Popen to start w/ taskset w/ affinity `affinity`
"""
self.taskset_path = self.get_option(self.SECTION, 'taskset_path')

if args:
return [self.taskset_path, '-c', affinity] + args

if pid:
args = "%s -pc %s %s" % (self.taskset_path, affinity, pid)
retcode, stdout, stderr = execute(args, shell=True, poll_period=0.1, catch_out=True)
logger.debug('taskset for pid %s stdout: %s', pid, stdout)
if retcode == 0:
logger.info("Enabled taskset for pid %s with affinity %s", str(pid), affinity)
else:
logger.info(
"Enabled taskset for pid %s with affinity %s",
str(pid), affinity)
logger.debug('Taskset setup failed w/ retcode :%s', retcode)
raise KeyError(stderr)

def __collect_artifacts(self):
logger.debug("Collecting artifacts")
Expand Down
12 changes: 5 additions & 7 deletions yandextank/plugins/Bfg/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,18 @@
from .widgets import BfgInfoWidget
from .worker import BFGMultiprocessing, BFGGreen
from ..Console import Plugin as ConsolePlugin
from ...common.interfaces import AbstractPlugin, GeneratorPlugin
from ...common.interfaces import GeneratorPlugin
from ...stepper import StepperWrapper


class Plugin(AbstractPlugin, GeneratorPlugin):
''' Big Fucking Gun plugin '''
class Plugin(GeneratorPlugin):
""" Big Fucking Gun plugin """
SECTION = 'bfg'

def __init__(self, core, cfg, cfg_updater):
super(Plugin, self).__init__(core, cfg, cfg_updater)
self._bfg = None
self.reader = None
self.stats_reader = None
self.log = logging.getLogger(__name__)
AbstractPlugin.__init__(self, core, cfg, cfg_updater)
self.gun_type = None
self.start_time = time.time()
self.stepper_wrapper = StepperWrapper(core, cfg)
Expand All @@ -43,7 +41,7 @@ def get_key():
def get_available_options(self):
return [
"gun_type", "instances", "cached_stpd", "pip"
] + self.stepper_wrapper.get_available_options
]

def configure(self):
self.log.info("Configuring BFG...")
Expand Down
5 changes: 5 additions & 0 deletions yandextank/plugins/JMeter/config/schema.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
affinity:
description: Use to set CPU affinity
type: string
nullable: true
default: ''
args:
description: additional commandline arguments for JMeter.
type: string
Expand Down

0 comments on commit fce5fc1

Please sign in to comment.