Skip to content
Permalink
Browse files

Merge pull request #1829 from privacyidea/1825/enforce_uid_type_str

The getUserId() function returns only strings
  • Loading branch information...
fredreichbier committed Aug 29, 2019
2 parents 2c90807 + 61743ea commit f1543ced95df9dd7214dbd2499e3f57bd3990f98
@@ -60,12 +60,13 @@
import yaml
import threading
import functools
import six

from .UserIdResolver import UserIdResolver

import ldap3
from ldap3 import MODIFY_REPLACE, MODIFY_ADD, MODIFY_DELETE
from ldap3 import Server, Tls, Connection
from ldap3 import Tls
from ldap3.core.exceptions import LDAPOperationResult
from ldap3.core.results import RESULT_SIZE_LIMIT_EXCEEDED
import ssl
@@ -76,18 +77,16 @@
from passlib.hash import ldap_salted_sha1
import hashlib
import binascii
from privacyidea.lib.crypto import urandom, geturandom
from privacyidea.lib.utils import is_true
from privacyidea.lib.framework import get_app_local_store
import datetime

from privacyidea.lib import _
from privacyidea.lib.utils import to_utf8, to_unicode
from privacyidea.lib.utils import to_utf8, to_unicode, convert_column_to_unicode
from privacyidea.lib.error import privacyIDEAError
import uuid
from ldap3.utils.conv import escape_bytes
from operator import itemgetter
from six import string_types

CACHE = {}

@@ -419,7 +418,7 @@ def _get_uid(entry, uidtype):
# we strip the curly braces from objectGUID values.
# If we are using ldap3 <= 2.4.1, there are no curly braces and we leave the value unchanged.
uid = uid.strip("{").strip("}")
return uid
return convert_column_to_unicode(uid)

def _trim_user_id(self, userId):
"""
@@ -539,7 +538,7 @@ def _ldap_attributes_to_user_object(self, attributes):
if ldap_k == map_v:
if ldap_k == "objectGUID":
# An objectGUID should be no list, since it is unique
if isinstance(ldap_v, string_types):
if isinstance(ldap_v, six.string_types):
ret[map_k] = ldap_v.strip("{").strip("}")
else:
raise Exception("The LDAP returns an objectGUID, that is no string: {0!s}".format(type(ldap_v)))
@@ -571,8 +570,9 @@ def getUserId(self, LoginName):
resolve the loginname to the userid.
:param LoginName: The login name from the credentials
:type LoginName: string
:type LoginName: str
:return: UserId as found for the LoginName
:rtype: str
"""
userid = ""
self._bind()
@@ -630,7 +630,7 @@ def getUserId(self, LoginName):

return userid

def getUserList(self, searchDict):
def getUserList(self, searchDict=None):
"""
:param searchDict: A dictionary with search parameters
:type searchDict: dict
@@ -48,7 +48,7 @@
import codecs
import six

from privacyidea.lib.utils import to_bytes
from privacyidea.lib.utils import to_bytes, convert_column_to_unicode
from .UserIdResolver import UserIdResolver

log = logging.getLogger(__name__)
@@ -253,7 +253,7 @@ def getUsername(self, userId):
:param userid: The userid in this resolver
:type userid: string
:return: username
:rtype: string
:rtype: str
'''
fields = self.descDict.get(userId)
index = self.sF["username"]
@@ -265,11 +265,12 @@ def getUserId(self, LoginName):
:param LoginName: the login of the user (as unicode)
:return: the userId
:rtype: str
"""
# We do not encode the LoginName anymore, as we are
# storing unicode in nameDict now.
if LoginName in self.nameDict:
return self.nameDict[LoginName]
return convert_column_to_unicode(self.nameDict[LoginName])
else:
return ""

@@ -293,7 +294,7 @@ def getSearchFields(self, searchDict=None):

return self.searchFields

def getUserList(self, searchDict):
def getUserList(self, searchDict=None):
"""
get a list of all users matching the search criteria of the searchdict
@@ -45,7 +45,7 @@
import requests
import base64
from six.moves.urllib.parse import urlencode
from privacyidea.lib.utils import to_bytes, to_unicode
from privacyidea.lib.utils import to_bytes, to_unicode, convert_column_to_unicode

log = logging.getLogger(__name__)

@@ -124,6 +124,7 @@ def getUsername(self, userid):
def getUserId(self, loginName):
"""
returns the uid for a given loginname/username
:rtype: str
"""
#res = {}
#if self.access_token:
@@ -133,7 +134,7 @@ def getUserId(self, loginName):
# ("userName", loginName)})
#return res.get("Resources", [{}])[0].get("externalId")
# It seems that the userName is the userId
return loginName
return convert_column_to_unicode(loginName)

def getUserList(self, searchDict=None):
"""
@@ -39,14 +39,15 @@

from sqlalchemy import and_
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import Integer, cast, String
from sqlalchemy.orm import sessionmaker, scoped_session

import traceback
import hashlib
from privacyidea.lib.pooling import get_engine
from privacyidea.lib.lifecycle import register_finalizer
from privacyidea.lib.utils import (is_true, censor_connect_string, to_utf8)
from privacyidea.lib.utils import (is_true, censor_connect_string,
convert_column_to_unicode)
from passlib.context import CryptContext
from base64 import b64decode, b64encode
from passlib.utils import h64
@@ -356,9 +357,11 @@ def getUserInfo(self, userId):
def _get_userid_filter(self, userId):
column = getattr(self.TABLE, self.map.get("userid"))
if isinstance(column.type, Integer):
return column == userId
# since our user ID is usually a string we need to cast
return column == int(userId)
else:
return column.like(userId)
# otherwise we cast the column to string (in case of postgres UUIDs)
return cast(column, String).like(userId)

def getUsername(self, userId):
"""
@@ -378,6 +381,7 @@ def getUserId(self, LoginName):
:param LoginName: The login name from the credentials
:type LoginName: string
:return: UserId as found for the LoginName
:rtype: str
"""
userid = ""

@@ -395,7 +399,7 @@ def getUserId(self, LoginName):
raise Exception("More than one user with loginname"
" %s found!" % LoginName)
user = self._get_user_from_mapped_object(r)
userid = user["id"]
userid = convert_column_to_unicode(user["id"])
except Exception as exx: # pragma: no cover
log.error("Could not get the userinformation: {0!r}".format(exx))

@@ -128,7 +128,7 @@ def getUserId(self, loginName):
:param loginName: The login name of the user
:type loginName: sting
:return: The ID of the user
:rtype: string or int
:rtype: str
"""
return "dummy_user_id"

@@ -57,7 +57,7 @@
import inspect
from collections import namedtuple, Sequence, Sized
from functools import update_wrapper
from privacyidea.lib.utils import to_bytes
from privacyidea.lib.utils import to_bytes, to_unicode

DIRECTORY = "tests/testdata/tmp_directory"

@@ -357,7 +357,7 @@ def _match_equal_to(search_base, attribute, value, candidates):
regex = "^{0}$".format(regex)

for entry in candidates:
dn = entry.get("dn")
dn = to_unicode(entry.get("dn"))

if attribute not in entry.get("attributes") or not dn.endswith(search_base):
continue
@@ -663,7 +663,7 @@ def search(self, search_base=None, search_scope=None,
if isinstance(search_filter, bytes):
# We need to convert to unicode otherwise pyparsing will not
# find the u"ö"
search_filter = search_filter.decode("utf-8")
search_filter = to_unicode(search_filter)
expr = Connection._parse_filter()
s_filter = expr.parseString(search_filter).asList()[0]
except pyparsing.ParseBaseException as exx:
@@ -756,7 +756,7 @@ def _on_Connection(self, server, user, password,
if authentication == ldap3.ANONYMOUS and user == "":
correct_password = True
for entry in self.directory:
if entry.get("dn") == user:
if to_unicode(entry.get("dn")) == user:
pw = entry.get("attributes").get("userPassword")
# password can be unicode
if to_bytes(pw) == to_bytes(password):
@@ -182,7 +182,7 @@ def test_01_sqlite_resolver(self):

user = "cornelius"
user_id = y.getUserId(user)
self.assertTrue(user_id == 3, user_id)
self.assertEqual(user_id, '3', user_id)

rid = y.getResolverId()
self.assertTrue(rid.startswith("sql."))
@@ -296,8 +296,9 @@ def test_05_add_user_update_delete(self):
stored_password = y.TABLE.filter_by(username="achmed").first().password
self.assertTrue(stored_password.startswith("{SSHA256}"), stored_password)

# we assume here the uid is of type int
uid = y.getUserId("achmed")
self.assertTrue(uid > self.num_users)
self.assertGreater(int(uid), self.num_users)

r = y.update_user(uid, {"username": "achmed2",
"password": "test"})
@@ -1627,7 +1628,7 @@ def test_25_LDAP_DN_with_utf8(self):

user = u"kölbel".encode('utf8')
user_id = y.getUserId(user)
self.assertEqual(user_id, "cn=kölbel,ou=example,o=test")
self.assertEqual(user_id, u"cn=kölbel,ou=example,o=test")

rid = y.getResolverId()
self.assertTrue(rid == "035fbc6272907bc79a2c036b5bf9665ca921d558", rid)
@@ -1681,7 +1682,7 @@ def test_26_LDAP_DN_with_unicode(self):

user = u"kölbel"
user_id = y.getUserId(user)
self.assertEqual(user_id, "cn=kölbel,ou=example,o=test")
self.assertEqual(user_id, u"cn=kölbel,ou=example,o=test")

rid = y.getResolverId()
self.assertTrue(rid == "035fbc6272907bc79a2c036b5bf9665ca921d558", rid)
@@ -1734,11 +1735,11 @@ def test_27_LDAP_multiple_loginnames(self):

user = u"kölbel"
user_id = y.getUserId(user)
self.assertEqual(user_id, "cn=kölbel,ou=example,o=test")
self.assertEqual(user_id, u"cn=kölbel,ou=example,o=test")

username = "cko@o"
user_id = y.getUserId(username)
self.assertEqual(user_id, "cn=kölbel,ou=example,o=test")
self.assertEqual(user_id, u"cn=kölbel,ou=example,o=test")

@ldap3mock.activate
def test_28_LDAP_multivalues(self):
@@ -1766,7 +1767,7 @@ def test_28_LDAP_multivalues(self):

user = u"kölbel"
user_id = y.getUserId(user)
self.assertEqual(user_id, "cn=kölbel,ou=example,o=test")
self.assertEqual(user_id, u"cn=kölbel,ou=example,o=test")
info = y.getUserInfo(user_id)
self.assertTrue("value1" in info.get("piAttr"))
self.assertTrue("value2" in info.get("piAttr"))
@@ -440,7 +440,7 @@ def test_12_multiple_resolvers(self):
# Now, query the user and populate the cache
self.assertEqual(UserCache.query.count(), 0)
user1 = User('wordpressuser', self.sql_realm)
self.assertEqual(user1.uid, 6)
self.assertEqual(user1.uid, '6')
# Assert it was found in reso_b (as it does not have a phone number)!
self.assertEqual(user1.resolver, 'reso_b')
self.assertEqual(UserCache.query.filter(UserCache.username == 'wordpressuser',
@@ -458,7 +458,7 @@ def test_12_multiple_resolvers(self):
'reso_b')
# Now, it should be located in reso_a!
user2 = User('wordpressuser', self.sql_realm)
self.assertEqual(user2.uid, 6)
self.assertEqual(user2.uid, '6')
self.assertEqual(user2.resolver, 'reso_a')
# ... but the cache still contains entries for both!
resolver_query = UserCache.query.filter(UserCache.username == 'wordpressuser',

0 comments on commit f1543ce

Please sign in to comment.
You can’t perform that action at this time.