This repository has been archived by the owner on Oct 10, 2020. It is now read-only.
/
containers.py
175 lines (150 loc) · 7.64 KB
/
containers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import json
from . import util
from . import Atomic
import datetime
from dateutil.parser import parse as dateparse
def cli(subparser):
# atomic containers
c = subparser.add_parser("containers")
containers_subparser = c.add_subparsers(title='images subcommands',
description="operate on images",
help='additional help')
pss = containers_subparser.add_parser("list",
help=_("list the containers"),
epilog="By default this shows only the running containers.")
pss.set_defaults(_class=Containers, func='ps_tty')
pss.add_argument("-a", "--all", action='store_true',dest="all", default=False,
help=_("show all containers"))
pss.add_argument("-f", "--filter", metavar='FILTER', action='append', dest="filter",
help=_("Filter output based on conditions given in the VARIABLE=VALUE form"))
pss.add_argument("--json", action='store_true',dest="json", default=False,
help=_("print in a machine parseable form"))
pss.add_argument("-n", "--noheading", dest="heading", default=True,
action="store_false",
help=_("do not print heading when listing the containers"))
pss.add_argument("--no-trunc", action='store_false', dest="truncate", default=True,
help=_("Don't truncate output"))
pss.add_argument("-q", "--quiet", action='store_true', dest="quiet", default=False,
help=_("Only display container IDs"))
util.add_opt(pss)
class Containers(Atomic):
def ps_tty(self):
all_container_info = self.ps()
all_containers = []
for each in all_container_info:
if each["Type"] == "system":
container = each["Id"]
status = "exited"
created = datetime.datetime.fromtimestamp(each["Created"])
info = self.syscontainers.get_container_runtime_info(container)
if 'status' in info:
status = info["status"]
if 'created' in info:
created = info['created']
if not self.args.all and status != "running":
continue
image = each['Image']
imageId = each['ImageID']
command = each["Command"]
created = created.strftime("%F %H:%M") # pylint: disable=no-member
container_info = {"type" : "system", "container" : container,
"image" : image, "command" : command, "image_id" : imageId,
"created" : created, "status" : status,
"runtime" : "runc", "vulnerable" : each["vulnerable"]}
if self.args.filter:
if not self._filter_include_container(container_info):
continue
all_containers.append(container_info)
elif each["Type"] == "docker":
# Collect the docker containers
container = each["Id"]
ret = self._inspect_container(name=container)
status = ret["State"]["Status"]
image = ret['Config']['Image']
imageId = ret['Image']
command = u' '.join(ret['Config']['Cmd']) if ret['Config']['Cmd'] else ""
created = dateparse(ret['Created']).strftime("%F %H:%M") # pylint: disable=no-member
container_info = {"type" : "docker", "container" : container,
"image" : image, "image_id" : imageId, "command" : command,
"created" : created, "status" : status,
"runtime" : "Docker", "vulnerable" : each["vulnerable"]}
if self.args.filter:
if not self._filter_include_container(container_info):
continue
all_containers.append(container_info)
if not all_containers:
return
if self.args.json:
util.write_out(json.dumps(all_containers))
return
if self.args.truncate:
max_len_container = 12
max_len_image = 20
max_len_command = 20
else:
max_len_container = max(max([len(s["container"]) for s in all_containers]), 12)
max_len_image = max(max([len(s["image"]) for s in all_containers]), 20)
max_len_command = max(max([len(s["command"]) for s in all_containers]), 20)
if self.args.quiet:
for container in all_containers:
util.write_out(container["container"][0:max_len_container])
return
col_out = "{0:2} {1:%s} {2:%s} {3:%s} {4:16} {5:9} {6:10}" % (max_len_container, max_len_image, max_len_command)
if self.args.heading:
util.write_out(col_out.format(" ",
"CONTAINER ID",
"IMAGE",
"COMMAND",
"CREATED",
"STATUS",
"RUNTIME"))
#if self.args.truncate:
for container in all_containers:
indicator = ""
if container["vulnerable"]:
if util.is_python2:
indicator = indicator + self.skull + " "
else:
indicator = indicator + str(self.skull, "utf-8") + " "
util.write_out(col_out.format(indicator,
container["container"][0:max_len_container],
container["image"][0:max_len_image],
container["command"][0:max_len_command],
container["created"][0:16],
container["status"][0:9],
container["runtime"][0:10]))
def ps(self):
all_containers = []
vuln_ids = self.get_vulnerable_ids()
all_vuln_info = json.loads(self.get_all_vulnerable_info())
# Collect the system containers
for i in self.syscontainers.get_system_containers():
i["vulnerable"] = i['Id'] in vuln_ids
if i["vulnerable"]:
i["vuln_info"] = all_vuln_info[i['Id']]
else:
i["vuln_info"] = dict()
all_containers.append(i)
# Collect the docker containers
for container in [x["Id"] for x in self.d.containers(all=self.args.all)]:
ret = self._inspect_container(name=container)
ret["Type"] = "docker"
ret["vulnerable"] = ret["Image"] in vuln_ids
if ret["vulnerable"]:
ret["vuln_info"] = all_vuln_info[ret["ImageId"]]
else:
ret["vuln_info"] = dict()
all_containers.append(ret)
return all_containers
def _filter_include_container(self, container_info):
filterables = ["container", "image", "command", "created", "status", "runtime"]
for j in self.args.filter:
var, value = str(j).split("=")
var = var.lower()
if var == "id" or var == "containerid":
var = "container"
if var not in filterables: # If the filter does not exist, default to allowing all containers through
continue
if value not in container_info[var]:
return False
return True