Skip to content

Commit

Permalink
Merge pull request #549 from fomars/develop
Browse files Browse the repository at this point in the history
Stepping before Preparing, Allowed to disable caching, Refactored Uploader
  • Loading branch information
fomars committed Apr 3, 2018
2 parents 7d19315 + 29647eb commit 0c5af69
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 135 deletions.
26 changes: 13 additions & 13 deletions docs/tutorial.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ Create a file on a server with Yandex.Tank: **load.yaml**
.. code-block:: yaml
phantom:
address: 203.0.113.1:80 # [Target's address]:[target's port]
uris:
- /
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: line(1, 10, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
address: 203.0.113.1:80 # [Target's address]:[target's port]
uris:
- /
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: line(1, 10, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
console:
enabled: true # enable console output
telegraf:
Expand Down Expand Up @@ -74,13 +74,13 @@ have following lines:
.. code-block:: yaml
phantom:
address: 203.0.113.1:80 # [Target's address]:[target's port]
uris:
- /uri1
- /uri2
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: const(10, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
address: 203.0.113.1:80 # [Target's address]:[target's port]
uris:
- /uri1
- /uri2
load_profile:
load_type: rps # schedule load by defining requests per second
schedule: const(10, 10m) # starting from 1rps growing linearly to 10rps during 10 minutes
console:
enabled: true # enable console output
telegraf:
Expand Down
2 changes: 1 addition & 1 deletion yandextank/core/tests/test_tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def load_yaml(directory, filename):
'phantom': {
'package': 'yandextank.plugins.Phantom',
'enabled': True,
'address': 'lunapark.test.yandex-team.ru',
'address': 'lunapark.yandex-team.ru',
'header_http': '1.1',
'uris': ['/'],
'load_profile': {'load_type': 'rps', 'schedule': 'line(1, 10, 1m)'},
Expand Down
2 changes: 1 addition & 1 deletion yandextank/plugins/Bfg/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def get_available_options(self):
def configure(self):
self.log.info("Configuring BFG...")
self.stepper_wrapper.read_config()
self.stepper_wrapper.prepare_stepper()

def get_reader(self):
if self.reader is None:
Expand All @@ -61,7 +62,6 @@ def get_stats_reader(self):
def bfg(self):
if self._bfg is None:
BFG = BFGGreen if self.get_option("worker_type", "") == "green" else BFGMultiprocessing
self.stepper_wrapper.prepare_stepper()
self._bfg = BFG(
gun=self.gun,
instances=self.stepper_wrapper.instances,
Expand Down
98 changes: 27 additions & 71 deletions yandextank/plugins/DataUploader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

from netort.data_processing import Drain


logger = logging.getLogger(__name__) # pylint: disable=C0103


Expand Down Expand Up @@ -158,7 +157,7 @@ def configure(self):
def check_task_is_open(self):
if self.backend_type == BackendTypes.OVERLOAD:
return
TASK_TIP = 'The task should be connected to Lunapark.'\
TASK_TIP = 'The task should be connected to Lunapark.' \
'Open startrek task page, click "actions" -> "load testing".'

logger.debug("Check if task %s is open", self.task)
Expand Down Expand Up @@ -399,98 +398,55 @@ def __send_status(self):
break
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
if self.finished:
break
logger.info("Closing Status sender thread")

def __data_uploader(self):
logger.info('Data uploader thread started')
lp_job = self.lp_job
queue = self.data_queue
while lp_job.is_alive:
def __uploader(self, queue, sender_method, name='Uploader'):
logger.info('{} thread started'.format(name))
while self.lp_job.is_alive:
try:
entry = queue.get(timeout=1)
if entry is not None:
data, stats = entry
else:
logger.info("Data uploader queue returned None")
if entry is None:
logger.info("{} queue returned None".format(name))
break
lp_job.push_test_data(data, stats)
sender_method(entry)
except Empty:
continue
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push {} data'.format(name))
logger.warn(e.message)
self.lp_job.is_alive = False
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
logger.info("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))
break
logger.info("Closing Data uploader thread")
# purge queue
while not queue.empty():
if queue.get_nowait() is None:
break
logger.info("Closing {} thread".format(name))

def __data_uploader(self):
self.__uploader(self.data_queue,
lambda entry: self.lp_job.push_test_data(*entry),
'Data Uploader')

def __monitoring_uploader(self):
logger.info('Monitoring uploader thread started')
lp_job = self.lp_job
queue = self.monitoring_queue
while lp_job.is_alive:
try:
data = queue.get(timeout=1)
if data is not None:
lp_job.push_monitoring_data(data)
else:
logger.info('Monitoring queue returned None')
break
except Empty:
continue
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push monitoring data')
logger.warn(e.message)
break
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
logger.info("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))
break
logger.info('Closing Monitoring uploader thread')
self.__uploader(self.monitoring_queue,
self.lp_job.push_monitoring_data,
'Monitoring Uploader')

def __events_uploader(self):
logger.info('Events uploader thread started')
lp_job = self.lp_job
queue = self.events_queue

while lp_job.is_alive:
try:
data = queue.get(timeout=1)
if data is not None:
logger.debug('Events data sending...: %s', data)
lp_job.push_events_data(data)
else:
logger.info('Events queue returned None')
break
except Empty:
continue
except (APIClient.NetworkError, APIClient.NotAvailable, APIClient.UnderMaintenance) as e:
logger.warn('Failed to push events data')
logger.warn(e.message)
break
except APIClient.StoppedFromOnline:
logger.info("Test stopped from Lunapark")
lp_job.is_alive = False
self.retcode = 8
break
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
logger.info("Mysterious exception:\n%s\n%s\n%s", (exc_type, exc_value, exc_traceback))
break
logger.info('Closing Events uploader thread')
self.__uploader(self.events_queue,
self.lp_job.push_events_data,
'Events Uploader')

# TODO: why we do it here? should be in core
def __save_conf(self):
Expand Down
1 change: 1 addition & 0 deletions yandextank/plugins/Phantom/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def configure(self):
self.predefined_phout = self.get_option(PhantomConfig.OPTION_PHOUT, '')
if not self.get_option(self.OPTION_CONFIG, '') and self.predefined_phout:
self.phout_import_mode = True
self.phantom_config = self.phantom.config_file

@property
def phantom(self):
Expand Down
8 changes: 5 additions & 3 deletions yandextank/stepper/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def __init__(
autocases=None,
enum_ammo=False,
ammo_type='phantom',
chosen_cases=None, ):
chosen_cases=None,
use_cache=True):
self.log = logging.getLogger(__name__)
self.ammo_file = ammo_file
self.ammo_type = ammo_type
Expand All @@ -50,6 +51,7 @@ def __init__(
self.headers = headers
self.marker = get_marker(autocases, enum_ammo)
self.chosen_cases = chosen_cases or []
self.use_cache = use_cache

def get_load_plan(self):
"""
Expand Down Expand Up @@ -94,7 +96,7 @@ def get_ammo_generator(self):
if self.ammo_type in af_readers:
if self.ammo_type == 'phantom':
opener = resource.get_opener(self.ammo_file)
with opener() as ammo:
with opener(self.use_cache) as ammo:
try:
if not ammo.next()[0].isdigit():
self.ammo_type = 'uri'
Expand All @@ -114,7 +116,7 @@ def get_ammo_generator(self):
raise NotImplementedError(
'No such ammo type implemented: "%s"' % self.ammo_type)
ammo_gen = af_readers[self.ammo_type](
self.ammo_file, headers=self.headers, http_ver=self.http_ver)
self.ammo_file, headers=self.headers, http_ver=self.http_ver, use_cache=self.use_cache)
else:
raise StepperConfigurationError(
'Ammo not found. Specify uris or ammo file')
Expand Down
4 changes: 2 additions & 2 deletions yandextank/stepper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ def __init__(self, core, cfg):
self.headers = []
self.autocases = 0
self.enum_ammo = False
self.use_caching = True
self.force_stepping = None
self.chosen_cases = []

Expand Down Expand Up @@ -314,6 +313,7 @@ def __make_stpd_file(self):
autocases=self.autocases,
enum_ammo=self.enum_ammo,
ammo_type=self.ammo_type,
chosen_cases=self.chosen_cases, )
chosen_cases=self.chosen_cases,
use_cache=self.use_caching)
with open(self.stpd, 'w', self.file_cache) as os:
stepper.write(os)

0 comments on commit 0c5af69

Please sign in to comment.