From 61659356cbf7c4d52a3099dbb46d89054c28a672 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carles=20Barrobe=CC=81s?= Date: Thu, 11 Apr 2024 15:24:37 +0200 Subject: [PATCH] Encapsulate Directory in class --- Makefile | 2 +- create_test_ldap_user.py | 14 ++- delete_ldap_user.py | 8 +- humans/directory.py | 122 ++++++++++++------------- humans/models.py | 44 +++++++-- humans/tests/test_ldap_integration.py | 38 ++++++-- shipanaro/admin.py | 21 ++++- shipanaro/tests/test_authentication.py | 2 +- 8 files changed, 157 insertions(+), 94 deletions(-) diff --git a/Makefile b/Makefile index 8ded70a..04c332d 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ test-user-delete: docker-compose run --entrypoint 'ldapdelete -x -H ldap://ldap -D "cn=admin,dc=pirata,dc=cat" -w admin "cn=tester,dc=pirata,dc=cat"' ldap || echo "User doesn't exist" test-user-create: test-user-delete - docker-compose run web ./create_ldap_user.py tester tester + docker-compose run web ./create_ldap_user.py tester tester --create-ou test-users-delete: make ldap-list | grep tester | grep sn | cut -d ' ' -f2 | xargs -n1 ./delete_ldap_user.py diff --git a/create_test_ldap_user.py b/create_test_ldap_user.py index 0073e87..48cbe43 100755 --- a/create_test_ldap_user.py +++ b/create_test_ldap_user.py @@ -19,6 +19,7 @@ class User: username = sys.argv[1] password = sys.argv[2] + create_ou = len(sys.argv) > 3 and sys.argv[3] == "--create-ou" user = User( username=username, @@ -28,11 +29,14 @@ class User: ) settings.configure() - from humans import directory + from humans.directory import Directory - conn = directory.connect() - ou, ou_attrs = directory.create_ou(conn, "afiliats") - user_dn, user_attrs = directory.create_user(conn, user) - directory.set_password(conn, user_dn, password) + directory = Directory() + + if create_ou: + ou, ou_attrs = directory.create_ou("afiliats") + + user_dn, user_attrs = directory.create_user(user) + directory.set_password(user_dn, password) print(f"{user_dn=}\n{password=}\n{user_attrs}") diff --git a/delete_ldap_user.py b/delete_ldap_user.py index d9e60ff..0bff8bf 100755 --- a/delete_ldap_user.py +++ b/delete_ldap_user.py @@ -10,7 +10,9 @@ username = sys.argv[1] settings.configure() - from humans import directory + from humans.directory import Directory - conn = directory.connect() - directory.delete_user(conn, username) + directory = Directory() + directory.delete_user(username) + + print(f"User '{username}' deleted from LDAP") diff --git a/humans/directory.py b/humans/directory.py index fc9c0a4..3886a69 100644 --- a/humans/directory.py +++ b/humans/directory.py @@ -2,8 +2,6 @@ import ldap from ldap import modlist -from ldap.ldapobject import LDAPObject - from .auth.hashers import make_ldap_password @@ -15,64 +13,62 @@ ORG_UNIT = f"ou=afiliats,{DOMAIN}" -def connect() -> LDAPObject: - ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) - connection = ldap.initialize(LDAP_URL) - connection.simple_bind_s(LDAP_BIND_DN, LDAP_BIND_PASS) - return connection - - -def create_user(connection, user): - name = user.username.encode("utf-8") - user_dn = f"uid={user.username},{ORG_UNIT}" - - user_attrs = {} - user_attrs["objectClass"] = [b"pilotPerson"] - user_attrs["cn"] = (user.first_name.encode("utf-8"),) - user_attrs["sn"] = (user.last_name.encode("utf-8"),) - user_attrs["sn"] = (name,) - user_attrs["uid"] = (name.lower(),) - if user.email: - user_attrs["mail"] = (user.email.encode("utf-8"),) - - user_ldif = modlist.addModlist(user_attrs) - result = connection.add_s(user_dn, user_ldif) - return user_dn, user_attrs - - -def create_ou(connection, name): - dn = f"ou={name},{DOMAIN}" - - attrs = {} - attrs["objectClass"] = [b"organizationalUnit"] - attrs["ou"] = (name.encode("utf-8"),) - - ldif = modlist.addModlist(attrs) - result = connection.add_s(dn, ldif) - return dn, attrs - - -def get_user(connection, username): - search_dn = f"uid={username},{ORG_UNIT}" - try: - result = connection.search_s(search_dn, ldap.SCOPE_BASE) - return result[0] - except: - return search_dn, None - - -def delete_user(connection, username): - user_dn = f"uid={username},{ORG_UNIT}" - connection.delete_s(user_dn) - - -def set_password(connection, user_dn, password): - password_value = make_ldap_password(password) - add_pass = [(ldap.MOD_REPLACE, "userpassword", [password_value])] - connection.modify_s(user_dn, add_pass) - - -# raises exception if credentials fail, else returns None -def check_credentials(connection, username, password): - user_dn = f"uid={username},{ORG_UNIT}" - connection.simple_bind_s(user_dn, password) +class Directory: + def __init__(self): + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + connection = ldap.initialize(LDAP_URL) + connection.simple_bind_s(LDAP_BIND_DN, LDAP_BIND_PASS) + self.connection = connection + + def create_user(self, user): + name = user.username.encode("utf-8") + user_dn = f"uid={user.username},{ORG_UNIT}" + + user_attrs = {} + user_attrs["objectClass"] = [b"pilotPerson"] + user_attrs["sn"] = (name,) + user_attrs["uid"] = (name.lower(),) + + if user.first_name: + user_attrs["cn"] = (user.first_name.encode("utf-8"),) + if user.last_name: + user_attrs["sn"] = (user.last_name.encode("utf-8"),) + if user.email: + user_attrs["mail"] = (user.email.encode("utf-8"),) + + user_ldif = modlist.addModlist(user_attrs) + result = self.connection.add_s(user_dn, user_ldif) + return user_dn, user_attrs + + def create_ou(self, name): + dn = f"ou={name},{DOMAIN}" + + attrs = {} + attrs["objectClass"] = [b"organizationalUnit"] + attrs["ou"] = (name.encode("utf-8"),) + + ldif = modlist.addModlist(attrs) + result = self.connection.add_s(dn, ldif) + return dn, attrs + + def get_user(self, username): + search_dn = f"uid={username},{ORG_UNIT}" + try: + result = self.connection.search_s(search_dn, ldap.SCOPE_BASE) + return result[0] + except: + return search_dn, None + + def delete_user(self, username): + user_dn = f"uid={username},{ORG_UNIT}" + self.connection.delete_s(user_dn) + + def set_password(self, user_dn, password): + password_value = make_ldap_password(password) + add_pass = [(ldap.MOD_REPLACE, "userpassword", [password_value])] + self.connection.modify_s(user_dn, add_pass) + + # raises exception if credentials fail, else returns None + def check_credentials(self, username, password): + user_dn = f"uid={username},{ORG_UNIT}" + self.connection.simple_bind_s(user_dn, password) diff --git a/humans/models.py b/humans/models.py index 3bbd279..e8df020 100644 --- a/humans/models.py +++ b/humans/models.py @@ -1,12 +1,12 @@ import logging -from django.db.models import Max, signals +from django.db.models import signals from django.dispatch import receiver from django.contrib.auth.models import AbstractUser from django_auth_ldap.backend import LDAPBackend, _LDAPUser -from . import directory +from .directory import Directory class User(AbstractUser): @@ -24,8 +24,8 @@ def save(self, *args, **kwargs): self.__ldap__set_password(password) def __ldap__save(self): - connection = directory.connect() - directory.create_user(connection, self) + directory = Directory() + directory.create_user(self) return def __ldap__set_password(self, password): @@ -34,7 +34,8 @@ def __ldap__set_password(self, password): logging.error(f"User {self.username} not found in LDAP") return conn = ldap_user.connection - directory.set_password(conn, ldap_user.dn, password) + directory = Directory() + directory.set_password(ldap_user.dn, password) def set_password(self, raw_password): self.set_unusable_password() @@ -44,9 +45,34 @@ class Meta: db_table = "auth_user" -@receiver(signals.pre_delete, sender=User, dispatch_uid="delete_user") -def delete_associated_ldap_user(sender, instance: User, **kwargs): +def delete_user_in_ldap(username): try: - directory.delete_user(directory.connect(), instance.username) + directory = Directory() + directory.delete_user(username) except Exception as e: - logging.exception(f"Cannot delete LDAP user {instance.username}", e) + logging.exception(f"Cannot delete LDAP user {username}", e) + + +@receiver(signals.pre_delete, sender=User, dispatch_uid="delete_user") +def delete_associated_ldap_user_when_deleted(sender, instance: User, **kwargs): + delete_user_in_ldap(instance.username) + + +@receiver(signals.post_save, sender=User, dispatch_uid="deactivate_user") +def delete_associated_ldap_user_when_set_inactive(sender, instance: User, **kwargs): + if not instance.is_active: + delete_user_in_ldap(instance.username) + + +@receiver(signals.post_save, sender=User, dispatch_uid="reactivate_user") +def create_associated_ldap_user_when_set_active( + sender, instance: User, created=False, **kwargs +): + if created: + return + + directory = Directory() + _, ldap_attrs = directory.get_user(instance.username) + + if instance.is_active and not ldap_attrs: + directory.create_user(instance) diff --git a/humans/tests/test_ldap_integration.py b/humans/tests/test_ldap_integration.py index 203740e..2044a65 100644 --- a/humans/tests/test_ldap_integration.py +++ b/humans/tests/test_ldap_integration.py @@ -6,7 +6,7 @@ from django.urls import reverse from django.utils.translation import gettext_lazy as _ -from humans import directory +from humans.directory import Directory User = get_user_model() @@ -18,9 +18,9 @@ class LDAPIntegrationTest(TestCase): def setUp(self): super().setUp() - self.connection = directory.connect() + self.directory = Directory() - def create_user(self): + def create_user(self, auto_cleanup=True): name = f"tester-{randint(0, 10000)}" self.user = User( username=name, @@ -29,7 +29,8 @@ def create_user(self): last_name="Pirata", ) self.user.save() - self.addCleanup(self.deleteLeftoverUser) + if auto_cleanup: + self.addCleanup(self.deleteLeftoverUser) return self.user def deleteLeftoverUser(self): @@ -39,7 +40,7 @@ def deleteLeftoverUser(self): def test_new_user_is_created_in_ldap_with_no_password(self): user = self.create_user() - uid, attrs = directory.get_user(self.connection, user.username) + uid, attrs = self.directory.get_user(user.username) self.assertEquals(uid, f"uid={user.username},ou=afiliats,dc=pirata,dc=cat") self.assertEquals(attrs["cn"][0].decode(), user.username) @@ -50,15 +51,36 @@ def test_save_password_sets_password_in_ldap(self): user.set_password(PASSWORD) user.save() - _, attrs = directory.get_user(self.connection, user.username) + _, attrs = self.directory.get_user(user.username) self.assertEquals(attrs["cn"][0].decode(), user.username) self.assertEquals(attrs["userPassword"][0].decode()[0:6], "{SSHA}") def test_deleted_user_is_removed_from_ldap(self): - user = self.create_user() + user = self.create_user(auto_cleanup=False) user.delete() - _, attrs = directory.get_user(self.connection, user.username) + _, attrs = self.directory.get_user(user.username) self.assertIsNone(attrs) + + def test_deactivated_user_is_removed_from_ldap(self): + user = self.create_user(auto_cleanup=False) + user.is_active = False + user.save() + + _, attrs = self.directory.get_user(user.username) + + self.assertIsNone(attrs) + + def test_reactivated_user_is_created_in_ldap(self): + user = self.create_user() + user.is_active = False + user.save() + + user.is_active = True + user.save() + + _, attrs = self.directory.get_user(user.username) + + self.assertEquals(attrs["cn"][0].decode(), user.username) diff --git a/shipanaro/admin.py b/shipanaro/admin.py index 199a5ec..2085923 100644 --- a/shipanaro/admin.py +++ b/shipanaro/admin.py @@ -1,5 +1,5 @@ from django.conf import settings -from django.contrib import admin +from django.contrib import admin, messages from django.contrib.admin import ModelAdmin from django.contrib.auth.admin import GroupAdmin, UserAdmin from django.utils.translation import gettext_lazy as _ @@ -49,18 +49,31 @@ class ShipanaroGroupAdmin(GroupAdmin, ShipanaroModelAdmin): @admin.action(description=_("Create user in LDAP")) def create_ldap_user(modeladmin, request, queryset): - from humans import directory + from humans.directory import Directory - conn = directory.connect() + directory = Directory() for member in queryset: - directory.create_user(conn, member.user) + try: + directory.create_user(member.user) + messages.add_message( + request, messages.INFO, f"User {member.user.username} created in LDAP" + ) + except Exception as e: + messages.add_message( + request, + messages.ERROR, + f"User {member.user.username} cannot be created in LDAP\n{e}", + ) @admin.action(description=_("Send password reset email")) def send_password_reset(modeladmin, request, queryset): for member in queryset: send_reset_password_email(member.user.email) + messages.add_message( + request, messages.INFO, f"Password reset email sent {member.user.email}" + ) class MembershipAdmin(ShipanaroModelAdmin): diff --git a/shipanaro/tests/test_authentication.py b/shipanaro/tests/test_authentication.py index 2280969..75b644b 100644 --- a/shipanaro/tests/test_authentication.py +++ b/shipanaro/tests/test_authentication.py @@ -94,7 +94,7 @@ def test_reset_password_process_sends_email(self): mail.outbox[0].body, ) - def test_deleted_user_no_cannot_log_in(self): + def test_deleted_user_cannot_log_in(self): # set a password first to ensure we have a user in LDAP that can log in self.user.set_password(PASSWORD) self.user.save()