/
managers.py
executable file
·248 lines (179 loc) · 8.63 KB
/
managers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
from __future__ import unicode_literals
import copy
import datetime
from django.db import models
from django.utils.functional import curry
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from audit_log.models.fields import LastUserField
try:
from django.utils.timezone import now as datetime_now
assert datetime_now
except ImportError:
import datetime
datetime_now = datetime.datetime.now
class LogEntryObjectDescriptor(object):
def __init__(self, model):
self.model = model
def __get__(self, instance, owner):
kwargs = dict((f.attname, getattr(instance, f.attname))
for f in self.model._meta.fields
if hasattr(instance, f.attname))
return self.model(**kwargs)
class AuditLogManager(models.Manager):
def __init__(self, model, attname, instance = None, ):
super(AuditLogManager, self).__init__()
self.model = model
self.instance = instance
self.attname = attname
#set a hidden attribute on the instance to control wether we should track changes
if instance is not None and not hasattr(instance, '__is_%s_enabled'%attname):
setattr(instance, '__is_%s_enabled'%attname, True)
def enable_tracking(self):
if self.instance is None:
raise ValueError("Tracking can only be enabled or disabled "
"per model instance, not on a model class")
setattr(self.instance, '__is_%s_enabled'%self.attname, True)
def disable_tracking(self):
if self.instance is None:
raise ValueError("Tracking can only be enabled or disabled "
"per model instance, not on a model class")
setattr(self.instance, '__is_%s_enabled'%self.attname, False)
def is_tracking_enabled(self):
if self.instance is None:
raise ValueError("Tracking can only be enabled or disabled "
"per model instance, not on a model class")
return getattr(self.instance, '__is_%s_enabled'%self.attname)
def get_queryset(self):
if self.instance is None:
return super(AuditLogManager, self).get_queryset()
f = {self.instance._meta.pk.name : self.instance.pk}
return super(AuditLogManager, self).get_queryset().filter(**f)
class AuditLogDescriptor(object):
def __init__(self, model, manager_class, attname):
self.model = model
self.manager_class = manager_class
self.attname = attname
def __get__(self, instance, owner):
if instance is None:
return self.manager_class(self.model, self.attname)
return self.manager_class(self.model, self.attname, instance)
class AuditLog(object):
manager_class = AuditLogManager
def __init__(self, exclude = []):
self._exclude = exclude
def contribute_to_class(self, cls, name):
self.manager_name = name
models.signals.class_prepared.connect(self.finalize, sender = cls)
def create_log_entry(self, instance, action_type):
manager = getattr(instance, self.manager_name)
attrs = {}
for field in instance._meta.fields:
if field.attname not in self._exclude:
attrs[field.attname] = getattr(instance, field.attname)
manager.create(action_type = action_type, **attrs)
def post_save(self, instance, created, **kwargs):
#ignore if it is disabled
if getattr(instance, self.manager_name).is_tracking_enabled():
self.create_log_entry(instance, created and 'I' or 'U')
def post_delete(self, instance, **kwargs):
#ignore if it is disabled
if getattr(instance, self.manager_name).is_tracking_enabled():
self.create_log_entry(instance, 'D')
def finalize(self, sender, **kwargs):
log_entry_model = self.create_log_entry_model(sender)
models.signals.post_save.connect(self.post_save, sender = sender, weak = False)
models.signals.post_delete.connect(self.post_delete, sender = sender, weak = False)
descriptor = AuditLogDescriptor(log_entry_model, self.manager_class, self.manager_name)
setattr(sender, self.manager_name, descriptor)
def copy_fields(self, model):
"""
Creates copies of the fields we are keeping
track of for the provided model, returning a
dictionary mapping field name to a copied field object.
"""
fields = {'__module__' : model.__module__}
for field in model._meta.fields:
if not field.name in self._exclude:
field = copy.deepcopy(field)
if isinstance(field, models.AutoField):
#we replace the AutoField of the original model
#with an IntegerField because a model can
#have only one autofield.
field.__class__ = models.IntegerField
if field.primary_key:
field.serialize = True
#OneToOne fields should really be tracked
#as ForeignKey fields
if isinstance(field, models.OneToOneField):
field.__class__ = models.ForeignKey
if field.primary_key or field.unique:
#unique fields of the original model
#can not be guaranteed to be unique
#in the audit log entry but they
#should still be indexed for faster lookups.
field.primary_key = False
field._unique = False
field.db_index = True
if field.rel and field.rel.related_name:
field.rel.related_name = '_auditlog_%s' % field.rel.related_name
fields[field.name] = field
return fields
def get_logging_fields(self, model):
"""
Returns a dictionary mapping of the fields that are used for
keeping the acutal audit log entries.
"""
rel_name = '_%s_audit_log_entry'%model._meta.object_name.lower()
def entry_instance_to_unicode(log_entry):
try:
result = '%s: %s %s at %s'%(model._meta.object_name,
log_entry.object_state,
log_entry.get_action_type_display().lower(),
log_entry.action_date,
)
except AttributeError:
result = '%s %s at %s'%(model._meta.object_name,
log_entry.get_action_type_display().lower(),
log_entry.action_date
)
return result
action_user_field = LastUserField(related_name = rel_name, editable = False)
#check if the manager has been attached to auth user model
if [model._meta.app_label, model.__name__] == getattr(settings, 'AUTH_USER_MODEL', 'auth.User').split("."):
action_user_field = LastUserField(related_name = rel_name, editable = False, to = 'self')
return {
'action_id' : models.AutoField(primary_key = True),
'action_date' : models.DateTimeField(default = datetime_now, editable = False, blank=False),
'action_user' : action_user_field,
'action_type' : models.CharField(max_length = 1, editable = False, choices = (
('I', _('Created')),
('U', _('Changed')),
('D', _('Deleted')),
)),
'object_state' : LogEntryObjectDescriptor(model),
'__unicode__' : entry_instance_to_unicode,
}
def get_meta_options(self, model):
"""
Returns a dictionary of Meta options for the
autdit log model.
"""
result = {
'ordering' : ('-action_date',),
'app_label' : model._meta.app_label,
}
from django.db.models.options import DEFAULT_NAMES
if 'default_permissions' in DEFAULT_NAMES:
result.update({'default_permissions': ()})
return result
def create_log_entry_model(self, model):
"""
Creates a log entry model that will be associated with
the model provided.
"""
attrs = self.copy_fields(model)
attrs.update(self.get_logging_fields(model))
attrs.update(Meta = type(str('Meta'), (), self.get_meta_options(model)))
name = str('%sAuditLogEntry'%model._meta.object_name)
return type(name, (models.Model,), attrs)