Skip to content

Commit

Permalink
Merge c1264ca into 8626e94
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Kotal committed May 3, 2018
2 parents 8626e94 + c1264ca commit a2b9a0d
Show file tree
Hide file tree
Showing 12 changed files with 240 additions and 50 deletions.
86 changes: 80 additions & 6 deletions tools/sync/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@
import subprocess
import string
import threading
import time


class TimeoutException(Exception):
"""
Exception returned when command exceeded its timeout.
"""


class Command:
Expand All @@ -38,14 +45,17 @@ class Command:
FINISHED = "finished"
INTERRUPTED = "interrupted"
ERRORED = "errored"
TIMEDOUT = "timed out"

def __init__(self, cmd, args_subst=None, args_append=None, logger=None,
excl_subst=False, work_dir=None, env_vars=None):
excl_subst=False, work_dir=None, env_vars=None, timeout=None):
self.cmd = cmd
self.state = "notrun"
self.excl_subst = excl_subst
self.work_dir = work_dir
self.env_vars = env_vars
self.timeout = timeout
self.pid = None

self.logger = logger or logging.getLogger(__name__)
logging.basicConfig()
Expand All @@ -61,6 +71,41 @@ def execute(self):
Execute the command and capture its output and return code.
"""

class TimeoutThread(threading.Thread):
"""
Wait until the timeout specified in seconds expires and kill
the process specified by the Popen object after that.
If timeout expires, TimeoutException is stored in the object
and can be retrieved by the caller.
"""

def __init__(self, logger, timeout, condition, p):
super(TimeoutThread, self).__init__()
self.timeout = timeout
self.popen = p
self.condition = condition
self.logger = logger
self.start()
self.exception = None

def run(self):
with self.condition:
if not self.condition.wait(self.timeout):
p = self.popen
self.logger.info("Terminating command {} with PID {} "
"after timeout".
format(p.args, p.pid))
self.popen.terminate()
self.exception = TimeoutException("Command {} with pid"
" {} timed out".
format(p.args,
p.pid))
else:
return None

def getexception(self):
return self.exception

class OutputThread(threading.Thread):
"""
Capture data from subprocess.Popen(). This avoids hangs when
Expand Down Expand Up @@ -116,33 +161,59 @@ def close(self):
format(self.work_dir), exc_info=True)
return

othr = OutputThread()
timeout_thread = None
output_thread = OutputThread()
try:
start_time = time.time()
self.logger.debug("working directory = {}".format(os.getcwd()))
self.logger.debug("command = {}".format(self.cmd))
if self.env_vars:
my_env = os.environ.copy()
my_env.update(self.env_vars)
p = subprocess.Popen(self.cmd, stderr=subprocess.STDOUT,
stdout=othr, env=my_env)
stdout=output_thread, env=my_env)
else:
p = subprocess.Popen(self.cmd, stderr=subprocess.STDOUT,
stdout=othr)
stdout=output_thread)

self.pid = p.pid

if self.timeout:
condition = threading.Condition()
self.logger.debug("Setting timeout to {}".format(self.timeout))
timeout_thread = TimeoutThread(self.logger, self.timeout,
condition, p)

self.logger.debug("Waiting for process with PID {}".format(p.pid))
p.wait()

if self.timeout:
e = timeout_thread.getexception()
if e:
raise e
except KeyboardInterrupt as e:
self.logger.info("Got KeyboardException while processing ",
exc_info=True)
self.state = Command.INTERRUPTED
except OSError as e:
self.logger.error("Got OS error", exc_info=True)
self.state = Command.ERRORED
except TimeoutException as e:
self.logger.error("Timed out", exc_info=True)
self.state = Command.TIMEDOUT
else:
self.state = Command.FINISHED
self.returncode = int(p.returncode)
self.logger.debug("{} -> {}".format(self.cmd, self.getretcode()))
finally:
othr.close()
self.out = othr.getoutput()
if self.timeout != 0 and timeout_thread:
with condition:
condition.notifyAll()
output_thread.close()
self.out = output_thread.getoutput()
elapsed_time = time.time() - start_time
self.logger.debug("Command {} took {} seconds".
format(self.cmd, int(elapsed_time)))

if orig_work_dir:
try:
Expand Down Expand Up @@ -199,3 +270,6 @@ def getoutput(self):

def getstate(self):
return self.state

def getpid(self):
return self.pid
7 changes: 4 additions & 3 deletions tools/sync/cvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@


class CVSRepository(Repository):
def __init__(self, logger, path, project, command, env, hooks):
def __init__(self, logger, path, project, command, env, hooks, timeout):

super().__init__(logger, path, project, command, env, hooks)
super().__init__(logger, path, project, command, env, hooks, timeout)

if command:
self.command = command
Expand All @@ -42,7 +42,8 @@ def __init__(self, logger, path, project, command, env, hooks):

def reposync(self):
hg_command = [self.command, "update", "-dP"]
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
self.logger.info(cmd.getoutputstr())
if cmd.getretcode() != 0 or cmd.getstate() != Command.FINISHED:
Expand Down
7 changes: 4 additions & 3 deletions tools/sync/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@


class GitRepository(Repository):
def __init__(self, logger, path, project, command, env, hooks):
def __init__(self, logger, path, project, command, env, hooks, timeout):

super().__init__(logger, path, project, command, env, hooks)
super().__init__(logger, path, project, command, env, hooks, timeout)

if command:
self.command = command
Expand All @@ -42,7 +42,8 @@ def __init__(self, logger, path, project, command, env, hooks):

def reposync(self):
hg_command = [self.command, "pull", "--ff-only"]
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
self.logger.info(cmd.getoutputstr())
if cmd.getretcode() != 0 or cmd.getstate() != Command.FINISHED:
Expand Down
5 changes: 3 additions & 2 deletions tools/sync/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import logging


def run_hook(logger, script, path, env):
def run_hook(logger, script, path, env, timeout):
"""
Change a working directory to specified path, run a command
and change the working directory back to its original value.
Expand All @@ -37,7 +37,8 @@ def run_hook(logger, script, path, env):
ret = 0
logger.debug("Running hook '{}' in directory {}".
format(script, path))
cmd = Command([script], work_dir=path, env_vars=env)
cmd = Command([script], logger=logger, work_dir=path, env_vars=env,
timeout=timeout)
cmd.execute()
if cmd.state is not "finished" or cmd.getretcode() != 0:
logger.error("command failed: {} -> {}".format(cmd, cmd.getretcode()))
Expand Down
16 changes: 10 additions & 6 deletions tools/sync/mercurial.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@


class MercurialRepository(Repository):
def __init__(self, logger, path, project, command, env, hooks):
def __init__(self, logger, path, project, command, env, hooks, timeout):

super().__init__(logger, path, project, command, env, hooks)
super().__init__(logger, path, project, command, env, hooks, timeout)

if command:
self.command = command
Expand All @@ -42,7 +42,8 @@ def __init__(self, logger, path, project, command, env, hooks):

def get_branch(self):
hg_command = [self.command, "branch"]
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
if cmd.getstate() != Command.FINISHED:
self.logger.debug(cmd.getoutput())
Expand All @@ -66,7 +67,8 @@ def reposync(self):
if branch != "default":
hg_command.append("-b")
hg_command.append(branch)
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
self.logger.info(cmd.getoutputstr())
#
Expand All @@ -81,7 +83,8 @@ def reposync(self):
if branch != "default":
hg_command.append("-b")
hg_command.append(branch)
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
self.logger.info(cmd.getoutputstr())
if cmd.getretcode() != 0 or cmd.getstate() != Command.FINISHED:
Expand All @@ -93,7 +96,8 @@ def reposync(self):
# some servers do not support it.
if branch == "default":
hg_command.append("--check")
cmd = Command(hg_command, work_dir=self.path, env_vars=self.env)
cmd = self.getCommand(hg_command, work_dir=self.path,
env_vars=self.env, logger=self.logger)
cmd.execute()
self.logger.info(cmd.getoutputstr())
if cmd.getretcode() != 0 or cmd.getstate() != Command.FINISHED:
Expand Down
73 changes: 56 additions & 17 deletions tools/sync/mirror.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from repository import Repository
from mercurial import MercurialRepository
from repofactory import get_repository
from utils import is_exe, check_create_dir
from utils import is_exe, check_create_dir, get_int
from hook import run_hook
from readconfig import read_config
from opengrok import get_repos, get_config_value, get_repo_type
Expand All @@ -69,6 +69,10 @@
output = []
dirs_to_process = []

# "constants"
HOOK_TIMEOUT_PROPERTY = 'hook_timeout'
CMD_TIMEOUT_PROPERTY = 'command_timeout'

parser = argparse.ArgumentParser(description='project mirroring')

parser.add_argument('project')
Expand Down Expand Up @@ -152,6 +156,16 @@
if hookdir:
logger.debug("Hook directory = {}".format(hookdir))

command_timeout = get_int(logger, "command timeout",
config.get(CMD_TIMEOUT_PROPERTY))
if command_timeout:
logger.debug("Global command timeout = {}".format(command_timeout))

hook_timeout = get_int(logger, "hook timeout",
config.get(HOOK_TIMEOUT_PROPERTY))
if hook_timeout:
logger.debug("Global hook timeout = {}".format(hook_timeout))

prehook = None
posthook = None
ignored_repos = []
Expand All @@ -160,6 +174,24 @@
logger.debug("Project '{}' has specific (non-default) config".
format(args.project))

project_command_timeout = get_int(logger, "command timeout for "
"project {}".format(args.project),
project_config.
get(CMD_TIMEOUT_PROPERTY))
if project_command_timeout:
command_timeout = project_command_timeout
logger.debug("Project command timeout = {}".
format(command_timeout))

project_hook_timeout = get_int(logger, "hook timeout for "
"project {}".format(args.project),
project_config.
get(HOOK_TIMEOUT_PROPERTY))
if project_hook_timeout:
hook_timeout = project_hook_timeout
logger.debug("Project hook timeout = {}".
format(hook_timeout))

if project_config.get('ignored_repos'):
ignored_repos = project_config.get('ignored_repos')
logger.debug("has ignored repositories: {}".
Expand Down Expand Up @@ -242,13 +274,15 @@
args.project + "-mirror.lock"))
try:
with lock.acquire(timeout=0):
if prehook and run_hook(logger, prehook,
os.path.join(source_root, args.project),
config['proxy'] if use_proxy else None
) != 0:
logger.error("pre hook failed")
logging.shutdown()
sys.exit(1)
if prehook:
logger.info("Running pre hook")
if run_hook(logger, prehook,
os.path.join(source_root, args.project),
config['proxy'] if use_proxy else None,
hook_timeout) != 0:
logger.error("pre hook failed")
logging.shutdown()
sys.exit(1)

#
# If one of the repositories fails to sync, the whole project sync
Expand All @@ -258,7 +292,7 @@
logger.debug("Repository path = {}".format(repo_path))

if repo_path in ignored_repos:
logger.debug("repository {} ignored".format(repo_path))
logger.info("repository {} ignored".format(repo_path))
continue

repo_type = get_repo_type(logger, repo_path, messages_file)
Expand All @@ -275,24 +309,29 @@
args.project,
config.get('commands'),
config['proxy'] if use_proxy else None,
None)
None,
command_timeout)
if not repo:
logger.error("Cannot get repository for {}".
format(repo_path))
ret = 1
else:
logger.info("Synchronizing repository {}".
format(repo_path))
if repo.sync() != 0:
logger.error("failed to sync repository {}".
format(repo_path))
ret = 1

if posthook and run_hook(logger, posthook,
os.path.join(source_root, args.project),
config['proxy'] if use_proxy else None
) != 0:
logger.error("post hook failed")
logging.shutdown()
sys.exit(1)
if posthook:
logger.info("Running post hook")
if run_hook(logger, posthook,
os.path.join(source_root, args.project),
config['proxy'] if use_proxy else None,
hook_timeout) != 0:
logger.error("post hook failed")
logging.shutdown()
sys.exit(1)
except Timeout:
logger.warning("Already running, exiting.")
sys.exit(1)
Expand Down
Loading

0 comments on commit a2b9a0d

Please sign in to comment.