Skip to content

Commit

Permalink
Merge pull request #589 from yandex/release
Browse files Browse the repository at this point in the history
Fixes from release
  • Loading branch information
fomars committed May 17, 2018
2 parents 91599c3 + dc8aeee commit 2feb055
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 38 deletions.
2 changes: 2 additions & 0 deletions yandextank/config_converter/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def converter(self):
return self._converter

def _get_scheme_converter(self):
if self.name == 'enabled':
return self.TYPE_CASTERS['boolean']
if self.schema.get(self.name) is None:
logger.warning('Unknown option {}:{}'.format(self.plugin, self.name))
raise UnknownOption
Expand Down
3 changes: 1 addition & 2 deletions yandextank/plugins/DataUploader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,7 @@ def __uploader(self, queue, sender_method, name='Uploader'):
except Empty:
continue
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
self.retcode = 8
logger.warning("Lunapark is rejecting {} data".format(name))
break
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push {} data'.format(name))
Expand Down
45 changes: 9 additions & 36 deletions yandextank/plugins/YASM/plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import ctypes
import re

import time
Expand All @@ -9,7 +8,7 @@
import logging
from yandextank.common.interfaces import MonitoringPlugin
from yasmapi import RtGolovanRequest
from threading import Thread, _active
from threading import Thread

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,40 +49,15 @@ def convert_value(name, value):
return value


def ctype_async_raise(thread_obj, exception):
found = False
target_tid = 0
for tid, tobj in _active.items():
if tobj is thread_obj:
found = True
target_tid = tid
break

if not found:
raise ValueError("Invalid thread object")

ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, ctypes.py_object(exception))
# ref: http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
# Huh? Why would we notify more than one threads?
# Because we punch a hole into C level interpreter.
# So it is better to clean up the mess.
ctypes.pythonapi.PyThreadState_SetAsyncExc(target_tid, 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
logger.debug("Successfully set asynchronized exception for %s", target_tid)


def monitoring_data(ts, data, comment=''):
return {
return ImmutableDict({
"timestamp": ts,
"data": {
host: {
"comment": comment,
"metrics": {map_metric_name(name): convert_value(name, value) for name, value in host_data.items()}
}
for host, host_data in data.items()}}
for host, host_data in data.items()}})


class ImmutableDict(dict):
Expand Down Expand Up @@ -146,7 +120,7 @@ def end_test(self, retcode):
self.end_time = time.time()
while self.last_ts < self.end_time and not self.stop_event.is_set():
try:
logger.info('Waiting for yasm metrics')
logger.info('Waiting for yasm metrics till {}'.format(self.end_time))
time.sleep(1)
except KeyboardInterrupt:
logger.info('Metrics receiving interrupted')
Expand All @@ -165,8 +139,10 @@ def send_rest(self):
def consumer(self):
while not self.stop_event.is_set():
try:
data = self.data_queue.get(timeout=self.RECEIVE_TIMEOUT)
self.data_buffer.append(ImmutableDict(data))
ts, data = self.data_queue.get(timeout=self.RECEIVE_TIMEOUT)
logger.info('Received monitoring data for {}'.format(ts))
self.last_ts = ts
self.data_buffer.append(monitoring_data(ts, data))
except Empty:
logger.warning(
'Not receiving any data from YASM. Probably your hosts/tags specification is not correct')
Expand All @@ -185,11 +161,8 @@ def yasm_receiver(self, hosts, tags, custom_signals=None,
try:
while not self.stop_event.is_set():
ts, data = stream.next()
logger.info('Received monitoring data for {}'.format(ts))
self.last_ts = int(ts)
chunk = monitoring_data(ts, data)
if self.start_event.is_set():
self.data_queue.put(chunk)
self.data_queue.put((ts, data))
finally:
logger.info('Closing YASM receiver thread')
# logger.info('Putting to monitoring data queue: {}'.format(chunk))
Expand Down

0 comments on commit 2feb055

Please sign in to comment.