-
Notifications
You must be signed in to change notification settings - Fork 187
/
run.py
266 lines (239 loc) · 9.77 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding: utf-8 -*-
#
# Copyright (c) 2019-2021 VMware, Inc. All Rights Reserved.
# SPDX-License-Identifier: BSD-2-Clause
"""
Run analysis on a Dockerfile
"""
import docker
import logging
import os
import subprocess # nosec
from tern.utils import constants
from tern.utils import rootfs
from tern.classes.notice import Notice
from tern.classes.image_layer import ImageLayer
from tern.classes.image import Image
from tern.classes.package import Package
from tern.load import docker_api
from tern import prep
from tern.analyze import common
from tern.analyze.default import filter as fltr
from tern.analyze.default.dockerfile import parse
from tern.analyze.default.dockerfile import lock
from tern.analyze.default.container import run as crun
from tern.analyze.default.container import image as cimage
from tern.report import report
from tern.report import formats
# global logger
logger = logging.getLogger(constants.logger_name)
def get_dockerfile_packages():
'''Given a Dockerfile return an approximate image object. This is mosty
guess work and shouldn't be relied on for accurate information. Add
Notice messages indicating as such:
1. Create an image with a placeholder repotag
2. For each RUN command, create a package list
3. Create layer objects with incremental integers and add the package
list to that layer with a Notice about parsing
4. Return stub image'''
stub_image = Image('easteregg:cookie')
layer_count = 0
for cmd in lock.docker_commands:
if cmd['instruction'] == 'RUN':
layer_count = layer_count + 1
layer = ImageLayer(layer_count)
install_commands, msg = fltr.filter_install_commands(cmd['value'])
if msg:
layer.origins.add_notice_to_origins(
cmd['value'], Notice(msg, 'info'))
pkg_names = []
for command in install_commands:
pkg_names.append(fltr.get_installed_package_names(command))
for pkg_name in pkg_names:
pkg = Package(pkg_name)
# shell parser does not parse version pins yet
# when that is enabled, Notices for no versions need to be
# added here
layer.add_package(pkg)
return stub_image
def analyze_full_image(full_image, options):
"""If we are able to load a full image after a build, we can run an
analysis on it"""
# set up for analysis
crun.setup(full_image)
# analyze image
cimage.default_analyze(full_image, options)
# clean up after analysis
rootfs.clean_up()
# we should now be able to set imported layers
lock.set_imported_layers(full_image)
# save to the cache
common.save_to_cache(full_image)
return [full_image]
def load_base_image():
'''Create base image from dockerfile instructions and return the image'''
base_image, dockerfile_lines = lock.get_dockerfile_base()
# try to get image metadata
if docker_api.dump_docker_image(base_image.repotag):
# now see if we can load the image
try:
base_image.load_image(load_until_layer=0)
except (NameError,
subprocess.CalledProcessError,
IOError,
docker.errors.APIError,
ValueError,
EOFError) as error:
logger.warning('Error in loading base image: %s', str(error))
base_image.origins.add_notice_to_origins(
dockerfile_lines, Notice(str(error), 'error'))
return base_image
return None
def analyze_base_image(base_image, options):
"""If we are unable to load the full image, we will try to analyze
the base image and try to extrapolate"""
# set up for analysis
crun.setup(base_image)
# analyze image
cimage.default_analyze(base_image, options)
# clean up
rootfs.clean_up()
# save the base image to cache
common.save_to_cache(base_image)
# let's try to figure out what packages were going to be installed in
# the dockerfile anyway
stub_image = get_dockerfile_packages()
return [base_image, stub_image]
def full_image_analysis(dfile, options):
"""This subroutine is executed when a Dockerfile is successfully built"""
image_list = []
# attempt to load the built image metadata
full_image = cimage.load_full_image(dfile, image_type='docker')
if full_image.origins.is_empty():
# Add an image origin here
full_image.origins.add_notice_origin(
formats.dockerfile_image.format(dockerfile=dfile))
image_list = analyze_full_image(full_image, options)
else:
# we cannot analyze the full image, but maybe we can
# analyze the base image
logger.error('Cannot retrieve full image metadata')
# cleanup for full images
if not options.keep_wd:
prep.clean_image_tars(full_image)
return image_list
def base_and_run_analysis(dfile, options):
"""This subroutine is executed when a Dockerfile fails build. It returns
a base image and any RUN commands in the Dockerfile."""
image_list = []
# Try to analyze the base image
logger.debug('Analyzing base image...')
# this will pull, dump and load the base image
base_image = load_base_image()
if base_image:
if base_image.origins.is_empty():
# add a notice stating failure to build image
base_image.origins.add_notice_to_origins(dfile, Notice(
formats.image_build_failure, 'warning'))
image_list = analyze_base_image(base_image, options)
else:
# we cannot load the base image
logger.warning('Cannot retrieve base image metadata')
# cleanup for base images
if not options.keep_wd:
prep.clean_image_tars(base_image)
else:
logger.error('Cannot analyze base image')
return image_list
def analyze_single_dockerfile(dockerfile, options):
"""Run image analysis for a single Dockerfile. Return a list of images.
Inputs are:
dockerfile: the Dockerfile file
redo: True if analysis needs to be redone
driver: The filesystem driver used
keep_wd: True if the working directory must not be cleaned up
extend: the extension used for analysis"""
# attempt to build the image
logger.debug('Building Docker image with Dockerfile: %s', dockerfile)
image_info = docker_api.build_and_dump(dockerfile)
image_list = []
if image_info:
logger.debug('Docker image successfully built. Analyzing...')
# analyze the full image
image_list = full_image_analysis(dockerfile, options)
base_import_str = image_list[0].layers[0].import_str
if not base_import_str and image_info.get('RepoDigests'):
image_string = image_info.get('RepoDigests')[0]
image_list[0].layers[0].import_str = image_string
else:
# cannot build the image
logger.warning('Cannot build image')
# analyze the base image and any RUN lines in the Dockerfile
image_list = base_and_run_analysis(dockerfile, options)
return image_list
def execute_dockerfile(args, locking=False):
"""Execution path for Dockerfiles"""
dfile = ''
if locking:
dfile = args.lock
else:
dfile = args.dockerfile
image_list = []
logger.debug("Parsing Dockerfile...")
dfobj = parse.get_dockerfile_obj(dfile)
# expand potential ARG values so base image tag is correct
parse.expand_arg(dfobj)
parse.expand_vars(dfobj)
# Store dockerfile path and commands so we can access it during execution
lock.load_docker_commands(dfobj)
if dfobj.is_multistage:
image_list = analyze_multistage_dockerfile(dfobj, args)
else:
image_list = analyze_single_dockerfile(dfile, args)
# generate report based on what images were created
if image_list:
if not locking:
report.report_out(args, *image_list)
else:
logger.debug('Generating locked Dockerfile...')
parse.expand_from_images(dfobj, image_list)
# we can only lock one image for now
locked_dfobj = lock.lock_dockerfile(dfobj, image_list[0])
output = lock.create_locked_dockerfile(locked_dfobj)
lock.write_locked_dockerfile(output, args.output_file)
def write_dockerfile_stages(dfobj):
"""Given a Dockerfile object, create Dockerfiles for each of the
stages for analysis. Return a list of Dockerfiles"""
stages = parse.get_dockerfile_stages(dfobj)
dockerfiles = []
filepath, filename = os.path.split(dfobj.filepath)
for stage in stages:
stagefile = os.path.join(
filepath, '{}_{}'.format(filename, stages.index(stage) + 1))
with open(stagefile, 'w', encoding='utf-8') as f:
f.write(stage)
dockerfiles.append(stagefile)
return dockerfiles
def clean_dockerfile_stages(dockerfiles):
"""Remove all the intermediate dockerfiles"""
for dockerfile in dockerfiles:
os.remove(dockerfile)
def analyze_multistage_dockerfile(dfobj, options):
"""Split the multistage dockerfile, and then analyze on each stage.
Inputs:
dfobj: the Dockerfile object
redo: True when we want to redo the analysis
driver: the filesystem driver to use
keep_wd: keep working directory
extend: the extension to use for analysis"""
# split the multistage dockerfile into single stages for analysis
dockerfiles = write_dockerfile_stages(dfobj)
image_list = []
for dfile in dockerfiles:
imlist = analyze_single_dockerfile(dfile, options)
image_list.extend(imlist)
clean_dockerfile_stages(dockerfiles)
# finally build the existing Dockerfile
finallist = analyze_single_dockerfile(dfobj.filepath, options)
image_list.extend(finallist)
return image_list