/
buildcomponent.py
596 lines (491 loc) · 23.7 KB
/
buildcomponent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
import datetime
import os
import time
import logging
import json
import trollius
from autobahn.wamp.exception import ApplicationError
from trollius import From, Return
from buildman.server import BuildJobResult
from buildman.component.basecomponent import BaseComponent
from buildman.component.buildparse import extract_current_step
from buildman.jobutil.buildjob import BuildJobLoadException
from buildman.jobutil.buildstatus import StatusHandler
from buildman.jobutil.workererror import WorkerError
from app import app
from data.database import BUILD_PHASE, UseThenDisconnect
from data.model import InvalidRepositoryBuildException
from data.registry_model import registry_model
from util import slash_join
HEARTBEAT_DELTA = datetime.timedelta(seconds=60)
BUILD_HEARTBEAT_DELAY = datetime.timedelta(seconds=30)
HEARTBEAT_TIMEOUT = 10
INITIAL_TIMEOUT = 25
SUPPORTED_WORKER_VERSIONS = ["0.3"]
# Label which marks a manifest with its source build ID.
INTERNAL_LABEL_BUILD_UUID = "quay.build.uuid"
logger = logging.getLogger(__name__)
class ComponentStatus(object):
""" ComponentStatus represents the possible states of a component. """
JOINING = "joining"
WAITING = "waiting"
RUNNING = "running"
BUILDING = "building"
TIMED_OUT = "timeout"
class BuildComponent(BaseComponent):
""" An application session component which conducts one (or more) builds. """
def __init__(self, config, realm=None, token=None, **kwargs):
self.expected_token = token
self.builder_realm = realm
self.parent_manager = None
self.registry_hostname = None
self._component_status = ComponentStatus.JOINING
self._last_heartbeat = None
self._current_job = None
self._build_status = None
self._image_info = None
self._worker_version = None
BaseComponent.__init__(self, config, **kwargs)
def kind(self):
return "builder"
def onConnect(self):
self.join(self.builder_realm)
@trollius.coroutine
def onJoin(self, details):
logger.debug("Registering methods and listeners for component %s", self.builder_realm)
yield From(self.register(self._on_ready, u"io.quay.buildworker.ready"))
yield From(
self.register(self._determine_cache_tag, u"io.quay.buildworker.determinecachetag")
)
yield From(self.register(self._ping, u"io.quay.buildworker.ping"))
yield From(self.register(self._on_log_message, u"io.quay.builder.logmessagesynchronously"))
yield From(self.subscribe(self._on_heartbeat, u"io.quay.builder.heartbeat"))
yield From(self._set_status(ComponentStatus.WAITING))
@trollius.coroutine
def start_build(self, build_job):
""" Starts a build. """
if self._component_status not in (ComponentStatus.WAITING, ComponentStatus.RUNNING):
logger.debug(
"Could not start build for component %s (build %s, worker version: %s): %s",
self.builder_realm,
build_job.repo_build.uuid,
self._worker_version,
self._component_status,
)
raise Return()
logger.debug(
"Starting build for component %s (build %s, worker version: %s)",
self.builder_realm,
build_job.repo_build.uuid,
self._worker_version,
)
self._current_job = build_job
self._build_status = StatusHandler(self.build_logs, build_job.repo_build.uuid)
self._image_info = {}
yield From(self._set_status(ComponentStatus.BUILDING))
# Send the notification that the build has started.
build_job.send_notification("build_start")
# Parse the build configuration.
try:
build_config = build_job.build_config
except BuildJobLoadException as irbe:
yield From(self._build_failure("Could not load build job information", irbe))
raise Return()
base_image_information = {}
# Add the pull robot information, if any.
if build_job.pull_credentials:
base_image_information["username"] = build_job.pull_credentials.get("username", "")
base_image_information["password"] = build_job.pull_credentials.get("password", "")
# Retrieve the repository's fully qualified name.
repo = build_job.repo_build.repository
repository_name = repo.namespace_user.username + "/" + repo.name
# Parse the build queue item into build arguments.
# build_package: URL to the build package to download and untar/unzip.
# defaults to empty string to avoid requiring a pointer on the builder.
# sub_directory: The location within the build package of the Dockerfile and the build context.
# repository: The repository for which this build is occurring.
# registry: The registry for which this build is occuring (e.g. 'quay.io').
# pull_token: The token to use when pulling the cache for building.
# push_token: The token to use to push the built image.
# tag_names: The name(s) of the tag(s) for the newly built image.
# base_image: The image name and credentials to use to conduct the base image pull.
# username: The username for pulling the base image (if any).
# password: The password for pulling the base image (if any).
context, dockerfile_path = self.extract_dockerfile_args(build_config)
build_arguments = {
"build_package": build_job.get_build_package_url(self.user_files),
"context": context,
"dockerfile_path": dockerfile_path,
"repository": repository_name,
"registry": self.registry_hostname,
"pull_token": build_job.repo_build.access_token.get_code(),
"push_token": build_job.repo_build.access_token.get_code(),
"tag_names": build_config.get("docker_tags", ["latest"]),
"base_image": base_image_information,
}
# If the trigger has a private key, it's using git, thus we should add
# git data to the build args.
# url: url used to clone the git repository
# sha: the sha1 identifier of the commit to check out
# private_key: the key used to get read access to the git repository
private_key = None
if (
build_job.repo_build.trigger is not None
and build_job.repo_build.trigger.secure_private_key is not None
):
private_key = build_job.repo_build.trigger.secure_private_key.decrypt()
if private_key is not None:
build_arguments["git"] = {
"url": build_config["trigger_metadata"].get("git_url", ""),
"sha": BuildComponent._commit_sha(build_config),
"private_key": private_key or "",
}
# If the build args have no buildpack, mark it as a failure before sending
# it to a builder instance.
if not build_arguments["build_package"] and not build_arguments["git"]:
logger.error(
"%s: insufficient build args: %s",
self._current_job.repo_build.uuid,
build_arguments,
)
yield From(self._build_failure("Insufficient build arguments. No buildpack available."))
raise Return()
# Invoke the build.
logger.debug("Invoking build: %s", self.builder_realm)
logger.debug("With Arguments: %s", build_arguments)
def build_complete_callback(result):
""" This function is used to execute a coroutine as the callback. """
trollius.ensure_future(self._build_complete(result))
self.call("io.quay.builder.build", **build_arguments).add_done_callback(
build_complete_callback
)
# Set the heartbeat for the future. If the builder never receives the build call,
# then this will cause a timeout after 30 seconds. We know the builder has registered
# by this point, so it makes sense to have a timeout.
self._last_heartbeat = datetime.datetime.utcnow() + BUILD_HEARTBEAT_DELAY
@staticmethod
def extract_dockerfile_args(build_config):
dockerfile_path = build_config.get("build_subdir", "")
context = build_config.get("context", "")
if not (dockerfile_path == "" or context == ""):
# This should not happen and can be removed when we centralize validating build_config
dockerfile_abspath = slash_join("", dockerfile_path)
if ".." in os.path.relpath(dockerfile_abspath, context):
return os.path.split(dockerfile_path)
dockerfile_path = os.path.relpath(dockerfile_abspath, context)
return context, dockerfile_path
@staticmethod
def _commit_sha(build_config):
""" Determines whether the metadata is using an old schema or not and returns the commit. """
commit_sha = build_config["trigger_metadata"].get("commit", "")
old_commit_sha = build_config["trigger_metadata"].get("commit_sha", "")
return commit_sha or old_commit_sha
@staticmethod
def name_and_path(subdir):
""" Returns the dockerfile path and name """
if subdir.endswith("/"):
subdir += "Dockerfile"
elif not subdir.endswith("Dockerfile"):
subdir += "/Dockerfile"
return os.path.split(subdir)
@staticmethod
def _total_completion(statuses, total_images):
""" Returns the current amount completion relative to the total completion of a build. """
percentage_with_sizes = float(len(statuses.values())) / total_images
sent_bytes = sum([status["current"] for status in statuses.values()])
total_bytes = sum([status["total"] for status in statuses.values()])
return float(sent_bytes) / total_bytes * percentage_with_sizes
@staticmethod
def _process_pushpull_status(status_dict, current_phase, docker_data, images):
""" Processes the status of a push or pull by updating the provided status_dict and images. """
if not docker_data:
return
num_images = 0
status_completion_key = ""
if current_phase == "pushing":
status_completion_key = "push_completion"
num_images = status_dict["total_commands"]
elif current_phase == "pulling":
status_completion_key = "pull_completion"
elif current_phase == "priming-cache":
status_completion_key = "cache_completion"
else:
return
if "progressDetail" in docker_data and "id" in docker_data:
image_id = docker_data["id"]
detail = docker_data["progressDetail"]
if "current" in detail and "total" in detail:
images[image_id] = detail
status_dict[status_completion_key] = BuildComponent._total_completion(
images, max(len(images), num_images)
)
@trollius.coroutine
def _on_log_message(self, phase, json_data):
""" Tails log messages and updates the build status. """
# Update the heartbeat.
self._last_heartbeat = datetime.datetime.utcnow()
# Parse any of the JSON data logged.
log_data = {}
if json_data:
try:
log_data = json.loads(json_data)
except ValueError:
pass
# Extract the current status message (if any).
fully_unwrapped = ""
keys_to_extract = ["error", "status", "stream"]
for key in keys_to_extract:
if key in log_data:
fully_unwrapped = log_data[key]
break
# Determine if this is a step string.
current_step = None
current_status_string = str(fully_unwrapped.encode("utf-8"))
if current_status_string and phase == BUILD_PHASE.BUILDING:
current_step = extract_current_step(current_status_string)
# Parse and update the phase and the status_dict. The status dictionary contains
# the pull/push progress, as well as the current step index.
with self._build_status as status_dict:
try:
changed_phase = yield From(
self._build_status.set_phase(phase, log_data.get("status_data"))
)
if changed_phase:
logger.debug("Build %s has entered a new phase: %s", self.builder_realm, phase)
elif self._current_job.repo_build.phase == BUILD_PHASE.CANCELLED:
build_id = self._current_job.repo_build.uuid
logger.debug(
"Trying to move cancelled build into phase: %s with id: %s", phase, build_id
)
raise Return(False)
except InvalidRepositoryBuildException:
build_id = self._current_job.repo_build.uuid
logger.warning("Build %s was not found; repo was probably deleted", build_id)
raise Return(False)
BuildComponent._process_pushpull_status(status_dict, phase, log_data, self._image_info)
# If the current message represents the beginning of a new step, then update the
# current command index.
if current_step is not None:
status_dict["current_command"] = current_step
# If the json data contains an error, then something went wrong with a push or pull.
if "error" in log_data:
yield From(self._build_status.set_error(log_data["error"]))
if current_step is not None:
yield From(self._build_status.set_command(current_status_string))
elif phase == BUILD_PHASE.BUILDING:
yield From(self._build_status.append_log(current_status_string))
raise Return(True)
@trollius.coroutine
def _determine_cache_tag(
self, command_comments, base_image_name, base_image_tag, base_image_id
):
with self._build_status as status_dict:
status_dict["total_commands"] = len(command_comments) + 1
logger.debug(
"Checking cache on realm %s. Base image: %s:%s (%s)",
self.builder_realm,
base_image_name,
base_image_tag,
base_image_id,
)
tag_found = self._current_job.determine_cached_tag(base_image_id, command_comments)
raise Return(tag_found or "")
@trollius.coroutine
def _build_failure(self, error_message, exception=None):
""" Handles and logs a failed build. """
yield From(
self._build_status.set_error(
error_message, {"internal_error": str(exception) if exception else None}
)
)
build_id = self._current_job.repo_build.uuid
logger.warning("Build %s failed with message: %s", build_id, error_message)
# Mark that the build has finished (in an error state)
yield From(self._build_finished(BuildJobResult.ERROR))
@trollius.coroutine
def _build_complete(self, result):
""" Wraps up a completed build. Handles any errors and calls self._build_finished. """
build_id = self._current_job.repo_build.uuid
try:
# Retrieve the result. This will raise an ApplicationError on any error that occurred.
result_value = result.result()
kwargs = {}
# Note: If we are hitting an older builder that didn't return ANY map data, then the result
# value will be a bool instead of a proper CallResult object.
# Therefore: we have a try-except guard here to ensure we don't hit this pitfall.
try:
kwargs = result_value.kwresults
except:
pass
try:
yield From(self._build_status.set_phase(BUILD_PHASE.COMPLETE))
except InvalidRepositoryBuildException:
logger.warning("Build %s was not found; repo was probably deleted", build_id)
raise Return()
yield From(self._build_finished(BuildJobResult.COMPLETE))
# Label the pushed manifests with the build metadata.
manifest_digests = kwargs.get("digests") or []
repository = registry_model.lookup_repository(
self._current_job.namespace, self._current_job.repo_name
)
if repository is not None:
for digest in manifest_digests:
with UseThenDisconnect(app.config):
manifest = registry_model.lookup_manifest_by_digest(
repository, digest, require_available=True
)
if manifest is None:
continue
registry_model.create_manifest_label(
manifest, INTERNAL_LABEL_BUILD_UUID, build_id, "internal", "text/plain"
)
# Send the notification that the build has completed successfully.
self._current_job.send_notification(
"build_success", image_id=kwargs.get("image_id"), manifest_digests=manifest_digests
)
except ApplicationError as aex:
worker_error = WorkerError(aex.error, aex.kwargs.get("base_error"))
# Write the error to the log.
yield From(
self._build_status.set_error(
worker_error.public_message(),
worker_error.extra_data(),
internal_error=worker_error.is_internal_error(),
requeued=self._current_job.has_retries_remaining(),
)
)
# Send the notification that the build has failed.
self._current_job.send_notification(
"build_failure", error_message=worker_error.public_message()
)
# Mark the build as completed.
if worker_error.is_internal_error():
logger.exception(
"[BUILD INTERNAL ERROR: Remote] Build ID: %s: %s",
build_id,
worker_error.public_message(),
)
yield From(self._build_finished(BuildJobResult.INCOMPLETE))
else:
logger.debug("Got remote failure exception for build %s: %s", build_id, aex)
yield From(self._build_finished(BuildJobResult.ERROR))
# Remove the current job.
self._current_job = None
@trollius.coroutine
def _build_finished(self, job_status):
""" Alerts the parent that a build has completed and sets the status back to running. """
yield From(self.parent_manager.job_completed(self._current_job, job_status, self))
# Set the component back to a running state.
yield From(self._set_status(ComponentStatus.RUNNING))
@staticmethod
def _ping():
""" Ping pong. """
return "pong"
@trollius.coroutine
def _on_ready(self, token, version):
logger.debug('On ready called (token "%s")', token)
self._worker_version = version
if not version in SUPPORTED_WORKER_VERSIONS:
logger.warning(
'Build component (token "%s") is running an out-of-date version: %s', token, version
)
raise Return(False)
if self._component_status != ComponentStatus.WAITING:
logger.warning('Build component (token "%s") is already connected', self.expected_token)
raise Return(False)
if token != self.expected_token:
logger.warning(
'Builder token mismatch. Expected: "%s". Found: "%s"', self.expected_token, token
)
raise Return(False)
yield From(self._set_status(ComponentStatus.RUNNING))
# Start the heartbeat check and updating loop.
loop = trollius.get_event_loop()
loop.create_task(self._heartbeat())
logger.debug("Build worker %s is connected and ready", self.builder_realm)
raise Return(True)
@trollius.coroutine
def _set_status(self, phase):
if phase == ComponentStatus.RUNNING:
yield From(self.parent_manager.build_component_ready(self))
self._component_status = phase
def _on_heartbeat(self):
""" Updates the last known heartbeat. """
if self._component_status == ComponentStatus.TIMED_OUT:
return
logger.debug("Got heartbeat on realm %s", self.builder_realm)
self._last_heartbeat = datetime.datetime.utcnow()
@trollius.coroutine
def _heartbeat(self):
""" Coroutine that runs every HEARTBEAT_TIMEOUT seconds, both checking the worker's heartbeat
and updating the heartbeat in the build status dictionary (if applicable). This allows
the build system to catch crashes from either end.
"""
yield From(trollius.sleep(INITIAL_TIMEOUT))
while True:
# If the component is no longer running or actively building, nothing more to do.
if (
self._component_status != ComponentStatus.RUNNING
and self._component_status != ComponentStatus.BUILDING
):
raise Return()
# If there is an active build, write the heartbeat to its status.
if self._build_status is not None:
with self._build_status as status_dict:
status_dict["heartbeat"] = int(time.time())
# Mark the build item.
current_job = self._current_job
if current_job is not None:
yield From(self.parent_manager.job_heartbeat(current_job))
# Check the heartbeat from the worker.
logger.debug("Checking heartbeat on realm %s", self.builder_realm)
if (
self._last_heartbeat
and self._last_heartbeat < datetime.datetime.utcnow() - HEARTBEAT_DELTA
):
logger.debug(
"Heartbeat on realm %s has expired: %s",
self.builder_realm,
self._last_heartbeat,
)
yield From(self._timeout())
raise Return()
logger.debug(
"Heartbeat on realm %s is valid: %s (%s).",
self.builder_realm,
self._last_heartbeat,
self._component_status,
)
yield From(trollius.sleep(HEARTBEAT_TIMEOUT))
@trollius.coroutine
def _timeout(self):
if self._component_status == ComponentStatus.TIMED_OUT:
raise Return()
yield From(self._set_status(ComponentStatus.TIMED_OUT))
logger.warning("Build component with realm %s has timed out", self.builder_realm)
# If we still have a running job, then it has not completed and we need to tell the parent
# manager.
if self._current_job is not None:
yield From(
self._build_status.set_error(
"Build worker timed out",
internal_error=True,
requeued=self._current_job.has_retries_remaining(),
)
)
build_id = self._current_job.build_uuid
logger.error("[BUILD INTERNAL ERROR: Timeout] Build ID: %s", build_id)
yield From(
self.parent_manager.job_completed(
self._current_job, BuildJobResult.INCOMPLETE, self
)
)
# Unregister the current component so that it cannot be invoked again.
self.parent_manager.build_component_disposed(self, True)
# Remove the job reference.
self._current_job = None
@trollius.coroutine
def cancel_build(self):
self.parent_manager.build_component_disposed(self, True)
self._current_job = None
yield From(self._set_status(ComponentStatus.RUNNING))