forked from learningequality/kolibri
-
Notifications
You must be signed in to change notification settings - Fork 0
/
models.py
1562 lines (1273 loc) · 59.1 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
We have four main abstractions: Users, Collections, Memberships, and Roles.
Users represent people, like students in a school, teachers for a classroom, or volunteers setting up informal
installations. A ``FacilityUser`` belongs to a particular facility, and has permissions only with respect to other data
that is associated with that facility. ``FacilityUser`` accounts (like other facility data) may be synced across multiple
devices.
Collections form a hierarchy, with Collections able to belong to other Collections. Collections are subdivided
into several pre-defined levels (``Facility`` > ``Classroom`` > ``LearnerGroup``).
A ``FacilityUser`` (but not a ``DeviceOwner``) can be marked as a member of a ``Collection`` through a ``Membership``
object. Being a member of a Collection also means being a member of all the Collections above that Collection in the
hierarchy.
Another way in which a ``FacilityUser`` can be associated with a particular ``Collection`` is through a ``Role``
object, which grants the user a role with respect to the ``Collection`` and all the collections below it. A ``Role``
object also stores the "kind" of the role (currently, one of "admin" or "coach"), which affects what permissions the
user gains through the ``Role``.
"""
from __future__ import absolute_import
from __future__ import print_function
from __future__ import unicode_literals
import logging
from threading import local
import six
from django.contrib.auth.models import AbstractBaseUser
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth.models import UserManager
from django.core import validators
from django.core.exceptions import ObjectDoesNotExist
from django.core.exceptions import ValidationError
from django.db import models
from django.db import transaction
from django.db.models.query import Q
from django.db.utils import IntegrityError
from django.utils.encoding import python_2_unicode_compatible
from morango.models import Certificate
from morango.models import SyncableModel
from morango.models import SyncableModelManager
from mptt.models import TreeForeignKey
from .constants import collection_kinds
from .constants import facility_presets
from .constants import morango_sync
from .constants import role_kinds
from .constants import user_kinds
from .errors import IncompatibleDeviceSettingError
from .errors import InvalidCollectionHierarchy
from .errors import InvalidMembershipError
from .errors import InvalidRoleKind
from .errors import UserDoesNotHaveRoleError
from .errors import UserIsNotFacilityUser
from .errors import UserIsNotMemberError
from .permissions.auth import AllCanReadFacilityDataset
from .permissions.auth import AnyUserCanReadFacilities
from .permissions.auth import CoachesCanManageGroupsForTheirClasses
from .permissions.auth import CoachesCanManageMembershipsForTheirGroups
from .permissions.auth import CollectionSpecificRoleBasedPermissions
from .permissions.auth import FacilityAdminCanEditForOwnFacilityDataset
from .permissions.base import BasePermissions
from .permissions.base import RoleBasedPermissions
from .permissions.general import IsAdminForOwnFacility
from .permissions.general import IsOwn
from .permissions.general import IsSelf
from kolibri.core.auth.constants.demographics import choices as GENDER_CHOICES
from kolibri.core.auth.constants.demographics import DEFERRED
from kolibri.core.auth.constants.morango_sync import ScopeDefinitions
from kolibri.core.device.utils import DeviceNotProvisioned
from kolibri.core.device.utils import get_device_setting
from kolibri.core.device.utils import set_device_settings
from kolibri.core.errors import KolibriValidationError
from kolibri.core.fields import DateTimeTzField
from kolibri.utils.time_utils import local_now
logger = logging.getLogger(__name__)
class DatasetCache(local):
def __init__(self):
self.deactivate()
def __enter__(self):
self.activate()
def activate(self):
self._active = True
def __exit__(self, type, value, traceback):
self.deactivate()
def deactivate(self):
self._active = False
self.clear()
def clear(self):
self._cache = {}
def get(self, key):
if self._active:
return self._cache.get(key)
return None
def set(self, key, dataset_id):
if self._active:
self._cache[key] = dataset_id
return None
dataset_cache = DatasetCache()
def _has_permissions_class(obj):
return hasattr(obj, "permissions") and isinstance(obj.permissions, BasePermissions)
class FacilityDataSyncableModel(SyncableModel):
morango_profile = morango_sync.PROFILE_FACILITY_DATA
class Meta:
abstract = True
@python_2_unicode_compatible
class FacilityDataset(FacilityDataSyncableModel):
"""
``FacilityDataset`` stores high-level metadata and settings for a particular ``Facility``.
It is also the model that all models storing facility data (data that is associated with a
particular facility, and that inherits from ``AbstractFacilityDataModel``) foreign key onto,
to indicate that they belong to this particular ``Facility``.
"""
permissions = (
AllCanReadFacilityDataset() | FacilityAdminCanEditForOwnFacilityDataset()
)
# Morango syncing settings
morango_model_name = "facilitydataset"
description = models.TextField(blank=True)
location = models.CharField(max_length=200, blank=True)
preset = models.CharField(
max_length=50,
choices=facility_presets.choices,
default=facility_presets.default,
)
# Facility specific configuration settings
learner_can_edit_username = models.BooleanField(default=True)
learner_can_edit_name = models.BooleanField(default=True)
learner_can_edit_password = models.BooleanField(default=True)
learner_can_sign_up = models.BooleanField(default=True)
learner_can_delete_account = models.BooleanField(default=True)
learner_can_login_with_no_password = models.BooleanField(default=False)
show_download_button_in_learn = models.BooleanField(default=True)
registered = models.BooleanField(default=False)
def __str__(self):
facilities = self.collection_set.filter(kind=collection_kinds.FACILITY)
if facilities:
return "FacilityDataset for {}".format(
Facility.objects.get(id=facilities[0].id)
)
return "FacilityDataset (no associated Facility)"
def save(self, *args, **kwargs):
self.ensure_compatibility()
super(FacilityDataset, self).save(*args, **kwargs)
def ensure_compatibility(self, *args, **kwargs):
if self.learner_can_login_with_no_password and self.learner_can_edit_password:
raise IncompatibleDeviceSettingError(
"Device Settings [learner_can_login_with_no_password={}] & [learner_can_edit_password={}] "
"values incompatible together.".format(
self.learner_can_login_with_no_password,
self.learner_can_edit_password,
)
)
def calculate_source_id(self):
# if we don't already have a source ID, get one by generating a new root certificate, and using its ID
if not self._morango_source_id:
self._morango_source_id = Certificate.generate_root_certificate(
ScopeDefinitions.FULL_FACILITY
).id
return self._morango_source_id
@staticmethod
def compute_namespaced_id(partition_value, source_id_value, model_name):
# assert partition_value.startswith(FacilityDataset.ID_PLACEHOLDER)
if model_name != FacilityDataset.morango_model_name:
raise AssertionError
# we use the source_id as the ID for the FacilityDataset
return source_id_value
def calculate_partition(self):
return "{id}:allusers-ro".format(id=self.ID_PLACEHOLDER)
def get_root_certificate(self):
return Certificate.objects.get(id=self.id)
def get_owned_certificates(self):
# return all certificates associated with this facility dataset for which we have the private key
return Certificate.objects.filter(
tree_id=self.get_root_certificate().tree_id
).exclude(_private_key=None)
def reset_to_default_settings(self, preset=None):
from kolibri.core.auth.constants.facility_presets import mappings
# use the current preset if it is not passed in
dataset_data = mappings[preset or self.preset]
for key, value in dataset_data.items():
setattr(self, key, value)
self.save()
class AbstractFacilityDataModel(FacilityDataSyncableModel):
"""
Base model for Kolibri "Facility Data", which is data that is specific to a particular ``Facility``,
such as ``FacilityUsers``, ``Collections``, and other data associated with those users and collections.
"""
dataset = models.ForeignKey(FacilityDataset, on_delete=models.CASCADE)
class Meta:
abstract = True
@classmethod
def get_related_dataset_cache_key(cls, id, db_table):
return "{id}_{db_table}_dataset".format(id=id, db_table=db_table)
def cached_related_dataset_lookup(self, related_obj_name):
"""
Attempt to get the dataset_id either from the cache or the actual related obj instance.
:param related_obj_name: string representing the name of the related object on this model
:return: the dataset_id associated with the related obj
"""
field = self._meta.get_field(related_obj_name)
key = self.get_related_dataset_cache_key(
getattr(self, field.attname), field.related_model._meta.db_table
)
dataset_id = dataset_cache.get(key)
if dataset_id is None:
try:
dataset_id = getattr(self, related_obj_name).dataset_id
except ObjectDoesNotExist as e:
raise ValidationError(e)
dataset_cache.set(key, dataset_id)
return dataset_id
def calculate_source_id(self):
# by default, we'll use randomly generated source IDs; this can be overridden as desired
return None
def clean_fields(self, *args, **kwargs):
# ensure that we have, or can infer, a dataset for the model instance
if not self.dataset_id:
self.ensure_dataset(validating=True)
super(AbstractFacilityDataModel, self).clean_fields(*args, **kwargs)
def full_clean(self, *args, **kwargs):
kwargs["exclude"] = kwargs.get("exclude", []) + getattr(
self, "FIELDS_TO_EXCLUDE_FROM_VALIDATION", []
)
super(AbstractFacilityDataModel, self).full_clean(*args, **kwargs)
def pre_save(self):
# before saving, ensure we have a dataset, and convert any validation errors into integrity
# errors, since by this point the `clean_fields` method should already have prevented
# this situation from arising
try:
self.ensure_dataset()
except KolibriValidationError as e:
raise IntegrityError(str(e))
def save(self, *args, **kwargs):
self.pre_save()
super(AbstractFacilityDataModel, self).save(*args, **kwargs)
def ensure_dataset(self, *args, **kwargs):
"""
If no dataset has yet been specified, try to infer it. If a dataset has already been specified, to prevent
inconsistencies, make sure it matches the inferred dataset, otherwise raise a ``KolibriValidationError``.
If we have no dataset and it can't be inferred, we raise a ``KolibriValidationError`` exception as well.
"""
inferred_dataset_id = self.infer_dataset(*args, **kwargs)
if self.dataset_id:
# make sure currently stored dataset matches inferred dataset, if any
if inferred_dataset_id and inferred_dataset_id != self.dataset_id:
raise KolibriValidationError(
"This model is not associated with the correct FacilityDataset."
)
else:
# use the inferred dataset, if there is one, otherwise throw an error
if inferred_dataset_id:
self.dataset_id = inferred_dataset_id
else:
raise KolibriValidationError(
"FacilityDataset ('dataset') not provided, and could not be inferred."
)
def infer_dataset(self, *args, **kwargs):
"""
This method is used by `ensure_dataset` to "infer" which dataset should be associated with this instance.
It should be overridden in any subclass of ``AbstractFacilityDataModel``, to define a model-specific inference.
"""
raise NotImplementedError(
"Subclasses of AbstractFacilityDataModel must override the `infer_dataset` method."
)
class KolibriAbstractBaseUser(AbstractBaseUser):
"""
Our custom user type, derived from ``AbstractBaseUser`` as described in the Django docs.
Draws liberally from ``django.contrib.auth.AbstractUser``, except we exclude some fields
we don't care about, like email.
This model is an abstract model, and is inherited by ``FacilityUser``.
"""
class Meta:
abstract = True
USERNAME_FIELD = "username"
username = models.CharField(
"username",
max_length=30,
help_text="Required. 30 characters or fewer. Letters and digits only",
validators=[
validators.RegexValidator(
r'[\s`~!@#$%^&*()\-+={}\[\]\|\\\/:;"\'<>,\.\?]',
"Enter a valid username. This value can contain only letters, numbers, and underscores.",
inverse_match=True,
)
],
)
full_name = models.CharField("full name", max_length=120, blank=True)
date_joined = DateTimeTzField("date joined", default=local_now, editable=False)
is_staff = False
is_superuser = False
is_facility_user = False
can_manage_content = False
def get_short_name(self):
return self.full_name.split(" ", 1)[0]
@property
def session_data(self):
"""
Data that is added to the session data at login and during session updates.
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `session_data` property."
)
def is_member_of(self, coll):
"""
Determine whether this user is a member of the specified ``Collection``.
:param coll: The ``Collection`` for which we are checking this user's membership.
:return: ``True`` if this user is a member of the specified ``Collection``, otherwise False.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `is_member_of` method."
)
def has_role_for_user(self, kinds, user):
"""
Determine whether this user has (at least one of) the specified role kind(s) in relation to the specified user.
:param user: The user that is the target of the role (for which this user has the roles).
:param kinds: The kind (or kinds) of role to check for, as a string or iterable.
:type kinds: string from ``kolibri.core.auth.constants.role_kinds.*``
:return: ``True`` if this user has the specified role kind with respect to the target user, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `has_role_for_user` method."
)
def has_role_for_collection(self, kinds, coll):
"""
Determine whether this user has (at least one of) the specified role kind(s) in relation to the specified ``Collection``.
:param kinds: The kind (or kinds) of role to check for, as a string or iterable.
:type kinds: string from kolibri.core.auth.constants.role_kinds.*
:param coll: The target ``Collection`` for which this user has the roles.
:return: ``True`` if this user has the specified role kind with respect to the target ``Collection``, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `has_role_for_collection` method."
)
def can_create_instance(self, obj):
"""
Checks whether this user (self) has permission to create a particular model instance (obj).
This method should be overridden by classes that inherit from ``KolibriAbstractBaseUser``.
In general, unless an instance has already been initialized, this method should not be called directly;
instead, it should be preferred to call ``can_create``.
:param obj: An (unsaved) instance of a Django model, to check permissions for.
:return: ``True`` if this user should have permission to create the object, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `can_create_instance` method."
)
def can_create(self, Model, data):
"""
Checks whether this user (self) has permission to create an instance of Model with the specified attributes (data).
This method defers to the ``can_create_instance`` method, and in most cases should not itself be overridden.
:param Model: A subclass of ``django.db.models.Model``
:param data: A ``dict`` of data to be used in creating an instance of the Model
:return: ``True`` if this user should have permission to create an instance of Model with the specified data, else ``False``.
:rtype: bool
"""
try:
instance = Model(**data)
instance.clean_fields(
exclude=getattr(Model, "FIELDS_TO_EXCLUDE_FROM_VALIDATION", None)
)
instance.clean()
except TypeError as e:
logger.error(
"TypeError while validating model before checking permissions: {}".format(
e.args
)
)
# if the data provided does not fit the Model, don't continue checking
return False
except ValidationError as e:
logger.error(e)
return False # if the data does not validate, don't continue checking
# now that we have an instance, defer to the permission-checking method that works with instances
return self.can_create_instance(instance)
def can_read(self, obj):
"""
Checks whether this user (self) has permission to read a particular model instance (obj).
This method should be overridden by classes that inherit from ``KolibriAbstractBaseUser``.
:param obj: An instance of a Django model, to check permissions for.
:return: ``True`` if this user should have permission to read the object, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `can_read` method."
)
def can_update(self, obj):
"""
Checks whether this user (self) has permission to update a particular model instance (obj).
This method should be overridden by classes that inherit from KolibriAbstractBaseUser.
:param obj: An instance of a Django model, to check permissions for.
:return: ``True`` if this user should have permission to update the object, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `can_update` method."
)
def can_delete(self, obj):
"""
Checks whether this user (self) has permission to delete a particular model instance (obj).
This method should be overridden by classes that inherit from KolibriAbstractBaseUser.
:param obj: An instance of a Django model, to check permissions for.
:return: ``True`` if this user should have permission to delete the object, otherwise ``False``.
:rtype: bool
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `can_delete` method."
)
def has_role_for(self, kinds, obj):
"""
Helper function that defers to ``has_role_for_user`` or ``has_role_for_collection`` based on the type of object passed in.
"""
if isinstance(obj, KolibriAbstractBaseUser):
return self.has_role_for_user(kinds, obj)
elif isinstance(obj, Collection):
return self.has_role_for_collection(kinds, obj)
else:
raise ValueError(
"The `obj` argument to `has_role_for` must be either an instance of KolibriAbstractBaseUser or Collection."
)
def filter_readable(self, queryset):
"""
Filters a queryset down to only the elements that this user should have permission to read.
:param queryset: A ``QuerySet`` instance that the filtering should be applied to.
:return: Filtered ``QuerySet`` including only elements that are readable by this user.
"""
raise NotImplementedError(
"Subclasses of KolibriAbstractBaseUser must override the `can_delete` method."
)
class KolibriAnonymousUser(AnonymousUser, KolibriAbstractBaseUser):
"""
Custom anonymous user that also exposes the same interface as KolibriAbstractBaseUser, for consistency.
"""
class Meta:
abstract = True
@property
def session_data(self):
return {
"username": "",
"full_name": "",
"user_id": None,
"facility_id": getattr(Facility.get_default_facility(), "id", None),
"kind": [user_kinds.ANONYMOUS],
}
def is_member_of(self, coll):
return False
def has_role_for_user(self, kinds, user):
return False
def has_role_for_collection(self, kinds, coll):
return False
def can_create_instance(self, obj):
# check the object permissions, if available, just in case permissions are granted to anon users
if _has_permissions_class(obj):
return obj.permissions.user_can_create_object(self, obj)
return False
def can_read(self, obj):
# check the object permissions, if available, just in case permissions are granted to anon users
if _has_permissions_class(obj):
return obj.permissions.user_can_read_object(self, obj)
return False
def can_update(self, obj):
# check the object permissions, if available, just in case permissions are granted to anon users
if _has_permissions_class(obj):
return obj.permissions.user_can_update_object(self, obj)
return False
def can_delete(self, obj):
# check the object permissions, if available, just in case permissions are granted to anon users
if _has_permissions_class(obj):
return obj.permissions.user_can_delete_object(self, obj)
return False
def filter_readable(self, queryset):
# check the object permissions, if available, just in case permissions are granted to anon users
if _has_permissions_class(queryset.model):
return queryset.filter(
queryset.model.permissions.readable_by_user_filter(self)
).distinct()
return queryset.none()
class FacilityUserModelManager(SyncableModelManager, UserManager):
def create_user(self, username, email=None, password=None, **extra_fields):
"""
Creates and saves a User with the given username.
"""
if not username:
raise ValueError("The given username must be set")
if "facility" not in extra_fields:
extra_fields["facility"] = Facility.get_default_facility()
if self.filter(
username__iexact=username, facility=extra_fields["facility"]
).exists():
raise ValidationError("An account with that username already exists")
user = self.model(username=username, password=password, **extra_fields)
user.full_clean()
user.set_password(password)
user.save(using=self._db)
return user
def create_superuser(self, username, password, facility=None, full_name=None):
# import here to avoid circularity
from kolibri.core.device.models import DevicePermissions
# get the default facility
if facility is None:
facility = Facility.get_default_facility()
if self.filter(username__iexact=username, facility=facility).exists():
raise ValidationError("An account with that username already exists")
# create the new account in that facility
# gender and birth_year are set to DEFERRED, since superusers do not
# need to provide this and are not nudged to update profile on Learn page
superuser = FacilityUser(
full_name=full_name or username,
username=username,
password=password,
facility=facility,
gender=DEFERRED,
birth_year=DEFERRED,
)
superuser.full_clean()
superuser.set_password(password)
superuser.save()
# make the user a facility admin
facility.add_role(superuser, role_kinds.ADMIN)
# make the user into a superuser on this device
DevicePermissions.objects.create(
user=superuser, is_superuser=True, can_manage_content=True
)
return superuser
def validate_birth_year(value):
error = ""
if value == "NOT_SPECIFIED" or value == "DEFERRED":
return
try:
if int(value) < 1900:
error = (
"Birth year {value} is invalid, as it is prior to the year 1900".format(
value=value
)
)
elif int(value) > 3000:
error = (
"Birth year {value} is invalid, as it is after the year 3000".format(
value=value
)
)
except ValueError:
error = "{value} is not a valid value for birth_year".format(value=value)
if error != "":
raise ValidationError(error)
role_kinds_set = {r[0] for r in role_kinds.choices}
def validate_role_kinds(kinds):
if isinstance(kinds, six.string_types):
kinds = set([kinds])
else:
try:
kinds = set(kinds)
except TypeError:
raise TypeError(
"kinds argument must be a string or an iterable coerceable to a set"
)
if not role_kinds_set.issuperset(kinds):
raise InvalidRoleKind("kinds argument must only contain valid role kind names")
return kinds
@python_2_unicode_compatible
class FacilityUser(KolibriAbstractBaseUser, AbstractFacilityDataModel):
"""
``FacilityUser`` is the fundamental object of the auth app. These users represent the main users, and can be associated
with a hierarchy of ``Collections`` through ``Memberships`` and ``Roles``, which then serve to help determine permissions.
"""
# Morango syncing settings
morango_model_name = "facilityuser"
# FacilityUser can be read and written by itself
own = IsSelf()
# FacilityUser can be read and written by a facility admin
admin = IsAdminForOwnFacility()
# FacilityUser can be read by admin or coach, and updated by admin, but not created/deleted by non-facility admin
role = RoleBasedPermissions(
target_field=".",
can_be_created_by=(), # we can't check creation permissions by role, as user doesn't exist yet
can_be_read_by=(role_kinds.ADMIN, role_kinds.COACH),
can_be_updated_by=(role_kinds.ADMIN,),
can_be_deleted_by=(), # don't want a classroom admin deleting a user completely, just removing them from the class
collection_field="memberships__collection",
)
permissions = own | admin | role
objects = FacilityUserModelManager()
facility = models.ForeignKey("Facility", on_delete=models.CASCADE)
is_facility_user = True
gender = models.CharField(
max_length=16, choices=GENDER_CHOICES, default="", blank=True
)
birth_year = models.CharField(
max_length=16, default="", validators=[validate_birth_year], blank=True
)
id_number = models.CharField(max_length=64, default="", blank=True)
@classmethod
def deserialize(cls, dict_model):
# be defensive against blank passwords, set to `NOT_SPECIFIED` if blank
password = dict_model.get("password", "") or ""
if len(password) == 0:
dict_model.update(password="NOT_SPECIFIED")
return super(FacilityUser, cls).deserialize(dict_model)
def calculate_partition(self):
return "{dataset_id}:user-ro:{user_id}".format(
dataset_id=self.dataset_id, user_id=self.ID_PLACEHOLDER
)
def infer_dataset(self, *args, **kwargs):
return self.cached_related_dataset_lookup("facility")
def get_permission(self, permission):
try:
return getattr(self.devicepermissions, "is_superuser") or getattr(
self.devicepermissions, permission
)
except ObjectDoesNotExist:
return False
def has_morango_certificate_scope_permission(
self, scope_definition_id, scope_params
):
if self.is_superuser:
# superusers of a device always have permission to sync
return True
if scope_params.get("dataset_id") != self.dataset_id:
# if the request isn't for the same facility as this user, abort
return False
if scope_definition_id == ScopeDefinitions.FULL_FACILITY:
# if request is for full-facility syncing, return True only if user is a Facility Admin
return self.has_role_for_collection(role_kinds.ADMIN, self.facility)
elif scope_definition_id == ScopeDefinitions.SINGLE_USER:
# for single-user syncing, return True if this user *is* target user, or is admin for target user
target_user = FacilityUser.objects.get(id=scope_params.get("user_id"))
if self == target_user:
return True
if self.has_role_for_user(role_kinds.ADMIN, target_user):
return True
return False
return False
@property
def session_data(self):
roles = list(self.roles.values_list("kind", flat=True).distinct())
if self.is_superuser:
roles.insert(0, user_kinds.SUPERUSER)
if not roles:
roles = [user_kinds.LEARNER]
return {
"username": self.username,
"full_name": self.full_name,
"user_id": self.id,
"kind": roles,
"can_manage_content": self.can_manage_content,
"facility_id": self.facility_id,
}
@property
def can_manage_content(self):
return self.get_permission("can_manage_content")
@property
def is_superuser(self):
return self.get_permission("is_superuser")
@property
def is_staff(self):
return self.is_superuser
def is_member_of(self, coll):
if self.dataset_id != coll.dataset_id:
return False
if coll.kind == collection_kinds.FACILITY:
return self.facility_id == coll.id
return Membership.objects.filter(user=self, collection=coll).exists()
def has_role_for_user(self, kinds, user):
kinds = validate_role_kinds(kinds)
if self.is_superuser:
# a superuser has admin role for all users on the device
return role_kinds.ADMIN in kinds
if not kinds:
return False
if not hasattr(user, "dataset_id") or self.dataset_id != user.dataset_id:
return False
return Role.objects.filter(
Q(user=self, collection_id=user.facility_id, kind__in=kinds)
| Q(
user=self,
collection_id__in=user.memberships.all().values_list(
"collection_id", flat=True
),
kind__in=kinds,
)
).exists()
def has_role_for_collection(self, kinds, coll):
kinds = validate_role_kinds(kinds)
if self.is_superuser:
# a superuser has admin role for all collections on the device
return role_kinds.ADMIN in kinds
if not kinds:
return False
if self.dataset_id != coll.dataset_id:
return False
coll_id = coll.id
if (
coll.kind == collection_kinds.LEARNERGROUP
or coll.kind == collection_kinds.ADHOCLEARNERSGROUP
):
coll_id = coll.parent_id
return Role.objects.filter(
Q(user=self, collection_id=self.facility_id, kind__in=kinds)
| Q(user=self, collection_id=coll_id, kind__in=kinds)
).exists()
def can_create_instance(self, obj):
if self.is_superuser:
return True
# a FacilityUser's permissions are determined through the object's permission class
if _has_permissions_class(obj):
return obj.permissions.user_can_create_object(self, obj)
return False
def can_read(self, obj):
if self.is_superuser:
return True
# a FacilityUser's permissions are determined through the object's permission class
if _has_permissions_class(obj):
return obj.permissions.user_can_read_object(self, obj)
return False
def can_update(self, obj):
# Superusers cannot update their own permissions, because they only thing they can do is make themselves
# not super, we all saw what happened in Superman 2, no red kryptonite here!
if self.is_superuser and obj != self.devicepermissions:
return True
# a FacilityUser's permissions are determined through the object's permission class
if _has_permissions_class(obj):
return obj.permissions.user_can_update_object(self, obj)
return False
def can_delete(self, obj):
# Users cannot delete themselves
if self == obj:
return False
# Superusers cannot update their own permissions, because they only thing they can do is make themselves
# not super, we all saw what happened in Superman 2, no red kryptonite here!
if self.is_superuser and obj != self.devicepermissions:
return True
# a FacilityUser's permissions are determined through the object's permission class
if _has_permissions_class(obj):
return obj.permissions.user_can_delete_object(self, obj)
return False
def filter_readable(self, queryset):
if self.is_superuser:
return queryset
if _has_permissions_class(queryset.model):
return queryset.filter(
queryset.model.permissions.readable_by_user_filter(self)
).distinct()
return queryset.none()
def __str__(self):
return '"{user}"@"{facility}"'.format(
user=self.full_name or self.username, facility=self.facility
)
def has_perm(self, perm, obj=None):
# ensure the superuser has full access to the Django admin
if self.is_superuser:
return True
def has_perms(self, perm_list, obj=None):
# ensure the superuser has full access to the Django admin
if self.is_superuser:
return True
def has_module_perms(self, app_label):
# ensure the superuser has full access to the Django admin
if self.is_superuser:
return True
@python_2_unicode_compatible
class Collection(AbstractFacilityDataModel):
"""
``Collections`` are hierarchical groups of ``FacilityUsers``, used for grouping users and making decisions about permissions.
``FacilityUsers`` can have roles for one or more ``Collections``, by way of obtaining ``Roles`` associated with those ``Collections``.
``Collections`` can belong to other ``Collections``, and user membership in a ``Collection`` is conferred through ``Memberships``.
``Collections`` are subdivided into several pre-defined levels.
"""
# Morango syncing settings
morango_model_name = None
# Collection can be read by anybody from the facility; writing is only allowed by an admin for the collection.
# Furthermore, no FacilityUser can create or delete a Facility. Permission to create a collection is governed
# by roles in relation to the new collection's parent collection (see CollectionSpecificRoleBasedPermissions).
permissions = (
CollectionSpecificRoleBasedPermissions()
| AnyUserCanReadFacilities()
| CoachesCanManageGroupsForTheirClasses()
)
_KIND = None # Should be overridden in subclasses to specify what "kind" they are
name = models.CharField(max_length=100)
parent = models.ForeignKey(
"self", null=True, blank=True, related_name="children", db_index=True
)
kind = models.CharField(max_length=20, choices=collection_kinds.choices)
def __init__(self, *args, **kwargs):
if self._KIND:
kwargs["kind"] = self._KIND
super(Collection, self).__init__(*args, **kwargs)
def calculate_partition(self):
return "{dataset_id}:allusers-ro".format(dataset_id=self.dataset_id)
def clean_fields(self, *args, **kwargs):
self._ensure_kind()
super(Collection, self).clean_fields(*args, **kwargs)
def save(self, *args, **kwargs):
self._ensure_kind()
super(Collection, self).save(*args, **kwargs)
def _ensure_kind(self):
"""
Make sure the "kind" is set correctly on the model, corresponding to the appropriate subclass of ``Collection``.
"""
if self._KIND:
self.kind = self._KIND
def get_members(self):
if self.kind == collection_kinds.FACILITY:
return FacilityUser.objects.filter(
dataset=self.dataset
) # FacilityUser is always a member of her own facility
return FacilityUser.objects.filter(memberships__collection=self)
def get_coaches(self):
"""
Returns users who have the coach role for this immediate collection.
"""
return FacilityUser.objects.filter(dataset_id=self.dataset_id).filter(
roles__kind=role_kinds.COACH, roles__collection=self
)
def get_admins(self):
"""
Returns users who have the admin role for this immediate collection.
"""
return FacilityUser.objects.filter(dataset_id=self.dataset_id).filter(
roles__kind=role_kinds.ADMIN, roles__collection=self
)
def add_role(self, user, role_kind):
"""
Create a ``Role`` associating the provided user with this collection, with the specified kind of role.
If the Role object already exists, just return that, without changing anything.
:param user: The ``FacilityUser`` to associate with this ``Collection``.
:param role_kind: The kind of role to give the user with respect to this ``Collection``.
:return: The ``Role`` object (possibly new) that associates the user with the ``Collection``.
"""