Skip to content

Commit

Permalink
Encapsulate Directory in class
Browse files Browse the repository at this point in the history
  • Loading branch information
txels committed Apr 11, 2024
1 parent 53bb665 commit 6165935
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 94 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ test-user-delete:
docker-compose run --entrypoint 'ldapdelete -x -H ldap://ldap -D "cn=admin,dc=pirata,dc=cat" -w admin "cn=tester,dc=pirata,dc=cat"' ldap || echo "User doesn't exist"

test-user-create: test-user-delete
docker-compose run web ./create_ldap_user.py tester tester
docker-compose run web ./create_ldap_user.py tester tester --create-ou

test-users-delete:
make ldap-list | grep tester | grep sn | cut -d ' ' -f2 | xargs -n1 ./delete_ldap_user.py
Expand Down
14 changes: 9 additions & 5 deletions create_test_ldap_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class User:

username = sys.argv[1]
password = sys.argv[2]
create_ou = len(sys.argv) > 3 and sys.argv[3] == "--create-ou"

user = User(
username=username,
Expand All @@ -28,11 +29,14 @@ class User:
)

settings.configure()
from humans import directory
from humans.directory import Directory

conn = directory.connect()
ou, ou_attrs = directory.create_ou(conn, "afiliats")
user_dn, user_attrs = directory.create_user(conn, user)
directory.set_password(conn, user_dn, password)
directory = Directory()

if create_ou:
ou, ou_attrs = directory.create_ou("afiliats")

user_dn, user_attrs = directory.create_user(user)
directory.set_password(user_dn, password)

print(f"{user_dn=}\n{password=}\n{user_attrs}")
8 changes: 5 additions & 3 deletions delete_ldap_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
username = sys.argv[1]

settings.configure()
from humans import directory
from humans.directory import Directory

conn = directory.connect()
directory.delete_user(conn, username)
directory = Directory()
directory.delete_user(username)

print(f"User '{username}' deleted from LDAP")
122 changes: 59 additions & 63 deletions humans/directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import ldap
from ldap import modlist
from ldap.ldapobject import LDAPObject


from .auth.hashers import make_ldap_password

Expand All @@ -15,64 +13,62 @@
ORG_UNIT = f"ou=afiliats,{DOMAIN}"


def connect() -> LDAPObject:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
connection = ldap.initialize(LDAP_URL)
connection.simple_bind_s(LDAP_BIND_DN, LDAP_BIND_PASS)
return connection


def create_user(connection, user):
name = user.username.encode("utf-8")
user_dn = f"uid={user.username},{ORG_UNIT}"

user_attrs = {}
user_attrs["objectClass"] = [b"pilotPerson"]
user_attrs["cn"] = (user.first_name.encode("utf-8"),)
user_attrs["sn"] = (user.last_name.encode("utf-8"),)
user_attrs["sn"] = (name,)
user_attrs["uid"] = (name.lower(),)
if user.email:
user_attrs["mail"] = (user.email.encode("utf-8"),)

user_ldif = modlist.addModlist(user_attrs)
result = connection.add_s(user_dn, user_ldif)
return user_dn, user_attrs


def create_ou(connection, name):
dn = f"ou={name},{DOMAIN}"

attrs = {}
attrs["objectClass"] = [b"organizationalUnit"]
attrs["ou"] = (name.encode("utf-8"),)

ldif = modlist.addModlist(attrs)
result = connection.add_s(dn, ldif)
return dn, attrs


def get_user(connection, username):
search_dn = f"uid={username},{ORG_UNIT}"
try:
result = connection.search_s(search_dn, ldap.SCOPE_BASE)
return result[0]
except:
return search_dn, None


def delete_user(connection, username):
user_dn = f"uid={username},{ORG_UNIT}"
connection.delete_s(user_dn)


def set_password(connection, user_dn, password):
password_value = make_ldap_password(password)
add_pass = [(ldap.MOD_REPLACE, "userpassword", [password_value])]
connection.modify_s(user_dn, add_pass)


# raises exception if credentials fail, else returns None
def check_credentials(connection, username, password):
user_dn = f"uid={username},{ORG_UNIT}"
connection.simple_bind_s(user_dn, password)
class Directory:
def __init__(self):
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
connection = ldap.initialize(LDAP_URL)
connection.simple_bind_s(LDAP_BIND_DN, LDAP_BIND_PASS)
self.connection = connection

def create_user(self, user):
name = user.username.encode("utf-8")
user_dn = f"uid={user.username},{ORG_UNIT}"

user_attrs = {}
user_attrs["objectClass"] = [b"pilotPerson"]
user_attrs["sn"] = (name,)
user_attrs["uid"] = (name.lower(),)

if user.first_name:
user_attrs["cn"] = (user.first_name.encode("utf-8"),)
if user.last_name:
user_attrs["sn"] = (user.last_name.encode("utf-8"),)
if user.email:
user_attrs["mail"] = (user.email.encode("utf-8"),)

user_ldif = modlist.addModlist(user_attrs)
result = self.connection.add_s(user_dn, user_ldif)
return user_dn, user_attrs

def create_ou(self, name):
dn = f"ou={name},{DOMAIN}"

attrs = {}
attrs["objectClass"] = [b"organizationalUnit"]
attrs["ou"] = (name.encode("utf-8"),)

ldif = modlist.addModlist(attrs)
result = self.connection.add_s(dn, ldif)
return dn, attrs

def get_user(self, username):
search_dn = f"uid={username},{ORG_UNIT}"
try:
result = self.connection.search_s(search_dn, ldap.SCOPE_BASE)
return result[0]
except:
return search_dn, None

def delete_user(self, username):
user_dn = f"uid={username},{ORG_UNIT}"
self.connection.delete_s(user_dn)

def set_password(self, user_dn, password):
password_value = make_ldap_password(password)
add_pass = [(ldap.MOD_REPLACE, "userpassword", [password_value])]
self.connection.modify_s(user_dn, add_pass)

# raises exception if credentials fail, else returns None
def check_credentials(self, username, password):
user_dn = f"uid={username},{ORG_UNIT}"
self.connection.simple_bind_s(user_dn, password)
44 changes: 35 additions & 9 deletions humans/models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging

from django.db.models import Max, signals
from django.db.models import signals
from django.dispatch import receiver
from django.contrib.auth.models import AbstractUser

from django_auth_ldap.backend import LDAPBackend, _LDAPUser

from . import directory
from .directory import Directory


class User(AbstractUser):
Expand All @@ -24,8 +24,8 @@ def save(self, *args, **kwargs):
self.__ldap__set_password(password)

def __ldap__save(self):
connection = directory.connect()
directory.create_user(connection, self)
directory = Directory()
directory.create_user(self)
return

def __ldap__set_password(self, password):
Expand All @@ -34,7 +34,8 @@ def __ldap__set_password(self, password):
logging.error(f"User {self.username} not found in LDAP")
return
conn = ldap_user.connection
directory.set_password(conn, ldap_user.dn, password)
directory = Directory()
directory.set_password(ldap_user.dn, password)

def set_password(self, raw_password):
self.set_unusable_password()
Expand All @@ -44,9 +45,34 @@ class Meta:
db_table = "auth_user"


@receiver(signals.pre_delete, sender=User, dispatch_uid="delete_user")
def delete_associated_ldap_user(sender, instance: User, **kwargs):
def delete_user_in_ldap(username):
try:
directory.delete_user(directory.connect(), instance.username)
directory = Directory()
directory.delete_user(username)
except Exception as e:
logging.exception(f"Cannot delete LDAP user {instance.username}", e)
logging.exception(f"Cannot delete LDAP user {username}", e)


@receiver(signals.pre_delete, sender=User, dispatch_uid="delete_user")
def delete_associated_ldap_user_when_deleted(sender, instance: User, **kwargs):
delete_user_in_ldap(instance.username)


@receiver(signals.post_save, sender=User, dispatch_uid="deactivate_user")
def delete_associated_ldap_user_when_set_inactive(sender, instance: User, **kwargs):
if not instance.is_active:
delete_user_in_ldap(instance.username)


@receiver(signals.post_save, sender=User, dispatch_uid="reactivate_user")
def create_associated_ldap_user_when_set_active(
sender, instance: User, created=False, **kwargs
):
if created:
return

directory = Directory()
_, ldap_attrs = directory.get_user(instance.username)

if instance.is_active and not ldap_attrs:
directory.create_user(instance)
38 changes: 30 additions & 8 deletions humans/tests/test_ldap_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from django.urls import reverse
from django.utils.translation import gettext_lazy as _

from humans import directory
from humans.directory import Directory

User = get_user_model()

Expand All @@ -18,9 +18,9 @@ class LDAPIntegrationTest(TestCase):

def setUp(self):
super().setUp()
self.connection = directory.connect()
self.directory = Directory()

def create_user(self):
def create_user(self, auto_cleanup=True):
name = f"tester-{randint(0, 10000)}"
self.user = User(
username=name,
Expand All @@ -29,7 +29,8 @@ def create_user(self):
last_name="Pirata",
)
self.user.save()
self.addCleanup(self.deleteLeftoverUser)
if auto_cleanup:
self.addCleanup(self.deleteLeftoverUser)
return self.user

def deleteLeftoverUser(self):
Expand All @@ -39,7 +40,7 @@ def deleteLeftoverUser(self):
def test_new_user_is_created_in_ldap_with_no_password(self):
user = self.create_user()

uid, attrs = directory.get_user(self.connection, user.username)
uid, attrs = self.directory.get_user(user.username)

self.assertEquals(uid, f"uid={user.username},ou=afiliats,dc=pirata,dc=cat")
self.assertEquals(attrs["cn"][0].decode(), user.username)
Expand All @@ -50,15 +51,36 @@ def test_save_password_sets_password_in_ldap(self):
user.set_password(PASSWORD)
user.save()

_, attrs = directory.get_user(self.connection, user.username)
_, attrs = self.directory.get_user(user.username)

self.assertEquals(attrs["cn"][0].decode(), user.username)
self.assertEquals(attrs["userPassword"][0].decode()[0:6], "{SSHA}")

def test_deleted_user_is_removed_from_ldap(self):
user = self.create_user()
user = self.create_user(auto_cleanup=False)
user.delete()

_, attrs = directory.get_user(self.connection, user.username)
_, attrs = self.directory.get_user(user.username)

self.assertIsNone(attrs)

def test_deactivated_user_is_removed_from_ldap(self):
user = self.create_user(auto_cleanup=False)
user.is_active = False
user.save()

_, attrs = self.directory.get_user(user.username)

self.assertIsNone(attrs)

def test_reactivated_user_is_created_in_ldap(self):
user = self.create_user()
user.is_active = False
user.save()

user.is_active = True
user.save()

_, attrs = self.directory.get_user(user.username)

self.assertEquals(attrs["cn"][0].decode(), user.username)
21 changes: 17 additions & 4 deletions shipanaro/admin.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from django.conf import settings
from django.contrib import admin
from django.contrib import admin, messages
from django.contrib.admin import ModelAdmin
from django.contrib.auth.admin import GroupAdmin, UserAdmin
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -49,18 +49,31 @@ class ShipanaroGroupAdmin(GroupAdmin, ShipanaroModelAdmin):

@admin.action(description=_("Create user in LDAP"))
def create_ldap_user(modeladmin, request, queryset):
from humans import directory
from humans.directory import Directory

conn = directory.connect()
directory = Directory()

for member in queryset:
directory.create_user(conn, member.user)
try:
directory.create_user(member.user)
messages.add_message(
request, messages.INFO, f"User {member.user.username} created in LDAP"
)
except Exception as e:
messages.add_message(
request,
messages.ERROR,
f"User {member.user.username} cannot be created in LDAP\n{e}",
)


@admin.action(description=_("Send password reset email"))
def send_password_reset(modeladmin, request, queryset):
for member in queryset:
send_reset_password_email(member.user.email)
messages.add_message(
request, messages.INFO, f"Password reset email sent {member.user.email}"
)


class MembershipAdmin(ShipanaroModelAdmin):
Expand Down
2 changes: 1 addition & 1 deletion shipanaro/tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_reset_password_process_sends_email(self):
mail.outbox[0].body,
)

def test_deleted_user_no_cannot_log_in(self):
def test_deleted_user_cannot_log_in(self):
# set a password first to ensure we have a user in LDAP that can log in
self.user.set_password(PASSWORD)
self.user.save()
Expand Down

0 comments on commit 6165935

Please sign in to comment.