Skip to content

Commit

Permalink
Add CIPD pin reporting to swarming.
Browse files Browse the repository at this point in the history
This will cause run_isolated.py to report the fully resolved versions of the CIPD packages that it actually ended up using. DM will use this information to ensure that all executions of a given Attempt use the same versions.

R=maruel@chromium.org, nodir@chromium.org, vadimsh@chromium.org
BUG=639975

Review-Url: https://codereview.chromium.org/2267363004
  • Loading branch information
riannucci authored and Commit bot committed Aug 30, 2016
1 parent e34a6e6 commit 1200be8
Show file tree
Hide file tree
Showing 17 changed files with 544 additions and 56 deletions.
99 changes: 97 additions & 2 deletions appengine/swarming/cipd.py
Expand Up @@ -4,6 +4,8 @@

"""CIPD-specific code is concentrated here."""

import contextlib
import logging
import re

# Regular expressions below are copied from
Expand All @@ -26,13 +28,99 @@
# os can be "linux", "mac" or "windows" and arch can be "386", "amd64" or
# "armv6l".
PARAM_PLATFORM = '${platform}'
PARAM_PLATFORM_ESC = re.escape(PARAM_PLATFORM)
# OS version parameter defines major and minor version of the OS distribution.
# It is useful if package depends on .dll/.so libraries provided by the OS.
# Example values: "ubuntu14_04", "mac10_9", "win6_1".
PARAM_OS_VER = '${os_ver}'
PARAM_OS_VER_ESC = re.escape(PARAM_OS_VER)
ALL_PARAMS = (PARAM_PLATFORM, PARAM_OS_VER)


@contextlib.contextmanager
def pin_check_fn(platform, os_ver):
"""Yields a function that verifies that an input CipdPackage could have been
plausibly expanded, via pinning, to another CipdPackage. Repeated invocations
of the function will retain knowledge of any resolved name template paramters
like ${platform} and ${os_ver}.
Args:
platform - a pre-defined expansion of ${platform}, or None to learn from the
first valid checked CipdPackage containing ${platform}.
os_ver - a pre-defined expansion of ${os_ver}, or None to learn from the
first valid checked CipdPackage containing ${os_ver}.
Args of yielded function:
original - a CipdPackage which may contain template params like ${platform}
expanded - a CipdPackage which is nominally an expansion of original.
CipdPackage is a duck-typed object which has three string properties:
'package_name', 'path' and 'version'.
Yielded function raises:
ValueError if expanded is not a valid derivation of original.
Example:
with pin_check_fn(None, None) as check:
check(CipdPackage('', '${platform}', 'ref'),
CipdPackage('', 'windows-amd64', 'deadbeef'*5))
check(CipdPackage('', '${platform}', 'ref'),
CipdPackage('', 'linux-amd64', 'deadbeef'*5)) ## will raise ValueError
"""
plat_ref = [platform]
os_ver_ref = [os_ver]
def _check_fn(original, expanded):
if original.path != expanded.path:
logging.warn('Mismatched path: %r v %r', original.path, expanded.path)
raise ValueError('Mismatched path')

def sub_param(regex, param_esc, param_re, param_const):
# This is validated at task creation time as well, but just to make sure.
if regex.count(param_esc) > 1:
logging.warn('Duplicate template param %r: %r', param_esc, regex)
raise ValueError('%s occurs more than once in name.' % param_esc)

ret = False
if param_const is None:
ret = param_esc in regex
if ret:
regex = regex.replace(param_esc, param_re, 1)
else:
regex = regex.replace(param_esc, param_const, 1)
return regex, ret

name_regex = re.escape(original.package_name)
name_regex, scan_plat = sub_param(
name_regex, PARAM_PLATFORM_ESC, r'(?P<platform>\w+-[a-z0-9]+)',
plat_ref[0])
name_regex, scan_os_ver = sub_param(
name_regex, PARAM_OS_VER_ESC, r'(?P<os_ver>[_a-z0-9]+)',
os_ver_ref[0])

match = re.match(name_regex, expanded.package_name)
if not match:
logging.warn('Mismatched package_name: %r | %r v %r',
original.package_name, name_regex, expanded.package_name)
raise ValueError('Mismatched package_name')

if is_valid_instance_id(original.version):
if original.version != expanded.version:
logging.warn('Mismatched pins: %r v %r', original.version,
expanded.version)
raise ValueError('Mismatched pins')
else:
if not is_valid_instance_id(expanded.version):
logging.warn('Pin not a pin: %r', expanded.version)
raise ValueError('Pin value is not a pin')

if scan_plat:
plat_ref[0] = re.escape(match.group('platform'))
if scan_os_ver:
os_ver_ref[0] = re.escape(match.group('os_ver'))

yield _check_fn


def is_valid_package_name(package_name):
"""Returns True if |package_name| is a valid CIPD package name."""
return bool(PACKAGE_NAME_RE.match(package_name))
Expand All @@ -43,13 +131,15 @@ def is_valid_package_name_template(template):
# Render known parameters first.
for p in ALL_PARAMS:
template = template.replace(p, 'x')
if template.count(p) > 1:
return False
return is_valid_package_name(template)


def is_valid_version(version):
"""Returns True if |version| is a valid CIPD package version."""
return bool(
INSTANCE_ID_RE.match(version) or
is_valid_instance_id(version) or
is_valid_tag(version) or
REF_RE.match(version)
)
Expand All @@ -63,6 +153,11 @@ def is_valid_tag(tag):
return bool(TAG_KEY_RE.match(tag.split(':', 1)[0]))


def is_valid_instance_id(version):
"""Returns True if |version| is an insance_id."""
return bool(INSTANCE_ID_RE.match(version))


def is_pinned_version(version):
"""Returns True if |version| is pinned."""
return bool(INSTANCE_ID_RE.match(version)) or is_valid_tag(version)
return is_valid_instance_id(version) or is_valid_tag(version)
84 changes: 84 additions & 0 deletions appengine/swarming/cipd_test.py
@@ -0,0 +1,84 @@
#!/usr/bin/env python
# coding: utf-8
# Copyright 2016 The LUCI Authors. All rights reserved.
# Use of this source code is governed under the Apache License, Version 2.0
# that can be found in the LICENSE file.

import collections
import logging
import re
import sys
import unittest

# Setups environment.
import test_env_handlers

import cipd
import swarming_rpcs


class TestPinChecker(unittest.TestCase):
def setUp(self):
super(TestPinChecker, self).setUp()
self.cp = collections.namedtuple('CipdPackage', 'path package_name version')

def test_correct_pins(self):
a = self.cp('path', 'package_name/${platform}-${os_ver}', 'latest')
b = self.cp('path', 'package_name/windows-amd64-something_10', 'deadbeef'*5)

with cipd.pin_check_fn(None, None) as check:
# will not raise
check(a, b)

a = self.cp('path', 'other/${platform}-${os_ver}', 'latest')
b = self.cp('path', 'other/windows-amd64-something_10', 'deadbeef'*5)

# will not raise
check(a, b)

def test_mismatched_pins(self):
# if a is already a pin, b must match its version exactly
a = self.cp('path', 'package_name/${platform}-${os_ver}', 'deadbeef'*5)
b = self.cp('path', 'package_name/windows-amd64-something_10', 'badc0ffe'*5)

with cipd.pin_check_fn(None, None) as check:
with self.assertRaisesRegexp(ValueError, 'Mismatched pins'):
check(a, b)

def test_mismatched_paths(self):
a = self.cp('path', 'package_name/${platform}-${os_ver}', 'latest')
b = self.cp('else', 'package_name/windows-amd64-something_10', 'deadbeef'*5)

with cipd.pin_check_fn(None, None) as check:
with self.assertRaisesRegexp(ValueError, 'Mismatched path'):
check(a, b)

def test_mismatched_names(self):
a = self.cp('', 'package_name/${platform}-${os_ver}', 'latest')
b = self.cp('', 'else/windows-amd64-something_10', 'deadbeef'*5)

with cipd.pin_check_fn(None, None) as check:
with self.assertRaisesRegexp(ValueError, 'Mismatched package_name'):
check(a, b)

a = self.cp('', 'package_name/${platform}-${os_ver}', 'latest')
b = self.cp('', 'package_name/windows-amd64-something_10', 'deadbeef'*5)
# will not raise
check(a, b)

# This doesn't match the previous knowledge of platform or os_ver, so it
# will not match.
a = self.cp('', 'package_name/${platform}-${os_ver}', 'latest')
b = self.cp('', 'package_name/linux-32-nerds', 'deadbeef'*5)

with self.assertRaisesRegexp(ValueError, 'Mismatched package_name'):
check(a, b)


if __name__ == '__main__':
if '-v' in sys.argv:
unittest.TestCase.maxDiff = None
logging.basicConfig(
level=logging.DEBUG if '-v' in sys.argv else logging.CRITICAL,
format='%(levelname)-7s %(filename)s:%(lineno)3d %(message)s')
unittest.main()
18 changes: 14 additions & 4 deletions appengine/swarming/handlers_bot.py
Expand Up @@ -632,9 +632,9 @@ class BotTaskUpdateHandler(_BotApiHandler):
out-of-order packets.
"""
ACCEPTED_KEYS = {
u'bot_overhead', u'cipd_stats', u'cost_usd', u'duration', u'exit_code',
u'hard_timeout', u'id', u'io_timeout', u'isolated_stats', u'output',
u'output_chunk_start', u'outputs_ref', u'task_id',
u'bot_overhead', u'cipd_pins', u'cipd_stats', u'cost_usd', u'duration',
u'exit_code', u'hard_timeout', u'id', u'io_timeout', u'isolated_stats',
u'output', u'output_chunk_start', u'outputs_ref', u'task_id',
}
REQUIRED_KEYS = {u'id', u'task_id'}

Expand All @@ -657,13 +657,14 @@ def post(self, task_id=None):
bot_auth.validate_bot_id_and_fetch_config(bot_id)

bot_overhead = request.get('bot_overhead')
cipd_pins = request.get('cipd_pins')
cipd_stats = request.get('cipd_stats')
cost_usd = request.get('cost_usd', 0)
duration = request.get('duration')
exit_code = request.get('exit_code')
hard_timeout = request.get('hard_timeout')
io_timeout = request.get('io_timeout')
isolated_stats = request.get('isolated_stats')
cipd_stats = request.get('cipd_stats')
output = request.get('output')
output_chunk_start = request.get('output_chunk_start')
outputs_ref = request.get('outputs_ref')
Expand Down Expand Up @@ -720,6 +721,14 @@ def unpack_base64(d, k):
if outputs_ref:
outputs_ref = task_request.FilesRef(**outputs_ref)

if cipd_pins:
cipd_pins = task_result.CipdPins(
client_package=task_request.CipdPackage(
**cipd_pins['client_package']),
packages=[
task_request.CipdPackage(**args) for args in cipd_pins['packages']]
)

try:
state = task_scheduler.bot_update_task(
run_result_key=run_result_key,
Expand All @@ -732,6 +741,7 @@ def unpack_base64(d, k):
io_timeout=io_timeout,
cost_usd=cost_usd,
outputs_ref=outputs_ref,
cipd_pins=cipd_pins,
performance_stats=performance_stats)
if not state:
logging.info('Failed to update, please retry')
Expand Down
22 changes: 22 additions & 0 deletions appengine/swarming/handlers_bot_test.py
Expand Up @@ -436,6 +436,17 @@ def test_complete_task_isolated(self):
u'isolatedserver': u'http://localhost:1',
u'namespace': u'default-gzip',
},
'cipd_pins': {
u'client_package': {
u'package_name': u'infra/tools/cipd/windows-amd64',
u'version': u'deadbeef'*5,
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'badc0fee'*5,
}]
},
'task_id': task_id,
}
response = self.post_json('/swarming/api/v1/bot/task_update', params)
Expand Down Expand Up @@ -464,6 +475,17 @@ def test_complete_task_isolated(self):
u'isolatedserver': u'http://localhost:1',
u'namespace': u'default-gzip',
},
'cipd_pins': {
u'client_package': {
u'package_name': u'infra/tools/cipd/windows-amd64',
u'version': u'deadbeef'*5,
},
u'packages': [{
u'package_name': u'rm',
u'path': u'bin',
u'version': u'badc0fee'*5,
}]
},
u'server_versions': [u'v1a'],
u'started_ts': str_now,
u'state': u'COMPLETED',
Expand Down
29 changes: 29 additions & 0 deletions appengine/swarming/handlers_frontend.py
Expand Up @@ -616,6 +616,17 @@ def get_request_and_result(self, task_id):
class TaskHandler(BaseTaskHandler):
"""Show the full text of a task request and its result."""

@staticmethod
def packages_grouped_by_path(flat_packages):
"""Returns sorted [(path, [PinInfo, ...])].
Used by user_task.html.
"""
retval = collections.defaultdict(list)
for pkg in flat_packages:
retval[pkg.path].append(pkg)
return sorted(retval.iteritems())

@auth.autologin
@auth.require(acl.is_user)
def get(self, task_id):
Expand Down Expand Up @@ -663,9 +674,27 @@ def get(self, task_id):
parent_task = parent_task_future.get_result()
children_tasks = [c.get_result() for c in children_tasks_futures]

cipd = None
if request.properties.cipd_input:
cipd = {
'server': request.properties.cipd_input.server,
'client_package': request.properties.cipd_input.client_package,
'packages': self.packages_grouped_by_path(
request.properties.cipd_input.packages),
}

cipd_pins = None
if result.cipd_pins:
cipd_pins = {
'client_package': result.cipd_pins.client_package,
'packages': self.packages_grouped_by_path(result.cipd_pins.packages),
}

params = {
'bot': bot_future.get_result() if bot_future else None,
'children_tasks': children_tasks,
'cipd': cipd,
'cipd_pins': cipd_pins,
'is_admin': acl.is_admin(),
'is_gae_admin': users.is_current_user_admin(),
'is_privileged_user': acl.is_privileged_user(),
Expand Down
16 changes: 15 additions & 1 deletion appengine/swarming/message_conversion.py
Expand Up @@ -200,6 +200,19 @@ def task_result_to_rpc(entity, send_stats):
outputs_ref = (
_ndb_to_rpc(swarming_rpcs.FilesRef, entity.outputs_ref)
if entity.outputs_ref else None)
cipd_pins = None
if entity.cipd_pins:
cipd_pins = swarming_rpcs.CipdPins(
client_package=(
_ndb_to_rpc(swarming_rpcs.CipdPackage,
entity.cipd_pins.client_package)
if entity.cipd_pins.client_package else None
),
packages=[
_ndb_to_rpc(swarming_rpcs.CipdPackage, pkg)
for pkg in entity.cipd_pins.packages
] if entity.cipd_pins.packages else None
)
performance_stats = None
if send_stats and entity.performance_stats.is_valid:
def op(entity):
Expand All @@ -213,8 +226,9 @@ def op(entity):
isolated_upload=op(entity.performance_stats.isolated_upload))
kwargs = {
'bot_dimensions': _string_list_pairs_from_dict(entity.bot_dimensions or {}),
'performance_stats': performance_stats,
'cipd_pins': cipd_pins,
'outputs_ref': outputs_ref,
'performance_stats': performance_stats,
'state': swarming_rpcs.StateField(entity.state),
}
if entity.__class__ is task_result.TaskRunResult:
Expand Down

0 comments on commit 1200be8

Please sign in to comment.