Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first commit

  • Loading branch information...
commit c60a82e150382c1e1ac4092ed034a3ea0de5eab7 0 parents
@winjer winjer authored
0  CHANGES
No changes.
0  README.rst
No changes.
15 django_ldap_pixiedust/__init__.py
@@ -0,0 +1,15 @@
+""" LDAP Pixiedust
+
+Sprinkles magic pixie dust over your Django/LDAP integration.
+
+In particular it provides:
+
+ * A backend that can authenticate by e-mail, but otherwise play nicely with
+ * standard Django a set of hooks that will magically synchronise changes to users, groups and profiles back to your LDAP database
+
+"""
+
+from .backend import EmailLoginBackend
+from .ldap import LDAPConnectionMixin
+from . import sync
+
86 django_ldap_pixiedust/backend.py
@@ -0,0 +1,86 @@
+
+from __future__ import absolute_import
+
+from django.contrib.auth import models as auth_models
+from django.contrib.auth.models import User, Group
+from django.utils.encoding import smart_str
+from django.utils.hashcompat import sha_constructor, md5_constructor
+from django_auth_ldap.backend import LDAPBackend, _LDAPUser
+from collections import defaultdict
+from django.db import models
+
+import uuid
+import logging
+
+from .user import SynchronisingUserAdapter
+from .utils import django_password_to_ldap
+from .ldap import LDAPConnectionMixin
+from . import settings
+
+logger = logging.getLogger('django_ldap_pixiedust')
+
+class DecoupledUserClassMixin(object):
+ """ Make it easy to use a different _LDAPUser class from within the backend. """
+ userclass = _LDAPUser
+ def authenticate(self, username, password):
+ ldap_user = self.userclass(self, username=username)
+ user = ldap_user.authenticate(password)
+ return user
+
+ def get_user(self, user_id):
+ user = None
+ try:
+ user = User.objects.get(pk=user_id)
+ self.userclass(self, user=user) # sets user.ldap_user
+ except User.DoesNotExist:
+ pass
+ return user
+
+if settings.ldap_settings.LDAP_PIXIEDUST_COMPATIBLE_PASSWORDS:
+ # monkeypatch django.contrib.auth to alter the digest mechanism
+ # look away now
+ def get_hexdigest(algorithm, salt, raw_password):
+
+ """ replacement hashing mechanism that hashes password first, instead
+ of salt first. This means that generated passwords are valid for LDAP
+ as well as Django.
+
+ Only supports SHA1. """
+
+ raw_password, salt = smart_str(raw_password), smart_str(salt)
+ if algorithm == 'crypt':
+ try:
+ import crypt
+ except ImportError:
+ raise ValueError('"crypt" password algorithm not supported in this environment')
+ return crypt.crypt(raw_password, salt)
+
+ if algorithm == 'md5':
+ return md5_constructor(salt + raw_password).hexdigest()
+ elif algorithm == 'sha1':
+ # in django by default this is salt + raw_password
+ return sha_constructor(raw_password + salt).hexdigest()
+ raise ValueError("Got unknown password algorithm type in password.")
+ auth_models.get_hexdigest = get_hexdigest
+
+class EmailLoginBackend(LDAPConnectionMixin, LDAPBackend):
+
+ """ Locates the user by email address, even though their dn is determined by a unique username. """
+
+ def authenticate(self, username, password):
+ """ This actually takes the email address as the username, but
+ assembles a user based on the correct DN by searching first using the
+ login search configuration, then delegating user creation back to the
+ usual mechanism. """
+
+ search = settings.ldap_settings.LDAP_PIXIEDUST_LOGIN_SEARCH
+ results = search.execute(self._get_connection(), {"user": username})
+ if results is not None and len(results) == 1:
+ id_attr = settings.ldap_settings.LDAP_PIXIEDUST_USERNAME_DN_ATTRIBUTE
+ user_dn, user_attrs = results[0]
+ if id_attr in user_attrs:
+ real_username = user_attrs[id_attr][0]
+ return super(EmailLoginBackend, self).authenticate(real_username, password)
+ else:
+ raise ValueError("LDAP User %r does not have username attribute %r" % (user_dn, id_attr))
+
57 django_ldap_pixiedust/ldap.py
@@ -0,0 +1,57 @@
+
+from __future__ import absolute_import
+
+from . import settings
+import ldap
+
+class LDAPConnectionMixin(object):
+
+ """ Based on equivalent code in django-auth-ldap. """
+
+ def _bind(self):
+ """
+ Binds to the LDAP server with AUTH_LDAP_BIND_DN and
+ AUTH_LDAP_BIND_PASSWORD.
+ """
+ self._bind_as(settings.ldap_settings.AUTH_LDAP_BIND_DN,
+ settings.ldap_settings.AUTH_LDAP_BIND_PASSWORD)
+
+ self._connection_bound = True
+
+ def _bind_as(self, bind_dn, bind_password):
+ """
+ Binds to the LDAP server with the given credentials. This does not trap
+ exceptions.
+
+ If successful, we set self._connection_bound to False under the
+ assumption that we're not binding as the default user. Callers can set
+ it to True as appropriate.
+ """
+ self._get_connection().simple_bind_s(bind_dn.encode('utf-8'),
+ bind_password.encode('utf-8'))
+
+ self._connection_bound = False
+
+ # TODO: this is pretty fugly. a bunch of this stuff should be in
+ # __init__ really, but I've delayed refactoring to stop this getting
+ # confused with the historical connection stuff from django_auth_ldap
+ def _get_connection(self):
+ """
+ Returns our cached LDAPObject, which may or may not be bound.
+ """
+ if not hasattr(self, 'ldap'):
+ self.ldap = ldap
+ if not hasattr(self, '_connection'):
+ self._connection = None
+ if self._connection is None:
+ self._connection = self.ldap.initialize(settings.ldap_settings.AUTH_LDAP_SERVER_URI)
+
+ for opt, value in settings.ldap_settings.AUTH_LDAP_CONNECTION_OPTIONS.iteritems():
+ self._connection.set_option(opt, value)
+
+ if settings.ldap_settings.AUTH_LDAP_START_TLS:
+ #logger.debug("Initiating TLS")
+ self._connection.start_tls_s()
+
+ return self._connection
+
0  django_ldap_pixiedust/management/__init__.py
No changes.
0  django_ldap_pixiedust/management/commands/__init__.py
No changes.
33 django_ldap_pixiedust/management/commands/ldapsync.py
@@ -0,0 +1,33 @@
+
+""" Synchronise the Django user, profile and group tables against LDAP. """
+
+from django.core.management.base import BaseCommand
+from optparse import make_option
+import ldap
+from django_ldap_pixiedust import settings
+from django_ldap_pixiedust.ldap import LDAPConnectionMixin
+from django_ldap_pixiedust.user import SynchronisingUserAdapter
+from django_auth_ldap.backend import LDAPBackend, _LDAPUser
+from django.contrib.auth.models import User
+from django_ldap_pixiedust import sync
+
+class Command(BaseCommand):
+
+ help = "example help"
+
+ option_list = BaseCommand.option_list + (
+ make_option("--verbose", action="store_true", default=False),
+ )
+
+ def handle(self, *args, **options):
+ for arg in args:
+ handler = getattr(self, "handle_" + arg)
+ handler(**options)
+
+ def handle_fromldap(self, **options):
+ synchroniser = sync.LDAPSynchroniser()
+ synchroniser.synchronise_from_ldap()
+
+ def handle_toldap(self, **options):
+ synchroniser = sync.LDAPSynchroniser()
+ synchroniser.synchronise_to_ldap()
12 django_ldap_pixiedust/models.py
@@ -0,0 +1,12 @@
+from django.db import models
+
+
+class PixieDustTestProfile(models.Model):
+ """
+ A user profile model for use by unit tests. This has nothing to do with the
+ authentication backend itself.
+ """
+ user = models.OneToOneField('auth.User')
+ location = models.CharField(max_length=100)
+ is_special = models.BooleanField(default=False)
+ populated = models.BooleanField(default=False)
24 django_ldap_pixiedust/settings.py
@@ -0,0 +1,24 @@
+from django_auth_ldap.backend import LDAPSettings as BaseLDAPSettings
+
+class LDAPSettings(BaseLDAPSettings):
+ pass
+
+LDAPSettings.defaults.update({
+ 'LDAP_PIXIEDUST_SYNC_USER': False,
+ 'LDAP_PIXIEDUST_SYNC_PROFILE': False,
+ 'LDAP_PIXIEDUST_SYNC_GROUPS': False,
+ 'LDAP_PIXIEDUST_LOGIN_SEARCH': None,
+ 'LDAP_PIXIEDUST_GROUP_DN_TEMPLATE': '',
+ 'LDAP_PIXIEDUST_COMPATIBLE_PASSWORDS': False,
+ 'LDAP_PIXIEDUST_USER_OBJECTCLASSES': [
+ 'organizationalPerson',
+ 'inetOrgPerson',
+ ],
+ 'LDAP_PIXIEDUST_USERNAME_DN_ATTRIBUTE': 'uid',
+ 'LDAP_PIXIEDUST_DEFAULT_ATTR_MAP': {},
+ 'AUTH_PROFILE_MODULE': '',
+ 'LDAP_PIXIEDUST_ALL_USERS': None,
+ 'LDAP_PIXIEDUST_GROUP_OBJECTCLASS': 'groupOfNames',
+ })
+
+ldap_settings = LDAPSettings()
110 django_ldap_pixiedust/sync.py
@@ -0,0 +1,110 @@
+from django.contrib.auth.models import Group, User
+from django_auth_ldap.backend import LDAPBackend, _LDAPUser
+from django.db.models.signals import post_save, post_delete, m2m_changed
+
+from . import backend, ldap, settings
+from .user import SynchronisingUserAdapter
+
+import logging
+
+logger = logging.getLogger("django_ldap_pixiedust")
+
+def group_sync_handler(sender, instance, action, pk_set, **kwargs):
+ if sender == User.groups.through:
+ user = LDAPBackend().get_user(instance.id)
+ sync = SynchronisingUserAdapter(user)
+ if pk_set is not None:
+ groups = [Group.objects.get(pk=x) for x in pk_set]
+ if action == 'post_clear':
+ sync.clear_groups()
+ elif action == 'post_add':
+ sync.group_add(groups)
+ elif action == 'post_remove':
+ sync.group_remove(groups)
+
+def profile_sync_handler(sender, instance, created, **kwargs):
+ from django.db import models
+ app_label, model_name = settings.ldap_settings.AUTH_PROFILE_MODULE.split('.')
+ profile_model = models.get_model(app_label, model_name)
+
+ # we're getting the octopus profile, not the pixiedust profile
+ if sender == profile_model:
+ user = LDAPBackend().get_user(instance.id)
+ sync = SynchronisingUserAdapter(user)
+ sync.synchronise_profile(created)
+
+def user_sync_handler(sender, **kwargs):
+ instance = kwargs.pop('instance', None)
+ created = kwargs.pop('created', None)
+ backend = LDAPBackend()
+ if sender == User:
+ user = backend.get_user(instance.id)
+ sync = SynchronisingUserAdapter(user)
+ sync.synchronise(created)
+
+def activate(sync_user, sync_groups, sync_profile):
+ if sync_groups:
+ logger.warning("Group changes will be synchronised to LDAP")
+ m2m_changed.connect(group_sync_handler)
+ else:
+ m2m_changed.disconnect(group_sync_handler)
+
+ if sync_user:
+ logger.warning("User changes will be synchronised to LDAP")
+ post_save.connect(user_sync_handler)
+ else:
+ post_save.disconnect(user_sync_handler)
+
+ if sync_profile:
+ logger.warning("User profile changes will be synchronised to LDAP")
+ post_save.connect(profile_sync_handler)
+ else:
+ post_save.disconnect(profile_sync_handler)
+
+def deactivate():
+ activate(False, False, False)
+
+def activate_fromsettings():
+ activate(settings.ldap_settings.LDAP_PIXIEDUST_SYNC_USER,
+ settings.ldap_settings.LDAP_PIXIEDUST_SYNC_GROUPS,
+ settings.ldap_settings.LDAP_PIXIEDUST_SYNC_PROFILE
+ )
+
+# this is our default
+activate_fromsettings()
+
+class LDAPSynchroniser(ldap.LDAPConnectionMixin):
+
+ """ This provides complete database synchronisation - either from or to
+ the LDAP server. This will synchronise every user and group. It's called
+ by the "ldapsync" management command. """
+
+ def __init__(self):
+ self.conn = self._get_connection()
+ self.backend = LDAPBackend()
+
+ def ldap_users(self):
+ search = settings.ldap_settings.LDAP_PIXIEDUST_ALL_USERS
+ for dn, attrs in search.execute(self.conn):
+ user_id = attrs[settings.ldap_settings.LDAP_PIXIEDUST_USERNAME_DN_ATTRIBUTE][0]
+ yield _LDAPUser(self.backend, username=user_id)
+
+ def model_users(self):
+ return User.objects.all()
+
+ def synchronise_from_ldap(self):
+ deactivate()
+ for user in self.ldap_users():
+ logger.debug("Synchronising %r" % repr(user))
+ user._get_or_create_user()
+ activate_fromsettings()
+
+ def synchronise_to_ldap(self):
+ for user in self.model_users():
+ ldap_user = self.backend.get_user(user.id)
+ sync = SynchronisingUserAdapter(ldap_user)
+ logger.debug("Synchronising %r" % repr(user))
+ sync.synchronise_groups() # must happen first, because it clears groups!
+ sync.synchronise()
+ sync.synchronise_profile()
+
3  django_ldap_pixiedust/tests/__init__.py
@@ -0,0 +1,3 @@
+from test_user import *
+from test_backend import *
+from test_sync import *
90 django_ldap_pixiedust/tests/fixtures.py
@@ -0,0 +1,90 @@
+from django_auth_ldap.tests import TestSettings, MockLDAP as BaseMockLDAP
+from django.test import TestCase
+from django_auth_ldap.config import _LDAPConfig
+import logging
+
+from django_ldap_pixiedust import settings
+
+alice = ("uid=alice,ou=users,dc=test", {
+ 'uid': ['alice'],
+ 'objectClass': ['organizationalPerson', 'inetOrgPerson'],
+ 'mail': ['alice@example.com'],
+ 'userPassword': ['password1'],
+ })
+
+bob = ("uid=bob,ou=users,dc=test", {
+ 'uid': ['bob'],
+ 'objectClass': ['organizationalPerson', 'inetOrgPerson'],
+ 'mail': ['bob@example.com'],
+ 'userPassword': ['password2'],
+ })
+
+class MockLDAP(BaseMockLDAP):
+
+ class OBJECT_CLASS_VIOLATION(Exception): pass
+
+ MOD_ADD = 0
+ MOD_DELETE = 1
+ MOD_REPLACE = 2
+
+ def add_s(self, dn, attrs):
+ self._record_call('add_s', {
+ 'who': dn,
+ 'attrs': attrs,
+ })
+ attrs = tuple((x, tuple(y)) for x,y in attrs)
+ value = self._get_return_value('add_s', (dn, attrs))
+ return value
+
+ def modify_s(self, dn, attrs):
+
+ def _tuplify(operation, attribute, values):
+ if operation == 1:
+ return (operation, attribute, values)
+ else:
+ return (operation, attribute, tuple(values))
+ self._record_call('modify_s', {
+ 'who': dn,
+ 'attrs': attrs,
+ })
+ attrs = tuple(_tuplify(x,y,z) for (x,y,z) in attrs)
+ value = self._get_return_value('modify_s', (dn, attrs))
+ return value
+
+
+mock_ldap = MockLDAP({
+ alice[0]: alice[1],
+ bob[0]: bob[1],
+})
+
+
+
+
+class LDAPTest(TestCase):
+
+ def setUp(self):
+ self.configure_logger()
+ self.mock_ldap = mock_ldap
+ self.mock_ldap.reset()
+ _LDAPConfig.ldap = mock_ldap
+
+ logging_configured = False
+ @classmethod
+ def configure_logger(cls):
+ if not cls.logging_configured:
+ logger = logging.getLogger('django_auth_ldap')
+ formatter = logging.Formatter("LDAP auth - %(levelname)s - %(message)s")
+ handler = logging.StreamHandler()
+
+ handler.setLevel(logging.DEBUG)
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+
+ logger.setLevel(logging.CRITICAL)
+
+ cls.logging_configured = True
+
+ def _init_settings(self, **kw):
+ from django_auth_ldap import backend
+ backend.ldap_settings = settings.ldap_settings = TestSettings(**kw)
+ from django_ldap_pixiedust import sync
65 django_ldap_pixiedust/tests/test_backend.py
@@ -0,0 +1,65 @@
+
+from django_auth_ldap.config import LDAPSearch
+from django_auth_ldap.backend import LDAPBackend, _LDAPUser
+
+from django_ldap_pixiedust import settings
+from django_ldap_pixiedust import backend
+from django_ldap_pixiedust import sync
+
+from .fixtures import mock_ldap, LDAPTest, TestSettings
+from . import fixtures
+
+class TestLDAPUser(_LDAPUser):
+ pass
+
+class DecoupledBackend(backend.DecoupledUserClassMixin, LDAPBackend):
+ userclass = TestLDAPUser
+ ldap = mock_ldap
+
+class TestDecoupledUserClassMixin(LDAPTest):
+
+ def setUp(self):
+ LDAPTest.setUp(self)
+ self.backend = DecoupledBackend()
+
+ def test_authenticate(self):
+ sync.activate(sync_user=False, sync_groups=False, sync_profile=False)
+ self._init_settings(
+ AUTH_LDAP_USER_DN_TEMPLATE = "uid=%(user)s,ou=users,dc=test",
+ AUTH_LDAP_USER_ATTR_MAP = { 'email': 'mail'},
+ )
+ self.mock_ldap.set_return_value('simple_bind_s',
+ ('uid=bob,ou=users,dc=test', 'password2'), None)
+ user = self.backend.authenticate('bob', 'password2')
+ self.failUnless(isinstance(user.ldap_user, TestLDAPUser))
+
+
+backend.EmailLoginBackend.ldap = mock_ldap
+
+class TestEmailLogin(LDAPTest):
+
+ def setUp(self):
+ LDAPTest.setUp(self)
+ self.backend = backend.EmailLoginBackend()
+
+ def test_authenticate(self):
+ sync.activate(sync_user=False, sync_groups=False, sync_profile=False)
+ self._init_settings(
+ LDAP_PIXIEDUST_LOGIN_SEARCH=LDAPSearch("ou=users,dc=test",
+ self.mock_ldap.SCOPE_ONELEVEL,
+ '(mail=%(user)s)'),
+ AUTH_LDAP_USER_DN_TEMPLATE = "uid=%(user)s,ou=users,dc=test",
+ AUTH_LDAP_USER_ATTR_MAP = { 'email': 'mail'},
+ )
+ self.mock_ldap.set_return_value('search_s',
+ ('ou=users,dc=test',
+ self.mock_ldap.SCOPE_ONELEVEL,
+ "(mail=bob@example.com)", None, 0),
+ [fixtures.bob])
+ self.mock_ldap.set_return_value('simple_bind_s',
+ ('uid=bob,ou=users,dc=test', 'password2'), None)
+ user = self.backend.authenticate('bob@example.com', 'password2')
+ self.assertEqual(user.username, u'bob')
+ self.assertEqual(user.email, u'bob@example.com')
+
+
105 django_ldap_pixiedust/tests/test_sync.py
@@ -0,0 +1,105 @@
+
+from django_ldap_pixiedust import sync
+from django.test import TestCase
+from django_auth_ldap.tests import TestSettings
+
+from django.contrib.auth.models import User, Group
+
+from fixtures import mock_ldap
+from django_auth_ldap import backend
+
+backend.LDAPBackend.ldap = mock_ldap
+
+from django_ldap_pixiedust import settings
+
+class TestUserSync(TestCase):
+
+ def setUp(self):
+ settings.ldap_settings = TestSettings(
+ AUTH_PROFILE_MODULE='django_ldap_pixiedust.PixieDustTestProfile',
+ AUTH_LDAP_USER_ATTR_MAP = {
+ 'email': 'mail',
+ },
+ AUTH_LDAP_PROFILE_ATTR_MAP = {
+ 'location': 'location',
+ },
+ )
+ sync.activate(True, True, True)
+
+ def tearDown(self):
+ sync.deactivate()
+
+ def test_create_new_user(self):
+ mock_ldap.reset()
+ u = User(username="fred",
+ email="foo@example.com",
+ password="sha1$SALT$4242",
+ )
+ u.save()
+ self.assertEqual(mock_ldap.ldap_methods_called(), [
+ 'initialize',
+ 'simple_bind_s',
+ 'search_s',
+ 'add_s',
+ ])
+ self.assertEqual(mock_ldap.ldap_methods_called_with_arguments()[3][1], {
+ 'who': 'uid=fred,ou=users,dc=test',
+ 'attrs': [
+ ('objectClass', ['organizationalPerson', 'inetOrgPerson']),
+ ('mail', ['foo@example.com']),
+ ('userPassword', ['{SSHA}QkJTQUxU']),
+ ]})
+
+ def test_create_existing_user(self):
+ mock_ldap.reset()
+ u = User(username="bob",
+ email="foo@example.com",
+ password="sha1$SALT$4242",
+ )
+ u.save()
+ self.assertEqual(mock_ldap.ldap_methods_called(), [
+ 'initialize',
+ 'simple_bind_s',
+ 'search_s',
+ 'search_s',
+ 'modify_s',
+ ])
+
+ def test_update_user(self):
+ u = User(username="bill",
+ email="foo@example.com",
+ password="sha1$SALT$4242",
+ )
+ u.save()
+ mock_ldap.reset()
+ mock_ldap.set_return_value('search_s',
+ ('uid=bill,ou=users,dc=test',
+ 0,
+ '(objectClass=*)',
+ None,
+ 0), [('uid=bill,ou=users,dc=test',[
+ ('objectClass', ['organizationalPerson', 'inetOrgPerson']),
+ ('mail', ['foo@example.com']),
+ ('userPassword', ['{SSHA}QkJTQUxU']),
+ ])])
+
+ u.email = 'bar@example.com'
+ u.save()
+ self.assertEqual(mock_ldap.ldap_methods_called(), ['initialize', 'simple_bind_s', 'search_s', 'search_s', 'modify_s'])
+ self.assertEqual(mock_ldap.ldap_methods_called_with_arguments()[4][1], {
+ 'who': 'uid=bill,ou=users,dc=test',
+ 'attrs': [(1, 'mail', None), (0, 'mail', ['bar@example.com'])]
+ })
+
+ def test_update_user_profile(self):
+ u = User(username="bill",
+ email="foo@example.com",
+ password="sha1$SALT$4242",
+ )
+ u.save()
+ profile = u.get_profile()
+ profile.location = "London"
+ profile.save()
+
+
+
101 django_ldap_pixiedust/tests/test_user.py
@@ -0,0 +1,101 @@
+# -*- coding: UTF-8 -*-
+
+from .fixtures import mock_ldap, LDAPTest, TestSettings
+from django_ldap_pixiedust import settings
+from django_ldap_pixiedust import backend
+from django_ldap_pixiedust.user import generate_attrs, SynchronisingUserAdapter
+from django_auth_ldap.config import LDAPSearch
+from django_auth_ldap.backend import _LDAPUser, LDAPBackend
+from django.test import TestCase
+from django.contrib.auth.models import User
+
+from . import fixtures
+
+class Vanilla:
+ def __init__(self, **kwargs):
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
+
+class TestGenerateAttrs(TestCase):
+
+ def test_empty(self):
+ attrs = generate_attrs(Vanilla(), {})
+ self.failUnlessEqual(attrs, {})
+
+ def test_simple(self):
+ attrs = generate_attrs(Vanilla(foo='x',bar='y'),
+ {'foo': 'ldap_foo',
+ 'bar': 'ldap_bar'})
+ self.failUnlessEqual(dict(attrs),
+ {'ldap_foo': ['x'],
+ 'ldap_bar': ['y']})
+
+ def test_int(self):
+ attrs = generate_attrs(Vanilla(bar=10),
+ {'bar': 'ldap_bar'})
+ self.failUnlessEqual(dict(attrs), {'ldap_bar': ['10']})
+
+ def test_unicode(self):
+ attrs = generate_attrs(Vanilla(bar=u"£"), {'bar': 'ldap_bar'})
+ self.failUnlessEqual(dict(attrs), {'ldap_bar': [unicode.encode(u"£", "UTF-8")]})
+
+
+ def test_empty_string(self):
+ attrs = generate_attrs(Vanilla(bar=''), {'bar': 'ldap_bar'})
+ self.failUnlessEqual(dict(attrs), {})
+
+ def test_zero(self):
+ attrs = generate_attrs(Vanilla(bar=0), {'bar': 'ldap_bar'})
+ self.failUnlessEqual(dict(attrs), {})
+
+
+class TestUser(Vanilla):
+
+ def __init__(self, username, **kw):
+ Vanilla.__init__(self, username=username, **kw)
+ self.ldap_user = TestLDAPUser(LDAPBackend(), user=self)
+
+class TestLDAPUser(_LDAPUser):
+ def __init__(self, backend, username=None, user=None):
+ _LDAPUser.__init__(self, backend, username, user)
+ self.ldap = mock_ldap
+
+
+class TestSynchronisingUserAdapter(TestCase):
+
+ def test_dn(self):
+ user = TestUser("bill")
+ fixture = SynchronisingUserAdapter(user)
+ self.assertEqual(fixture.dn, "uid=bill,ou=users,dc=test")
+
+ def test_user_attrs(self):
+ settings.ldap_settings = TestSettings(
+ AUTH_LDAP_USER_ATTR_MAP = {
+ 'foo': 'ldap_foo',
+ 'bar': 'ldap_bar',
+ },
+ LDAP_PIXIEDUST_DEFAULT_ATTR_MAP = {
+ 'frob': 'nicate'
+ })
+ user = TestUser(username="bill", foo='x', bar='y', password='sha1$SALT$4242')
+ fixture = SynchronisingUserAdapter(user)
+ self.failUnlessEqual(fixture.user_attrs(), {
+ 'ldap_foo': ['x'],
+ 'ldap_bar': ['y'],
+ 'frob': ['nicate'],
+ 'objectClass': ['organizationalPerson', 'inetOrgPerson'],
+ 'userPassword': ['{SSHA}QkJTQUxU'],
+ })
+
+ def test_reset_ldap_password(self):
+ user = TestUser("bill")
+ mock_ldap.reset()
+ fixture = SynchronisingUserAdapter(user)
+ fixture.reset_ldap_password()
+ self.assertEqual(mock_ldap.ldap_methods_called(), ['initialize', 'simple_bind_s', 'modify_s'])
+ self.assertEqual(mock_ldap.ldap_methods_called_with_arguments()[2][1], {
+ 'who': 'uid=bill,ou=users,dc=test',
+ 'attrs': [(2, 'userPassword', '{SSHA}!')],
+ })
+
234 django_ldap_pixiedust/user.py
@@ -0,0 +1,234 @@
+
+from __future__ import absolute_import
+
+from collections import defaultdict
+from django_auth_ldap.backend import _LDAPUser
+from ldap import modlist, SCOPE_BASE, NO_SUCH_OBJECT, MOD_ADD, MOD_DELETE, TYPE_OR_VALUE_EXISTS, OBJECT_CLASS_VIOLATION
+import logging
+
+from . import settings
+from .utils import django_password_to_ldap
+
+logger = logging.getLogger("django_ldap_pixiedust")
+
+class DjangoLDAPPixieDustError(Exception):
+ """ An exception that should never occur. """
+
+def generate_attrs(instance, attr_map):
+ """ This will NOT generate attributes for empty strings or zeros. This is
+ probably a bug. """
+ attrs = defaultdict(lambda: [])
+ for django_name, ldap_name in attr_map.items():
+ value = getattr(instance, django_name, None)
+ if not value:
+ continue # TODO: something cleverer
+ elif type(value) == type(u''):
+ attrs[ldap_name].append(unicode.encode(value, "UTF-8"))
+ else:
+ attrs[ldap_name].append(str(value))
+ return attrs
+
+class SynchronisingUserAdapter(object):
+
+ """ This adapts an _LDAPUser object that has been initialised with a correct User object. The layers and layers of wrapping allow this to sit on top of the existing code, but is kind of confusing.
+
+ The recommended incantation to go from a Django User object to a correctly set up adapter is::
+
+ ldap_user = backend.get_user(user.id)
+ sync = SynchronisingUserAdapter(ldap_user)
+
+ """
+
+ group_type = settings.ldap_settings.AUTH_LDAP_GROUP_TYPE
+ group_template = settings.ldap_settings.LDAP_PIXIEDUST_GROUP_DN_TEMPLATE
+
+ def __init__(self, original):
+ self.original = original
+ self.bound = False
+
+ def reset_ldap_password(self):
+ """ Set the LDAP Password to an impossible value """
+ self.connection.modify_s(self.dn, [(self.ldap.MOD_REPLACE,
+ 'userPassword',
+ '{SSHA}!')])
+
+ @property
+ def dn(self):
+ dn = self.original.ldap_user.dn
+ if type(dn) == type(u''):
+ return unicode.encode(dn, 'UTF-8')
+ else:
+ return dn
+
+ @property
+ def connection(self):
+ conn = self.original.ldap_user._get_connection()
+ if not self.bound:
+ self.original.ldap_user._bind()
+ self.bound = True
+ return conn
+
+ @property
+ def ldap(self):
+ return self.original.ldap_user.ldap
+
+ def user_attrs(self):
+ """ Create a set of user attributes based on the state of the django user object. """
+ attrs = generate_attrs(self.original, settings.ldap_settings.AUTH_LDAP_USER_ATTR_MAP)
+ for name, value in settings.ldap_settings.LDAP_PIXIEDUST_DEFAULT_ATTR_MAP.items():
+ if not attrs[name]:
+ attrs[name].append(value)
+ attrs['objectClass'] = settings.ldap_settings.LDAP_PIXIEDUST_USER_OBJECTCLASSES
+ if self.original.password and self.original.password != '!':
+ attrs['userPassword'].append(django_password_to_ldap(self.original.password))
+ return dict(attrs)
+
+ def profile_attrs(self):
+ attrs = generate_attrs(self.original.get_profile(), settings.ldap_settings.AUTH_LDAP_PROFILE_ATTR_MAP)
+ return dict(attrs)
+
+ def create_ldap_user(self):
+ new_attrs = self.user_attrs()
+ attrs = modlist.addModlist(new_attrs)
+ self.connection.add_s(self.dn, attrs)
+ self.update_flagged_groups()
+
+ def get_ldap_attrs(self):
+ results = self.connection.search_s(self.dn, SCOPE_BASE)
+ if results and len(results) == 1:
+ return dict(results[0][1])
+
+ def update_ldap_user(self):
+ """ Only updates those attributes the user object controls. """
+ old_attrs = self.get_ldap_attrs()
+ new_attrs = old_attrs.copy()
+ new_attrs.update(self.user_attrs())
+ attrs = modlist.modifyModlist(old_attrs, new_attrs)
+ self.connection.modify_s(self.dn, attrs)
+ self.update_flagged_groups()
+
+ def update_ldap_profile(self):
+ """ Only updates those attributes the user profile controls. """
+ old_attrs = self.get_ldap_attrs()
+ new_attrs = old_attrs.copy()
+ new_attrs.update(self.profile_attrs())
+ attrs = modlist.modifyModlist(old_attrs, new_attrs)
+ self.connection.modify_s(self.dn, attrs)
+ self.update_profile_flagged_groups()
+
+ def synchronise(self, created=False):
+ # ignoring created since it seems to be unreliable
+ if not self.ldap_user_exists():
+ self.create_ldap_user()
+ else:
+ self.update_ldap_user()
+
+ def synchronise_profile(self, created=False):
+ if not self.ldap_user_exists():
+ raise DjangoLDAPPixieDustError("Profile requested to be synchronised for a non-existent user")
+ self.update_ldap_profile()
+
+ def synchronise_groups(self):
+ self.clear_groups()
+ self.group_add(self.original.groups.iterator())
+
+ def ldap_user_exists(self):
+ try:
+ results = self.connection.search_s(self.original.ldap_user.dn, SCOPE_BASE)
+ if results is None:
+ return False
+ return True
+ except self.original.ldap_user.ldap.NO_SUCH_OBJECT:
+ return False
+
+ def flagged_groups(self):
+ """ A list of DNs for groups that are managed by flags on the user or profile objects. These should not be cleared by clear_groups! """
+
+ d = settings.ldap_settings.AUTH_LDAP_USER_FLAGS_BY_GROUP.values() + \
+ settings.ldap_settings.AUTH_LDAP_PROFILE_FLAGS_BY_GROUP.values()
+ return set(d)
+
+ def clear_groups(self):
+ logger.debug("Clearing %r from all groups" % (self.dn,))
+ group = self.group_type
+ flagged = self.flagged_groups()
+ # TODO ensure query does not return all members
+ for group_dn, group_attrs in settings.ldap_settings.AUTH_LDAP_GROUP_SEARCH.execute(self.connection):
+ if group_dn not in flagged and \
+ group.is_member(self.original.ldap_user, group_dn):
+ try:
+ self.connection.modify_s(group_dn, [(self.original.ldap_user.ldap.MOD_DELETE,
+ self.group_type.member_attr,
+ [self.dn])])
+ except OBJECT_CLASS_VIOLATION, e:
+ logger.warning("Unable to remove %r from %r because it is the last remaining member." % (self.original, group_dn))
+
+ def _group_dn(self, group):
+ return self.group_template % {'group': group.name}
+
+ def group_exists(self, group_dn):
+ try:
+ self.connection.search_s(group_dn, self.ldap.SCOPE_BASE)
+ return True
+ except self.ldap.NO_SUCH_OBJECT:
+ return False
+
+ def group_add(self, groups):
+ for g in groups:
+ group_dn = self._group_dn(g)
+ if not self.group_exists(group_dn):
+ self.group_create(g)
+ else:
+ self._group_add(group_dn)
+
+ def group_create(self, group):
+ """ Create an LDAP group with this user as the only member """
+ group_dn = self._group_dn(group)
+ attrs = {
+ self.group_type.name_attr: [group.name.encode('utf-8')],
+ 'objectClass': [settings.ldap_settings.LDAP_PIXIEDUST_GROUP_OBJECTCLASS],
+ self.group_type.member_attr: [self.dn],
+ }
+ self.connection.add_s(group_dn, modlist.addModlist(attrs))
+
+ def _group_add(self, group_dn):
+ """ Add this user to the specified LDAP group. """
+ try:
+ logger.debug("Adding %r to %r" % (self.dn, group_dn))
+ self.connection.modify_s(group_dn, [(self.ldap.MOD_ADD,
+ self.group_type.member_attr,
+ self.dn)])
+ except self.ldap.TYPE_OR_VALUE_EXISTS:
+ pass
+
+ def group_remove(self, groups):
+ for g in groups:
+ group_dn = self.group_template % {'group': g.name}
+ self._group_remove(group_dn)
+
+ def _group_remove(self, group_dn):
+ try:
+ logger.debug("Removing %r from %r" % (self.dn, group_dn))
+ self.connection.modify_s(group_dn, [(self.ldap.MOD_DELETE,
+ self.group_type.member_attr,
+ self.dn)])
+ except self.ldap.NO_SUCH_ATTRIBUTE:
+ pass
+
+ def update_flagged_groups(self):
+ """ Update any group membership that's taken from the
+ AUTH_LDAP_USER_FLAGS_BY_GROUP parameter. """
+
+ self._update_flagged_groups(self.original, settings.ldap_settings.AUTH_LDAP_USER_FLAGS_BY_GROUP)
+
+ def update_profile_flagged_groups(self):
+ profile = self.original.get_profile()
+ self._update_flagged_groups(profile, settings.ldap_settings.AUTH_LDAP_PROFILE_FLAGS_BY_GROUP)
+
+ def _update_flagged_groups(self, instance, flag_map):
+ for flag, group_dn in flag_map.items():
+ value = getattr(instance, flag)
+ if value:
+ self._group_add(group_dn)
+ else:
+ self._group_remove(group_dn)
33 django_ldap_pixiedust/utils.py
@@ -0,0 +1,33 @@
+
+from django.contrib.auth.models import User
+from django_auth_ldap.backend import LDAPBackend
+
+from base64 import encodestring
+import uuid
+
+#http://www.openldap.org/faq/data/cache/347.html
+def django_password_to_ldap(django_password):
+ """ Django and LDAP use different mechanisms for encoding passwords. """
+ scheme, salt, hexpasswd = django_password.split("$")
+ if scheme != 'sha1':
+ raise KeyError("scheme %r is not supported by django-ldap-pixiedust" % scheme)
+ passwd = hexpasswd.decode('hex')
+ return "{SSHA}" + encodestring(passwd + str(salt)).rstrip()
+
+def new_uid():
+ """ A utility class to generate UIDs for your new users, if you are using email address for logins. """
+ return str(uuid.uuid1())[:30]
+
+def reset_ldap_password(username):
+ """ Set the user's ldap password to something that can never be entered,
+ effectively locking the account. We do not sync these passwords from
+ django, because django_auth_ldap sets all new accounts to these
+ passwords. """
+
+ from django_ldap_pixiedust.user import SynchronisingUserAdapter
+ backend = LDAPBackend()
+ user = User.objects.get(username=username)
+ ldap_user = backend.get_user(user.id)
+ sync = SynchronisingUserAdapter(ldap_user)
+ sync.reset_ldap_password()
+
35 setup.py
@@ -0,0 +1,35 @@
+from setuptools import setup, find_packages
+import sys, os
+
+version = '1.0.0'
+
+def read(*rnames):
+ return open(os.path.join(os.path.dirname(__file__), *rnames)).read()
+
+long_description = (
+ read('README.rst')
+ + '\n' +
+ read('CHANGES')
+ )
+
+
+setup(name='django-ldap-pixiedust',
+ version=version,
+ description="Makes django_auth_ldap more sprinkly",
+ long_description=long_description,
+ classifiers=[], # Get strings from http://pypi.python.org/pypi?%3Aaction=list_classifiers
+ keywords='',
+ author='Doug Winter',
+ author_email='doug.winter@isotoma.com',
+ url='',
+ license='',
+ packages=find_packages(exclude=['ez_setup', 'examples', 'tests']),
+ include_package_data=True,
+ zip_safe=False,
+ install_requires=[
+ 'django-auth-ldap',
+ ],
+ entry_points="""
+ # -*- Entry points: -*-
+ """,
+ )
Please sign in to comment.
Something went wrong with that request. Please try again.