-
Notifications
You must be signed in to change notification settings - Fork 192
/
models.py
399 lines (349 loc) · 14.1 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
from copy import copy
from typing import List, NamedTuple, Sequence, Union
import swapper
from django.db import models
from django.utils.translation import gettext_lazy as _
from firebase_admin import messaging
from firebase_admin.exceptions import FirebaseError, InvalidArgumentError
from fcm_django.settings import FCM_DJANGO_SETTINGS as SETTINGS
# Set by Firebase. Adjust when they adjust; developers can override too if we don't
# upgrade package in time via a monkeypatch.
MAX_MESSAGES_PER_BATCH = 500
class Device(models.Model):
id = models.AutoField(
verbose_name="ID",
primary_key=True,
auto_created=True,
)
name = models.CharField(
max_length=255, verbose_name=_("Name"), blank=True, null=True
)
active = models.BooleanField(
verbose_name=_("Is active"),
default=True,
help_text=_("Inactive devices will not be sent notifications"),
)
user = models.ForeignKey(
SETTINGS["USER_MODEL"],
blank=True,
null=True,
on_delete=models.CASCADE,
related_query_name=_("fcmdevice"),
)
date_created = models.DateTimeField(
verbose_name=_("Creation date"), auto_now_add=True, null=True
)
class Meta:
abstract = True
def __str__(self):
return (
self.name
or (getattr(self, "device_id") or "")
or f"{self.__class__.__name__} for {self.user or 'unknown user'}"
)
class _FCMDeviceManager(models.Manager):
def get_queryset(self):
return FCMDeviceQuerySet(self.model)
# Error codes: https://firebase.google.com/docs/reference/fcm/rest/v1/ErrorCode
fcm_error_list = [
messaging.UnregisteredError,
messaging.SenderIdMismatchError,
InvalidArgumentError,
]
fcm_error_list_str = [x.code for x in fcm_error_list]
def _validate_exception_for_deactivation(exc: Union[FirebaseError]) -> bool:
if not exc:
return False
exc_type = type(exc)
if exc_type == str:
return exc in fcm_error_list_str
return (
exc_type == InvalidArgumentError and exc.cause == "Invalid registration"
) or (exc_type in fcm_error_list)
class FirebaseResponseDict(NamedTuple):
# All errors are stored rather than raised in BatchResponse.exceptions
# or TopicManagementResponse.errors
response: Union[messaging.BatchResponse, messaging.TopicManagementResponse]
registration_ids_sent: List[str]
deactivated_registration_ids: List[str]
class FCMDeviceQuerySet(models.query.QuerySet):
@staticmethod
def _prepare_message(message: messaging.Message, token: str):
message.token = token
return copy(message)
@staticmethod
def get_default_send_message_response() -> FirebaseResponseDict:
return FirebaseResponseDict(
response=messaging.BatchResponse([]),
registration_ids_sent=[],
deactivated_registration_ids=[],
)
def get_registration_ids(
self,
skip_registration_id_lookup: bool = False,
additional_registration_ids: Sequence[str] = None,
) -> List[str]:
"""
Uses the current filtering/QuerySet chain to get registration IDs
:param skip_registration_id_lookup: skips the QuerySet lookup and solely uses
the list of IDs from additional_registration_ids
:param additional_registration_ids: specific registration_ids to add to the
QuerySet lookup
:returns a list of registration IDs
"""
registration_ids = (
list(additional_registration_ids) if additional_registration_ids else []
)
if not skip_registration_id_lookup:
registration_ids.extend(
self.filter(active=True).values_list("registration_id", flat=True)
)
return registration_ids
def send_message(
self,
message: messaging.Message,
skip_registration_id_lookup: bool = False,
additional_registration_ids: Sequence[str] = None,
app: "firebase_admin.App" = SETTINGS["DEFAULT_FIREBASE_APP"],
**more_send_message_kwargs,
) -> FirebaseResponseDict:
"""
Send notification of single message for all active devices in
queryset and deactivate if DELETE_INACTIVE_DEVICES setting is set to True.
Bulk sends using firebase.messaging.send_each. For every 500 messages, we send a
single HTTP request to Firebase (the 500 is set by the firebase-sdk).
:param message: firebase.messaging.Message. If `message` includes a token/id, it
will be overridden.
:param skip_registration_id_lookup: skips the QuerySet lookup and solely uses
the list of IDs from additional_registration_ids
:param additional_registration_ids: specific registration_ids to add to the
:param app: firebase_admin.App. Specify a specific app to use
QuerySet lookup
:param more_send_message_kwargs: Parameters for firebase.messaging.send_each()
- dry_run: bool. Whether to actually send the notification to the device
If there are any new parameters, you can still specify them here.
:raises FirebaseError
:returns FirebaseResponseDict
"""
registration_ids = self.get_registration_ids(
skip_registration_id_lookup,
additional_registration_ids,
)
if not registration_ids:
return self.get_default_send_message_response()
responses: List[messaging.SendResponse] = []
for i in range(0, len(registration_ids), MAX_MESSAGES_PER_BATCH):
messages = [
self._prepare_message(message, token)
for token in registration_ids[i : i + MAX_MESSAGES_PER_BATCH]
]
responses.extend(
messaging.send_each(
messages, app=app, **more_send_message_kwargs
).responses
)
return FirebaseResponseDict(
response=messaging.BatchResponse(responses),
registration_ids_sent=registration_ids,
deactivated_registration_ids=self.deactivate_devices_with_error_results(
registration_ids, responses
),
)
def deactivate_devices_with_error_results(
self,
registration_ids: List[str],
results: List[Union[messaging.SendResponse, messaging.ErrorInfo]],
) -> List[str]:
if not results:
return []
if isinstance(results[0], messaging.SendResponse):
deactivated_ids = [
token
for item, token in zip(results, registration_ids)
if _validate_exception_for_deactivation(item.exception)
]
else:
deactivated_ids = [
registration_ids[x.index]
for x in results
if _validate_exception_for_deactivation(x.reason)
]
self.filter(registration_id__in=deactivated_ids).update(active=False)
self._delete_inactive_devices_if_requested(deactivated_ids)
return deactivated_ids
def _delete_inactive_devices_if_requested(self, registration_ids: List[str]):
if SETTINGS["DELETE_INACTIVE_DEVICES"]:
self.filter(registration_id__in=registration_ids).delete()
@staticmethod
def get_default_topic_response() -> FirebaseResponseDict:
return FirebaseResponseDict(
response=messaging.TopicManagementResponse({"results": []}),
registration_ids_sent=[],
deactivated_registration_ids=[],
)
def handle_topic_subscription(
self,
should_subscribe: bool,
topic: str,
skip_registration_id_lookup: bool = False,
additional_registration_ids: Sequence[str] = None,
app: "firebase_admin.App" = SETTINGS["DEFAULT_FIREBASE_APP"],
**more_subscribe_kwargs,
) -> FirebaseResponseDict:
"""
Subscribes or Unsubscribes filtered and/or given tokens/registration_ids
to given topic.
:param should_subscribe: whether to have these users subscribe (True) or
unsubscribe to a topic (False).
:param topic: Name of the topic to subscribe to. May contain the ``/topics/``
prefix.
:param skip_registration_id_lookup: skips the QuerySet lookup and solely uses
the list of IDs from additional_registration_ids
:param additional_registration_ids: specific registration_ids to add to the
:param app: firebase_admin.App. Specify a specific app to use
QuerySet lookup
:param more_subscribe_kwargs: Parameters for
``firebase.messaging.subscribe_to_topic()``
If there are any new parameters, you can still specify them here.
:raises FirebaseError
:returns FirebaseResponseDict
"""
registration_ids = self.get_registration_ids(
skip_registration_id_lookup,
additional_registration_ids,
)
if not registration_ids:
return self.get_default_topic_response()
response = (
messaging.subscribe_to_topic
if should_subscribe
else messaging.unsubscribe_from_topic
)(registration_ids, topic, app=app, **more_subscribe_kwargs)
return FirebaseResponseDict(
response=response,
registration_ids_sent=registration_ids,
deactivated_registration_ids=self.deactivate_devices_with_error_results(
registration_ids, response.errors
),
)
FCMDeviceManager = _FCMDeviceManager.from_queryset(FCMDeviceQuerySet)
class DeviceType(models.TextChoices):
IOS = "ios", "ios"
ANDROID = "android", "android"
WEB = "web", "web"
class AbstractFCMDevice(Device):
device_id = models.CharField(
verbose_name=_("Device ID"),
blank=True,
null=True,
db_index=True,
help_text=_("Unique device identifier"),
max_length=255,
)
registration_id = models.TextField(
verbose_name=_("Registration token"),
unique=not SETTINGS["MYSQL_COMPATIBILITY"],
)
type = models.CharField(choices=DeviceType.choices, max_length=10)
objects: "FCMDeviceQuerySet" = FCMDeviceManager()
class Meta:
abstract = True
verbose_name = _("FCM device")
indexes = [
models.Index(fields=["registration_id", "user"]),
]
def send_message(
self,
message: messaging.Message,
app: "firebase_admin.App" = SETTINGS["DEFAULT_FIREBASE_APP"],
**more_send_message_kwargs,
) -> messaging.SendResponse:
"""
Send single message. The message's token should be blank (and will be
overridden if not). Responds with message ID string.
:param message: firebase.messaging.Message. If `message` includes a token/id, it
will be overridden.
:param app: firebase_admin.App. Specify a specific app to use
:param more_send_message_kwargs: Parameters for firebase.messaging.send_each()
- dry_run: bool. Whether to actually send the notification to the device
If there are any new parameters, you can still specify them here.
:raises FirebaseError
:returns messaging.SendResponse or FirebaseError if the device was
deactivated due to an error.
"""
if not self.active:
return messaging.SendResponse(
None,
None,
)
message.token = self.registration_id
try:
return messaging.SendResponse(
{"name": messaging.send(message, app=app, **more_send_message_kwargs)},
None,
)
except FirebaseError as e:
self.deactivate_devices_with_error_result(self.registration_id, e)
raise
def handle_topic_subscription(
self,
should_subscribe: bool,
topic: str,
app: "firebase_admin.App" = SETTINGS["DEFAULT_FIREBASE_APP"],
**more_subscribe_kwargs,
) -> FirebaseResponseDict:
"""
Subscribes or Unsubscribes based on instance's registration_id
:param should_subscribe: whether to have these users subscribe (True) or
unsubscribe to a topic (False).
:param topic: Name of the topic to subscribe to. May contain the ``/topics/``
prefix.
:param app: firebase_admin.App. Specify a specific app to use
:param more_subscribe_kwargs: Parameters for
``firebase.messaging.subscribe_to_topic()``
If there are any new parameters, you can still specify them here.
:raises FirebaseError
:returns FirebaseResponseDict
"""
_r_ids = [self.registration_id]
response = (
messaging.subscribe_to_topic
if should_subscribe
else messaging.unsubscribe_from_topic
)(_r_ids, topic, app=app, **more_subscribe_kwargs)
return FirebaseResponseDict(
response=response,
registration_ids_sent=_r_ids,
deactivated_registration_ids=type(
self
).objects.deactivate_devices_with_error_results(_r_ids, response.errors),
)
@classmethod
def deactivate_devices_with_error_result(
cls, registration_id, firebase_exc, name=None
) -> List[str]:
return cls.objects.deactivate_devices_with_error_results(
[registration_id], [messaging.SendResponse({"name": name}, firebase_exc)]
)
@staticmethod
def send_topic_message(
message: messaging.Message,
topic_name: str,
app: "firebase_admin.App" = SETTINGS["DEFAULT_FIREBASE_APP"],
**more_send_message_kwargs,
) -> messaging.SendResponse:
message.topic = topic_name
return messaging.SendResponse(
{"name": messaging.send(message, app=app, **more_send_message_kwargs)},
None,
)
class FCMDevice(AbstractFCMDevice):
class Meta:
verbose_name = _("FCM device")
verbose_name_plural = _("FCM devices")
if not SETTINGS["MYSQL_COMPATIBILITY"]:
indexes = [
models.Index(fields=["registration_id", "user"]),
]
app_label = "fcm_django"
swappable = swapper.swappable_setting("fcm_django", "fcmdevice")