/
reference_index.py
708 lines (593 loc) · 28.2 KB
/
reference_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
import uuid
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRel
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.utils.functional import cached_property
from django.utils.text import capfirst
from django.utils.translation import gettext_lazy as _
from modelcluster.fields import ParentalKey
from modelcluster.models import ClusterableModel, get_all_child_relations
from taggit.models import ItemBase
from wagtail.blocks import StreamBlock
from wagtail.fields import StreamField
class ReferenceGroups:
"""
Groups records in a ReferenceIndex queryset by their source object.
Args:
qs: (QuerySet[ReferenceIndex]) A QuerySet on the ReferenceIndex model
Yields:
A tuple (source_object, references) for each source object that appears
in the queryset. source_object is the model instance of the source object
and references is a list of references that occur in the QuerySet from
that source object.
"""
def __init__(self, qs):
self.qs = qs.order_by("base_content_type", "object_id")
def __iter__(self):
reference_fk = None
references = []
for reference in self.qs:
if reference_fk != (reference.base_content_type_id, reference.object_id):
if reference_fk is not None:
content_type = ContentType.objects.get_for_id(reference_fk[0])
object = content_type.get_object_for_this_type(pk=reference_fk[1])
yield object, references
references = []
reference_fk = (reference.base_content_type_id, reference.object_id)
references.append(reference)
if references:
content_type = ContentType.objects.get_for_id(reference_fk[0])
object = content_type.get_object_for_this_type(pk=reference_fk[1])
yield object, references
def __len__(self):
return self._count
@cached_property
def _count(self):
return self.qs.values("base_content_type", "object_id").distinct().count()
@cached_property
def is_protected(self):
return any(reference.on_delete == models.PROTECT for reference in self.qs)
def count(self):
"""
Returns the number of rows that will be returned by iterating this
ReferenceGroups.
Just calls len(self) internally, this method only exists to allow
instances of this class to be used in a Paginator.
"""
return len(self)
def __getitem__(self, key):
return list(self)[key]
class ReferenceIndexQuerySet(models.QuerySet):
def group_by_source_object(self):
"""
Returns a ReferenceGroups object for this queryset that will yield
references grouped by their source instance.
"""
return ReferenceGroups(self)
class ReferenceIndex(models.Model):
"""
Records references between objects for quick retrieval of object usage.
References are extracted from Foreign Keys, Chooser Blocks in StreamFields, and links in Rich Text Fields.
This index allows us to efficiently find all of the references to a particular object from all of these sources.
"""
# The object where the reference was extracted from
# content_type represents the content type of the model that contains
# the field where the reference came from. If the model sub-classes another
# concrete model (such as Page), that concrete model will be set in
# base_content_type, otherwise it would be the same as content_type
content_type = models.ForeignKey(
ContentType, on_delete=models.CASCADE, related_name="+"
)
base_content_type = models.ForeignKey(
ContentType, on_delete=models.CASCADE, related_name="+"
)
object_id = models.CharField(
max_length=255,
verbose_name=_("object id"),
)
# The object that has been referenced
# to_content_type is always the base content type of the referenced object
to_content_type = models.ForeignKey(
ContentType, on_delete=models.CASCADE, related_name="+"
)
to_object_id = models.CharField(
max_length=255,
verbose_name=_("object id"),
)
# The model_path is the path to the field on content_type where the reference was extracted from.
# the content_path is the path to a specific block on the instance where the reference is extracted from.
# These are dotted path, always starting with a field or child relation name. If
# the reference was extracted from an inline panel or streamfield, other components
# of the path can be used to locate where the reference was extracted.
#
# For example, say we have a StreamField called 'body' which has a struct block type
# called 'my_struct_block' that has a field called 'my_field'. If we extracted a
# reference from that field, the model_path would be set to the following:
#
# 'body.my_struct_block.my_field'
#
# The content path would follow the same format, but anything repeatable would be replaced by an ID.
# For example:
#
# 'body.bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86.my_field'
#
# We can use the model_path with the 'content_type' to find the original definition of
# the field block and display information to the user about where the reference was
# extracted from.
#
# We can use the content_path to link the user directly to the block/field that contains
# the reference.
model_path = models.TextField()
content_path = models.TextField()
# We need a separate hash field for content_path in order to use it in a unique key because
# MySQL has a limit to the size of fields that are included in unique keys
content_path_hash = models.UUIDField()
objects = ReferenceIndexQuerySet.as_manager()
wagtail_reference_index_ignore = True
# The set of models that should have signals attached to watch for outbound references.
# This includes those registered with `register_model`, as well as their child models
# linked by a ParentalKey.
tracked_models = set()
# Ths set of models that can appear as the 'from' object in the reference index.
# This only includes those registered with `register_model`, and NOT child models linked
# by ParentalKey (object references on those are recorded under the parent).
indexed_models = set()
class Meta:
unique_together = [
(
"base_content_type",
"object_id",
"to_content_type",
"to_object_id",
"content_path_hash",
)
]
@classmethod
def _get_base_content_type(cls, model_or_object):
"""
Returns the ContentType record that represents the base model of the
given model or object.
For a model that uses multi-table-inheritance, this returns the model
that contains the primary key. For example, for any page object, this
will return the content type of the Page model.
"""
parents = model_or_object._meta.get_parent_list()
if parents:
return ContentType.objects.get_for_model(
parents[-1], for_concrete_model=False
)
else:
return ContentType.objects.get_for_model(
model_or_object, for_concrete_model=False
)
@classmethod
def model_is_indexable(cls, model, allow_child_models=False):
"""
Returns True if the given model may have outbound references that we would be interested in recording in the index.
Args:
model (type): a Django model class
allow_child_models (boolean): Child models are not indexable on their own. If you are looking at
a child model from the perspective of indexing it through its parent,
set this to True to disable checking for this. Default False.
"""
if getattr(model, "wagtail_reference_index_ignore", False):
return False
# Don't check any models that have a parental key, references from these will be collected from the parent
if not allow_child_models and any(
isinstance(field, ParentalKey) for field in model._meta.get_fields()
):
return False
for field in model._meta.get_fields():
if field.is_relation and field.many_to_one:
if getattr(field, "wagtail_reference_index_ignore", False):
continue
if getattr(
field.related_model, "wagtail_reference_index_ignore", False
):
continue
if isinstance(field, (ParentalKey, GenericRel)):
continue
return True
if hasattr(field, "extract_references"):
return True
if issubclass(model, ClusterableModel):
for child_relation in get_all_child_relations(model):
if cls.model_is_indexable(
child_relation.related_model,
allow_child_models=True,
):
return True
return False
@classmethod
def register_model(cls, model):
"""
Registers the model for indexing.
"""
if model in cls.indexed_models:
return
if cls.model_is_indexable(model):
cls.indexed_models.add(model)
cls._register_as_tracked_model(model)
@classmethod
def _register_as_tracked_model(cls, model):
"""
Add the model and all of its ParentalKey-linked children to the set of
models to be tracked by signal handlers.
"""
if model in cls.tracked_models:
return
from wagtail.signal_handlers import (
connect_reference_index_signal_handlers_for_model,
)
cls.tracked_models.add(model)
connect_reference_index_signal_handlers_for_model(model)
for child_relation in get_all_child_relations(model):
if cls.model_is_indexable(
child_relation.related_model,
allow_child_models=True,
):
cls._register_as_tracked_model(child_relation.related_model)
@classmethod
def is_indexed(cls, model):
return model in cls.indexed_models
@classmethod
def _extract_references_from_object(cls, object):
"""
Generator that scans the given object and yields any references it finds.
Args:
object (Model): an instance of a Django model to scan for references
Yields:
A tuple (content_type_id, object_id, model_path, content_path) for each
reference found.
content_type_id (int): The ID of the ContentType record representing
the model of the referenced object
object_id (str): The primary key of the referenced object, converted
to a string
model_path (str): The path to the field on the model of the source
object where the reference was found
content_path (str): The path to the piece of content on the source
object instance where the reference was found
"""
# Extract references from fields
for field in object._meta.get_fields():
if field.is_relation and field.many_to_one:
if getattr(field, "wagtail_reference_index_ignore", False):
continue
if getattr(
field.related_model, "wagtail_reference_index_ignore", False
):
continue
if isinstance(field, (ParentalKey, GenericRel)):
continue
if isinstance(field, GenericForeignKey):
ct_field = object._meta.get_field(field.ct_field)
fk_field = object._meta.get_field(field.fk_field)
ct_value = ct_field.value_from_object(object)
fk_value = fk_field.value_from_object(object)
if ct_value is not None and fk_value is not None:
# The content type ID referenced by the GenericForeignKey might be a subclassed
# model, but the reference index requires us to index it under the base model's
# content type, as that's what will be used for lookups. So, we need to convert
# the content type back to a model class so that _get_base_content_type can
# select the appropriate superclass if necessary, before converting back to a
# content type.
model = ContentType.objects.get_for_id(ct_value).model_class()
yield cls._get_base_content_type(model).id, str(
fk_value
), field.name, field.name
continue
if isinstance(field, GenericRel):
continue
value = field.value_from_object(object)
if value is not None:
yield cls._get_base_content_type(field.related_model).id, str(
value
), field.name, field.name
if hasattr(field, "extract_references"):
value = field.value_from_object(object)
if value is not None:
yield from (
(
cls._get_base_content_type(to_model).id,
to_object_id,
f"{field.name}.{model_path}",
f"{field.name}.{content_path}",
)
for to_model, to_object_id, model_path, content_path in field.extract_references(
value
)
)
# Extract references from child relations
if isinstance(object, ClusterableModel):
for child_relation in get_all_child_relations(object):
relation_name = child_relation.get_accessor_name()
child_objects = getattr(object, relation_name).all()
for child_object in child_objects:
yield from (
(
to_content_type_id,
to_object_id,
f"{relation_name}.item.{model_path}",
f"{relation_name}.{str(child_object.id)}.{content_path}",
)
for to_content_type_id, to_object_id, model_path, content_path in cls._extract_references_from_object(
child_object
)
)
@classmethod
def _get_content_path_hash(cls, content_path):
"""
Returns a UUID for the given content path. Used to enforce uniqueness.
Note: MySQL has a limit on the length of fields that are used in unique keys so
we need a separate hash field to allow us to support long content paths.
Args:
content_path (str): The content path to get a hash for
Returns:
A UUID instance containing the hash of the given content path
"""
return uuid.uuid5(
uuid.UUID("bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86"), content_path
)
@classmethod
def create_or_update_for_object(cls, object):
"""
Creates or updates ReferenceIndex records for the given object.
This method will extract any outbound references from the given object
and insert/update them in the database.
Note: This method must be called within a `django.db.transaction.atomic()` block.
Args:
object (Model): The model instance to create/update ReferenceIndex records for
"""
# For the purpose of this method, a "reference record" is a tuple of
# (to_content_type_id, to_object_id, model_path, content_path) - the properties that
# uniquely define a reference
# Extract new references and construct a set of reference records
references = set(cls._extract_references_from_object(object))
# Find content types for this model and all of its ancestor classes,
# ordered from most to least specific
content_types = [
ContentType.objects.get_for_model(model_or_object, for_concrete_model=False)
for model_or_object in ([object] + object._meta.get_parent_list())
]
content_type = content_types[0]
base_content_type = content_types[-1]
known_content_type_ids = [ct.id for ct in content_types]
# Find existing references in the database so we know what to add/delete.
# Construct a dict mapping reference records to the (content_type_id, id) pair that the
# existing database entry is found under
existing_references = {
(to_content_type_id, to_object_id, model_path, content_path): (
content_type_id,
id,
)
for id, content_type_id, to_content_type_id, to_object_id, model_path, content_path in cls.objects.filter(
base_content_type=base_content_type, object_id=object.pk
).values_list(
"id",
"content_type_id",
"to_content_type",
"to_object_id",
"model_path",
"content_path",
)
}
# Construct the set of reference records that have been found on the object but are not
# already present in the database
new_references = references - set(existing_references.keys())
# Create database records for those reference records
cls.objects.bulk_create(
[
cls(
content_type=content_type,
base_content_type=base_content_type,
object_id=object.pk,
to_content_type_id=to_content_type_id,
to_object_id=to_object_id,
model_path=model_path,
content_path=content_path,
content_path_hash=cls._get_content_path_hash(content_path),
)
for to_content_type_id, to_object_id, model_path, content_path in new_references
]
)
# Delete removed references
deleted_reference_ids = []
# Look at the reference record and the supporting content_type / id for each existing
# reference in the database
for (reference_data, (content_type_id, id)) in existing_references.items():
if reference_data in references:
# Do not delete this reference, as it is still present in the new set
continue
if content_type_id not in known_content_type_ids:
# The content type for the existing record does not match the current model or any
# superclass. We can infer that the existing record is for a more specific subclass
# than the one we're currently indexing - e.g. we are indexing <Page id=123> while
# the existing reference was recorded against <BlogPage id=123>. In this case, do
# not treat the missing reference as a deletion - it likely still exists, but on a
# relation which can only be seen on the more specific model.
continue
# If we reach here, this is a legitimate deletion - add it to the list of IDs to delete
deleted_reference_ids.append(id)
# Perform the deletion
cls.objects.filter(id__in=deleted_reference_ids).delete()
@classmethod
def remove_for_object(cls, object):
"""
Deletes all outbound references for the given object.
Use this before deleting the object itself.
Args:
object (Model): The model instance to delete ReferenceIndex records for
"""
base_content_type = cls._get_base_content_type(object)
cls.objects.filter(
base_content_type=base_content_type, object_id=object.pk
).delete()
@classmethod
def get_references_for_object(cls, object):
"""
Returns all outbound references for the given object.
Args:
object (Model): The model instance to fetch ReferenceIndex records for
Returns:
A QuerySet of ReferenceIndex records
"""
return cls.objects.filter(
base_content_type_id=cls._get_base_content_type(object),
object_id=object.pk,
)
@classmethod
def get_references_to(cls, object):
"""
Returns all inbound references for the given object.
Args:
object (Model): The model instance to fetch ReferenceIndex records for
Returns:
A QuerySet of ReferenceIndex records
"""
return cls.objects.filter(
to_content_type_id=cls._get_base_content_type(object),
to_object_id=object.pk,
)
@classmethod
def get_grouped_references_to(cls, object):
"""
Returns all inbound references for the given object, grouped by the object
they are found on.
Args:
object (Model): The model instance to fetch ReferenceIndex records for
Returns:
A ReferenceGroups object
"""
return cls.get_references_to(object).group_by_source_object()
@property
def _content_type(self):
# Accessing a ContentType from a ForeignKey does not make use of the
# ContentType manager's cache, so we use this property to make use of
# the cache.
return ContentType.objects.get_for_id(self.content_type_id)
@cached_property
def model_name(self):
"""
The model name of the object from which the reference was extracted.
For most cases, this is also where the reference exists on the database
(i.e. ``related_field_model_name``). However, for ClusterableModels, the
reference is extracted from the parent model.
Example:
A relationship between a BlogPage, BlogPageGalleryImage, and Image
is extracted from the BlogPage model, but the reference is stored on
on the BlogPageGalleryImage model.
"""
return self._content_type.name
@cached_property
def related_field_model_name(self):
"""
The model name where the reference exists on the database.
"""
return self.related_field.model._meta.verbose_name
@cached_property
def on_delete(self):
try:
return self.reverse_related_field.on_delete
except AttributeError:
# It might be a custom field/relation that doesn't have an on_delete attribute,
# or other reference collected from extract_references(), e.g. StreamField.
return models.SET_NULL
@cached_property
def source_field(self):
"""
The field from which the reference was extracted.
This may be a related field (e.g. ForeignKey), a reverse related field
(e.g. ManyToOneRel), a StreamField, or any other field that defines
extract_references().
"""
model_path_components = self.model_path.split(".")
field_name = model_path_components[0]
field = self._content_type.model_class()._meta.get_field(field_name)
return field
@cached_property
def related_field(self):
# The field stored on the reference index can be a related field or a
# reverse related field, depending on whether the reference was extracted
# directly from a ForeignKey or through a parent ClusterableModel. This
# property normalises to the related field.
if isinstance(self.source_field, models.ForeignObjectRel):
return self.source_field.remote_field
return self.source_field
@cached_property
def reverse_related_field(self):
# This property normalises to the reverse related field, which is where
# the on_delete attribute is stored.
return self.related_field.remote_field
def describe_source_field(self):
"""
Returns a string describing the field that this reference was extracted from.
For StreamField, this returns the label of the block that contains the reference.
For other fields, this returns the verbose name of the field.
"""
field = self.source_field
model_path_components = self.model_path.split(".")
# ManyToOneRel (reverse accessor for ParentalKey) does not have a verbose name. So get the name of the child field instead
if isinstance(field, models.ManyToOneRel):
child_field = field.related_model._meta.get_field(model_path_components[2])
return capfirst(child_field.verbose_name)
elif isinstance(field, StreamField):
label = f"{capfirst(field.verbose_name)}"
block = field.stream_block
block_idx = 1
while isinstance(block, StreamBlock):
block = block.child_blocks[model_path_components[block_idx]]
block_label = capfirst(block.label)
label += f" → {block_label}"
block_idx += 1
return label
else:
try:
field_name = field.verbose_name
except AttributeError:
# generate verbose name from field name in the same way that Django does:
# https://github.com/django/django/blob/7b94847e384b1a8c05a7d4c8778958c0290bdf9a/django/db/models/fields/__init__.py#L858
field_name = field.name.replace("_", " ")
return capfirst(field_name)
def describe_on_delete(self):
"""
Returns a string describing the action that will be taken when the referenced object is deleted.
"""
if self.on_delete == models.CASCADE:
return _("the %(model_name)s will also be deleted") % {
"model_name": self.related_field_model_name,
}
if self.on_delete == models.PROTECT:
return _("prevents deletion")
if self.on_delete == models.SET_DEFAULT:
return _("will be set to the default %(model_name)s") % {
"model_name": self.related_field_model_name,
}
if self.on_delete == models.DO_NOTHING:
return _("will do nothing")
# It's technically possible to know whether RESTRICT will prevent the
# deletion or not, but the only way to reliably do so is to use Django's
# internal Collector class, which is not publicly documented.
# It also uses its own logic to find the references in real-time, which
# may be slower than our ReferenceIndex. For now, we'll just say that
# RESTRICT *may* prevent deletion, but we do not add any safe guards
# around the possible exception.
if self.on_delete == models.RESTRICT:
return _("may prevent deletion")
# SET is a function that returns the actual callable used for on_delete,
# so we need to check for it by inspecting the deconstruct() result.
if (
hasattr(self.on_delete, "deconstruct")
and self.on_delete.deconstruct()[0] == "django.db.models.SET"
):
return _("will be set to a %(model_name)s specified by the system") % {
"model_name": self.related_field_model_name,
}
# It's either models.SET_NULL or a custom value, but we cannot be sure what
# will happen with the latter, so assume that the reference will be unset.
return _("will unset the reference")
# Ignore relations formed by any django-taggit 'through' model, as this causes any tag attached to
# a tagged object to appear as a reference to that object. Ideally we would follow the reference to
# the Tag model so that we can use the references index to find uses of a tag, but doing that
# correctly will require support for ManyToMany relations with through models:
# https://github.com/wagtail/wagtail/issues/9629
ItemBase.wagtail_reference_index_ignore = True