-
Notifications
You must be signed in to change notification settings - Fork 26
/
docker.py
186 lines (138 loc) · 5.09 KB
/
docker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import glob
import itertools
import json
import logging
import os
import re
import subprocess
import sys
import time
from deimos.cmd import Run
from deimos.err import *
from deimos.logger import log
from deimos._struct import _Struct
def run(options, image, command=[], env={}, cpus=None, mems=None, ports=[]):
envs = env.items() if isinstance(env, dict) else env
pairs = [("-e", "%s=%s" % (k, v)) for k, v in envs]
if ports != []: # NB: Forces external call to pre-fetch image
port_pairings = list(itertools.izip_longest(ports, inner_ports(image)))
log.info("Port pairings (Mesos, Docker) // %r", port_pairings)
for allocated, target in port_pairings:
if allocated is None:
log.warning("Container exposes more ports than were allocated")
break
options += ["-p", "%d:%d" % (allocated, target or allocated)]
argv = ["run"] + options
argv += ["-c", str(cpus)] if cpus else []
argv += ["-m", str(mems)] if mems else []
argv += [_ for __ in pairs for _ in __] # This is just flatten
argv += [image] + command
return docker(*argv)
def stop(ident):
return docker("stop", "-t=2", ident)
def rm(ident):
return docker("rm", ident)
def wait(ident):
return docker("wait", ident)
images = {} # Cache of image information
def pull(image):
Run(data=True)(docker("pull", image))
refresh_docker_image_info(image)
def pull_once(image):
if image_info(image) is None:
pull(image)
def image_info(image):
if image in images:
return images[image]
else:
return refresh_docker_image_info(image)
def refresh_docker_image_info(image):
try:
text = Run(data=True)(docker("inspect", image))
parsed = json.loads(text)[0]
images[image] = parsed
return parsed
except subprocess.CalledProcessError as e:
return None
def ensure_image(f):
def f_(image, *args, **kwargs):
pull_once(image)
return f(image, *args, **kwargs)
return f_
@ensure_image
def inner_ports(image):
info = image_info(image)
config = info.get("Config", info.get("config"))
if config:
exposed = config.get("ExposedPorts", {})
if exposed and isinstance(exposed, dict):
return sorted(int(k.split("/")[0]) for k in exposed.keys())
specs = config.get("PortSpecs", [])
if specs and isinstance(specs, list):
return sorted(int(v.split(":")[-1]) for v in specs)
return [] # If all else fails...
# System and process interfaces
class Status(_Struct):
def __init__(self, cid=None, pid=None, exit=None):
_Struct.__init__(self, cid=cid, pid=pid, exit=exit)
def cgroups(cid):
paths = []
paths += glob.glob("/sys/fs/cgroup/*/" + cid)
paths += glob.glob("/sys/fs/cgroup/*/docker/" + cid)
paths += glob.glob("/cgroup/*/" + cid)
paths += glob.glob("/cgroup/*/docker/" + cid)
named_cgroups = [(s.split("/cgroup/")[1].split("/")[0], s) for s in paths]
return dict(named_cgroups)
def matching_image_for_host(distro=None, release=None, *args, **kwargs):
if distro is None or release is None:
# TODO: Use redhat-release, &c
rel_string = Run(data=True)(["bash", "-c", """
set -o errexit -o nounset -o pipefail
( source /etc/os-release && tr A-Z a-z <<<"$ID\t$VERSION_ID" )
"""])
probed_distro, probed_release = rel_string.strip().split()
distro, release = (distro or probed_distro, release or probed_release)
return image_token("%s:%s" % (distro, release), *args, **kwargs)
def image_token(name, account=None, index=None, *args, **kwargs):
return "/".join(_ for _ in [index, account, name] if _ is not None)
def probe(ident, quiet=False):
fields = "{{.ID}} {{.State.Pid}} {{.State.ExitCode}}"
level = logging.DEBUG if quiet else logging.WARNING
argv = docker("inspect", "--format=" + fields, ident)
run = Run(data=True, error_level=level)
text = run(argv).strip()
cid, pid, exit = text.split()
return Status(cid=cid, pid=pid, exit=(exit if pid == 0 else None))
def exists(ident, quiet=False):
try:
return probe(ident, quiet)
except subprocess.CalledProcessError as e:
if e.returncode != 1:
raise e
return None
def await(ident, t=0.05, n=10):
for _ in range(0, n):
result = exists(ident, quiet=True)
if result:
return result
time.sleep(t)
result = exists(ident, quiet=True)
if result:
return result
msg = "Container %s not ready after %d sleeps of %g seconds"
log.warning(msg % (ident, n, t))
raise AwaitTimeout("Timed out waiting for %s" % ident)
def read_wait_code(data):
try:
code = int(data)
code = 128 + abs(code) if code < 0 else code
return code % 256
except:
log.error("Result of `docker wait` wasn't an int: %r", data)
return 111
class AwaitTimeout(Err):
pass
# Global settings
options = []
def docker(*args):
return ["docker"] + options + list(args)