Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execution: unify docker cmdline usage (prepping for kubernetes drop-i… #18

Merged
merged 1 commit into from
Nov 9, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 117 additions & 136 deletions packtivity/handlers/execution_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,81 +22,59 @@ def sourcepath(path):
else:
return path

def state_context_to_mounts(state):
readwrites = state.readwrite
readonlies = state.readonly
mounts = ''
for rw in readwrites:
mounts += '-v {}:{}:rw'.format(sourcepath(os.path.abspath(rw)),rw)
for ro in readonlies:
mounts += ' -v {}:{}:ro'.format(sourcepath(ro),ro)
return mounts

def prepare_par_mounts(parmounts,state):
mounts = []
for i,x in enumerate(parmounts):
parmountfile = os.path.join(state.readwrite[0],'_yadage_parmount_{}.txt'.format(i))
with open(parmountfile,'w') as f:
f.write(x['mountcontent'])

mounts.append('{}:{}'.format(
mounts.append(' -v {}:{}'.format(
os.path.abspath(parmountfile),
x['mountpath']
))

return mounts

def state_context_to_mounts(state):
readwrites = state.readwrite
readonlies = state.readonly
mounts = ''
for rw in readwrites:
mounts += '-v {}:{}:rw'.format(sourcepath(os.path.abspath(rw)),rw)
for ro in readonlies:
mounts += ' -v {}:{}:ro'.format(sourcepath(ro),ro)
return mounts

def cvmfs_from_volume_plugin(command_line,cvmfs_repos = None):
def cvmfs_from_volume_plugin(cvmfs_repos = None):
if not cvmfs_repos:
cvmfs_repos = yaml.load(os.environ.get('PACKTIVITY_CVMFS_REPOS','null'))
if not cvmfs_repos:
cvmfs_repos = ['atlas.cern.ch','atlas-condb.cern.ch','sft.cern.ch']
command_line += ' --security-opt label:disable'
command_line = ' --security-opt label:disable'
for repo in cvmfs_repos:
command_line += ' --volume-driver cvmfs -v {cvmfs_repo}:/cvmfs/{cvmfs_repo}'.format(cvmfs_repo = repo)
return command_line

def cvmfs_from_external_mount(command_line):
command_line+=' -v {}:/cvmfs'.format(os.environ.get('PACKTIVITY_CVMFS_LOCATION','/cvmfs'))
return command_line

def prepare_docker(state,do_cvmfs,do_auth,par_mounts,log,metadata):
docker_mod = state_context_to_mounts(state)



if do_cvmfs:
cvmfs_source = os.environ.get('PACKTIVITY_CVMFS_SOURCE','external')
if cvmfs_source == 'external':
docker_mod = cvmfs_from_external_mount(docker_mod)
elif cvmfs_source == 'voldriver':
docker_mod = cvmfs_from_volume_plugin(docker_mod)
else:
raise RuntimeError('unknown CVMFS location requested')

if do_auth:
if 'PACKTIVITY_AUTH_LOCATION' not in os.environ:
docker_mod+=' -v /home/recast/recast_auth:/recast_auth'
else:
docker_mod+=' -v {}:/recast_auth'.format(os.environ['PACKTIVITY_AUTH_LOCATION'])


for x in par_mounts:
docker_mod+=' -v {}'.format(x)


cidfile = '{}/{}.cid'.format(state.metadir,metadata['name'])

if os.path.exists(cidfile):
log.warning('cid file %s seems to exist, container execution will crash',cidfile)
docker_mod += ' --cidfile {}'.format(cidfile)
def cvmfs_from_external_mount():
return ' -v {}:/cvmfs'.format(os.environ.get('PACKTIVITY_CVMFS_LOCATION','/cvmfs'))

def cvmfs_mount():
cvmfs_source = os.environ.get('PACKTIVITY_CVMFS_SOURCE','external')
if cvmfs_source == 'external':
return cvmfs_from_external_mount()
elif cvmfs_source == 'voldriver':
return cvmfs_from_volume_plugin()
else:
raise RuntimeError('unknown CVMFS location requested')

docker_mod += ' {}'.format(os.environ.get('PACKTIVITY_DOCKER_CMD_MOD',''))

return docker_mod
def auth_mount():
if 'PACKTIVITY_AUTH_LOCATION' not in os.environ:
return ' -v /home/recast/recast_auth:/recast_auth'
else:
return ' -v {}:/recast_auth'.format(os.environ['PACKTIVITY_AUTH_LOCATION'])

def prepare_docker_context(state,environment,log,metadata):
def resource_mounts(state,environment,log,metadata):
report = '''\n\
--------------
run in docker container image: {image}
Expand All @@ -113,9 +91,50 @@ def prepare_docker_context(state,environment,log,metadata):
do_auth = ('GRIDProxy' in environment['resources']) or ('KRB5Auth' in environment['resources'])
log.debug('do_auth: %s do_cvmfs: %s',do_auth,do_cvmfs)

par_mounts = prepare_par_mounts(environment['par_mounts'], state)

return prepare_docker(state,do_cvmfs,do_auth,par_mounts,log,metadata)
resource_mounts = ''
if do_cvmfs:
resource_mounts+=cvmfs_mount()

if do_auth:
resource_mounts+=auth_mount()

return resource_mounts

def docker_execution_cmdline(state,environment,log,metadata,combined_flags,cmd_argv):
quoted_string = ' '.join(map(pipes.quote,cmd_argv))

image = environment['image']
imagetag = environment['imagetag']

# generic non-volume mount flags
workdir_flag = '-w {}'.format(environment['workdir']) if environment['workdir'] is not None else ''

cidfile = '{}/{}.cid'.format(state.metadir,metadata['name'])
if os.path.exists(cidfile):
log.warning('cid file %s seems to exist, container execution will crash',cidfile)
cid_file = '--cidfile {}'.format(cidfile)

custom_mod = ' {}'.format(os.environ.get('PACKTIVITY_DOCKER_CMD_MOD',''))

# volume mounts (resources, parameter mounts and state mounts)
state_mounts = state_context_to_mounts(state)
rsrcs_mounts = resource_mounts(state,environment,log,metadata)

par_mounts = ' '.join(prepare_par_mounts(environment['par_mounts'], state))

return 'docker run {combined} {cid} {workdir} {custom} {state_mounts} {rsrcs} {par_mounts} {img}:{tag} {command}'.format(
combined = combined_flags,
cid = cid_file,
workdir = workdir_flag,
custom = custom_mod,
state_mounts = state_mounts,
rsrcs = rsrcs_mounts,
par_mounts = par_mounts,
img = image,
tag = imagetag,
command = quoted_string
)

def run_docker_with_script(state,environment,job,log,metadata):
script = job['script']
Expand All @@ -130,59 +149,12 @@ def run_docker_with_script(state,environment,job,log,metadata):
envmod = 'source {} && '.format(environment['envscript']) if environment['envscript'] else ''
indocker = envmod+indocker

try:
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
subcmd = docker_execution_cmdline(
state,environment,log,metadata,
combined_flags = '--rm -i',
cmd_argv = ['sh', '-c', indocker]
)
log.debug('running docker cmd: %s',subcmd)
proc = subprocess.Popen(shlex.split(subcmd), stdin = subprocess.PIPE, stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)

log.debug('started run subprocess with pid %s. now piping script',proc.pid)
proc.stdin.write(script.encode('utf-8'))
proc.stdin.close()
time.sleep(0.5)

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('container execution finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = subcmd)
log.debug('moving on from run')
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container execution.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:

log.debug('finally for run')

def docker_execution_cmdline(state,environment,log,metadata,combined_flags,cmd_argv):
quoted_string = ' '.join(map(pipes.quote,cmd_argv))

image = environment['image']
imagetag = environment['imagetag']
docker_mod = prepare_docker_context(state,environment,log,metadata)
workdir_flag = '-w {}'.format(environment['workdir']) if environment['workdir'] is not None else ''

return 'docker run {} {} {} {}:{} {}'.format(
combined_flags,
workdir_flag,
docker_mod,
image,
imagetag,
quoted_string
docker_run_cmd_str = docker_execution_cmdline(
state,environment,log,metadata,
combined_flags = '--rm -i',
cmd_argv = ['sh', '-c', indocker]
)
execute_docker(metadata,state,log,docker_run_cmd_str,stdin_content=script)

def run_docker_with_oneliner(state,environment,command,log,metadata):
log.debug('''\n\
Expand All @@ -200,76 +172,86 @@ def run_docker_with_oneliner(state,environment,command,log,metadata):
combined_flags = '--rm',
cmd_argv = ['sh', '-c', in_docker_cmd]
)
docker_run_cmd(docker_run_cmd_str,log,state,metadata)
execute_docker(metadata,state,log,docker_run_cmd_str)

def docker_pull(docker_pull_cmd,log,state,metadata):
log.debug('container image pull command: \n %s',docker_pull_cmd)
def execute_docker(metadata,state,log,docker_run_cmd_str,stdin_content = None):
log.debug('container execution command: \n%s',docker_run_cmd_str)
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
with logutils.setup_logging_topic(metadata,state,'pull', return_logger = True) as pulllog:
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:

proc = None
if stdin_content:
log.debug('stdin: \n%s',stdin_content)
proc = subprocess.Popen(shlex.split(docker_run_cmd_str),
stdin = subprocess.PIPE,
stderr = subprocess.STDOUT,
stdout = subprocess.PIPE,
bufsize=1,
close_fds = True)
proc.stdin.write(stdin_content.encode('utf-8'))
proc.stdin.close()
else:
proc = subprocess.Popen(shlex.split(docker_run_cmd_str), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)

log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
pulllog.info(line.strip())
runlog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('pull subprocess finished. return code: %s',proc.returncode)
log.debug('container execution subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_pull_cmd)
log.debug('moving on from pull')
except RuntimeError as e:
log.exception('caught RuntimeError')
raise e
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_run_cmd_str)
log.debug('moving on from run')
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container image pull subprocess in docker_enc_handler.')
raise RuntimeError('failed container execution subprocess.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:
log.debug('finally for pull')
log.debug('finally for run')

def docker_run_cmd(fullest_command,log,state,metadata):
log.debug('container execution command: \n%s',fullest_command)
def docker_pull(docker_pull_cmd,log,state,metadata):
log.debug('container image pull command: \n %s',docker_pull_cmd)
if 'PACKTIVITY_DRYRUN' in os.environ:
return
try:
with logutils.setup_logging_topic(metadata,state,'run', return_logger = True) as runlog:
proc = subprocess.Popen(shlex.split(fullest_command), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started run subprocess with pid %s. now wait to finish',proc.pid)
with logutils.setup_logging_topic(metadata,state,'pull', return_logger = True) as pulllog:
proc = subprocess.Popen(shlex.split(docker_pull_cmd), stderr = subprocess.STDOUT, stdout = subprocess.PIPE, bufsize=1, close_fds = True)
log.debug('started pull subprocess with pid %s. now wait to finish',proc.pid)
time.sleep(0.5)
log.debug('process children: %s',[x for x in psutil.Process(proc.pid).children(recursive = True)])

for line in iter(proc.stdout.readline, b''):
runlog.info(line.strip())
pulllog.info(line.strip())
while proc.poll() is None:
pass

proc.stdout.close()

log.debug('container execution subprocess finished. return code: %s',proc.returncode)
log.debug('pull subprocess finished. return code: %s',proc.returncode)
if proc.returncode:
log.error('non-zero return code raising exception')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = fullest_command)
log.debug('moving on from run')
raise subprocess.CalledProcessError(returncode = proc.returncode, cmd = docker_pull_cmd)
log.debug('moving on from pull')
except RuntimeError as e:
log.exception('caught RuntimeError')
raise e
except subprocess.CalledProcessError as exc:
log.exception('subprocess failed. code: %s, command %s',exc.returncode,exc.cmd)
raise RuntimeError('failed container execution subprocess.')
raise RuntimeError('failed container image pull subprocess in docker_enc_handler.')
except:
log.exception("Unexpected error: %s",sys.exc_info())
raise
finally:
log.debug('finally for run')


log.debug('finally for pull')


@executor('docker-encapsulated')
Expand All @@ -288,7 +270,6 @@ def docker_enc_handler(environment,state,job,metadata):

if 'command' in job:
run_docker_with_oneliner(state,environment,job['command'],log,metadata)
log.debug('reached return for docker_enc_handler')
elif 'script' in job:
run_docker_with_script(state,environment,job,log,metadata)
else:
Expand Down