-
Notifications
You must be signed in to change notification settings - Fork 352
/
tasks.py
179 lines (139 loc) · 6.07 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, you can obtain one at http://mozilla.org/MPL/2.0/.
import os
from celery import task
from django.core.management import call_command
from django.conf import settings
from treeherder.model.models import Datasource, Repository
from treeherder.model.exchanges import TreeherderPublisher
from treeherder.model.pulse_publisher import load_schemas
# Load schemas for validation of messages published on pulse
SOURCE_FOLDER = os.path.dirname(os.path.realpath(__file__))
SCHEMA_FOLDER = os.path.join(SOURCE_FOLDER, '..', '..', 'schemas')
PULSE_SCHEMAS = load_schemas(SCHEMA_FOLDER)
class LazyPublisher():
"""
Singleton for lazily connecting to the pulse publisher.
"""
def __init__(self):
self.publisher = False
def get_publisher(self):
"""
Attempt to get the publisher.
"""
if self.publisher is not False:
return self.publisher;
# Create publisher, if username and password is present
publisher = None
if settings.PULSE_EXCHANGE_NAMESPACE:
self.publisher = TreeherderPublisher(
namespace=settings.PULSE_EXCHANGE_NAMESPACE,
uri=settings.PULSE_URI,
schemas=PULSE_SCHEMAS
)
else:
self.publisher = None
pulse_connection = LazyPublisher()
@task(name='process-objects')
def process_objects(limit=None, project=None):
"""
Process a number of objects from the objectstore
and load them to the jobs store
"""
from treeherder.model.derived.jobs import JobsModel
# default limit to 100
limit = limit or 100
if project:
projects_to_process = [project]
else:
projects_to_process = Datasource.objects.values_list(
'project', flat=True).distinct()
for project in projects_to_process:
with JobsModel(project) as jm:
jm.process_objects(limit)
# Run a maximum of 1 per hour
@task(name='cycle-data', rate_limit='1/h')
def cycle_data():
call_command('cycle_data')
@task(name='calculate-eta', rate_limit='1/h')
def calculate_eta(sample_window_seconds=21600, debug=False):
from treeherder.model.derived.jobs import JobsModel
projects = Repository.objects.filter(active_status='active').values_list('name', flat=True)
for project in projects:
with JobsModel(project) as jm:
jm.calculate_eta(sample_window_seconds, debug)
@task(name='populate-performance-series')
def populate_performance_series(project, series_type, series_data):
from treeherder.model.derived.jobs import JobsModel
with JobsModel(project) as jm:
for t_range in settings.TREEHERDER_PERF_SERIES_TIME_RANGES:
for signature in series_data:
jm.store_performance_series(
t_range['seconds'], series_type, signature,
series_data[signature]
)
@task(name='publish-job-action')
def publish_job_action(project, action, job_id, requester):
"""
Generic task to issue pulse notifications when jobs actions occur
(retrigger/cancel)
:param project str: The name of the project this action was requested for.
:param action str: The type of action performed (retrigger/cancel/etc..)
:param job_id str: The job id the action was requested for.
:param requester str: The email address associated with the request.
"""
publisher = pulse_connection.get_publisher()
if not publisher:
return
from treeherder.model.derived.jobs import JobsModel
with JobsModel(project) as jm:
job = jm.get_job(job_id)[0]
refdata = jm.get_job_reference_data(job['signature'])
publisher.job_action(
version=1,
build_system_type=refdata['build_system_type'],
project=project,
action=action,
job_guid=job['job_guid'],
# Job id is included for convenience as you need it in some cases
# instead of job_guid...
job_id=job['id'],
requester=requester
)
@task(name='publish-resultset')
def publish_resultset(project, ids):
# If we don't have a publisher (because of missing configs), then we can't
# publish any pulse messages. This is okay, local installs etc. doesn't
# need to publish on pulse, and requiring a pulse user is adding more
# overhead to an already large development setup process.
publisher = pulse_connection.get_publisher()
if not publisher:
return
from treeherder.model.derived.jobs import JobsModel
with JobsModel(project) as jm:
# Publish messages with new result-sets
for entry in jm.get_result_set_list_by_ids(ids):
repository = jm.refdata_model.get_repository_info(entry['repository_id'])
if repository is None:
return
entry['repository_url'] = repository['url']
# Don't expose these properties, they are internal, at least that's
# what I think without documentation I have no clue... what any of
# this is
del entry['revisions'] # Not really internal, but too big
del entry['repository_id']
# Set required properties
entry['version'] = 1
entry['project'] = project
# Property revision_hash should already be there, I suspect it is the
# result-set identifier...
# publish the data to pulse
publisher.new_result_set(**entry)
# Basically, I have no idea what context this runs and was inherently
# unable to make kombu with or without pyamqp, etc. confirm-publish,
# so we're stuck with this super ugly hack where we just close the
# connection so that if the process context is destroyed then at least
# messages will still get published... Well, assuming nothing goes
# wrong, because we're not using confirm channels for publishing...
publisher.connection.release()