Skip to content

Commit

Permalink
Bug 1301477 - Convert artifact model into a set of etl methods (#2013)
Browse files Browse the repository at this point in the history
  • Loading branch information
wlach committed Dec 20, 2016
1 parent 53e4a95 commit 0f10dff
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 220 deletions.
Expand Up @@ -4,8 +4,8 @@

import pytest

from treeherder.model.derived import (ArtifactsModel,
JobsModel)
from treeherder.etl.artifact import store_job_artifacts
from treeherder.model.derived import JobsModel
from treeherder.model.models import (JobDetail,
TextLogError,
TextLogStep)
Expand Down Expand Up @@ -38,8 +38,7 @@ def max_length(field):
}),
'job_guid': job['job_guid']
}
with ArtifactsModel(test_project) as am:
am.load_job_artifacts([ji_artifact])
store_job_artifacts([ji_artifact])

assert JobDetail.objects.count() == 1

Expand Down Expand Up @@ -76,15 +75,14 @@ def test_load_textlog_summary_twice(test_project, test_job):
'job_guid': test_job.guid
}

with ArtifactsModel(test_project) as am:
am.load_job_artifacts([text_log_summary_artifact])
assert TextLogError.objects.count() == 1
assert TextLogStep.objects.count() == 1
# load again (simulating the job being parsed twice,
# which sometimes happens)
am.load_job_artifacts([text_log_summary_artifact])
assert TextLogError.objects.count() == 1
assert TextLogStep.objects.count() == 1
store_job_artifacts([text_log_summary_artifact])
assert TextLogError.objects.count() == 1
assert TextLogStep.objects.count() == 1
# load again (simulating the job being parsed twice,
# which sometimes happens)
store_job_artifacts([text_log_summary_artifact])
assert TextLogError.objects.count() == 1
assert TextLogStep.objects.count() == 1


def test_load_non_ascii_textlog_errors(test_project, eleven_jobs_stored):
Expand Down Expand Up @@ -122,8 +120,7 @@ def test_load_non_ascii_textlog_errors(test_project, eleven_jobs_stored):
}),
'job_guid': job['job_guid']
}
with ArtifactsModel(test_project) as am:
am.load_job_artifacts([text_log_summary_artifact])
store_job_artifacts([text_log_summary_artifact])

assert TextLogError.objects.count() == 2
assert TextLogError.objects.get(line_number=1587).line == '07:51:28 WARNING - \U000000c3'
Expand Down
10 changes: 5 additions & 5 deletions tests/etl/test_perf_data_adapters.py
Expand Up @@ -6,7 +6,7 @@
import pytest

from tests.test_utils import create_generic_job
from treeherder.etl.perf import load_perf_artifact
from treeherder.etl.perf import store_performance_artifact
from treeherder.model.models import (Push,
Repository)
from treeherder.perf.models import (PerformanceAlert,
Expand Down Expand Up @@ -85,7 +85,7 @@ def _generate_perf_data_range(test_project, test_repository,
submit_datum['blob'] = json.dumps({
'performance_data': submit_datum['blob']
})
load_perf_artifact(job, submit_datum)
store_performance_artifact(job, submit_datum)


def _verify_signature(repo_name, framework_name, suitename,
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_load_generic_data(test_project, test_repository,
'performance_data': submit_datum['blob']
})

load_perf_artifact(perf_job, submit_datum)
store_performance_artifact(perf_job, submit_datum)
assert 8 == PerformanceSignature.objects.all().count()
assert 1 == PerformanceFramework.objects.all().count()
framework = PerformanceFramework.objects.all()[0]
Expand Down Expand Up @@ -238,7 +238,7 @@ def test_load_generic_data(test_project, test_repository,
time=later_timestamp)
later_job = create_generic_job('lateguid', test_repository,
later_push.id, 2, generic_reference_data)
load_perf_artifact(later_job, submit_datum)
store_performance_artifact(later_job, submit_datum)
signature = PerformanceSignature.objects.get(
suite=perf_datum['suites'][0]['name'],
test=perf_datum['suites'][0]['subtests'][0]['name'])
Expand Down Expand Up @@ -288,7 +288,7 @@ def test_same_signature_multiple_performance_frameworks(test_project,
'performance_data': submit_datum['blob']
})

load_perf_artifact(perf_job, submit_datum)
store_performance_artifact(perf_job, submit_datum)

# we should have 2 performance signature objects, one for each framework
# and one datum for each signature
Expand Down
167 changes: 167 additions & 0 deletions treeherder/etl/artifact.py
@@ -0,0 +1,167 @@
import logging

import dateutil.parser
import simplejson as json
from django.db import transaction
from django.db.utils import IntegrityError

from treeherder.etl.perf import store_performance_artifact
from treeherder.etl.text import astral_filter
from treeherder.model import error_summary
from treeherder.model.models import (Job,
JobDetail,
TextLogError,
TextLogStep)

logger = logging.getLogger(__name__)


def store_job_info_artifact(job, job_info_artifact):
"""
Store the contents of the job info artifact
in job details
"""
job_details = json.loads(job_info_artifact['blob'])['job_details']
for job_detail in job_details:
job_detail_dict = {
'title': job_detail.get('title'),
'value': job_detail['value'],
'url': job_detail.get('url')
}
for (k, v) in job_detail_dict.items():
max_field_length = JobDetail._meta.get_field(k).max_length
if v is not None and len(v) > max_field_length:
logger.warning("Job detail '{}' for job_guid {} too long, "
"truncating".format(
v[:max_field_length],
job.guid))
job_detail_dict[k] = v[:max_field_length]

# move the url field to be updated in defaults now that it's
# had its size trimmed, if necessary
job_detail_dict['defaults'] = {'url': job_detail_dict['url']}
del job_detail_dict['url']

JobDetail.objects.update_or_create(
job=job,
**job_detail_dict)


def store_text_log_summary_artifact(job, text_log_summary_artifact):
"""
Store the contents of the text log summary artifact
"""
step_data = json.loads(
text_log_summary_artifact['blob'])['step_data']
result_map = {v: k for (k, v) in TextLogStep.RESULTS}
with transaction.atomic():
for step in step_data['steps']:
name = step['name'][:TextLogStep._meta.get_field('name').max_length]
# process start/end times if we have them
# we currently don't support timezones in treeherder, so
# just ignore that when importing/updating the bug to avoid
# a ValueError (though by default the text log summaries
# we produce should have time expressed in UTC anyway)
time_kwargs = {}
for tkey in ('started', 'finished'):
if step.get(tkey):
time_kwargs[tkey] = dateutil.parser.parse(
step[tkey], ignoretz=True)

log_step = TextLogStep.objects.create(
job=job,
started_line_number=step['started_linenumber'],
finished_line_number=step['finished_linenumber'],
name=name,
result=result_map[step['result']],
**time_kwargs)

if step.get('errors'):
for error in step['errors']:
TextLogError.objects.create(
step=log_step,
line_number=error['linenumber'],
line=astral_filter(error['line']))

# get error summary immediately (to warm the cache)
error_summary.get_error_summary(job)


def store_job_artifacts(artifact_data):
"""
Store a list of job artifacts. All of the datums in artifact_data need
to be in the following format:
{
'type': 'json',
'name': 'my-artifact-name',
# blob can be any kind of structured data
'blob': { 'stuff': [1, 2, 3, 4, 5] },
'job_guid': 'd22c74d4aa6d2a1dcba96d95dccbd5fdca70cf33'
}
"""
for index, artifact in enumerate(artifact_data):
# Determine what type of artifact we have received
if artifact:
artifact_name = artifact.get('name')
if not artifact_name:
logger.error("load_job_artifacts: Unnamed job artifact, "
"skipping")
continue
job_guid = artifact.get('job_guid')
if not job_guid:
logger.error("load_job_artifacts: Artifact '{}' with no "
"job guid set, skipping".format(
artifact_name))
continue

try:
job = Job.objects.get(guid=job_guid)
except Job.DoesNotExist:
logger.error(
('load_job_artifacts: No job_id for '
'guid {}'.format(job_guid)))
continue

if artifact_name == 'performance_data':
store_performance_artifact(job, artifact)
elif artifact_name == 'Job Info':
store_job_info_artifact(job, artifact)
elif artifact_name == 'text_log_summary':
try:
store_text_log_summary_artifact(job, artifact)
except IntegrityError:
logger.warning("Couldn't insert text log information "
"for job with guid %s, this probably "
"means the job was already parsed",
job_guid)
elif artifact_name == 'buildapi':
buildbot_request_id = json.loads(artifact['blob']).get(
'request_id')
if buildbot_request_id:
JobDetail.objects.update_or_create(
job=job,
title='buildbot_request_id',
value=str(buildbot_request_id))
else:
logger.warning("Unknown artifact type: %s submitted with job %s",
artifact_name, job.guid)
else:
logger.error(
('store_job_artifacts: artifact type '
'{} not understood'.format(artifact_name)))


def serialize_artifact_json_blobs(artifacts):
"""
Ensure that JSON artifact blobs passed as dicts are converted to JSON
"""
for artifact in artifacts:
blob = artifact['blob']
if (artifact['type'].lower() == 'json' and
not isinstance(blob, str) and
not isinstance(blob, unicode)):
artifact['blob'] = json.dumps(blob)

return artifacts
2 changes: 1 addition & 1 deletion treeherder/etl/perf.py
Expand Up @@ -173,7 +173,7 @@ def _load_perf_datum(job, perf_datum):
routing_key='generate_perf_alerts')


def load_perf_artifact(job, artifact):
def store_performance_artifact(job, artifact):
blob = json.loads(artifact['blob'])
performance_data = blob['performance_data']

Expand Down
13 changes: 6 additions & 7 deletions treeherder/log_parser/utils.py
Expand Up @@ -3,8 +3,9 @@

import simplejson as json

from treeherder.etl.artifact import (serialize_artifact_json_blobs,
store_job_artifacts)
from treeherder.log_parser.artifactbuildercollection import ArtifactBuilderCollection
from treeherder.model.derived import ArtifactsModel
from treeherder.model.models import JobLog

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,13 +56,11 @@ def post_log_artifacts(job_log):
raise

try:
serialized_artifacts = ArtifactsModel.serialize_artifact_json_blobs(
artifact_list)
project = job_log.job.repository.name
with ArtifactsModel(project) as artifacts_model:
artifacts_model.load_job_artifacts(serialized_artifacts)
serialized_artifacts = serialize_artifact_json_blobs(artifact_list)
store_job_artifacts(serialized_artifacts)
job_log.update_status(JobLog.PARSED)
logger.debug("Stored artifact for %s %s", project, job_log.job.id)
logger.debug("Stored artifact for %s %s", job_log.job.repository.name,
job_log.job.id)
except Exception as e:
logger.error("Failed to store parsed artifact for %s: %s", job_log.id, e)
raise

0 comments on commit 0f10dff

Please sign in to comment.