Skip to content

Commit

Permalink
Merge pull request #612 from yandex/master
Browse files Browse the repository at this point in the history
changes from release
  • Loading branch information
fomars committed Jun 15, 2018
2 parents 0ca241f + 1024c72 commit 96ea6fc
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 78 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
'pandas>=0.18.0', 'numpy>=1.12.1', 'future>=0.16.0',
'pip>=8.1.2',
'pyyaml>=3.12', 'cerberus==1.2', 'influxdb>=5.0.0',
'netort>=0.0.11'
'netort>=0.0.11', 'pyopenssl>=17.5.0'
],
setup_requires=[
'pytest-runner', 'flake8',
Expand Down
2 changes: 1 addition & 1 deletion yandextank/core/consoleworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def patch_ini_config_with_monitoring(ini_config, mon_section_name):
def load_tank_core(config_files, cmd_options, no_rc, depr_options, other_opts, patches=None):
if patches is None:
patches = []
other_opts = list(other_opts) if other_opts else []
other_opts = [other_opts] if other_opts else []
config_files = config_files if len(config_files) > 0 else [DEFAULT_CONFIG]
if no_rc:
configs = [load_cfg(cfg) for cfg in config_files] +\
Expand Down
212 changes: 136 additions & 76 deletions yandextank/plugins/YASM/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
]


def signals_stream(yasmapi_cfg):
def signals_stream(panel):
'''
:type yasmapi_cfg: YasmCfg
:return:
:type yasmapi_cfg: Panel
:return: Panel, float, dict
'''
for point in RtGolovanRequest(yasmapi_cfg.as_dict()):
for point in RtGolovanRequest(panel.as_dict):
logger.debug('received YASM data for {} for following hosts an tags: {}'.
format(point.ts,
{host: tags.keys() for host, tags in point.values.items()}))
yield point.ts, {panel.alias: point.values[panel.host][panel.tags] for panel in yasmapi_cfg.panels}
yield point.ts, point.values[panel.host][panel.tags]


def map_metric_name(name):
Expand Down Expand Up @@ -69,27 +69,37 @@ def monitoring_data(ts, data, comment=''):


class YasmCfg(object):

def __init__(self, panels):
self.panels = [self.Panel(alias, **attrs) for alias, attrs in panels.items()]
self.panels = [Panel(alias, **attrs) for alias, attrs in panels.items()]
self._as_dict = None

@property
def as_dict(self):
yasmapi_cfg = {}
for panel in self.panels:
yasmapi_cfg.setdefault(panel.host, {})[panel.tags] = panel.signals
logger.info('yasmapi cfg: {}'.format(yasmapi_cfg))
return yasmapi_cfg

class Panel(object):
def __init__(self, alias, host, tags, signals=None, default_signals=True):
self.alias = alias
custom_signals = signals if signals else []
self.signals = DEFAULT_SIGNALS + custom_signals if default_signals else custom_signals
self.host = host
self.tags = tags
if len(self.signals) == 0:
logger.warning('No signals specified for {} panel'.format(self.alias))
self.dict_cfg = {self.host: {self.tags: self.signals}}
if self._as_dict is None:
yasmapi_cfg = {}
for panel in self.panels:
yasmapi_cfg.setdefault(panel.host, {})[panel.tags] = panel.signals
logger.info('yasmapi cfg: {}'.format(yasmapi_cfg))
self._as_dict = yasmapi_cfg
return self._as_dict


class Panel(object):
def __init__(self, alias, host, tags, signals=None, default_signals=True):
self.queue = Queue()
self.alias = alias
custom_signals = signals if signals else []
self.signals = DEFAULT_SIGNALS + custom_signals if default_signals else custom_signals
self.host = host
self.tags = tags
if len(self.signals) == 0:
logger.warning('No signals specified for {} panel'.format(self.alias))
self.as_dict = {self.host: {self.tags: self.signals}}
self.last_ts = 0
self.stop_trigger = Event()

def stop(self):
self.stop_trigger.set()


class ImmutableDict(dict):
Expand All @@ -108,18 +118,110 @@ def set_copy(self, key=None, value=None):
return copy


class Plugin(MonitoringPlugin):
class YasmMPReceiver(object):
def __init__(self, yasm_cfg, yasmapi_timeout):
"""
:type data_queue: Queue
:type yasm_cfg: YasmCfg
"""
self.panels = yasm_cfg.panels
self.data_queue = Queue()
self.timeout = yasmapi_timeout
self._data_buffer = []
self._start_event = Event()
self._stop_event = Event()
self.ps_pool = {panel.alias: Process(target=self.single_receiver,
args=(panel,))
for panel in self.panels}
self.consumers = {panel.alias: Thread(target=self.single_controller, args=(panel, self.ps_pool[panel.alias]))
for panel in self.panels}

def get_buffered_data(self):
data, self._data_buffer = self._data_buffer, []
return data

def start_collecting(self):
[p.start() for p in self.ps_pool.values()]

def start_transmitting(self):
self._start_event.set()
[consumer.start() for consumer in self.consumers.values()]

def single_receiver(self, panel):
# ignore SIGINT (process is controlled by .stop_event)
"""
:type panel: Panel
"""
signal.signal(signal.SIGINT, signal.SIG_IGN)

stream = signals_stream(panel)
try:
while not panel.stop_trigger.is_set():
ts, data = stream.next()
if self._start_event.is_set():
panel.queue.put((ts, {panel.alias: data}))
finally:
logger.info('Closing panel {} receiver thread'.format(panel.alias))

def single_controller(self, panel, ps):
"""
:type ps: Process
:type panel: Panel
"""
while not panel.stop_trigger.is_set():
try:
ts, data = panel.queue.get(timeout=self.timeout)
panel.last_ts = ts
# logger.info('Received monitoring data for {}'.format(ts))
self._data_buffer.append(monitoring_data(ts, data))
except Empty:
logger.warning(
'Not receiving any data from YASM. Probably your hosts/tags specification is not correct')
panel.stop_trigger.set()
if ps.is_alive():
ps.terminate()
break
except KeyboardInterrupt:
logging.warning('Interrupting collecting metrics for panel {}'.format(panel.alias))
panel.stop_trigger.set()
if ps.is_alive():
ps.terminate()
break

def stop_now(self):
end_time = time.time()
active_panels = self.panels
while len(active_panels) > 0:
try:
for panel in active_panels:
if panel.last_ts < end_time and not self._stop_event.is_set():
logger.info('Waiting for yasm metrics for panel {}'.format(panel.alias))
else:
panel.stop_trigger.set()
self.ps_pool[panel.alias].join()
self.consumers[panel.alias].join()
active_panels = [panel for panel in active_panels if not panel.stop_trigger.is_set()]
if len(active_panels) > 0:
time.sleep(5)
except KeyboardInterrupt:
logger.info('Metrics receiving interrupted')
[panel.stop_trigger.set() for panel in active_panels]
[(self.ps_pool[panel.alias].join(), self.consumers[panel.alias].join()) for panel in active_panels]


class Plugin(MonitoringPlugin):
def __init__(self, core, cfg, cfg_updater=None):
super(Plugin, self).__init__(core, cfg)
self.data_queue = Queue()
self.start_event = Event()
self.stop_event = Event()
self.last_ts = 0
self.data_buffer = []
self.timeout = self.get_option('timeout')
if self.get_option('verbose_logging'):
logger.setLevel(logging.DEBUG)
self.yasm_receiver = YasmMPReceiver(YasmCfg(self.get_option('panels')),
self.get_option('timeout'))

def add_listener(self, plugin):
self.listeners.append(plugin)
Expand All @@ -130,66 +232,24 @@ def send_collected_data(self, data):
listener.monitoring_data(data)

def is_test_finished(self):
if len(self.data_buffer) > 0:
data, self.data_buffer = self.data_buffer, []
data = self.yasm_receiver.get_buffered_data()
if len(data) > 0:
self.send_collected_data(data)
return -1

def prepare_test(self):
yasmapi_cfg = YasmCfg(self.get_option('panels'))
self.yasm_receiver_ps = Process(target=self.yasm_receiver, args=(yasmapi_cfg,),)
self.yasm_receiver_ps.start()
self.consumer_thread = Thread(target=self.consumer)
self.consumer_thread.start()
self.yasm_receiver.start_collecting()

def start_test(self):
self.start_event.set()
self.yasm_receiver.start_transmitting()
logger.info('Listeners: {}'.format(self.listeners))

def end_test(self, retcode):
self.end_time = time.time()
while self.last_ts < self.end_time and not self.stop_event.is_set():
try:
logger.info('Waiting for yasm metrics till {}'.format(self.end_time))
time.sleep(5)
except KeyboardInterrupt:
logger.info('Metrics receiving interrupted')
break
self.stop_event.set()
self.consumer_thread.join()
self.yasm_receiver_ps.join()
self.yasm_receiver.stop_now()
self.send_rest()
return retcode

def send_rest(self):
if len(self.data_buffer) > 0:
data, self.data_buffer = self.data_buffer, []
data = self.yasm_receiver.get_buffered_data()
if len(data) > 0:
self.send_collected_data(data)

def consumer(self):
while not self.stop_event.is_set():
try:
ts, data = self.data_queue.get(timeout=self.timeout)
# logger.info('Received monitoring data for {}'.format(ts))
self.last_ts = ts
self.data_buffer.append(monitoring_data(ts, data))
except Empty:
logger.warning(
'Not receiving any data from YASM. Probably your hosts/tags specification is not correct')
self.stop_event.set()
if self.yasm_receiver_ps.is_alive():
self.yasm_receiver_ps.terminate()
break

def yasm_receiver(self, yasmapi_cfg):
# ignore SIGINT (process is controlled by .stop_event)
signal.signal(signal.SIGINT, signal.SIG_IGN)

stream = signals_stream(yasmapi_cfg)
try:
while not self.stop_event.is_set():
ts, data = stream.next()
if self.start_event.is_set():
self.data_queue.put((ts, data))
finally:
logger.info('Closing YASM receiver thread')

0 comments on commit 96ea6fc

Please sign in to comment.