Permalink
Find file
9c708f5 Mar 13, 2014
@solidsnack @pandrew @takedakn @davidhoyt
executable file 317 lines (286 sloc) 10.6 KB
#!/usr/bin/env python
import itertools
import json
import logging
import logging.handlers
import os
import re
import signal
import subprocess
import sys
import threading
import time
import traceback
import mesos
import mesos_pb2
def main():
def handler(signum, _):
log.info('Exiting due to signal: '+str(signum))
exit(-signum)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGABRT, handler)
signal.signal(signal.SIGPIPE, handler)
signal.signal(signal.SIGSEGV, handler)
driver = mesos.MesosExecutorDriver(DockerExecutor(sys.argv[1:]))
log.info('Ready to serve!')
exit(0 if driver.run() == mesos_pb2.DRIVER_STOPPED else 1)
# This script wraps one and only one container.
cid = None
cidfile = None
def read_cid():
global cid
if cidfile is not None:
try:
with open(cidfile) as f:
cid = f.read().strip()
except IOError:
pass
log = logging.getLogger('mesos-docker')
log.setLevel(logging.DEBUG)
syslog = logging.handlers.SysLogHandler(address = '/dev/log')
syslog_format = logging.Formatter("%(name)s[%(process)d]: %(message)s")
syslog.setFormatter(syslog_format)
log.addHandler(syslog)
class DockerExecutor(mesos.Executor):
def __init__(self, args):
self.args = args
self.task = None
self.driver = None
self.image = None
self.docker_options = []
self.runner_thread = None
self.shutdown_thread = None
self.data = {}
self.env = {}
def run(self):
global proc
exitcode = 2
finalstate = mesos_pb2.TASK_FAILED
resources = self.task.resources
try:
if self.image is None:
self.image = self.args[0]
args = self.args[1:]
else:
args = self.args[0:]
cpus = [ r.scalar.value for r in resources if r.name == 'cpus' ]
megs = [ r.scalar.value for r in resources if r.name == 'mem' ]
relative_cpu = [ int(1024 * r) for r in cpus ] + [ None ]
memory_bytes = [ int((2**20) * r) for r in megs ] + [ None ]
ports = self.allocated_ports()
proc = run_with_settings(self.image, self.docker_options,
args, self.env, ports,
relative_cpu[0], memory_bytes[0])
self.send_state(mesos_pb2.TASK_RUNNING)
proc.wait()
log.info('Container exited with code: %d' % proc.returncode)
exitcode = proc.returncode
if proc.returncode == 0:
finalstate = mesos_pb2.TASK_FINISHED
else:
if self.shutdown_thread:
finalstate = mesos_pb2.TASK_KILLED
else:
finalstate = mesos_pb2.TASK_FAILED
except Exception, e:
log_exc()
finally:
self.send_state(finalstate)
exit(exitcode)
def send_state(self, state):
try:
update = mesos_pb2.TaskStatus()
update.task_id.value = self.task.task_id.value
update.state = state
self.driver.sendStatusUpdate(update)
except Exception, e:
log_exc()
#### Mesos Executor API methods ####
def registered(self, driver, executorInfo, frameworkInfo, slaveInfo):
log.info('Registered with Mesos slave')
def reregistered(driver, slaveInfo):
log.info('Reregistered with Mesos slave')
def disconnected(driver):
log.warning('Disconnected from Mesos slave')
def launchTask(self, driver, task):
if self.task is not None:
log.error('Executor was reused but this executor is not reuseable')
exit(2)
self.task = task
self.driver = driver
log.info('Task is: %s' % task.task_id.value)
try:
self.data = json.loads(task.data) if task.data else {}
for line in json_pp(self.data).splitlines():
log.info(line)
for i, port in enumerate(self.allocated_ports()):
if port != 0:
self.env['PORT'+str(i)] = str(port)
if 'PORT0' in self.env:
self.env['PORT'] = self.env['PORT0']
if 'env' in self.data: # NB: user-specified PORT* vars override ours
self.env.update(self.data['env'])
if 'container' in self.data:
container = self.data['container']
m = re.match(r'^docker:///(.+)$', container['image'])
if m is None:
raise ValueError('container.image must be a docker:/// URL')
self.image = m.group(1)
self.docker_options = container['options']
for k, v in self.env.items():
os.environ[k] = str(v)
except Exception, e:
log.error('JSON from framework is rubbish')
log_exc()
try:
self.run_thread = threading.Thread(target=self.run)
self.run_thread.daemon = True
self.run_thread.start()
except Exception, e:
log_exc()
self.send_state(mesos_pb2.TASK_FAILED)
exit(2)
def killTask(self, driver, task_id):
if self.task.task_id.value == task_id.value:
log.info('Asked to shutdown managed task %s' % task_id.value)
self.cleanup()
else:
log.info('Asked to shutdown unknown task %s' % task_id.value)
def cleanup(self):
if self.shutdown_thread is None:
self.shutdown_thread = threading.Thread(target=cleanup_container)
self.shutdown_thread.daemon = True
self.shutdown_thread.start()
def allocated_ports(self):
range_resources = [ _.ranges.range for _ in self.task.resources
if _.name == 'ports' ]
ranges = itertools.chain(*range_resources)
# NB: Casting long() to int() so there's no trailing 'L' in later
# stringifications. Ports should only ever be shorts, anyways.
ports = [ range(int(_.begin), int(_.end)+1) for _ in ranges ]
return list(itertools.chain(*ports))
def shutdown(self, driver):
self.cleanup()
# Handles signals, passed as negative numbers, and ensures worker process is
# cleaned up if it exists.
#
# This function shows up in many places but because it's final statement is a
# call to os._exit() we can be sure it is only ever called one time.
def exit(returncode):
try:
cleanup_container()
except Exception, e:
log_exc()
finally:
os._exit( ((-returncode) + 128) if returncode < 0 else returncode )
def log_exc():
for line in traceback.format_exc().splitlines():
log.error(line)
def json_pp(thing):
s = json.dumps(thing, indent=2, separators=(',', ': '), sort_keys=True)
data_lines = s.splitlines()[1:-1]
return "{ " + '\n'.join([data_lines[0][2:]] + data_lines[1:]) + " }"
def ensure_image(f):
def f_(image, *args, **kwargs):
pull_once(image)
return f(image, *args, **kwargs)
return f_
def try_cid(f):
def f_(*args, **kwargs):
if cid is None:
read_cid()
return f(*args, **kwargs)
return f_
cleaning_up_already = False # Hackish lock.
@try_cid
def cleanup_container():
global cid
global cidfile
global cleaning_up_already
if cid is not None and not cleaning_up_already:
cleaning_up_already = True
log.info('Cleaning up container %s' % cid)
subprocess.check_call(['docker', 'stop', '-t=2', cid])
subprocess.check_call(['docker', 'rm', cid])
subprocess.check_call(['rm', '-f', cidfile])
cid = None
cidfile = None
@ensure_image
def run_with_settings(image, docker_opts=[], args=[], env={}, ports=[],
relative_cpu=None, memory_bytes=None):
global cidfile
cidfile = '/tmp/docker_cid.' + os.urandom(8).encode('hex')
cmd = [ 'run', '--cidfile', cidfile ]
if relative_cpu is not None:
cmd += [ '-c', str(relative_cpu) ]
if memory_bytes is not None:
cmd += [ '-m', str(memory_bytes) ]
for k, v in env.items():
cmd += [ '-e', '%s=%s' % (k,v) ]
log.info('Mesos ports: ' + str(ports))
docker_ports = inner_ports(image)
log.info('Docker ports: ' + str(docker_ports))
port_pairings = itertools.izip_longest(ports, docker_ports)
for allocated, target in port_pairings:
if allocated is None:
log.warning('Too few ports were allocated to this image.')
break
if target is None:
log.warning('Too many ports were allocated to this image.')
break
cmd += [ '-p', '%d:%d' % (allocated, target) ]
argv = ['docker'] + cmd + docker_opts + [ image ] + [ arg for arg in args ]
log.info('ARGV ' + ' '.join(str(arg) for arg in argv))
return subprocess.Popen(argv)
try:
subprocess.check_output
except:
# For python 2.6 or earlier.
def check_output(*args):
p = subprocess.Popen(stdout=subprocess.PIPE, *args)
stdout = p.communicate()[0]
exitcode = p.wait()
if exitcode:
raise subprocess.CalledProcessError(exitcode, args[0])
return stdout
subprocess.check_output = check_output
@ensure_image
def inner_ports(image):
text = subprocess.check_output(['docker', 'inspect', image])
parsed = json.loads(text)[0]
config = None
if 'Config' in parsed:
config = parsed['Config']
if 'config' in parsed and config is None:
config = parsed['config']
if config:
exposed = config.get('ExposedPorts', {})
if exposed and isinstance(exposed, dict):
return sorted( int(k.split('/')[0]) for k in exposed.keys() )
specs = config.get('PortSpecs', [])
if specs and isinstance(specs, list):
return sorted( int(v.split(':')[-1]) for v in specs )
return [] # If all else fails...
def pull(image):
subprocess.check_call(['docker', 'pull', image])
refresh_docker_image_info(image)
def pull_once(image):
if not image_info(image):
pull(image)
def image_info(image):
if image in images:
return images[image]
else:
return refresh_docker_image_info(image)
images = {}
def refresh_docker_image_info(image):
delim = re.compile(' +')
text = subprocess.check_output(['docker', 'images', image])
records = [ delim.split(line) for line in text.splitlines() ]
for record in records:
if record[0] == image:
images[image] = image
return record
if __name__ == '__main__':
main()