-
Notifications
You must be signed in to change notification settings - Fork 45
/
0015_auto_20170731_1527.py
216 lines (168 loc) · 8.09 KB
/
0015_auto_20170731_1527.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2017-07-31 19:27
from __future__ import unicode_literals
import math
from django.db import migrations
from django.utils.timezone import now
from job.configuration.configurators import QueuedExecutionConfigurator
from job.configuration.data.job_data import JobData
from job.configuration.interface.job_interface import JobInterface
from job.configuration.json.job.job_config import JobConfiguration
from node.resources.json.resources import Resources
from node.resources.node_resources import NodeResources
from node.resources.resource import Cpus, Disk, Mem
MIN_MEM = 128.0
MIN_DISK = 0.0
def get_job_data(self):
"""Returns the data for this job
:returns: The data for this job
:rtype: :class:`job.configuration.data.job_data.JobData`
"""
return JobData(self.data)
def job_get_job_interface(self):
"""Returns the interface for this job
:returns: The interface for this job
:rtype: :class:`job.configuration.interface.job_interface.JobInterface`
"""
return JobInterface(self.job_type_rev.interface)
def job_get_resources(self):
"""Returns the resources required for this job
:returns: The required resources
:rtype: :class:`node.resources.node_resources.NodeResources`
"""
resources = self.job_type.get_resources()
# Calculate memory required in MiB rounded up to the nearest whole MiB
multiplier = self.job_type.mem_mult_required
const = self.job_type.mem_const_required
disk_in_required = self.disk_in_required
if not disk_in_required:
disk_in_required = 0.0
memory_mb = long(math.ceil(multiplier * disk_in_required + const))
memory_required = max(memory_mb, MIN_MEM)
# Calculate output space required in MiB rounded up to the nearest whole MiB
multiplier = self.job_type.disk_out_mult_required
const = self.job_type.disk_out_const_required
output_size_mb = long(math.ceil(multiplier * disk_in_required + const))
disk_out_required = max(output_size_mb, MIN_DISK)
resources.add(NodeResources([Mem(memory_required), Disk(disk_out_required + disk_in_required)]))
return resources
def get_job_configuration(self):
"""Returns default job configuration for this job type
:returns: The default job configuration for this job type
:rtype: :class:`job.configuration.json.job.job_config.JobConfiguration`
"""
return JobConfiguration(self.configuration)
def job_type_get_job_interface(self):
"""Returns the interface for running jobs of this type
:returns: The job interface for this type
:rtype: :class:`job.configuration.interface.job_interface.JobInterface`
"""
return JobInterface(self.interface)
def job_type_get_resources(self):
"""Returns the resources required for jobs of this type
:returns: The required resources
:rtype: :class:`node.resources.node_resources.NodeResources`
"""
resources = Resources(self.custom_resources).get_node_resources()
resources.remove_resource('cpus')
resources.remove_resource('mem')
resources.remove_resource('disk')
cpus = max(self.cpus_required, 0.25)
resources.add(NodeResources([Cpus(cpus)]))
return resources
def get_secrets_key(self):
"""Returns the reference key for job type secrets stored in the secrets backend.
:returns: The job_type name and version concatenated
:rtype: str
"""
return '-'.join([self.name, self.version]).replace('.', '_')
class Migration(migrations.Migration):
dependencies = [
('job', '0029_auto_20170707_1034'),
('product', '0010_auto_20170727_1349'),
('storage', '0008_auto_20170609_1443'),
('queue', '0014_queue'),
]
def populate_queue(apps, schema_editor):
from job.configuration.json.execution.exe_config import ExecutionConfiguration
# Go through all of the queued job models and re-populate the queue table
when_queued = now()
Job = apps.get_model('job', 'Job')
JobType = apps.get_model('job', 'JobType')
JobExecution = apps.get_model('job', 'JobExecution')
FileAncestryLink = apps.get_model('product', 'FileAncestryLink')
Queue = apps.get_model('queue', 'Queue')
ScaleFile = apps.get_model('storage', 'ScaleFile')
Workspace = apps.get_model('storage', 'Workspace')
# Attach needed methods to Job model
Job.get_job_data = get_job_data
Job.get_job_interface = job_get_job_interface
Job.get_resources = job_get_resources
# Attach needed methods to JobType model
JobType.get_job_configuration = get_job_configuration
JobType.get_job_interface = job_type_get_job_interface
JobType.get_resources = job_type_get_resources
JobType.get_secrets_key = get_secrets_key
total_count = Job.objects.filter(status='QUEUED').count()
print 'Populating new queue table for %s queued jobs' % str(total_count)
done_count = 0
batch_size = 1000
while done_count < total_count:
percent = (float(done_count) / float(total_count)) * 100.00
print 'Completed %s of %s jobs (%f%%)' % (done_count, total_count, percent)
batch_end = done_count + batch_size
job_qry = Job.objects.filter(status='QUEUED').select_related('job_type', 'job_type_rev')
job_qry = job_qry.order_by('id')[done_count:batch_end]
# Query for all input files
input_files = {}
input_file_ids = set()
for job in job_qry:
input_file_ids.update(job.get_job_data().get_input_file_ids())
if input_file_ids:
qry = ScaleFile.objects.select_related('workspace').filter(id__in=input_file_ids)
for input_file in qry.only('id', 'file_type', 'file_path', 'is_deleted', 'workspace__name').iterator():
input_files[input_file.id] = input_file
# Bulk create queue models
queues = []
configurator = QueuedExecutionConfigurator(input_files)
for job in job_qry:
config = configurator.configure_queued_job(job)
queue = Queue()
queue.job_type = job.job_type
queue.job = job
queue.exe_num = job.num_exes
queue.input_file_size = job.disk_in_required if job.disk_in_required else 0.0
queue.is_canceled = False
queue.priority = job.priority
queue.timeout = job.timeout
queue.interface = job.get_job_interface().get_dict()
queue.configuration = config.get_dict()
queue.resources = job.get_resources().get_json().get_dict()
queue.queued = when_queued
queues.append(queue)
if not queues:
return []
Queue.objects.bulk_create(queues)
done_count += batch_size
print 'All %s jobs completed' % str(total_count)
total_count = JobExecution.objects.filter(status='QUEUED').count()
print 'Updating file ancestry links for %s queued job executions' % str(total_count)
done_count = 0
batch_size = 1000
while done_count < total_count:
percent = (float(done_count) / float(total_count)) * 100.00
print 'Completed %s of %s queued job executions (%f%%)' % (done_count, total_count, percent)
batch_end = done_count + batch_size
job_exe_qry = JobExecution.objects.filter(status='QUEUED').defer('configuration', 'resources')
job_exe_qry = job_exe_qry.order_by('id')[done_count:batch_end]
job_exe_ids = []
for job_exe in job_exe_qry:
job_exe_ids.append(job_exe.id)
FileAncestryLink.objects.filter(job_exe_id__in=job_exe_ids).update(job_exe_id=None)
done_count += batch_size
print 'All file ancestry links for %s queued job executions completed' % str(total_count)
print 'Deleting %s queued job executions...' % str(total_count)
JobExecution.objects.filter(status='QUEUED').delete()
operations = [
migrations.RunPython(populate_queue),
]