Skip to content

Commit

Permalink
Bug 1385401 - Support optional upstreamArtifact (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanLorenzo committed Jan 3, 2018
1 parent 12cedaa commit 4b6a85d
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 100 deletions.
66 changes: 56 additions & 10 deletions scriptworker/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from scriptworker.client import validate_artifact_url
from scriptworker.exceptions import ScriptWorkerRetryException, ScriptWorkerTaskException
from scriptworker.task import get_task_id, get_run_id, get_decision_task_id
from scriptworker.utils import download_file, filepaths_in_dir, raise_future_exceptions, retry_async
from scriptworker.utils import download_file, filepaths_in_dir, raise_future_exceptions, retry_async, add_enumerable_item_to_dict


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -325,24 +325,43 @@ def get_upstream_artifacts_full_paths_per_task_id(context):
context (scriptworker.context.Context): the scriptworker context.
Returns:
dict: lists of the paths to existing upstream artifacts, sorted by task_id
dict, dict: lists of the paths to upstream artifacts, sorted by task_id.
First dict represents the existing upstream artifacts. The second one
maps the optional artifacts that couldn't be downloaded
Raises:
scriptworker.exceptions.ScriptWorkerTaskException: when an artifact doesn't exist.
"""
upstream_artifacts = context.task['payload']['upstreamArtifacts']
task_ids_and_relative_paths = [
(artifact_definition['taskId'], artifact_definition['paths'])
for artifact_definition in context.task['payload']['upstreamArtifacts']
for artifact_definition in upstream_artifacts
]

return {
task_id: [
get_and_check_single_upstream_artifact_full_path(context, task_id, path)
for path in paths
]
for task_id, paths in task_ids_and_relative_paths
}
optional_artifacts_per_task_id = get_optional_artifacts_per_task_id(upstream_artifacts)

upstream_artifacts_full_paths_per_task_id = {}
failed_paths_per_task_id = {}
for task_id, paths in task_ids_and_relative_paths:
for path in paths:
try:
path_to_add = get_and_check_single_upstream_artifact_full_path(context, task_id, path)
add_enumerable_item_to_dict(
dict_=upstream_artifacts_full_paths_per_task_id,
key=task_id, item=path_to_add
)
except ScriptWorkerTaskException:
if path in optional_artifacts_per_task_id.get(task_id, []):
log.warn('Optional artifact "{}" of task "{}" not found'.format(path, task_id))
add_enumerable_item_to_dict(
dict_=failed_paths_per_task_id,
key=task_id, item=path
)
else:
raise

return upstream_artifacts_full_paths_per_task_id, failed_paths_per_task_id


def get_and_check_single_upstream_artifact_full_path(context, task_id, path):
Expand Down Expand Up @@ -387,3 +406,30 @@ def get_single_upstream_artifact_full_path(context, task_id, path):
"""
return os.path.abspath(os.path.join(context.config['work_dir'], 'cot', task_id, path))


def get_optional_artifacts_per_task_id(upstream_artifacts):
"""Return every optional artifact defined in ``upstream_artifacts``, ordered by taskId.
Args:
upstream_artifacts: the list of upstream artifact definitions
Returns:
dict: list of paths to downloaded artifacts ordered by taskId
"""
# A given taskId might be defined many times in upstreamArtifacts. Thus, we can't
# use a dict comprehension
optional_artifacts_per_task_id = {}

for artifact_definition in upstream_artifacts:
if artifact_definition.get('optional', False) is True:
task_id = artifact_definition['taskId']
artifacts_paths = artifact_definition['paths']

add_enumerable_item_to_dict(
dict_=optional_artifacts_per_task_id,
key=task_id, item=artifacts_paths
)

return optional_artifacts_per_task_id
172 changes: 124 additions & 48 deletions scriptworker/cot/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
import sys
import tempfile
from urllib.parse import unquote, urlparse
from scriptworker.artifacts import download_artifacts, get_artifact_url, get_single_upstream_artifact_full_path
from scriptworker.artifacts import download_artifacts, get_artifact_url, get_single_upstream_artifact_full_path, \
get_optional_artifacts_per_task_id
from scriptworker.config import read_worker_creds
from scriptworker.constants import DEFAULT_CONFIG
from scriptworker.context import Context
from scriptworker.exceptions import CoTError, DownloadError, ScriptWorkerGPGException
from scriptworker.gpg import get_body, GPG
from scriptworker.log import contextual_log_handler
from scriptworker.task import get_decision_task_id, get_parent_task_id, get_worker_type, get_task_id
from scriptworker.utils import format_json, get_hash, load_json, makedirs, match_url_regex, raise_future_exceptions, rm
from scriptworker.utils import format_json, get_hash, load_json, makedirs, match_url_regex, raise_future_exceptions, rm, \
get_results_and_future_exceptions, add_enumerable_item_to_dict
from taskcluster.exceptions import TaskclusterFailure

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -504,6 +506,12 @@ def verify_docker_image_sha(chain, link):
cot = link.cot
task = link.task
errors = []

if not cot:
log.warn('Chain of Trust for {} does not exist. See above log for more details. \
Skipping docker image sha verification'.format(task['taskId']))
return

if isinstance(task['payload'].get('image'), dict):
# Using pre-built image from docker-image task
docker_image_task_id = task['extra']['chainOfTrust']['inputs']['docker-image']
Expand Down Expand Up @@ -655,23 +663,37 @@ async def download_cot(chain):
DownloadError: on failure.
"""
async_tasks = []
mandatory_artifact_tasks = []
optional_artifact_tasks = []
# only deal with chain.links, which are previously finished tasks with
# signed chain of trust artifacts. ``chain.task`` is the current running
# task, and will not have a signed chain of trust artifact yet.
for link in chain.links:
task_id = link.task_id
url = get_artifact_url(chain.context, task_id, 'public/chainOfTrust.json.asc')
parent_dir = link.cot_dir
async_tasks.append(
asyncio.ensure_future(
download_artifacts(
chain.context, [url], parent_dir=parent_dir,
valid_artifact_task_ids=[task_id]
)
coroutine = asyncio.ensure_future(
download_artifacts(
chain.context, [url], parent_dir=parent_dir,
valid_artifact_task_ids=[task_id]
)
)
paths = await raise_future_exceptions(async_tasks)

if is_task_required_by_any_mandatory_artifact(chain, task_id):
mandatory_artifact_tasks.append(coroutine)
else:
optional_artifact_tasks.append(coroutine)

mandatory_artifacts_paths = await raise_future_exceptions(mandatory_artifact_tasks)
succeeded_optional_artifacts_paths, failed_optional_artifacts = \
await get_results_and_future_exceptions(optional_artifact_tasks)

if failed_optional_artifacts:
error_messages = '\n'.join([' * {}'.format(failure) for failure in failed_optional_artifacts])
log.warn('Could not download {} "chainOfTrust.json.asc". Although, they were not needed by \
any mandatory artifact. Continuing CoT verifications. Errors gotten: {}'.format(len(failed_optional_artifacts), error_messages))

paths = mandatory_artifacts_paths + succeeded_optional_artifacts_paths
for path in paths:
sha = get_hash(path[0])
log.debug("{} downloaded; hash is {}".format(path[0], sha))
Expand All @@ -695,6 +717,11 @@ async def download_cot_artifact(chain, task_id, path):
"""
link = chain.get_link(task_id)
log.debug("Verifying {} is in {} cot artifacts...".format(path, task_id))
if not link.cot:
log.warn('Chain of Trust for "{}" in {} does not exist. See above log for more details. \
Skipping download of this artifact'.format(path, task_id))
return

if path not in link.cot['artifacts']:
raise CoTError("path {} not in {} {} chain of trust artifacts!".format(path, link.name, link.task_id))
url = get_artifact_url(chain.context, task_id, path)
Expand All @@ -714,64 +741,109 @@ async def download_cot_artifact(chain, task_id, path):


# download_cot_artifacts {{{1
async def download_cot_artifacts(chain, artifact_dict):
"""Call ``download_cot_artifact`` in parallel for each key/value in ``artifact_dict``.
async def download_cot_artifacts(chain):
"""Call ``download_cot_artifact`` in parallel for each "upstreamArtifacts".
Optional artifacts are allowed to not be downloaded.
Args:
chain (ChainOfTrust): the chain of trust object
artifact_dict (dict): maps task_id to list of paths of artifacts to download
Returns:
list: list of full paths to artifacts
list: list of full paths to downloaded artifacts. Failed optional artifacts
aren't returned
Raises:
CoTError: on chain of trust sha validation error
DownloadError: on download error
CoTError: on chain of trust sha validation error, on a mandatory artifact
DownloadError: on download error on a mandatory artifact
"""
tasks = []
for task_id, paths in artifact_dict.items():
upstream_artifacts = chain.task['payload'].get('upstreamArtifacts', [])
all_artifacts_per_task_id = get_all_artifacts_per_task_id(chain, upstream_artifacts)

mandatory_artifact_tasks = []
optional_artifact_tasks = []
for task_id, paths in all_artifacts_per_task_id.items():
for path in paths:
tasks.append(
asyncio.ensure_future(
download_cot_artifact(
chain, task_id, path
)
)
)
full_paths = await raise_future_exceptions(tasks)
return full_paths
coroutine = asyncio.ensure_future(download_cot_artifact(chain, task_id, path))

if is_artifact_optional(chain, task_id, path):
optional_artifact_tasks.append(coroutine)
else:
mandatory_artifact_tasks.append(coroutine)

mandatory_artifacts_paths = await raise_future_exceptions(mandatory_artifact_tasks)
succeeded_optional_artifacts_paths, failed_optional_artifacts = \
await get_results_and_future_exceptions(optional_artifact_tasks)

if failed_optional_artifacts:
log.warn('Could not download {} artifacts: {}'.format(len(failed_optional_artifacts), failed_optional_artifacts))

# download_firefox_cot_artifacts {{{1
async def download_firefox_cot_artifacts(chain):
"""Download artifacts needed for firefox chain of trust verification.
return mandatory_artifacts_paths + succeeded_optional_artifacts_paths

This is only task-graph.json so far.

def is_task_required_by_any_mandatory_artifact(chain, task_id):
"""Tells whether a task has only optional artifacts or not.
Args:
chain (ChainOfTrust): the chain of trust object
task_id (str): the id of the aforementioned task
Returns:
list: list of full paths to artifacts
bool: True if task has one or several mandatory artifacts
"""
upstream_artifacts = chain.task['payload'].get('upstreamArtifacts', [])
all_artifacts_per_task_id = get_all_artifacts_per_task_id(chain, upstream_artifacts)
optional_artifacts_per_task_id = get_optional_artifacts_per_task_id(upstream_artifacts)

all_artifacts = all_artifacts_per_task_id.get(task_id, [])
optional_artifacts = optional_artifacts_per_task_id.get(task_id, [])
mandatory_artifacts = set(all_artifacts) - set(optional_artifacts)
return bool(mandatory_artifacts)

Raises:
CoTError: on chain of trust sha validation error
DownloadError: on download error

def is_artifact_optional(chain, task_id, path):
"""Tells whether an artifact is flagged as optional or not.
Args:
chain (ChainOfTrust): the chain of trust object
task_id (str): the id of the aforementioned task
Returns:
bool: True if artifact is optional
"""
upstream_artifacts = chain.task['payload'].get('upstreamArtifacts', [])
optional_artifacts_per_task_id = get_optional_artifacts_per_task_id(upstream_artifacts)
return path in optional_artifacts_per_task_id.get(task_id, [])


def get_all_artifacts_per_task_id(chain, upstream_artifacts):
"""Return every artifact to download, including the Chain Of Trust Artifacts.
Args:
chain (ChainOfTrust): the chain of trust object
upstream_artifacts: the list of upstream artifact definitions
Returns:
dict: list of paths to downloaded artifacts ordered by taskId
"""
artifact_dict = {}
all_artifacts_per_task_id = {}
for link in chain.links:
task_type = link.task_type
if task_type in PARENT_TASK_TYPES:
artifact_dict.setdefault(link.task_id, [])
artifact_dict[link.task_id].append('public/task-graph.json')
if 'upstreamArtifacts' in chain.task['payload']:
for upstream_dict in chain.task['payload']['upstreamArtifacts']:
artifact_dict.setdefault(upstream_dict['taskId'], [])
for path in upstream_dict['paths']:
artifact_dict[upstream_dict['taskId']].append(path)
return await download_cot_artifacts(chain, artifact_dict)
if link.task_type in PARENT_TASK_TYPES:
add_enumerable_item_to_dict(
dict_=all_artifacts_per_task_id, key=link.task_id, item='public/task-graph.json'
)

if upstream_artifacts:
for upstream_dict in upstream_artifacts:
add_enumerable_item_to_dict(
dict_=all_artifacts_per_task_id, key=upstream_dict['taskId'], item=upstream_dict['paths']
)

return all_artifacts_per_task_id


# verify_cot_signatures {{{1
Expand All @@ -798,7 +870,11 @@ def verify_cot_signatures(chain):
with open(path, "r") as fh:
contents = fh.read()
except OSError as exc:
raise CoTError("Can't read {}: {}!".format(path, str(exc)))
if is_task_required_by_any_mandatory_artifact(chain, link.task_id):
raise CoTError("Can't read mandatory {}: {}!".format(path, str(exc)))
else:
log.warn("Could not read optional {}. Continuing. Error gotten: {}!".format(path, str(exc)))
continue
try:
body = get_body(
gpg, contents,
Expand Down Expand Up @@ -1484,7 +1560,7 @@ async def verify_chain_of_trust(chain):
# verify the signatures and populate the ``link.cot``s
verify_cot_signatures(chain)
# download all other artifacts needed to verify chain of trust
await download_firefox_cot_artifacts(chain)
await download_cot_artifacts(chain)
# verify the task types, e.g. decision
task_count = await verify_task_types(chain)
check_num_tasks(chain, task_count)
Expand Down
Loading

0 comments on commit 4b6a85d

Please sign in to comment.