-
Notifications
You must be signed in to change notification settings - Fork 45
/
models.py
2052 lines (1634 loc) · 90.5 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Defines the database models for recipes and recipe types"""
from __future__ import unicode_literals
import copy
import logging
from collections import namedtuple
import django.contrib.postgres.fields
from django.db import connection, models, transaction
from django.db.models import Q
from django.utils.timezone import now
from data.data.data import Data
from data.data.json.data_v1 import convert_data_to_v1_json
from data.data.json.data_v6 import convert_data_to_v6_json, DataV6
from data.interface.interface import Interface
from data.interface.parameter import FileParameter
from job.models import Job, JobType
from messaging.manager import CommandMessageManager
from recipe.configuration.json.recipe_config_v6 import convert_config_to_v6_json, RecipeConfigurationV6
from recipe.definition.definition import RecipeDefinition
from recipe.definition.json.definition_v6 import convert_recipe_definition_to_v6_json, RecipeDefinitionV6
from recipe.definition.node import JobNodeDefinition, RecipeNodeDefinition
from recipe.diff.diff import RecipeDiff
from recipe.diff.json.diff_v6 import convert_recipe_diff_to_v6_json
from recipe.exceptions import CreateRecipeError, ReprocessError, SupersedeError, InactiveRecipeType
from recipe.instance.recipe import RecipeInstance
from recipe.instance.json.recipe_v6 import convert_recipe_to_v6_json, RecipeInstanceV6
from storage.models import ScaleFile, Workspace
from trigger.configuration.exceptions import InvalidTriggerType
from trigger.models import TriggerRule
from util import rest as rest_utils
from util.validation import ValidationWarning
logger = logging.getLogger(__name__)
RecipeNodeCopy = namedtuple('RecipeNodeCopy', ['superseded_recipe_id', 'recipe_id', 'node_names'])
RecipeNodeOutput = namedtuple('RecipeNodeOutput', ['node_name', 'node_type', 'id', 'output_data'])
RecipeTypeValidation = namedtuple('RecipeTypeValidation', ['is_valid', 'errors', 'warnings', 'diff'])
ForcedNodesValidation = namedtuple('ForcedNodesValidation', ['is_valid', 'errors', 'warnings', 'forced_nodes'])
INPUT_FILE_BATCH_SIZE = 500 # Maximum batch size for creating RecipeInputFile models
# IMPORTANT NOTE: Locking order
# Always adhere to the following model order for obtaining row locks via select_for_update() in order to prevent
# deadlocks and ensure query efficiency
# When editing a job/recipe type: RecipeType, JobType, TriggerRule
class RecipeManager(models.Manager):
"""Provides additional methods for handling recipes
"""
def complete_recipes(self, recipe_ids, when):
"""Marks the recipes with the given IDs as being completed
:param recipe_ids: The recipe IDs
:type recipe_ids: list
:param when: The time that the recipes were completed
:type when: :class:`datetime.datetime`
"""
qry = self.filter(id__in=recipe_ids, is_completed=False)
qry.update(is_completed=True, completed=when, last_modified=now())
def create_recipe_v6(self, recipe_type_rev, event_id=None, ingest_id=None, input_data=None, root_recipe_id=None, recipe_id=None,
recipe_config=None, batch_id=None, superseded_recipe=None, copy_superseded_input=False):
"""Creates a new recipe for the given recipe type revision and returns the (unsaved) recipe model
:param recipe_type_rev: The recipe type revision (with populated recipe_type model) of the recipe to create
:type recipe_type_rev: :class:`recipe.models.RecipeTypeRevision`
:param event_id: The event ID that triggered the creation of this recipe
:type event_id: int
:param ingest_id: The ingest event ID that triggered the creation of this recipe
:type ingest_id: int
:param input_data: The recipe's input data, possibly None
:type input_data: :class:`data.data.data.Data`
:param root_recipe_id: The ID of the root recipe that contains this sub-recipe, possibly None
:type root_recipe_id: int
:param recipe_id: The ID of the original recipe that created this sub-recipe, possibly None
:type recipe_id: int
:param batch_id: The ID of the batch that contains this recipe, possibly None
:type batch_id: int
:param recipe_config: The configuration for running this recipe, possibly None
:type recipe_config: :class:`recipe.configuration.configuration.RecipeConfiguration`
:param superseded_recipe: The recipe that the created recipe is superseding, possibly None
:type superseded_recipe: :class:`recipe.models.Recipe`
:param copy_superseded_input: Whether to copy the input data from the superseded recipe
:type copy_superseded_input: bool
:returns: The new recipe model
:rtype: :class:`recipe.models.Recipe`
:raises :class:`data.data.exceptions.InvalidData`: If the input data is invalid
:raises :class:`recipe.exceptions.InactiveRecipeType`: If the recipe type is inactive
"""
if not recipe_type_rev.recipe_type.is_active:
raise InactiveRecipeType("Recipe Type %s is inactive" % recipe_type_rev.recipe_type.name)
recipe = Recipe()
recipe.recipe_type = recipe_type_rev.recipe_type
recipe.recipe_type_rev = recipe_type_rev
recipe.event_id = event_id
recipe.ingest_event_id = ingest_id
recipe.root_recipe_id = root_recipe_id if root_recipe_id else recipe_id
recipe.recipe_id = recipe_id
recipe.batch_id = batch_id
if recipe_config:
recipe.configuration = convert_config_to_v6_json(recipe_config).get_dict()
if superseded_recipe:
root_id = superseded_recipe.root_superseded_recipe_id
if not root_id:
root_id = superseded_recipe.id
recipe.root_superseded_recipe_id = root_id
recipe.superseded_recipe = superseded_recipe
if copy_superseded_input:
if 'workspace_id' in superseded_recipe.input:
# TODO: Remove when legacy recipes go away
# get workspace ids from v1 data and pass them on to job configs so we don't lose them
workspace_id = superseded_recipe.get_recipe_data().get_workspace_id()
workspace = None
try:
workspace = Workspace.objects.get(pk=workspace_id)
except Workspace.DoesNotExist:
logger.exception('Could not copy workspace from superseded recipe. Workspace does not exist: %d', workspace_id)
config = RecipeConfigurationV6(recipe.configuration)
if workspace:
config = config.get_configuration()
config.default_output_workspace = workspace.name
recipe.configuration = convert_config_to_v6_json(config).get_dict()
input_data = superseded_recipe.get_input_data()
if not recipe_config and superseded_recipe.configuration:
recipe.configuration = superseded_recipe.configuration
if input_data:
input_data.validate(recipe_type_rev.get_input_interface())
recipe.input = convert_data_to_v6_json(input_data).get_dict()
return recipe
def get_locked_recipe(self, recipe_id):
"""Locks and returns the recipe model for the given ID with no related fields. Caller must be within an atomic
transaction.
:param recipe_id: The recipe ID
:type recipe_id: int
:returns: The recipe model
:rtype: :class:`recipe.models.Recipe`
"""
locked_recipes = self.get_locked_recipes([recipe_id])
if len(locked_recipes) > 0:
return locked_recipes[0]
return None
def get_locked_recipes(self, recipe_ids):
"""Locks and returns the recipe models for the given IDs with no related fields. Caller must be within an atomic
transaction.
:param recipe_ids: The recipe IDs
:type recipe_ids: list
:returns: The recipe models
:rtype: list
"""
# Recipe models are always locked in order of ascending ID to prevent deadlocks
return list(self.select_for_update().filter(id__in=recipe_ids).order_by('id').iterator())
def get_locked_recipes_from_root(self, root_recipe_ids):
"""Locks and returns the latest (non-superseded) recipe model for each recipe family with the given root recipe
IDs. The returned models have no related fields populated. Caller must be within an atomic transaction.
:param root_recipe_ids: The root recipe IDs
:type root_recipe_ids: list
:returns: The recipe models
:rtype: list
"""
root_recipe_ids = set(root_recipe_ids) # Ensure no duplicates
qry = self.select_for_update()
qry = qry.filter(models.Q(id__in=root_recipe_ids) | models.Q(root_superseded_recipe_id__in=root_recipe_ids))
qry = qry.filter(is_superseded=False)
# Recipe models are always locked in order of ascending ID to prevent deadlocks
return list(qry.order_by('id').iterator())
# TODO: remove this once database calls are no longer done in the post-task and this is not needed
def get_recipe_for_job(self, job_id):
"""Returns the original recipe for the job with the given ID (returns None if the job is not in a recipe). The
returned model will have its related recipe_type and recipe_type_rev models populated. If the job exists in
multiple recipes due to superseding, the original (first) recipe is returned.
:param job_id: The job ID
:type job_id: int
:returns: The recipe_job model with related recipe_type and recipe_type-rev, possibly None
:rtype: :class:`recipe.models.RecipeNode`
"""
recipe_job_qry = RecipeNode.objects.select_related('recipe__recipe_type', 'recipe__recipe_type_rev')
try:
recipe_job = recipe_job_qry.get(job_id=job_id, is_original=True)
except RecipeNode.DoesNotExist:
return None
return recipe_job
def get_recipe_ids_for_jobs(self, job_ids):
"""Returns the IDs of all recipes that contain the jobs with the given IDs. This will include superseded
recipes.
:param job_ids: The job IDs
:type job_ids: list
:returns: The recipe IDs
:rtype: list
"""
recipe_ids = set()
for recipe_node in RecipeNode.objects.filter(job_id__in=job_ids).only('recipe_id'):
recipe_ids.add(recipe_node.recipe_id)
return list(recipe_ids)
def get_recipe_ids_for_sub_recipes(self, sub_recipe_ids):
"""Returns the IDs of all recipes that contain the sub-recipes with the given IDs. This will include superseded
recipes.
:param sub_recipe_ids: The sub-recipe IDs
:type sub_recipe_ids: list
:returns: The recipe IDs
:rtype: list
"""
recipe_ids = set()
for recipe_node in RecipeNode.objects.filter(sub_recipe_id__in=sub_recipe_ids).only('recipe_id'):
recipe_ids.add(recipe_node.recipe_id)
return list(recipe_ids)
def get_recipe_instance(self, recipe_id):
"""Returns the recipe instance for the given recipe ID
:param recipe_id: The recipe ID
:type recipe_id: int
:returns: The recipe instance
:rtype: :class:`recipe.instance.recipe.RecipeInstance`
"""
recipe = Recipe.objects.select_related('recipe_type_rev').get(id=recipe_id)
recipe_nodes = RecipeNode.objects.get_recipe_nodes(recipe_id)
return RecipeInstance(recipe.recipe_type_rev.get_definition(), recipe, recipe_nodes)
def get_recipe_instance_from_root(self, root_recipe_id):
"""Returns the non-superseded recipe instance for the given root recipe ID
:param root_recipe_id: The root recipe ID
:type root_recipe_id: int
:returns: The recipe instance
:rtype: :class:`recipe.instance.recipe.RecipeInstance`
"""
qry = self.select_related('recipe_type_rev')
qry = qry.filter(models.Q(id=root_recipe_id) | models.Q(root_superseded_recipe_id=root_recipe_id))
recipe = qry.filter(is_superseded=False).order_by('-created').first()
recipe_nodes = RecipeNode.objects.get_recipe_nodes(recipe.id)
return RecipeInstance(recipe.recipe_type_rev.get_definition(), recipe, recipe_nodes)
def get_recipe_with_interfaces(self, recipe_id):
"""Gets the recipe model for the given ID with related recipe_type_rev and recipe__recipe_type_rev models
:param recipe_id: The recipe ID
:type recipe_id: int
:returns: The recipe model with related recipe_type_rev and recipe__recipe_type_rev models
:rtype: :class:`recipe.models.Recipe`
"""
return self.select_related('recipe_type_rev', 'recipe__recipe_type_rev').get(id=recipe_id)
def get_recipes_v6(self, started=None, ended=None, source_started=None, source_ended=None,
source_sensor_classes=None, source_sensors=None, source_collections=None,
source_tasks=None, ids=None, type_ids=None, type_names=None, batch_ids=None,
is_superseded=None, is_completed=None, order=None):
"""Returns a list of recipes within the given time range.
:param started: Query recipes updated after this amount of time.
:type started: :class:`datetime.datetime`
:param ended: Query recipes updated before this amount of time.
:type ended: :class:`datetime.datetime`
:param source_started: Query recipes where source collection started after this time.
:type source_started: :class:`datetime.datetime`
:param source_ended: Query recipes where source collection ended before this time.
:type source_ended: :class:`datetime.datetime`
:param source_sensor_classes: Query recipes with the given source sensor class.
:type source_sensor_classes: list
:param source_sensor: Query recipes with the given source sensor.
:type source_sensor: list
:param source_collection: Query recipes with the given source class.
:type source_collection: list
:param source_tasks: Query recipes with the given source tasks.
:type source_tasks: list
:param ids: Query recipes associated with the given identifiers.
:type ids: [int]
:param type_ids: Query recipes of the type associated with the identifiers.
:type type_ids: [int]
:param type_names: Query recipes of the type associated with the name.
:type type_names: [string]
:param batch_ids: Query recipes associated with batches with the given identifiers.
:type batch_ids: list[int]
:param is_superseded: Query recipes that match the is_superseded flag.
:type is_superseded: bool
:param is_completed: Query recipes that match the is_completed flag.
:type is_completed: bool
:param order: A list of fields to control the sort order.
:type order: [string]
:returns: The list of recipes that match the time range.
:rtype: [:class:`recipe.models.Recipe`]
"""
# Fetch a list of recipes
recipes = Recipe.objects.all()
recipes = recipes.select_related('recipe_type', 'recipe_type_rev', 'event', 'batch')
recipes = recipes.defer('recipe_type__definition', 'recipe_type_rev__recipe_type',
'recipe_type_rev__definition')
# Apply time range filtering
if started:
recipes = recipes.filter(last_modified__gte=started)
if ended:
recipes = recipes.filter(last_modified__lte=ended)
if source_started:
recipes = recipes.filter(source_started__gte=source_started)
if source_ended:
recipes = recipes.filter(source_ended__lte=source_ended)
if source_sensor_classes:
recipes = recipes.filter(source_sensor_class__in=source_sensor_classes)
if source_sensors:
recipes = recipes.filter(source_sensor__in=source_sensors)
if source_collections:
recipes = recipes.filter(source_collection__in=source_collections)
if source_tasks:
recipes = recipes.filter(source_task__in=source_tasks)
if ids:
recipes = recipes.filter(id__in=ids)
# Apply type filtering
if type_ids:
recipes = recipes.filter(recipe_type_id__in=type_ids)
if type_names:
recipes = recipes.filter(recipe_type__name__in=type_names)
# Apply batch filtering
if batch_ids:
recipes = recipes.filter(batch_id__in=batch_ids)
# Apply additional filters
if is_superseded is not None:
recipes = recipes.filter(is_superseded=is_superseded)
if is_completed is not None:
recipes = recipes.filter(is_completed=is_completed)
# Apply sorting
if order:
recipes = recipes.order_by(*order)
else:
recipes = recipes.order_by('last_modified')
return recipes
def get_details(self, recipe_id):
"""Gets the details for a given recipe including its associated jobs and input files.
:param recipe_id: The unique identifier of the recipe to fetch.
:type recipe_id: :int
:returns: A recipe with additional information.
:rtype: :class:`recipe.models.Recipe`
"""
# Attempt to fetch the requested recipe
recipe = Recipe.objects.select_related(
'recipe_type_rev', 'event', 'batch', 'root_superseded_recipe',
'root_superseded_recipe__recipe_type', 'superseded_recipe', 'superseded_recipe__recipe_type',
'superseded_by_recipe', 'superseded_by_recipe__recipe_type'
).get(pk=recipe_id)
# Update the recipe with job types and sub recipes
jt_ids = RecipeTypeJobLink.objects.get_job_type_ids([recipe.recipe_type.id])
recipe.job_types = JobType.objects.all().filter(id__in=jt_ids)
sub_ids = RecipeTypeSubLink.objects.get_sub_recipe_type_ids([recipe.recipe_type.id])
recipe.sub_recipe_types = RecipeType.objects.all().filter(id__in=sub_ids)
super_ids = RecipeTypeSubLink.objects.get_recipe_type_ids([recipe.recipe_type.id])
recipe.super_recipe_types = RecipeType.objects.all().filter(id__in=super_ids)
return recipe
def process_recipe_input(self, recipe):
"""Processes the input data for the given recipe to populate its input file models and input meta-data fields.
The caller must have obtained a model lock on the given recipe model.
:param recipe: The locked recipe models
:type recipe: :class:`recipe.models.Recipe`
"""
if recipe.input_file_size is not None:
return # Recipe has already had its input processed
# Create RecipeInputFile models in batches
all_file_ids = set()
input_file_models = []
for file_value in recipe.get_input_data().values.values():
if file_value.param_type != FileParameter.PARAM_TYPE:
continue
for file_id in file_value.file_ids:
all_file_ids.add(file_id)
recipe_input_file = RecipeInputFile()
recipe_input_file.recipe_id = recipe.id
recipe_input_file.input_file_id = file_id
recipe_input_file.recipe_input = file_value.name
input_file_models.append(recipe_input_file)
if len(input_file_models) >= INPUT_FILE_BATCH_SIZE:
RecipeInputFile.objects.bulk_create(input_file_models)
input_file_models = []
# Finish creating any remaining JobInputFile models
if input_file_models:
RecipeInputFile.objects.bulk_create(input_file_models)
if len(all_file_ids) == 0:
# If there are no input files, just zero out the file size and skip input meta-data fields
self.filter(id=recipe.id).update(input_file_size=0.0)
return
# Set input meta-data fields on the recipe
# Total input file size is in MiB rounded up to the nearest whole MiB
qry = 'UPDATE recipe r SET input_file_size = CEILING(s.total_file_size / (1024.0 * 1024.0)), '
qry += 'source_started = s.source_started, source_ended = s.source_ended, last_modified = %s, '
qry += 'source_sensor_class = s.source_sensor_class, source_sensor = s.source_sensor, '
qry += 'source_collection = s.source_collection, source_task = s.source_task FROM ('
qry += 'SELECT rif.recipe_id, MIN(f.source_started) AS source_started, MAX(f.source_ended) AS source_ended, '
qry += 'COALESCE(SUM(f.file_size), 0.0) AS total_file_size, '
qry += 'MAX(f.source_sensor_class) AS source_sensor_class, '
qry += 'MAX(f.source_sensor) AS source_sensor, '
qry += 'MAX(f.source_collection) AS source_collection, '
qry += 'MAX(f.source_task) AS source_task '
qry += 'FROM scale_file f JOIN recipe_input_file rif ON f.id = rif.input_file_id '
qry += 'WHERE rif.recipe_id = %s GROUP BY rif.recipe_id) s '
qry += 'WHERE r.id = s.recipe_id'
with connection.cursor() as cursor:
cursor.execute(qry, [now(), recipe.id])
def set_recipe_input_data_v6(self, recipe, input_data):
"""Sets the given input data as a v6 JSON for the given recipe. The recipe model must have its related
recipe_type_rev model populated.
:param recipe: The recipe model with related recipe_type_rev model
:type recipe: :class:`recipe.models.Recipe`
:param input_data: The input data for the recipe
:type input_data: :class:`data.data.data.Data`
:raises :class:`data.data.exceptions.InvalidData`: If the data is invalid
"""
recipe_definition = recipe.recipe_type_rev.get_definition()
input_data.validate(recipe_definition.input_interface)
input_dict = None
if not input_dict:
input_dict = convert_data_to_v6_json(input_data).get_dict()
self.filter(id=recipe.id).update(input=input_dict)
def supersede_recipes(self, recipe_ids, when):
"""Updates the given recipes to be superseded
:param recipe_ids: The recipe IDs to supersede
:type recipe_ids: list
:param when: The time that the recipes were superseded
:type when: :class:`datetime.datetime`
"""
qry = self.filter(id__in=recipe_ids, is_superseded=False)
qry.update(is_superseded=True, superseded=when, last_modified=now())
def update_recipe_metrics(self, recipe_ids):
"""Updates the metrics for the recipes with the given IDs
:param recipe_ids: The recipe IDs
:type recipe_ids: list
"""
if not recipe_ids:
return
qry = 'UPDATE recipe r SET jobs_total = s.jobs_total, jobs_pending = s.jobs_pending, '
qry += 'jobs_blocked = s.jobs_blocked, jobs_queued = s.jobs_queued, jobs_running = s.jobs_running, '
qry += 'jobs_failed = s.jobs_failed, jobs_completed = s.jobs_completed, jobs_canceled = s.jobs_canceled, '
qry += 'sub_recipes_total = s.sub_recipes_total, sub_recipes_completed = s.sub_recipes_completed, '
qry += 'last_modified = %s FROM ('
qry += 'SELECT rn.recipe_id, COUNT(j.id) + COALESCE(SUM(r.jobs_total), 0) AS jobs_total, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'PENDING\') + COALESCE(SUM(r.jobs_pending), 0) AS jobs_pending, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'BLOCKED\') + COALESCE(SUM(r.jobs_blocked), 0) AS jobs_blocked, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'QUEUED\') + COALESCE(SUM(r.jobs_queued), 0) AS jobs_queued, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'RUNNING\') + COALESCE(SUM(r.jobs_running), 0) AS jobs_running, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'FAILED\') + COALESCE(SUM(r.jobs_failed), 0) AS jobs_failed, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'COMPLETED\') '
qry += '+ COALESCE(SUM(r.jobs_completed), 0) AS jobs_completed, '
qry += 'COUNT(j.id) FILTER(WHERE status = \'CANCELED\') + COALESCE(SUM(r.jobs_canceled), 0) AS jobs_canceled, '
qry += 'COUNT(r.id) + COALESCE(SUM(r.sub_recipes_total), 0) AS sub_recipes_total, '
qry += 'COUNT(r.id) FILTER(WHERE r.is_completed) '
qry += '+ COALESCE(SUM(r.sub_recipes_completed), 0) AS sub_recipes_completed '
qry += 'FROM recipe_node rn LEFT OUTER JOIN job j ON rn.job_id = j.id '
qry += 'LEFT OUTER JOIN recipe r ON rn.sub_recipe_id = r.id WHERE rn.recipe_id IN %s GROUP BY rn.recipe_id) s '
qry += 'WHERE r.id = s.recipe_id'
with connection.cursor() as cursor:
cursor.execute(qry, [now(), tuple(recipe_ids)])
class Recipe(models.Model):
"""Represents a recipe to be run on the cluster. A model lock must be obtained using select_for_update() on any
recipe model before adding new jobs to it or superseding it.
:keyword recipe_type: The type of this recipe
:type recipe_type: :class:`django.db.models.ForeignKey`
:keyword recipe_type_rev: The revision of the recipe type when this recipe was created
:type recipe_type_rev: :class:`django.db.models.ForeignKey`
:keyword event: The event that triggered the creation of this recipe
:type event: :class:`django.db.models.ForeignKey`
:keyword ingest_event: The ingest event that triggered the creation of this recipe
:type ingest_event: :class:`django.db.models.ForeignKey`
:keyword root_recipe: The root recipe that contains this recipe
:type root_recipe: :class:`django.db.models.ForeignKey`
:keyword recipe: The original recipe that created this recipe
:type recipe: :class:`django.db.models.ForeignKey`
:keyword batch: The batch that contains this recipe
:type batch: :class:`django.db.models.ForeignKey`
:keyword is_superseded: Whether this recipe has been superseded and is obsolete. This may be true while
superseded_by_recipe (the reverse relationship of superseded_recipe) is null, indicating that this recipe is
obsolete, but there is no new recipe that has directly taken its place.
:type is_superseded: :class:`django.db.models.BooleanField`
:keyword root_superseded_recipe: The first recipe in the chain of superseded recipes. This field will be null for
the first recipe in the chain (i.e. recipes that have a null superseded_recipe field).
:type root_superseded_recipe: :class:`django.db.models.ForeignKey`
:keyword superseded_recipe: The recipe that was directly superseded by this recipe. The reverse relationship can be
accessed using 'superseded_by_recipe'.
:type superseded_recipe: :class:`django.db.models.ForeignKey`
:keyword input: JSON description defining the input for this recipe
:type input: :class:`django.contrib.postgres.fields.JSONField`
:keyword input_file_size: The total size in MiB for all input files in this recipe
:type input_file_size: :class:`django.db.models.FloatField`
:keyword configuration: JSON describing the overriding recipe configuration for this recipe instance
:type configuration: :class:`django.contrib.postgres.fields.JSONField`
:keyword source_started: The start time of the source data for this recipe
:type source_started: :class:`django.db.models.DateTimeField`
:keyword source_ended: The end time of the source data for this recipe
:type source_ended: :class:`django.db.models.DateTimeField`
:keyword source_sensor_class: The class of sensor used to produce the source file for this recipe
:type source_sensor_class: :class:`django.db.models.CharField`
:keyword source_sensor: The specific identifier of the sensor used to produce the source file for this recipe
:type source_sensor: :class:`django.db.models.CharField`
:keyword source_collection: The collection of the source file for this recipe
:type source_collection: :class:`django.db.models.CharField`
:keyword source_task: The task that produced the source file for this recipe
:type source_task: :class:`django.db.models.CharField`
:keyword jobs_total: The total count of all jobs within this recipe
:type jobs_total: :class:`django.db.models.IntegerField`
:keyword jobs_pending: The count of all PENDING jobs within this recipe
:type jobs_pending: :class:`django.db.models.IntegerField`
:keyword jobs_blocked: The count of all BLOCKED jobs within this recipe
:type jobs_blocked: :class:`django.db.models.IntegerField`
:keyword jobs_queued: The count of all QUEUED jobs within this recipe
:type jobs_queued: :class:`django.db.models.IntegerField`
:keyword jobs_running: The count of all RUNNING jobs within this recipe
:type jobs_running: :class:`django.db.models.IntegerField`
:keyword jobs_failed: The count of all FAILED jobs within this recipe
:type jobs_failed: :class:`django.db.models.IntegerField`
:keyword jobs_completed: The count of all COMPLETED within this recipe
:type jobs_completed: :class:`django.db.models.IntegerField`
:keyword jobs_canceled: The count of all CANCELED jobs within this recipe
:type jobs_canceled: :class:`django.db.models.IntegerField`
:keyword sub_recipes_total: The total count for all sub-recipes within this recipe
:type sub_recipes_total: :class:`django.db.models.IntegerField`
:keyword sub_recipes_completed: The count for all completed sub-recipes within this recipe
:type sub_recipes_completed: :class:`django.db.models.IntegerField`
:keyword is_completed: Whether this recipe has completed all of its jobs
:type is_completed: :class:`django.db.models.BooleanField`
:keyword created: When the recipe was created
:type created: :class:`django.db.models.DateTimeField`
:keyword completed: When every job in the recipe was completed successfully
:type completed: :class:`django.db.models.DateTimeField`
:keyword superseded: When this recipe was superseded
:type superseded: :class:`django.db.models.DateTimeField`
:keyword last_modified: When the recipe was last modified
:type last_modified: :class:`django.db.models.DateTimeField`
"""
recipe_type = models.ForeignKey('recipe.RecipeType', on_delete=models.PROTECT)
recipe_type_rev = models.ForeignKey('recipe.RecipeTypeRevision', on_delete=models.PROTECT)
# TODO remove when triggers are removed for v6
event = models.ForeignKey('trigger.TriggerEvent', blank=True, null=True, on_delete=models.PROTECT)
ingest_event = models.ForeignKey('ingest.IngestEvent', blank=True, null=True, on_delete=models.PROTECT)
root_recipe = models.ForeignKey('recipe.Recipe', related_name='sub_recipes_for_root', blank=True, null=True,
on_delete=models.PROTECT)
recipe = models.ForeignKey('recipe.Recipe', related_name='sub_recipes', blank=True, null=True,
on_delete=models.PROTECT)
batch = models.ForeignKey('batch.Batch', related_name='recipes_for_batch', blank=True, null=True,
on_delete=models.PROTECT)
is_superseded = models.BooleanField(default=False)
root_superseded_recipe = models.ForeignKey('recipe.Recipe', related_name='superseded_by_recipes', blank=True,
null=True, on_delete=models.PROTECT)
superseded_recipe = models.OneToOneField('recipe.Recipe', related_name='superseded_by_recipe', blank=True,
null=True, on_delete=models.PROTECT)
input = django.contrib.postgres.fields.JSONField(default=dict)
input_file_size = models.FloatField(blank=True, null=True)
configuration = django.contrib.postgres.fields.JSONField(blank=True, null=True)
# Supplemental sensor metadata fields
source_started = models.DateTimeField(blank=True, null=True, db_index=True)
source_ended = models.DateTimeField(blank=True, null=True, db_index=True)
source_sensor_class = models.TextField(blank=True, null=True, db_index=True)
source_sensor = models.TextField(blank=True, null=True, db_index=True)
source_collection = models.TextField(blank=True, null=True, db_index=True)
source_task = models.TextField(blank=True, null=True, db_index=True)
# Metrics fields
jobs_total = models.IntegerField(default=0)
jobs_pending = models.IntegerField(default=0)
jobs_blocked = models.IntegerField(default=0)
jobs_queued = models.IntegerField(default=0)
jobs_running = models.IntegerField(default=0)
jobs_failed = models.IntegerField(default=0)
jobs_completed = models.IntegerField(default=0)
jobs_canceled = models.IntegerField(default=0)
sub_recipes_total = models.IntegerField(default=0)
sub_recipes_completed = models.IntegerField(default=0)
is_completed = models.BooleanField(default=False)
created = models.DateTimeField(auto_now_add=True)
completed = models.DateTimeField(blank=True, null=True)
superseded = models.DateTimeField(blank=True, null=True)
last_modified = models.DateTimeField(auto_now=True)
objects = RecipeManager()
def get_input_data(self):
"""Returns the input data for this recipe
:returns: The input data for this recipe
:rtype: :class:`data.data.data.Data`
"""
return DataV6(data=self.input, do_validate=False).get_data()
def get_definition(self):
"""Returns the definition for this recipe
:returns: The definition for this recipe
:rtype: :class:`recipe.definition.definition.RecipeDefinition`
"""
return self.recipe_type_rev.get_definition()
def get_v6_input_data_json(self):
"""Returns the input data for this recipe as v6 json with the version stripped
:returns: The v6 JSON input data dict for this recipe
:rtype: dict
"""
return rest_utils.strip_schema_version(convert_data_to_v6_json(self.get_input_data()).get_dict())
def get_v6_recipe_instance_json(self):
"""Returns the recipe instance details as json
:returns: The v6 JSON instance details dict for this recipe
:rtype: dict
"""
instance = Recipe.objects.get_recipe_instance(self.id)
return rest_utils.strip_schema_version(convert_recipe_to_v6_json(instance).get_dict())
def has_input(self):
"""Indicates whether this recipe has its input
:returns: True if the recipe has its input, false otherwise.
:rtype: bool
"""
return True if self.input else False
class Meta(object):
"""meta information for the db"""
db_table = 'recipe'
index_together = ['last_modified', 'recipe_type']
class RecipeConditionManager(models.Manager):
"""Provides additional methods for handling recipe conditions
"""
def create_condition(self, recipe_id, root_recipe_id=None, batch_id=None):
"""Creates a new condition for the given recipe and returns the (unsaved) condition model
:param recipe_id: The ID of the original recipe that created this condition, possibly None
:type recipe_id: int
:param root_recipe_id: The ID of the root recipe that contains this condition, possibly None
:type root_recipe_id: int
:param batch_id: The ID of the batch that contains this condition, possibly None
:type batch_id: int
:returns: The new condition model
:rtype: :class:`recipe.models.RecipeCondition`
"""
condition = RecipeCondition()
condition.root_recipe_id = root_recipe_id if root_recipe_id else recipe_id
condition.recipe_id = recipe_id
condition.batch_id = batch_id
return condition
def get_condition_with_interfaces(self, condition_id):
"""Gets the condition model for the given ID with related recipe__recipe_type_rev model
:param condition_id: The condition ID
:type condition_id: int
:returns: The condition model with related recipe__recipe_type_rev model
:rtype: :class:`job.models.Job`
"""
return self.select_related('recipe__recipe_type_rev').get(id=condition_id)
def set_processed(self, condition_id, is_accepted):
"""Sets the condition with the given ID as being processed
:param condition_id: The condition ID
:type condition_id: int
:param is_accepted: Whether the condition was accepted
:type is_accepted: bool
"""
self.filter(id=condition_id).update(is_processed=True, is_accepted=is_accepted, processed=now())
def set_condition_data_v6(self, condition, data, node_name):
"""Sets the given data as a v6 JSON for the given condition. The condition model must have its related
recipe__recipe_type_rev model populated.
:param condition: The condition model with related recipe__recipe_type_rev model
:type condition: :class:`recipe.models.RecipeCondition`
:param data: The data for the condition
:type data: :class:`data.data.data.Data`
:param node_name: The name of the condition node in the recipe
:type node_name: string
:raises :class:`data.data.exceptions.InvalidData`: If the data is invalid
"""
recipe_definition = condition.recipe.recipe_type_rev.get_definition()
condition_interface = recipe_definition.graph[node_name].input_interface
data.validate(condition_interface)
data_dict = convert_data_to_v6_json(data).get_dict()
self.filter(id=condition.id).update(data=data_dict)
class RecipeCondition(models.Model):
"""Represents a conditional decision within a recipe. If the condition is accepted then the dependent nodes will be
created and processed, while if the condition is not accepted the dependent nodes will never be created.
:keyword root_recipe: The root recipe that contains this condition
:type root_recipe: :class:`django.db.models.ForeignKey`
:keyword recipe: The original recipe that created this condition
:type recipe: :class:`django.db.models.ForeignKey`
:keyword batch: The batch that contains this condition
:type batch: :class:`django.db.models.ForeignKey`
:keyword data: JSON description defining the data processed by this condition
:type data: :class:`django.contrib.postgres.fields.JSONField`
:keyword is_processed: Whether the condition has been processed
:type is_processed: :class:`django.db.models.BooleanField`
:keyword is_accepted: Whether the condition has been accepted
:type is_accepted: :class:`django.db.models.BooleanField`
:keyword created: When this condition was created
:type created: :class:`django.db.models.DateTimeField`
:keyword processed: When this condition was processed
:type processed: :class:`django.db.models.DateTimeField`
:keyword last_modified: When the condition was last modified
:type last_modified: :class:`django.db.models.DateTimeField`
"""
root_recipe = models.ForeignKey('recipe.Recipe', related_name='conditions_for_root_recipe',
on_delete=models.PROTECT)
recipe = models.ForeignKey('recipe.Recipe', related_name='conditions_for_recipe', on_delete=models.PROTECT)
batch = models.ForeignKey('batch.Batch', related_name='conditions_for_batch', blank=True, null=True,
on_delete=models.PROTECT)
data = django.contrib.postgres.fields.JSONField(blank=True, null=True)
is_processed = models.BooleanField(default=False)
is_accepted = models.BooleanField(default=False)
created = models.DateTimeField(auto_now_add=True)
processed = models.DateTimeField(blank=True, null=True)
last_modified = models.DateTimeField(auto_now=True)
objects = RecipeConditionManager()
def get_data(self):
"""Returns the data for this condition
:returns: The data for this condition
:rtype: :class:`data.data.data.Data`
"""
return DataV6(data=self.data, do_validate=False).get_data()
def has_data(self):
"""Indicates whether this condition has its data
:returns: True if the condition has its data, false otherwise.
:rtype: bool
"""
return True if self.data else False
class Meta(object):
"""meta information for the db"""
db_table = 'recipe_condition'
class RecipeInputFileManager(models.Manager):
"""Provides additional methods for handleing RecipeInputFiles"""
def get_recipe_input_files_v6(self, recipe_id, started=None, ended=None, time_field=None, file_name=None,
recipe_input=None):
"""Returns a query for Input Files filtered on the given fields.
:param recipe_id: The recipe ID
:type recipe_id: int
:param started: Query Scale files updated after this amount of time.
:type started: :class:`datetime.datetime`
:param ended: Query Scale files updated before this amount of time.
:type ended: :class:`datetime.datetime`
:keyword time_field: The time field to use for filtering.
:type time_field: string
:param file_name: Query Scale files with the given file name.
:type file_name: str
:param recipe_input: The name of the recipe input that the file was passed into
:type recipe_input: str
:returns: The Scale file query
:rtype: :class:`django.db.models.QuerySet`
"""
files = ScaleFile.objects.filter_files_v6(started=started, ended=ended,
time_field=time_field, file_name=file_name)
files = files.filter(recipeinputfile__recipe=recipe_id).order_by('last_modified')
# Apply time range filtering
if started:
if time_field == 'source':
files = files.filter(source_started__gte=started)
elif time_field == 'data':
files = files.filter(data_started__gte=started)
else:
files = files.filter(last_modified__gte=started)
if ended:
if time_field == 'source':
files = files.filter(source_ended__lte=ended)
elif time_field == 'data':
files = files.filter(data_ended__lte=ended)
else:
files = files.filter(last_modified__lte=ended)
if recipe_input:
files = files.filter(recipeinputfile__recipe_input=recipe_input)
return files
class RecipeInputFile(models.Model):
"""Links a recipe and its input files together. A file can be used as input to multiple recipes and a recipe can
accept multiple input files. This model is useful for determining relevant recipes to run during re-processing.
:keyword recipe: The recipe that the input file is linked to
:type recipe: :class:`django.db.models.ForeignKey`
:keyword scale_file: The input file that the recipe is linked to
:type scale_file: :class:`django.db.models.ForeignKey`
:keyword recipe_input: The name of the recipe input parameter
:type recipe_input: :class:`django.db.models.CharField`
:keyword created: When the recipe was created
:type created: :class:`django.db.models.DateTimeField`
"""
recipe = models.ForeignKey('recipe.Recipe', on_delete=models.PROTECT)
input_file = models.ForeignKey('storage.ScaleFile', on_delete=models.PROTECT)
recipe_input = models.CharField(blank=True, null=True, max_length=250)
created = models.DateTimeField(auto_now_add=True)
objects = RecipeInputFileManager()
class Meta(object):
"""meta information for the db"""
db_table = 'recipe_input_file'
class RecipeNodeManager(models.Manager):
"""Provides additional methods for handling jobs linked to a recipe
"""
def copy_recipe_nodes(self, recipe_copies):
"""Copies the given nodes from the superseded recipes to the new recipes
:param recipe_copies: A list of RecipeNodeCopy tuples
:type recipe_copies: list
"""
if not recipe_copies:
return
sub_queries = []
for recipe_copy in recipe_copies:
superseded_recipe_id = recipe_copy.superseded_recipe_id
recipe_id = recipe_copy.recipe_id
node_names = recipe_copy.node_names
sub_qry = 'SELECT node_name, false, %d, condition_id, job_id, sub_recipe_id '
sub_qry += 'FROM recipe_node WHERE recipe_id = %d'
sub_qry = sub_qry % (recipe_id, superseded_recipe_id)
if node_names:
node_sub_qry = ', '.join('\'%s\'' % node_name for node_name in node_names)
sub_qry = '%s AND node_name IN (%s)' % (sub_qry, node_sub_qry)
sub_queries.append(sub_qry)
union_sub_qry = ' UNION ALL '.join(sub_queries)
qry = 'INSERT INTO recipe_node (node_name, is_original, recipe_id, condition_id, job_id, sub_recipe_id) %s'
qry = qry % union_sub_qry
with connection.cursor() as cursor:
cursor.execute(qry)
def create_recipe_condition_nodes(self, recipe_id, conditions):
"""Creates and returns the recipe node models (unsaved) for the given recipe and conditions
:param recipe_id: The recipe ID
:type recipe_id: int
:param conditions: A dict of condition models stored by node name
:type conditions: dict
:returns: The list of recipe_node models
:rtype: list
"""
node_models = []
for node_name, condition in conditions.items():
recipe_node = RecipeNode()
recipe_node.recipe_id = recipe_id
recipe_node.node_name = node_name
recipe_node.condition = condition
node_models.append(recipe_node)
return node_models
def create_recipe_job_nodes(self, recipe_id, recipe_jobs):
"""Creates and returns the recipe node models (unsaved) for the given recipe and jobs
:param recipe_id: The recipe ID
:type recipe_id: int
:param recipe_jobs: A dict of job models stored by node name
:type recipe_jobs: dict
:returns: The list of recipe_node models
:rtype: list
"""
node_models = []
for node_name, job in recipe_jobs.items():
recipe_node = RecipeNode()
recipe_node.recipe_id = recipe_id
recipe_node.node_name = node_name
recipe_node.job = job
node_models.append(recipe_node)
return node_models
def create_subrecipe_nodes(self, recipe_id, sub_recipes):
"""Creates and returns the recipe node models (unsaved) for the given recipe and sub-recipes
:param recipe_id: The recipe ID
:type recipe_id: int