/
signal_processor.py
284 lines (226 loc) · 10.5 KB
/
signal_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
"""The signal processor for Review Board search."""
import logging
import threading
from functools import partial
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.db.models.signals import post_delete, post_save, m2m_changed
from djblets.siteconfig.models import SiteConfiguration
from haystack.signals import BaseSignalProcessor
from reviewboard.accounts.models import Profile
from reviewboard.reviews.models import Group, ReviewRequest
from reviewboard.reviews.signals import review_request_published
from reviewboard.search import search_backend_registry
logger = logging.getLogger(__name__)
class SignalProcessor(BaseSignalProcessor):
""""Listens for signals and updates the search index.
This will listen for any signals that would affect the search index, and
invokes a suitable Haystack callback to immediately update the data stored
in the index.
This only updates the search index if:
1) Search is enabled.
2) The current search engine backend supports on-the-fly indexing.
"""
save_signals = [
(ReviewRequest, review_request_published, 'review_request'),
(User, post_save, 'instance'),
(Profile, post_save, 'instance'),
]
delete_signals = [
(ReviewRequest, post_delete),
(User, post_delete),
]
def __init__(self, *args, **kwargs):
"""Initialize the signal processor.
Args:
*args (tuple):
Positional arguments to pass to the parent constructor.
**kwargs (dict):
Keyword arguments to pass to the parent constructor.
"""
self.is_setup = False
self._can_process_signals = False
self._handlers = {}
self._pending_user_changes = threading.local()
super(SignalProcessor, self).__init__(*args, **kwargs)
@property
def can_process_signals(self):
"""Whether the signal processor can currently process signals."""
if not self._can_process_signals:
try:
SiteConfiguration.objects.get_current()
self._can_process_signals = True
except ObjectDoesNotExist:
pass
return self._can_process_signals
def setup(self):
"""Register the signal handlers for this processor."""
# We define this here instead of at the class level because we cannot
# reference class members during the class' definition.
m2m_changed_signals = [
(Group.users.through, self._handle_group_m2m_changed),
]
if not self.is_setup:
for cls, signal, instance_kwarg in self.save_signals:
handler = partial(self.check_handle_save,
instance_kwarg=instance_kwarg)
self._handlers[(cls, signal)] = handler
for cls, signal in self.delete_signals:
self._handlers[(cls, signal)] = self.check_handle_delete
for cls, handler in m2m_changed_signals:
self._handlers[(cls, m2m_changed)] = handler
for (cls, signal), handler in self._handlers.items():
signal.connect(handler, sender=cls)
self.is_setup = True
def teardown(self):
"""Unregister all signal handlers for this processor."""
if self.is_setup:
for (cls, signal), handler in self._handlers.items():
signal.disconnect(handler, sender=cls)
self.is_setup = False
def check_handle_save(self, instance_kwarg, **kwargs):
"""Conditionally update the search index when an object is updated.
Args:
instance_kwarg (unicode):
The name of the instance parameter.
**kwargs (dict):
Signal arguments. These will be passed to
:py:meth:`handle_save`.
"""
if not self.can_process_signals:
return
instance = kwargs.pop(instance_kwarg)
backend = search_backend_registry.current_backend
if backend and search_backend_registry.on_the_fly_indexing_enabled:
if isinstance(instance, Profile):
# When we save a Profile, we want to update the User index.
kwargs['sender'] = User
instance = instance.user
self.handle_save(instance=instance, **kwargs)
def check_handle_delete(self, **kwargs):
"""Conditionally update the search index when an object is deleted.
Args:
**kwargs (dict):
Signal arguments. These will be passed to
:py:meth:`handle_delete`.
"""
if not self.can_process_signals:
return
backend = search_backend_registry.current_backend
if backend and search_backend_registry.on_the_fly_indexing_enabled:
self.handle_delete(**kwargs)
def handle_save(self, **kwargs):
"""Update the search index when an object is updated.
If there's any error writing to the search backend, the error will
be caught and logged.
Args:
**kwargs (dict):
Signal arguments. These will be passed to
:py:meth:`handle_save`.
"""
try:
super(SignalProcessor, self).handle_save(**kwargs)
except Exception as e:
logger.error('Error updating the search index. Check to '
'make sure the search backend is running and '
'configured correctly, and then rebuild the search '
'index. Error: %s',
e)
def handle_delete(self, **kwargs):
"""Update the search index when an object is deleted.
If there's any error writing to the search backend, the error will
be caught and logged.
Args:
**kwargs (dict):
Signal arguments. These will be passed to
:py:meth:`handle_save`.
"""
try:
super(SignalProcessor, self).handle_delete(**kwargs)
except Exception as e:
logger.error('Error updating the search index. Check to '
'make sure the search backend is running and '
'configured correctly, and then rebuild the search '
'index. Error: %s',
e)
def _handle_group_m2m_changed(self, instance, action, pk_set, reverse,
**kwargs):
"""Handle a Group.users relation changing.
When the :py:attr:`Group.users
<reviewboard.reviews.models.group.Group.users>` field changes, we don't
get a corresponding :py:data:`~django.db.signals.post_save` signal
(because the related model wasn't saved). Instead, we will get multiple
:py:data:`~django.db.signals.m2m_changed` signals that indicate how the
relation is changing. This method will handle those signals and
call the correct save method so that they can be re-indexed.
Args:
instance (django.contrib.auth.models.User or reviewboward.reviews.models.group.Group):
The model that updated.
action (unicode):
The update action. This will be one of:
* ``'pre_add'``
* ``'post_add'``
* ``'pre_remove'``
* ``'post_remove'``
* ``'pre_clear'``
* ``'post_clear'``
pk_set (set of int):
The primary keys of the related objects that changed.
When the action is ``'pre_clear'`` or ``'post_clear'``,
this argument will be an empty set.
reverse (bool):
Whether or not the reverse relation was modified. If
true, this indicated that ``instance`` is a
:py:class:`~django.contrib.auth.models.User` object and
``pk_set`` is the set of primary keys of the added or removed
groups.
When this argument is false, ``instance`` is a
:py:class:`~reviewboard.reviews.models.group.Group`
object and ``pk_set`` is the set of primary keys of the added
or removed users.
**kwargs (dict):
Additional keyword arguments.
"""
backend = search_backend_registry.current_backend
if not (backend and
search_backend_registry.on_the_fly_indexing_enabled):
return
if not hasattr(self._pending_user_changes, 'data'):
self._pending_user_changes.data = {}
if action in ('post_add', 'post_remove'):
if reverse:
# When using the reverse relation, the instance is the User and
# the pk_set is the PKs of the groups being added or removed.
users = [instance]
else:
# Otherwise the instance is the Group and the pk_set is the set
# of User primary keys.
users = User.objects.filter(pk__in=pk_set)
for user in users:
self.handle_save(instance=user, instance_kwarg='instance',
sender=User)
elif action == 'pre_clear':
# When ``reverse`` is ``True``, a User is having their groups
# cleared so we don't need to worry about storing any state in the
# pre_clear phase.
#
# Otherwise, a ReviewGroup is having their users cleared. In both
# the pre_clear and post_clear phases, the ``pk_set`` argument will
# be empty, so we cache the PKs of the current members of the
# groups so we know to reindex them.
if not reverse:
self._pending_user_changes.data[instance.pk] = list(
instance.users.values_list('pk', flat=True))
elif action == 'post_clear':
if reverse:
# When ``reverse`` is ``True``, we just have to reindex a
# single user.
self.handle_save(instance=instance, instance_kwarg='instance',
sender=User)
else:
# Here, we are reindexing every user that got removed from the
# group via clearing.
pks = self._pending_user_changes.data.pop(instance.pk)
for user in User.objects.filter(pk__in=pks):
self.handle_save(instance=user, instance_kwarg='instance',
sender=User)