Skip to content

Commit

Permalink
Bug 1368989 - Fix integrity error during pulse job ingestion
Browse files Browse the repository at this point in the history
Instead of hitting an exception and logging that when we get two
jobs with the same guid during ingestion, we will now use a
get_or_create to try to create it, but not throw an exception if
we can't because it's JUST been created right after the check.

This case looks to only happen when a pending and running job
with the same guid are being processed by different workers/dynos
simultaneously.  So the worst that would happen would be that a
job would look pending longer and then become complete immediately.
  • Loading branch information
Cameron Dawson committed Jul 12, 2017
1 parent 3a16745 commit cf71576
Showing 1 changed file with 32 additions and 25 deletions.
57 changes: 32 additions & 25 deletions treeherder/etl/jobs.py
Expand Up @@ -251,32 +251,39 @@ def _load_job(repository, job_datum, push_id, lower_tier_signatures):
# exist yet)
job_guid_root = get_guid_root(job_guid)
if not Job.objects.filter(guid__in=[job_guid, job_guid_root]).exists():
# this could theoretically throw an exception if we were processing
# several updates simultaneously, but that should never happen --
# and if it does it's better just to error out
Job.objects.create(
# This could theoretically already have been created by another process
# that is running updates simultaneously. So just attempt to create
# it, but allow it to skip if it's the same guid. The odds are
# extremely high that this is a pending and running job that came in
# quick succession and are being processed by two different workers.
Job.objects.get_or_create(
guid=job_guid,
repository=repository,
signature=signature,
build_platform=build_platform,
machine_platform=machine_platform,
machine=machine,
option_collection_hash=option_collection_hash,
job_type=job_type,
product=product,
failure_classification=default_failure_classification,
who=who,
reason=reason,
result=result,
state=state,
tier=tier,
submit_time=submit_time,
start_time=start_time,
end_time=end_time,
last_modified=datetime.now(),
running_eta=duration,
push_id=push_id)

defaults={
"repository": repository,
"signature": signature,
"build_platform": build_platform,
"machine_platform": machine_platform,
"machine": machine,
"option_collection_hash": option_collection_hash,
"job_type": job_type,
"product": product,
"failure_classification": default_failure_classification,
"who": who,
"reason": reason,
"result": result,
"state": state,
"tier": tier,
"submit_time": submit_time,
"start_time": start_time,
"end_time": end_time,
"last_modified": datetime.now(),
"running_eta": duration,
"push_id": push_id
}
)
# Can't just use the ``job`` we would get from the ``get_or_create``
# because we need to try the job_guid_root instance first for update,
# rather than a possible retry job instance.
try:
job = Job.objects.get(guid=job_guid_root)
except ObjectDoesNotExist:
Expand Down

0 comments on commit cf71576

Please sign in to comment.