Skip to content

Commit

Permalink
Merge pull request #683 from fomars/develop
Browse files Browse the repository at this point in the history
fix lock & uploader hanging
  • Loading branch information
fomars committed Nov 27, 2018
2 parents 72623c6 + ad50930 commit 5c6761d
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 28 deletions.
2 changes: 2 additions & 0 deletions yandextank/common/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ def __init__(self, core, cfg):
"""
:type core: TankCore
:type cfg: dict
"""
super(AbstractPlugin, self).__init__()
self._cleanup_actions = []
self.log = logging.getLogger(__name__)
self.core = core
self.cfg = cfg
self.interrupted = self.core.interrupted

def set_option(self, option, value):
self.cfg[option] = value
Expand Down
5 changes: 4 additions & 1 deletion yandextank/core/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ def main():
return
worker.start()
try:
worker.join()
while True:
worker.join(timeout=2)
if not worker.is_alive():
break
except KeyboardInterrupt:
worker.stop()
worker.join()
Expand Down
19 changes: 12 additions & 7 deletions yandextank/core/consoleworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
import traceback
from ConfigParser import ConfigParser, MissingSectionHeaderError, NoOptionError, NoSectionError
from threading import Thread
from threading import Thread, Event

import yaml
from netort.resource import manager as resource_manager
Expand Down Expand Up @@ -323,6 +323,7 @@ def __init__(self, configs, cli_options=None, cfg_patches=None, cli_args=None, n
self.files = [] if files is None else files
self.ammo_file = ammo_file

self.interrupted = Event()
self.config_list = self._combine_configs(configs, cli_options, cfg_patches, cli_args, no_local)
self.core = TankCore(self.config_list)
self.status = Status.TEST_INITIATED
Expand Down Expand Up @@ -383,6 +384,7 @@ def run(self):
self.status = Status.TEST_FINISHED

def stop(self):
self.interrupted.set()
self.core.interrupt()

def get_status(self):
Expand Down Expand Up @@ -423,21 +425,22 @@ def init_logging(self, debug=False):
logger.handlers = []
logger.setLevel(logging.DEBUG if debug else logging.INFO)

self.file_handler = logging.FileHandler(filename)
self.file_handler.setLevel(logging.DEBUG)
self.file_handler.setFormatter(logging.Formatter(
file_handler = logging.FileHandler(filename)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(name)s %(filename)s:%(lineno)d\t%(message)s"))
logger.addHandler(self.file_handler)
logger.addHandler(file_handler)
logger.info("Log file created")

for handler in self.log_handlers:
logger.addHandler(handler)
logger.info("Logging handler {} added".format(handler))

def get_lock(self):
while True:
while not self.interrupted.is_set():
try:
lock = Lock(self.test_id, self.folder).acquire(self.core.lock_dir)
lock = Lock(self.test_id, self.folder).acquire(self.core.lock_dir,
self.core.config.get_option(self.SECTION, 'ignore_lock'))
self.set_msg('')
break
except LockError as e:
Expand All @@ -447,6 +450,8 @@ def get_lock(self):
logger.warning(
"Couldn't get lock. Will retry in 5 seconds...")
time.sleep(5)
else:
raise KeyboardInterrupt
return lock

def set_msg(self, msg):
Expand Down
12 changes: 6 additions & 6 deletions yandextank/core/tankcore.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def __init__(self, configs, artifacts_base_dir=None, artifacts_dir_name=None):
configinfo.setdefault(self.SECTION, {})
configinfo[self.SECTION][self.API_JOBNO] = self.test_id
self.add_artifact_to_send(LPRequisites.CONFIGINFO, yaml.dump(configinfo))
logging.info('New test id %s' % self.test_id)
logger.info('New test id %s' % self.test_id)

@property
def cfg_snapshot(self):
Expand Down Expand Up @@ -336,7 +336,7 @@ def plugins_post_process(self, retcode):
return retcode

def interrupt(self):
logging.warning('Interrupting')
logger.warning('Interrupting')
self.interrupted = True

def __setup_taskset(self, affinity, pid=None, args=None):
Expand Down Expand Up @@ -518,7 +518,7 @@ def save_cfg(self, path):

def plugins_cleanup(self):
for plugin_name, plugin in self.plugins.items():
logging.info('Cleaning up plugin {}'.format(plugin_name))
logger.info('Cleaning up plugin {}'.format(plugin_name))
plugin.cleanup()


Expand All @@ -539,7 +539,7 @@ def __init__(self, test_id, test_dir, pid=None):
}
self.lock_file = None

def acquire(self, lock_dir, ignore=None):
def acquire(self, lock_dir, ignore=False):
is_locked = self.is_locked(lock_dir)
if not ignore and is_locked:
raise LockError("Lock file(s) found\n{}".format(is_locked))
Expand Down Expand Up @@ -584,15 +584,15 @@ def is_locked(cls, lock_dir='/var/lock'):
return msg
else:
if not pid_exists(int(running_lock.pid)):
logger.debug("Lock PID %s not exists, ignoring and trying to remove", running_lock.pid)
logger.info("Lock PID %s not exists, ignoring and trying to remove", running_lock.pid)
try:
os.remove(full_name)
except Exception as exc:
logger.warning("Failed to delete lock %s: %s", full_name, exc)
return False
else:
return "Another test is running with pid {}".format(running_lock.pid)
except Exception as exc:
except Exception:
msg = "Failed to load info from lock %s" % full_name
logger.warn(msg, exc_info=True)
return msg
Expand Down
4 changes: 3 additions & 1 deletion yandextank/plugins/DataUploader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys

import pwd
import threading
from urlparse import urljoin

from datetime import datetime
Expand Down Expand Up @@ -196,7 +197,8 @@ def post_loader():
TankCore.get_user_agent()))
api_client = client(base_url=config['api_address'],
user_agent=user_agent,
api_token=api_token
api_token=api_token,
core_interrupted=threading.Event()
# todo: add timeouts
)
lp_job = LPJob(
Expand Down
23 changes: 14 additions & 9 deletions yandextank/plugins/DataUploader/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class APIClient(object):

def __init__(
self,
core_interrupted,
base_url=None,
writer_url=None,
network_attempts=10,
Expand All @@ -40,6 +41,7 @@ def __init__(
connection_timeout=5.0,
user_agent=None,
api_token=None):
self.core_interrupted = core_interrupted
self.user_agent = user_agent
self.connection_timeout = connection_timeout
self._base_url = base_url
Expand Down Expand Up @@ -191,15 +193,18 @@ def __make_api_request(
return response_callback(response)
except (Timeout, ConnectionError, ProtocolError):
logger.warn(traceback.format_exc())
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
if not self.core_interrupted.is_set():
try:
timeout = next(network_timeouts)
logger.warn(
"Network error, will retry in %ss..." %
timeout)
time.sleep(timeout)
continue
except StopIteration:
raise self.NetworkError()
else:
break
except self.UnderMaintenance as e:
try:
timeout = next(maintenance_timeouts)
Expand Down
3 changes: 2 additions & 1 deletion yandextank/plugins/DataUploader/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,8 @@ def __get_api_client(self):
maintenance_timeout=self.get_option('maintenance_timeout'),
connection_timeout=self.get_option('connection_timeout'),
user_agent=self._get_user_agent(),
api_token=self.api_token)
api_token=self.api_token,
core_interrupted=self.interrupted)

@property
def lp_job(self):
Expand Down
3 changes: 2 additions & 1 deletion yandextank/plugins/Pandora/tests/test_pandora_plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from mock import MagicMock

from yandextank.plugins.Pandora import Plugin

Expand All @@ -21,7 +22,7 @@
)
])
def test_patch_config(cfg, expected):
plugin = Plugin(None, None)
plugin = Plugin(MagicMock(), None)
# '/tmp/9b73d966bcbf27467d4c4190cfe58c2a.downloaded_resource'
filename = plugin.patch_config(cfg)['pools'][0]['ammo']['file']
assert filename.endswith('.downloaded_resource')
5 changes: 3 additions & 2 deletions yandextank/plugins/ShellExec/tests/test_shellexec_plugin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import pytest
from mock import MagicMock
from yandextank.plugins.ShellExec import Plugin


def test_plugin_execute():
plugin = Plugin(None, {})
plugin = Plugin(MagicMock(), {})
assert plugin.execute('echo foo') == 0


def test_plugin_execute_raises():
plugin = Plugin(None, {})
plugin = Plugin(MagicMock(), {})
with pytest.raises(RuntimeError) as error:
plugin.execute('echo "foo')
assert 'Subprocess returned 2' in error.message

0 comments on commit 5c6761d

Please sign in to comment.