forked from wagtail/wagtail
/
audit_logging.py
201 lines (170 loc) · 7.59 KB
/
audit_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import datetime
from collections import defaultdict
import django_filters
from django import forms
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.db.models import IntegerField, Value
from django.utils.encoding import force_str
from django.utils.translation import gettext_lazy as _
from wagtail.admin.admin_url_finder import AdminURLFinder
from wagtail.admin.filters import (
ContentTypeFilter,
DateRangePickerWidget,
WagtailFilterSet,
)
from wagtail.coreutils import get_content_type_label
from wagtail.log_actions import registry as log_action_registry
from wagtail.models import PageLogEntry
from .base import ReportView
def get_users_for_filter(user):
user_ids = set()
for log_model in log_action_registry.get_log_entry_models():
user_ids.update(log_model.objects.viewable_by_user(user).get_user_ids())
User = get_user_model()
return User.objects.filter(pk__in=user_ids).order_by(User.USERNAME_FIELD)
def get_content_types_for_filter(user):
content_type_ids = set()
for log_model in log_action_registry.get_log_entry_models():
content_type_ids.update(
log_model.objects.viewable_by_user(user).get_content_type_ids()
)
return ContentType.objects.filter(pk__in=content_type_ids).order_by("model")
class SiteHistoryReportFilterSet(WagtailFilterSet):
action = django_filters.ChoiceFilter(
label=_("Action"),
choices=log_action_registry.get_choices,
)
hide_commenting_actions = django_filters.BooleanFilter(
label=_("Hide commenting actions"),
method="filter_hide_commenting_actions",
widget=forms.CheckboxInput,
)
timestamp = django_filters.DateFromToRangeFilter(
label=_("Date"), widget=DateRangePickerWidget
)
label = django_filters.CharFilter(label=_("Name"), lookup_expr="icontains")
user = django_filters.ModelChoiceFilter(
label=_("User"),
field_name="user",
queryset=lambda request: get_users_for_filter(request.user),
)
object_type = ContentTypeFilter(
label=_("Type"),
method="filter_object_type",
queryset=lambda request: get_content_types_for_filter(request.user),
)
def filter_hide_commenting_actions(self, queryset, name, value):
if value:
queryset = queryset.exclude(action__startswith="wagtail.comments")
return queryset
def filter_object_type(self, queryset, name, value):
return queryset.filter_on_content_type(value)
class Meta:
model = PageLogEntry
fields = [
"object_type",
"label",
"action",
"user",
"timestamp",
"hide_commenting_actions",
]
class LogEntriesView(ReportView):
template_name = "wagtailadmin/reports/site_history.html"
title = _("Site history")
header_icon = "history"
filterset_class = SiteHistoryReportFilterSet
export_headings = {
"object_id": _("ID"),
"label": _("Name"),
"content_type": _("Type"),
"action": _("Action type"),
"user_display_name": _("User"),
"timestamp": _("Date/Time"),
}
list_export = [
"object_id",
"label",
"content_type",
"action",
"user_display_name",
"timestamp",
]
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.custom_field_preprocess = self.custom_field_preprocess.copy()
self.custom_field_preprocess["action"] = {
self.FORMAT_CSV: self.get_action_label,
self.FORMAT_XLSX: self.get_action_label,
}
self.custom_field_preprocess["content_type"] = {
self.FORMAT_CSV: get_content_type_label,
self.FORMAT_XLSX: get_content_type_label,
}
def get_filename(self):
return "audit-log-{}".format(datetime.datetime.today().strftime("%Y-%m-%d"))
def get_filtered_queryset(self):
"""
Since this report combines records from multiple log models, the standard pattern of
returning a queryset from get_queryset() to be filtered by filter_queryset() is not
possible - the subquery for each log model must be filtered separately before joining
with union().
Additionally, a union() on standard model-based querysets will return a queryset based on
the first model in the union, so instances of the other model(s) would be returned as the
wrong type. To avoid this, we construct values() querysets as follows:
1. For each model, construct a values() queryset consisting of id, timestamp and an
annotation to indicate which model it is, and filter this with filter_queryset
2. Form a union() queryset from these queries, and order it by -timestamp
(this is the result returned from get_filtered_queryset)
3. Apply pagination (done in MultipleObjectMixin.get_context_data)
4. (In decorate_paginated_queryset:) For each model included in the result set, look up
the set of model instances by ID. Use these to form a final list of model instances
in the same order as the query.
"""
queryset = None
filters = None
# Retrieve the set of registered log models, and cast it to a list so that we assign
# an index number to each one; this index number will be used to distinguish models
# in the combined results
self.log_models = list(log_action_registry.get_log_entry_models())
for log_model_index, log_model in enumerate(self.log_models):
sub_queryset = (
log_model.objects.viewable_by_user(self.request.user)
.values("pk", "timestamp")
.annotate(
log_model_index=Value(log_model_index, output_field=IntegerField())
)
)
filters, sub_queryset = self.filter_queryset(sub_queryset)
# disable any native ordering on the queryset; we will re-apply it on the combined result
sub_queryset = sub_queryset.order_by()
if queryset is None:
queryset = sub_queryset
else:
queryset = queryset.union(sub_queryset)
return filters, queryset.order_by("-timestamp")
def decorate_paginated_queryset(self, queryset):
# build lists of ids from queryset, grouped by log model index
pks_by_log_model_index = defaultdict(list)
for row in queryset:
pks_by_log_model_index[row["log_model_index"]].append(row["pk"])
url_finder = AdminURLFinder(self.request.user)
# for each log model found in the queryset, look up the set of log entries by id
# and build a lookup table
object_lookup = {}
for log_model_index, pks in pks_by_log_model_index.items():
log_entries = (
self.log_models[log_model_index]
.objects.prefetch_related("user__wagtail_userprofile", "content_type")
.filter(pk__in=pks)
.with_instances()
)
for log_entry, instance in log_entries:
# annotate log entry with an 'edit_url' property
log_entry.edit_url = url_finder.get_edit_url(instance)
object_lookup[(log_model_index, log_entry.pk)] = log_entry
# return items from our lookup table in the order of the original queryset
return [object_lookup[(row["log_model_index"], row["pk"])] for row in queryset]
def get_action_label(self, action):
return force_str(log_action_registry.get_action_label(action))