Permalink
Switch branches/tags
360_event_handler 512/UI_Menus 541/limit_audit_log 542/resolver_in_audit 543/deny-policy 543/deny_policy 594/registration 627/smartphone-API 638/ldap-machine-resolver 644/datepicker 648/U2F 654/multiple-challenge-response 655/configurable-schema-calls 670/user-cache 683/user-cache-cleanup 691/tokeninfo-api 692/token-janitor-filters 694/improve_get_token 696/pi-manage-crashes 703/validate-radiuscheck 711/federation 715/refresh-button 715/reload-with-broadcast 729/auth_cache 738/ldap-umlaute 753/non-ascii-otp-pin 754/unicode-issues-3 754/unicode-issues 771/multipleCR 787/hsm_reinit 797/2step 802/configurable-serverpool 817/nonexistent-resources 820/supply-keysize 827/optimize-policies 840/offline-otp-2 840/offline-otp 850/title-ng-idle 874/log-activated-policies 874/triggered_policy_in_audit_log 878/smpp_provider 914/migration 919/smtpserver-timeout 940/db-reencryption 992/tasks-docs 1037/shorten-audit-log 1066/token_import 1069/hsm-reinit 1072/audit_file 1093/pi-bootstrap 1096/challenge-text 1140/idea_1_sharing_engine_between_threads 1224/optimize_for_one_token 1239/audit-rotate 1253/cache_token_types 1268/retry-on-deadlock 1290/queue-poc 1322/token-janitor-chunksize 1335/importlib SAML UCS4.2 auth_failure_ui autofix/wrapped2_to3_fix-0 autofix/wrapped2_to3_fix-1 autofix/wrapped2_to3_fix branch-1.2 branch-1.4 branch-1.5 branch-2.0 branch-2.2 branch-2.3 branch-2.5 branch-2.6 branch-2.8 branch-2.10 branch-2.11 branch-2.12 branch-2.13 branch-2.16 branch-2.18 branch-2.20 branch-2.21 branch-2.22 branch-2.23 cko_offline debian-packaging example-kerberos-resolver fixing_circleci freeradius-docs master migration_run multiple-versions objectGUID performance python3_compat python3_migrate_base python3_migrate_unicode redis-cache test-build-issue-postgres travis u2f_182 umlaut_filename version2
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
2356 lines (1999 sloc) 83.1 KB
# -*- coding: utf-8 -*-
# privacyIDEA is a fork of LinOTP
#
# 2018-12-10 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add Base58
# 2018-01-21 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add tokenkind
# 2017-08-11 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add auth_cache
# 2017-04-19 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Add support for multiple challenge response token
# 2016-08-31 Cornelius Kölbel <cornelius.koelbel@netknights.it>
# Reset failcounter of all user tokens.
# 2016-06-21 Cornelius Kölbel <cornelius@privacyidea.org>
# Add next pin change response
# 2016-06-13 Cornelius Kölbel <cornelius@privacyidea.org>
# Add otp length to detail response
# 2015-10-14 Cornelius Kölbel <cornelius@privacyidea.org>
# Add timelimit to user auth.
# 2015-08-31 Cornelius Kölbel <cornelius@privacyidea.org>
# Add check_realm_pass for 4-eyes policy
# 2015-03-20 Cornelius Kölbel, <cornelius@privacyidea.org>
# Add policy decorator for encryption
# 2015-03-15 Cornelius Kölbel, <cornelius@privacyidea.org>
# Add policy decorator for lost_token password
# 2014-12-08 Cornelius Kölbel, <cornelius@privacyidea.org>
# Rewrite the module for operation with flask
# assure >95% code coverage
# 2014-07-02 Cornelius Kölbel, <cornelius@privacyidea.org>
# remove references to machines, when a token is deleted
# 2014-05-08 Cornelius Kölbel, <cornelius@privacyidea.org>
#
# License: AGPLv3
# contact: http://www.privacyidea.org
#
# Copyright (C) 2010 - 2014 LSE Leading Security Experts GmbH
# License: AGPLv3
# contact: http://www.linotp.org
# http://www.lsexperts.de
# linotp@lsexperts.de
#
# This code is free software; you can redistribute it and/or
# modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE
# License as published by the Free Software Foundation; either
# version 3 of the License, or any later version.
#
# This code is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""
This module contains all top level token functions.
It depends on the models, lib.user and lib.tokenclass (which depends on the
tokenclass implementations like lib.tokens.hotptoken)
This is the middleware/glue between the HTTP API and the database
"""
import traceback
import string
import datetime
import binascii
import os
import logging
from six import string_types
from sqlalchemy import (and_, func)
from privacyidea.lib.error import (TokenAdminError,
ParameterError,
privacyIDEAError, ResourceNotFoundError)
from privacyidea.lib.decorators import (check_user_or_serial,
check_copy_serials)
from privacyidea.lib.tokenclass import TokenClass
from privacyidea.lib.utils import generate_password, is_true, BASE58
from privacyidea.lib.log import log_with
from privacyidea.models import (Token, Realm, TokenRealm, Challenge,
MachineToken, TokenInfo)
from privacyidea.lib.config import get_from_config
from privacyidea.lib.config import (get_token_class, get_token_prefix,
get_token_types,
get_inc_fail_count_on_false_pin)
from privacyidea.lib.user import get_user_info, User
from privacyidea.lib import _
from privacyidea.lib.realm import realm_is_defined
from privacyidea.lib.resolver import get_resolver_object
from privacyidea.lib.policy import ACTION, SCOPE
from privacyidea.lib.policydecorators import (libpolicy,
auth_user_does_not_exist,
auth_user_has_no_token,
auth_user_passthru,
auth_user_timelimit,
auth_lastauth,
auth_cache,
config_lost_token)
from privacyidea.lib.tokenclass import DATE_FORMAT
from privacyidea.lib.tokenclass import TOKENKIND
from dateutil.tz import tzlocal
log = logging.getLogger(__name__)
optional = True
required = False
ENCODING = "utf-8"
@log_with(log)
def create_tokenclass_object(db_token):
"""
(was createTokenClassObject)
create a token class object from a given type
If a tokenclass for this type does not exist,
the function returns None.
:param db_token: the database referenced token
:type db_token: database token object
:return: instance of the token class object
:rtype: tokenclass object
"""
# We use the tokentype from the database
tokentype = db_token.tokentype.lower()
token_object = None
token_class = get_token_class(tokentype)
if token_class:
try:
token_object = token_class(db_token)
except Exception as e: # pragma: no cover
raise TokenAdminError("create_tokenclass_object failed: {0!r}".format(e),
id=1609)
else:
log.error('type {0!r} not found in tokenclasses'.format(tokentype))
return token_object
def _create_token_query(tokentype=None, realm=None, assigned=None, user=None,
serial_exact=None, serial_wildcard=None, active=None, resolver=None,
rollout_state=None, description=None, revoked=None,
locked=None, userid=None, tokeninfo=None, maxfail=None):
"""
This function create the sql query for getting tokens. It is used by
get_tokens and get_tokens_paginate.
:return: An SQLAlchemy sql query
"""
sql_query = Token.query
if user is not None and not user.is_empty():
# extract the realm from the user object:
realm = user.realm
if tokentype is not None and tokentype.strip("*"):
# filter for type
if "*" in tokentype:
# match with "like"
sql_query = sql_query.filter(Token.tokentype.like(
tokentype.lower().replace("*", "%")))
else:
# exact match
sql_query = sql_query.filter(func.lower(Token.tokentype) ==
tokentype.lower())
if description is not None and description.strip("*"):
# filter for Description
if "*" in description:
# match with "like"
sql_query = sql_query.filter(func.lower(Token.description).like(
description.lower().replace("*", "%")))
else:
# exact match
sql_query = sql_query.filter(func.lower(Token.description) ==
description.lower())
if assigned is not None:
# filter if assigned or not
if assigned is False:
sql_query = sql_query.filter(Token.user_id == "")
elif assigned is True:
sql_query = sql_query.filter(Token.user_id != "")
else:
log.warning("assigned value not in [True, False] {0!r}".format(assigned))
if realm is not None:
# filter for the realm
sql_query = sql_query.filter(and_(func.lower(Realm.name) ==
realm.lower(),
TokenRealm.realm_id == Realm.id,
TokenRealm.token_id ==
Token.id)).distinct()
if resolver is not None and resolver.strip("*"):
# filter for given resolver
if "*" in resolver:
# match with "like"
sql_query = sql_query.filter(Token.resolver.like(resolver.replace(
"*", "%")))
else:
sql_query = sql_query.filter(Token.resolver == resolver)
if userid is not None and userid.strip("*"):
# filter for given userid
if "*" in userid:
# match with "like"
sql_query = sql_query.filter(Token.user_id.like(userid.replace(
"*", "%")))
else:
sql_query = sql_query.filter(Token.user_id == userid)
if serial_wildcard is not None and serial_wildcard.strip("*"):
# filter for serial
# match with "like"
sql_query = sql_query.filter(Token.serial.like(serial_wildcard.replace(
"*", "%")))
if serial_exact is not None:
# exact match for serial
sql_query = sql_query.filter(Token.serial == serial_exact)
if user is not None and not user.is_empty():
# filter for the rest of the user.
if user.resolver:
sql_query = sql_query.filter(Token.resolver == user.resolver)
(uid, _rtype, _resolver) = user.get_user_identifiers()
if uid:
if type(uid) == int:
uid = str(uid)
sql_query = sql_query.filter(Token.user_id == uid)
if active is not None:
# Filter active or inactive tokens
if active is True:
sql_query = sql_query.filter(Token.active == True)
else:
sql_query = sql_query.filter(Token.active == False)
if revoked is not None:
# Filter revoked or not revoked tokens
if revoked is True:
sql_query = sql_query.filter(Token.revoked == True)
else:
sql_query = sql_query.filter(Token.revoked == False)
if locked is not None:
# Filter revoked or not revoked tokens
if locked is True:
sql_query = sql_query.filter(Token.locked == True)
else:
sql_query = sql_query.filter(Token.locked == False)
if maxfail is not None:
# Filter tokens, that reached maxfail
if maxfail is True:
sql_query = sql_query.filter(Token.maxfail <= Token.failcount)
else:
sql_query = sql_query.filter(Token.maxfail > Token.failcount)
if rollout_state is not None:
# Filter for tokens with the given rollout state
sql_query = sql_query.filter(Token.rollout_state == rollout_state)
if tokeninfo is not None:
# Filter for tokens with token token.info.<key> and token.info.<value>
if len(tokeninfo) != 1:
raise privacyIDEAError("I can only create SQL filters from "
"tokeninfo of length 1.")
sql_query = sql_query.filter(TokenInfo.Key == list(tokeninfo)[0])
sql_query = sql_query.filter(TokenInfo.Value == list(tokeninfo.values())[0])
sql_query = sql_query.filter(TokenInfo.token_id == Token.id)
return sql_query
@log_with(log)
#@cache.memoize(10)
def get_tokens(tokentype=None, realm=None, assigned=None, user=None,
serial=None, serial_wildcard=None, active=None, resolver=None, rollout_state=None,
count=False, revoked=None, locked=None, tokeninfo=None,
maxfail=None, psize=None, page=1):
"""
(was getTokensOfType)
This function returns a list of token objects of a
* given type,
* of a realm
* or tokens with assignment or not
* for a certain serial number or
* for a User
E.g. thus you can get all assigned tokens of type totp.
:param tokentype: The type of the token. If None, all tokens are returned.
:type tokentype: basestring
:param realm: get tokens of a realm. If None, all tokens are returned.
:type realm: basestring
:param assigned: Get either assigned (True) or unassigned (False) tokens.
If None get all tokens.
:type assigned: bool
:param user: Filter for the Owner of the token
:type user: User Object
:param serial: The exact serial number of a token
:type serial: basestring
:param serial_wildcard: A wildcard to match token serials
:type serial_wildcard: basestring
:param active: Whether only active (True) or inactive (False) tokens
should be returned
:type active: bool
:param resolver: filter for the given resolver name
:type resolver: basestring
:param rollout_state: returns a list of the tokens in the certain rollout
state. Some tokens are not enrolled in a single step but in multiple
steps. These tokens are then identified by the DB-column rollout_state.
:param count: If set to True, only the number of the result and not the
list is returned.
:type count: bool
:param revoked: Only search for revoked tokens or only for not revoked
tokens
:type revoked: bool
:param locked: Only search for locked tokens or only for not locked tokens
:type locked: bool
:param tokeninfo: Return tokens with the given tokeninfo. The tokeninfo
is a key/value dictionary
:type tokeninfo: dict
:param maxfail: If only tokens should be returned, which failcounter
reached maxfail
:param psize: The number of returned tokens can be restricted by using the parameter psize.
Pagination then is used.
:type psize: int
:param page: If pagination is used, this is the page to get
:type page: int
:return: A list of tokenclasses (lib.tokenclass).
In case of pagination a tuple of count, prev, next, token_list
:rtype: list
"""
token_list = []
sql_query = _create_token_query(tokentype=tokentype, realm=realm,
assigned=assigned, user=user,
serial_exact=serial, serial_wildcard=serial_wildcard,
active=active, resolver=resolver,
rollout_state=rollout_state,
revoked=revoked, locked=locked,
tokeninfo=tokeninfo, maxfail=maxfail)
# Warning for unintentional exact serial matches
if serial is not None and "*" in serial:
log.info(u"Exact match on a serial containing a wildcard: {!r}".format(serial))
# Warning for unintentional wildcard serial matches
if serial_wildcard is not None and "*" not in serial_wildcard:
log.info(u"Wildcard match on serial without a wildcard: {!r}".format(serial_wildcard))
# Decide, what we are supposed to return
if count is True:
ret = sql_query.count()
elif psize:
# We return a paginated list of token objects.
pagination = sql_query.paginate(page, per_page=psize,
error_out=False)
count = pagination.total
prev = None
if pagination.has_prev:
prev = page - 1
next = None
if pagination.has_next:
next = page + 1
for token in pagination.items:
tokenobject = create_tokenclass_object(token)
if isinstance(tokenobject, TokenClass):
# A database token, that has a non existing type, will
# return None, and not a TokenClass. We do not want to
# add None to our list
token_list.append(tokenobject)
ret = count, prev, next, token_list
else:
# Return a simple, flat list of tokenobjects
for token in sql_query.all():
# the token is the database object, but we want an instance of the
# tokenclass!
tokenobject = create_tokenclass_object(token)
if isinstance(tokenobject, TokenClass):
# A database token, that has a non existing type, will
# return None, and not a TokenClass. We do not want to
# add None to our list
token_list.append(tokenobject)
ret = token_list
return ret
@log_with(log)
def get_tokens_paginate(tokentype=None, realm=None, assigned=None, user=None,
serial=None, active=None, resolver=None, rollout_state=None,
sortby=Token.serial, sortdir="asc", psize=15,
page=1, description=None, userid=None):
"""
This function is used to retrieve a token list, that can be displayed in
the Web UI. It supports pagination.
Each retrieved page will also contain a "next" and a "prev", indicating
the next or previous page. If either does not exist, it is None.
:param tokentype:
:param realm:
:param assigned: Returns assigned (True) or not assigned (False) tokens
:type assigned: bool
:param user: The user, whose token should be displayed
:type user: User object
:param serial: a pattern for matching the serial
:param active:
:param resolver: A resolver name, which may contain "*" for filtering.
:type resolver: basestring
:param userid: A userid, which may contain "*" for filtering.
:type userid: basestring
:param rollout_state:
:param sortby: Sort by a certain Token DB field. The default is
Token.serial. If a string like "serial" is provided, we try to convert
it to the DB column.
:type sortby: A Token column or a string.
:param sortdir: Can be "asc" (default) or "desc"
:type sortdir: basestring
:param psize: The size of the page
:type psize: int
:param page: The number of the page to view. Starts with 1 ;-)
:type page: int
:return: dict with tokens, prev, next and count
:rtype: dict
"""
sql_query = _create_token_query(tokentype=tokentype, realm=realm,
assigned=assigned, user=user,
serial_wildcard=serial, active=active,
resolver=resolver,
rollout_state=rollout_state,
description=description, userid=userid)
if isinstance(sortby, string_types):
# convert the string to a Token column
cols = Token.__table__.columns
sortby = cols.get(sortby)
if sortdir == "desc":
sql_query = sql_query.order_by(sortby.desc())
else:
sql_query = sql_query.order_by(sortby.asc())
pagination = sql_query.paginate(page, per_page=psize,
error_out=False)
tokens = pagination.items
prev = None
if pagination.has_prev:
prev = page-1
next = None
if pagination.has_next:
next = page + 1
token_list = []
for token in tokens:
tokenobject = create_tokenclass_object(token)
if isinstance(tokenobject, TokenClass):
token_dict = tokenobject.get_as_dict()
# add user information
# In certain cases the LDAP or SQL server might not be reachable.
# Then an exception is raised
token_dict["username"] = ""
token_dict["user_realm"] = ""
try:
userobject = tokenobject.user
if userobject:
token_dict["username"] = userobject.login
token_dict["user_realm"] = userobject.realm
token_dict["user_editable"] = get_resolver_object(
userobject.resolver).editable
except Exception as exx:
log.error("User information can not be retrieved: {0!s}".format(exx))
log.debug(traceback.format_exc())
token_dict["username"] = "**resolver error**"
token_list.append(token_dict)
ret = {"tokens": token_list,
"prev": prev,
"next": next,
"current": page,
"count": pagination.total}
return ret
def get_one_token(*args, **kwargs):
"""
Fetch exactly one token according to the given filter arguments, which are passed to
``get_tokens``. Raise ``ResourceNotFoundError`` if no token was found. Raise
``ParameterError`` if more than one token was found.
"""
result = get_tokens(*args, **kwargs)
if not result:
raise ResourceNotFoundError(u"The requested token could not be found.")
elif len(result) > 1:
raise ParameterError(u"More than one matching token were found.")
else:
return result[0]
def get_tokens_from_serial_or_user(serial, user, **kwargs):
"""
Fetch tokens, either by (exact) serial, or all tokens of a single user.
In case a serial number is given, check that exactly one token is returned
and raise a ResourceNotFoundError if that is not the case.
In case a user is given, the result can also be empty.
:param serial: exact serial number or None
:param user: a user object or None
:param kwargs: additional argumens to ``get_tokens``
:return: a (possibly empty) list of tokens
"""
if serial:
return [get_one_token(serial=serial, user=user, **kwargs)]
else:
return get_tokens(serial=serial, user=user, **kwargs)
@log_with(log)
def get_token_type(serial):
"""
Returns the tokentype of a given serial number. If the token does
not exist or can not be deterimined, an empty string is returned.
:param serial: the serial number of the to be searched token
:type serial: string
:return: tokentype
:rtype: string
"""
if serial and "*" in serial:
return ""
try:
return get_one_token(serial=serial).type
except ResourceNotFoundError:
return ""
@log_with(log)
def check_serial(serial):
"""
This checks, if the given serial number can be used for a new token.
it returns a tuple (result, new_serial)
result being True if the serial does not exist, yet.
new_serial is a suggestion for a new serial number, that does not
exist, yet.
:param serial: Seral number that is to be checked, if it can be used for
a new token.
:type serial: string
:result: bool and serial number
:rtype: tuple
"""
# serial does not exist, yet
result = True
new_serial = serial
i = 0
while get_tokens(serial=new_serial):
# as long as we find a token, modify the serial:
i += 1
result = False
new_serial = "{0!s}_{1:02d}".format(serial, i)
return result, new_serial
@log_with(log)
def get_num_tokens_in_realm(realm, active=True):
"""
This returns the number of tokens in one realm.
:param realm: The name of the realm
:type realm: basestring
:param active: If only active tokens should be taken into account
:type active: bool
:return: The number of tokens in the realm
:rtype: int
"""
return get_tokens(realm=realm, active=active, count=True)
@log_with(log)
def get_realms_of_token(serial, only_first_realm=False):
"""
This function returns a list of the realms of a token
:param serial: the exact serial number of the token
:type serial: basestring
:param only_first_realm: Wheather we should only return the first realm
:type only_first_realm: bool
:return: list of the realm names
:rtype: list
"""
if serial and "*" in serial:
return []
try:
tokenobject = get_one_token(serial=serial)
realms = tokenobject.get_realms()
except ResourceNotFoundError:
realms = []
if len(realms) > 1:
log.debug(
"Token {0!s} in more than one realm: {1!s}".format(serial, realms))
if only_first_realm:
if realms:
realms = realms[0]
else:
realms = None
return realms
@log_with(log)
def token_exist(serial):
"""
returns true if the token with the exact given serial number exists
:param serial: the serial number of the token
"""
if serial:
return get_tokens(serial=serial, count=True) > 0
else:
# If we have no serial we return false anyway!
return False
@log_with(log)
def get_token_owner(serial):
"""
returns the user object, to which the token is assigned.
the token is identified and retrieved by it's serial number
If the token has no owner, None is returned
Wildcards in the serial number are ignored. This raises
``ResourceNotFoundError`` if the token could not be found.
:param serial: serial number of the token
:type serial: basestring
:return: The owner of the token
:rtype: User object or None
"""
tokenobject = get_one_token(serial=serial)
return tokenobject.user
@log_with(log)
def is_token_owner(serial, user):
"""
Check if the given user is the owner of the token with the given serial
number
:param serial: The serial number of the token
:type serial: str
:param user: The user that needs to be checked
:type user: User object
:return: Return True or False
:rtype: bool
"""
ret = False
token_owner = get_token_owner(serial)
if token_owner is not None:
ret = token_owner == user
return ret
@log_with(log)
def get_tokens_in_resolver(resolver):
"""
Return a list of the token ojects, that contain this very resolver
:param resolver: The resolver, the tokens should be in
:type resolver: basestring
:return: list of tokens with this resolver
:rtype: list of token objects
"""
ret = get_tokens(resolver=resolver)
return ret
@log_with(log)
def get_tokenclass_info(tokentype, section=None):
"""
return the config definition of a dynamic token
:param tokentype: the tokentype of the token like "totp" or "hotp"
:type tokentype: basestring
:param section: subsection of the token definition - optional
:type section: basestring
:return: dict - if nothing found an empty dict
:rtype: dict
"""
res = {}
Tokenclass = get_token_class(tokentype)
if Tokenclass:
res = Tokenclass.get_class_info(section)
return res
@log_with(log)
def get_all_token_users():
"""
return a dictionary with all tokens, that are assigned to users.
This returns a dictionary with the key being the serial number of
the token and the user information as dict.
:return: dictionary of serial numbers
:rtype: dict
"""
tokens = {}
tokenobject_list = get_tokens(assigned=True)
for tokenobject in tokenobject_list:
user_info = {}
if tokenobject.token.user_id and tokenobject.token.resolver:
user_info = get_user_info(tokenobject.token.user_id,
tokenobject.token.resolver)
if tokenobject.token.user_id and len(user_info) == 0:
user_info['username'] = u'/:no user info:/'
if user_info:
tokens[tokenobject.token.serial] = user_info
return tokens
@log_with(log)
def get_otp(serial, current_time=None):
"""
This function returns the current OTP value for a given Token.
The tokentype needs to support this function.
if the token does not support getting the OTP value, a -2 is returned.
If the token could not be found, ResourceNotFoundError is raised.
:param serial: serial number of the token
:param current_time: a fake servertime for testing of TOTP token
:type current_time: datetime
:return: tuple with (result, pin, otpval, passw)
:rtype: tuple
"""
tokenobject = get_one_token(serial=serial)
return tokenobject.get_otp(current_time=current_time)
@log_with(log)
def get_multi_otp(serial, count=0, epoch_start=0, epoch_end=0,
curTime=None,
timestamp=None):
"""
This function returns a list of OTP values for the given Token.
Please note, that the tokentype needs to support this function.
:param serial: the serial number of the token
:type serial: basestring
:param count: number of the next otp values (to be used with event or
time based tokens)
:param epoch_start: unix time start date (used with time based tokens)
:param epoch_end: unix time end date (used with time based tokens)
:param curTime: Simulate the servertime
:type curTime: datetime
:param timestamp: Simulate the servertime (unix time in seconds)
:type timestamp: int
:return: dictionary of otp values
:rtype: dictionary
"""
ret = {"result": False}
tokenobject = get_one_token(serial=serial)
log.debug("getting multiple otp values for token {0!r}. curTime={1!r}".format(tokenobject, curTime))
res, error, otp_dict = tokenobject.\
get_multi_otp(count=count,
epoch_start=epoch_start,
epoch_end=epoch_end,
curTime=curTime,
timestamp=timestamp)
log.debug("received {0!r}, {1!r}, and {2!r} otp values".format(res, error,
len(otp_dict)))
if res is True:
ret = otp_dict
ret["result"] = True
else:
ret["error"] = error
return ret
@log_with(log)
def get_token_by_otp(token_list, otp="", window=10):
"""
search the token in the token_list, that creates the given OTP value.
The tokenobject_list would be created by get_tokens()
:param token_list: the list of token objects to be investigated
:type token_list: list of token objects
:param otp: the otp value, that needs to be found
:type otp: basestring
:param window: the window of search
:type window: int
:return: The token, that creates this OTP value
:rtype: Tokenobject
"""
result_token = None
result_list = []
for token in token_list:
log.debug("checking token {0!r}".format(token.get_serial()))
try:
r = token.check_otp_exist(otp=otp, window=window)
log.debug("result = {0:d}".format(int(r)))
if r >= 0:
result_list.append(token)
except Exception as err:
# A flaw in a single token should not stop privacyidea from finding
# the right token
log.warning("error in calculating OTP for token {0!s}: "
"{1!s}".format(token.token.serial, err))
if len(result_list) == 1:
result_token = result_list[0]
elif result_list:
raise TokenAdminError('multiple tokens are matching this OTP value!',
id=1200)
return result_token
@log_with(log)
def get_serial_by_otp(token_list, otp="", window=10):
"""
Returns the serial for a given OTP value
The tokenobject_list would be created by get_tokens()
:param token_list: the list of token objects to be investigated
:type token_list: list of token objects
:param otp: the otp value, that needs to be found
:param window: the window of search
:type window: int
:return: the serial for a given OTP value and the user
:rtype: basestring
"""
serial = None
token = get_token_by_otp(token_list, otp=otp, window=window)
if token is not None:
serial = token.get_serial()
return serial
@log_with(log)
def gen_serial(tokentype=None, prefix=None):
"""
generate a serial for a given tokentype
:param tokentype: the token type prefix is done by a lookup on the tokens
:param prefix: A prefix to the serial number
:return: serial number
:rtype: string
"""
serial_len = int(get_from_config("SerialLength") or 8)
def _gen_serial(_prefix, _tokennum):
h_serial = ''
num_str = '{:04d}'.format(_tokennum)
h_len = serial_len - len(num_str)
if h_len > 0:
h_serial = binascii.hexlify(os.urandom(h_len)).upper()[0:h_len]
return "{0!s}{1!s}{2!s}".format(_prefix, num_str, h_serial)
if not tokentype:
tokentype = 'PIUN'
if not prefix:
prefix = get_token_prefix(tokentype.lower(), tokentype.upper())
# now search the number of tokens of tokenytype in the token database
tokennum = Token.query.filter(Token.tokentype == u'' + tokentype).count()
# Now create the serial
serial = _gen_serial(prefix, tokennum)
# now test if serial already exists
while True:
numtokens = Token.query.filter(Token.serial == u'' + serial).count()
if numtokens == 0:
# ok, there is no such token, so we're done
break
serial = _gen_serial(prefix, tokennum + numtokens) # pragma: no cover
return serial
@log_with(log)
def import_token(serial, token_dict, default_hashlib=None, tokenrealms=None):
"""
This function is used during the import of a PSKC file.
:param serial: The serial number of the token
:param token_dict: A dictionary describing the token like:
{"type": ...,
"description": ...,
"otpkey": ...,
"counter: ...,
"timeShift": ...}
:param default_hashlib:
:return: the token object
"""
init_param = {'serial': serial,
'description': token_dict.get("description",
"imported")}
for p in ['type', 'otpkey', 'otplen', 'timeStep', 'hashlib', 'tans']:
if p in token_dict:
init_param[p] = token_dict[p]
user_obj = None
if token_dict.get("user"):
user_obj = User(token_dict.get("user").get("username"),
token_dict.get("user").get("realm"),
token_dict.get("user").get("resolver"))
if default_hashlib and default_hashlib != "auto":
init_param['hashlib'] = default_hashlib
# Imported tokens are usually hardware tokens
token = init_token(init_param, user=user_obj,
tokenrealms=tokenrealms,
tokenkind=TOKENKIND.HARDWARE)
if token_dict.get("counter"):
token.set_otp_count(token_dict.get("counter"))
if token_dict.get("timeShift"):
token.add_tokeninfo("timeShift", token_dict.get("timeShift"))
return token
@log_with(log)
def init_token(param, user=None, tokenrealms=None,
tokenkind=None):
"""
create a new token or update an existing token
:param param: initialization parameters like:
serial (optional)
type (optionl, default=hotp)
otpkey
:type param: dict
:param user: the token owner
:type user: User Object
:param tokenrealms: the realms, to which the token should belong
:type tokenrealms: list
:param tokenkind: The kind of the token, can be "software",
"hardware" or "virtual"
:return: token object or None
:rtype: TokenClass object
"""
db_token = None
tokenobject = None
tokentype = param.get("type") or "hotp"
serial = param.get("serial") or gen_serial(tokentype, param.get("prefix"))
realms = []
# unsupported tokentype
tokentypes = get_token_types()
if tokentype.lower() not in tokentypes:
log.error('type {0!r} not found in tokentypes: {1!r}'.format(tokentype, tokentypes))
raise TokenAdminError("init token failed: unknown token type {0!r}".format(tokentype), id=1610)
# Check, if a token with this serial already exist
# create a list of the found tokens
tokenobject_list = get_tokens(serial=serial)
token_count = len(tokenobject_list)
if token_count == 0:
# A token with the serial was not found, so we create a new one
db_token = Token(serial, tokentype=tokentype.lower())
else:
# The token already exist, so we update the token
db_token = tokenobject_list[0].token
# prevent from changing the token type
old_typ = db_token.tokentype
if old_typ.lower() != tokentype.lower():
msg = ('token %r already exist with type %r. '
'Can not initialize token with new type %r' % (serial,
old_typ,
tokentype))
log.error(msg)
raise TokenAdminError("initToken failed: {0!s}".format(msg))
# if there is a realm as parameter (and the realm is not empty), but no
# user, we assign the token to this realm.
if param.get("realm") and 'user' not in param:
realms.append(param.get("realm"))
# Assign the token to all tokenrealms
if tokenrealms and isinstance(tokenrealms, list):
realms.extend(tokenrealms)
# and to the user realm
if user and user.realm:
realms.append(user.realm)
if realms:
# We need to save the token to the DB, otherwise the Token
# has no id!
db_token.save()
db_token.set_realms(realms)
# the tokenclass object is created
tokenobject = create_tokenclass_object(db_token)
if token_count == 0:
# if this token is a newly created one, we have to setup the defaults,
# which later might be overwritten by the tokenobject.update(param)
tokenobject.set_defaults()
# Set the user of the token
if user is not None and user.login != "":
tokenobject.set_user(user)
upd_params = param
tokenobject.update(upd_params)
try:
# Save the token to the database
db_token.save()
except Exception as e: # pragma: no cover
log.error('token create failed!')
log.debug("{0!s}".format(traceback.format_exc()))
raise TokenAdminError("token create failed {0!r}".format(e), id=1112)
# We only set the tokenkind here, if it was explicitly set in the
# init_token call.
# In all other cases it is set in the update method of the tokenclass.
if tokenkind:
tokenobject.add_tokeninfo("tokenkind", tokenkind)
# Set the validity period
validity_period_start = param.get("validity_period_start")
validity_period_end = param.get("validity_period_end")
if validity_period_end:
set_validity_period_end(serial, user, validity_period_end)
if validity_period_start:
set_validity_period_start(serial, user, validity_period_start)
return tokenobject
@log_with(log)
@check_user_or_serial
def remove_token(serial=None, user=None):
"""
remove the token that matches the serial number or
all tokens of the given user and also remove the realm associations and
all its challenges
:param user: The user, who's tokens should be deleted.
:type user: User object
:param serial: The serial number of the token to delete (exact)
:type serial: basestring
:return: The number of deleted token
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
token_count = len(tokenobject_list)
# Delete challenges of such a token
for tokenobject in tokenobject_list:
# delete the challenge
Challenge.query.filter(Challenge.serial == tokenobject.get_serial(
)).delete()
# due to legacy SQLAlchemy it could happen that the
# foreign key relation could not be deleted
# so we do this manualy
# delete references to client machines
MachineToken.query.filter(MachineToken.token_id ==
tokenobject.token.id).delete()
TokenRealm.query.filter(TokenRealm.token_id ==
tokenobject.token.id).delete()
tokenobject.token.delete()
return token_count
@log_with(log)
def set_realms(serial, realms=None, add=False):
"""
Set all realms of a token. This sets the realms new. I.e. it does not add
realms. So realms that are not contained in the list will not be assigned
to the token anymore.
If the token could not be found, a ResourceNotFoundError is raised.
Thus, setting realms=[] clears all realms assignments.
:param serial: the serial number of the token (exact)
:type serial: basestring
:param realms: A list of realm names
:type realms: list
:param add: if the realms should be added and not replaced
:type add: bool
"""
realms = realms or []
corrected_realms = []
# get rid of non-defined realms
for realm in realms:
if realm_is_defined(realm):
corrected_realms.append(realm)
tokenobject = get_one_token(serial=serial)
tokenobject.set_realms(corrected_realms, add=add)
tokenobject.save()
@log_with(log)
def set_defaults(serial):
"""
Set the default values for the token with the given serial number (exact)
:param serial: token serial
:type serial: basestring
:return: None
"""
db_token = get_one_token(serial=serial).token
db_token.otplen = int(get_from_config("DefaultOtpLen", 6))
db_token.count_window = int(get_from_config("DefaultCountWindow", 15))
db_token.maxfail = int(get_from_config("DefaultMaxFailCount", 15))
db_token.sync_window = int(get_from_config("DefaultSyncWindow", 1000))
db_token.tokentype = u"hotp"
db_token.save()
@log_with(log)
def assign_token(serial, user, pin=None, encrypt_pin=False, err_message=None):
"""
Assign token to a user.
If the PIN is given, the PIN is reset.
:param serial: The serial number of the token
:type serial: basestring
:param user: The user, to whom the token should be assigned.
:type user: User object
:param pin: The PIN for the newly assigned token.
:type pin: basestring
:param encrypt_pin: Whether the PIN should be stored in an encrypted way
:type encrypt_pin: bool
:param err_message: The error message, that is displayed in case the token is already assigned
:type err_message: basestring
"""
tokenobject = get_one_token(serial=serial)
# Check if the token already belongs to another user
old_user = tokenobject.user
if old_user:
log.warning("token already assigned to user: {0!r}".format(old_user))
err_message = err_message or "Token already assigned to user {0!r}".format(old_user)
raise TokenAdminError(err_message, id=1103)
tokenobject.set_user(user)
if pin is not None:
tokenobject.set_pin(pin, encrypt=encrypt_pin)
# reset the OtpFailCounter
tokenobject.set_failcount(0)
try:
tokenobject.save()
except Exception as e: # pragma: no cover
log.error('update Token DB failed')
raise TokenAdminError("Token assign failed for {0!r}/{1!s} : {2!r}".format(user, serial, e), id=1105)
log.debug("successfully assigned token with serial "
"{0!r} to user {1!r}".format(serial, user))
return True
@log_with(log)
@check_user_or_serial
def unassign_token(serial, user=None):
"""
unassign the user from the token, or all tokens of a user
:param serial: The serial number of the token to unassign (exact). Can be None
:param user: A user whose tokens should be unassigned
:return: number of unassigned tokens
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.token.user_id = ""
tokenobject.token.resolver = ""
tokenobject.token.resolver_type = ""
tokenobject.set_pin("")
tokenobject.set_failcount(0)
try:
tokenobject.save()
except Exception as e: # pragma: no cover
log.error('update token DB failed')
raise TokenAdminError("Token unassign failed for {0!r}/{1!r}: {2!r}".format(serial, user, e), id=1105)
log.debug("successfully unassigned token with serial {0!r}".format(tokenobject))
# TODO: test with more than 1 token
return len(tokenobject_list)
@log_with(log)
def resync_token(serial, otp1, otp2, options=None, user=None):
"""
Resyncronize the token of the given serial number and user by searching the
otp1 and otp2 in the future otp values.
:param serial: token serial number (exact)
:type serial: basestring
:param otp1: first OTP value
:type otp1: basestring
:param otp2: second OTP value, directly after the first
:type otp2: basestring
:param options: additional options like the servertime for TOTP token
:type options: dict
:return:
"""
ret = False
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
ret = tokenobject.resync(otp1, otp2, options)
tokenobject.save()
return ret
@log_with(log)
@check_user_or_serial
def reset_token(serial, user=None):
"""
Reset the failcounter of a single token, or of all tokens of one user.
:param serial: serial number (exact)
:param user:
:return: The number of tokens, that were resetted
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.reset()
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_pin(serial, pin, user=None, encrypt_pin=False):
"""
Set the token PIN of the token. This is the static part that can be used
to authenticate.
:param pin: The pin of the token
:type pin: basestring
:param user: If the user is specified, the pins for all tokens of this
user will be set
:type user: User object
:param serial: If the serial is specified, the PIN for this very token
will be set. (exact)
:return: The number of PINs set (usually 1)
:rtype: int
"""
if isinstance(user, string_types):
# check if by accident the wrong parameter (like PIN)
# is put into the user attribute
log.warning("Parameter user must not be a string: {0!r}".format(user))
raise ParameterError("Parameter user must not be a string: {0!r}".format(
user), id=1212)
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_pin(pin, encrypt=encrypt_pin)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
def set_pin_user(serial, user_pin, user=None):
"""
This sets the user pin of a token. This just stores the information of
the user pin for (e.g. an eTokenNG, Smartcard) in the database
:param serial: The serial number of the token (exact)
:type serial: basestring
:param user_pin: The user PIN
:type user_pin: basestring
:return: The number of PINs set (usually 1)
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_user_pin(user_pin)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
def set_pin_so(serial, so_pin, user=None):
"""
Set the SO PIN of a smartcard. The SO Pin can be used to reset the
PIN of a smartcard. The SO PIN is stored in the database, so that it
could be used for automatic processes for User PIN resetting.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param so_pin: The Security Officer PIN
:type so_ping: basestring
:return: The number of SO PINs set. (usually 1)
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_so_pin(so_pin)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def revoke_token(serial, user=None):
"""
Revoke a token, or all tokens of a single user.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param enable: False is the token should be disabled
:type enable: bool
:param user: all tokens of the user will be enabled or disabled
:type user: User object
:return: Number of tokens that were enabled/disabled
:rtype:
"""
tokenobject_list = get_tokens_from_serial_or_user(user=user, serial=serial)
for tokenobject in tokenobject_list:
tokenobject.revoke()
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def enable_token(serial, enable=True, user=None):
"""
Enable or disable a token, or all tokens of a single user.
This can be checked with is_token_active.
Enabling an already active token will return 0.
:param serial: The serial number of the token
:type serial: basestring
:param enable: False is the token should be disabled
:type enable: bool
:param user: all tokens of the user will be enabled or disabled
:type user: User object
:return: Number of tokens that were enabled/disabled
:rtype:
"""
# We search for all matching tokens first, in case the user has
# provided a wrong serial number. Then we filter for the desired tokens.
tokenobject_list = get_tokens_from_serial_or_user(user=user, serial=serial)
count = 0
for tokenobject in tokenobject_list:
if tokenobject.is_active() == (not enable):
tokenobject.enable(enable)
tokenobject.save()
count += 1
return count
def is_token_active(serial):
"""
Return True if the token is active, otherwise false
Raise ResourceError if the token could not be found.
:param serial: The serial number of the token
:type serial: basestring
:return: True or False
:rtype: bool
"""
tokenobject = get_one_token(serial=serial)
return tokenobject.token.active
@log_with(log)
@check_user_or_serial
def set_otplen(serial, otplen=6, user=None):
"""
Set the otp length of the token defined by serial or for all tokens of
the user.
The OTP length is usually 6 or 8.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param otplen: The length of the OTP value
:type otplen: int
:param user: The owner of the tokens
:type user: User object
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_otplen(otplen)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_hashlib(serial, hashlib="sha1", user=None):
"""
Set the hashlib in the tokeninfo.
Can be something like sha1, sha256...
:param serial: The serial number of the token (exact)
:type serial: basestring
:param hashlib: The hashlib of the token
:type hashlib: basestring
:param user: The User, for who's token the hashlib should be set
:type user: User object
:return: the number of token infos set
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_hashlib(hashlib)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_count_auth(serial, count, user=None, max=False, success=False):
"""
The auth counters are stored in the token info database field.
There are different counters, that can be set
count_auth -> max=False, success=False
count_auth_max -> max=True, success=False
count_auth_success -> max=False, success=True
count_auth_success_max -> max=True, success=True
:param count: The counter value
:type count: int
:param user: The user owner of the tokens tokens to modify
:type user: User object
:param serial: The serial number of the one token to modifiy (exact)
:type serial: basestring
:param max: True, if either count_auth_max or count_auth_success_max are
to be modified
:type max: bool
:param success: True, if either count_auth_success or
count_auth_success_max are to be modified
:type success: bool
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
if max:
if success:
tokenobject.set_count_auth_success_max(count)
else:
tokenobject.set_count_auth_max(count)
else:
if success:
tokenobject.set_count_auth_success(count)
else:
tokenobject.set_count_auth(count)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def add_tokeninfo(serial, info, value=None,
value_type=None,
user=None):
"""
Sets a token info field in the database. The info is a dict for each
token of key/value pairs.
:param serial: The serial number of the token
:type serial: basestring
:param info: The key of the info in the dict
:param value: The value of the info
:param value_type: The type of the value. If set to "password" the value
is stored encrypted
:type value_type: basestring
:param user: The owner of the tokens, that should be modified
:type user: User object
:return: the number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.add_tokeninfo(info, value)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def delete_tokeninfo(serial, key, user=None):
"""
Delete a specific token info field in the database.
:param serial: The serial number of the token
:type serial: basestring
:param key: The key of the info in the dict
:param value: The value of the info
:param user: The owner of the tokens, that should be modified
:type user: User object
:return: the number of tokens matching the serial and user. This number also includes tokens that did not have
the token info *key* set in the first place!
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.del_tokeninfo(key)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_validity_period_start(serial, user, start):
"""
Set the validity period for the given token.
:param serial: serial number (exact)
:param user:
:param start: Timestamp in the format DD/MM/YY HH:MM
:type start: basestring
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_validity_period_start(start)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_validity_period_end(serial, user, end):
"""
Set the validity period for the given token.
:param serial: serial number (exact)
:param user:
:param end: Timestamp in the format DD/MM/YY HH:MM
:type end: basestring
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_validity_period_end(end)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_sync_window(serial, syncwindow=1000, user=None):
"""
The sync window is the window that is used during resync of a token.
Such many OTP values are calculated ahead, to find the matching otp value
and counter.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param syncwindow: The size of the sync window
:type syncwindow: int
:param user: The owner of the tokens, which should be modified
:type user: User object
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_sync_window(syncwindow)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_count_window(serial, countwindow=10, user=None):
"""
The count window is used during authentication to find the matching OTP
value. This sets the count window per token.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param countwindow: the size of the window
:type countwindow: int
:param user: The owner of the tokens, which should be modified
:type user: User object
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_count_window(countwindow)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_description(serial, description, user=None):
"""
Set the description of a token
:param serial: The serial number of the token (exact)
:type serial: basestring
:param description: The description for the token
:type description: int
:param user: The owner of the tokens, which should be modified
:type user: User object
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_description(description)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_failcounter(serial, counter, user=None):
"""
Set the fail counter of a token.
:param serial: The serial number of the token (exact)
:param counter: THe counter to which the fail counter should be set
:param user: An optional user
:return: Number of tokens, where the fail counter was set.
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_failcount(counter)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_user_or_serial
def set_max_failcount(serial, maxfail, user=None):
"""
Set the maximum fail counts of tokens. This is the maximum number a
failed authentication is allowed.
:param serial: The serial number of the token (exact)
:type serial: basestring
:param maxfail: The maximum allowed failed authentications
:type maxfail: int
:param user: The owner of the tokens, which should be modified
:type user: User object
:return: number of modified tokens
:rtype: int
"""
tokenobject_list = get_tokens_from_serial_or_user(serial=serial, user=user)
for tokenobject in tokenobject_list:
tokenobject.set_maxfail(maxfail)
tokenobject.save()
return len(tokenobject_list)
@log_with(log)
@check_copy_serials
def copy_token_pin(serial_from, serial_to):
"""
This function copies the token PIN from one token to the other token.
This can be used for workflows like lost token.
In fact the PinHash and the PinSeed are transferred
:param serial_from: The token to copy from
:type serial_from: basestring
:param serial_to: The token to copy to
:type serial_to: basestring
:return: True. In case of an error raise an exception
:rtype: bool
"""
tokenobject_from = get_one_token(serial=serial_from)
tokenobject_to = get_one_token(serial=serial_to)
pinhash, seed = tokenobject_from.get_pin_hash_seed()
tokenobject_to.set_pin_hash_seed(pinhash, seed)
tokenobject_to.save()
return True
@check_copy_serials
def copy_token_user(serial_from, serial_to):
"""
This function copies the user from one token to the other token.
In fact the user_id, resolver and resolver type are transferred.
:param serial_from: The token to copy from
:type serial_from: basestring
:param serial_to: The token to copy to
:type serial_to: basestring
:return: True. In case of an error raise an exception
:rtype: bool
"""
tokenobject_from = get_one_token(serial=serial_from)
tokenobject_to = get_one_token(serial=serial_to)
user_id = tokenobject_from.token.user_id
resolver = tokenobject_from.token.resolver
resolver_type = tokenobject_from.token.resolver_type
tokenobject_to.set_user_identifiers(user_id, resolver,
resolver_type)
copy_token_realms(serial_from, serial_to)
tokenobject_to.save()
return True
@check_copy_serials
def copy_token_realms(serial_from, serial_to):
"""
Copy the realms of one token to the other token
:param serial_from: The token to copy from
:param serial_to: The token to copy to
:return: None
"""
tokenobject_from = get_one_token(serial=serial_from)
tokenobject_to = get_one_token(serial=serial_to)
realm_list = tokenobject_from.token.get_realms()
tokenobject_to.set_realms(realm_list)
@log_with(log)
@libpolicy(config_lost_token)
def lost_token(serial, new_serial=None, password=None,
validity=10, contents="8", pw_len=16, options=None):
"""
This is the workflow to handle a lost token.
The token <serial> is lost and will be disabled. A new token of type
password token will be created and assigned to the user.
The PIN of the lost token will be copied to the new token.
The new token will have a certain validity period.
:param serial: Token serial number
:param new_serial: new serial number
:param password: new password
:param validity: Number of days, the new token should be valid
:type validity: int
:param contents: The contents of the generated password. "C": upper case
characters, "c": lower case characters, "n": digits and "s": special
characters, "8": base58
:type contents: A string like "Ccn"
:param pw_len: The length of the generated password
:type pw_len: int
:param options: optional values for the decorator passed from the upper
API level
:type options: dict
:return: result dictionary
"""
res = {}
new_serial = new_serial or "lost{0!s}".format(serial)
user = get_token_owner(serial)
log.debug("doing lost token for serial {0!r} and user {1!r}".format(serial, user))
if user is None or user.is_empty():
err = "You can only define a lost token for an assigned token."
log.warning("{0!s}".format(err))
raise TokenAdminError(err, id=2012)
character_pool = "{0!s}{1!s}{2!s}".format(string.ascii_lowercase,
string.ascii_uppercase, string.digits)
if contents != "":
character_pool = ""
if "c" in contents:
character_pool += string.ascii_lowercase
if "C" in contents:
character_pool += string.ascii_uppercase
if "n" in contents:
character_pool += string.digits
if "s" in contents:
character_pool += "!#$%&()*+,-./:;<=>?@[]^_"
if "8" in contents:
character_pool += BASE58
if password is None:
password = generate_password(size=pw_len, characters=character_pool)
res['serial'] = new_serial
tokenobject = init_token({"otpkey": password, "serial": new_serial,
"type": "pw",
"description": "temporary replacement for {0!s}".format(
serial)})
res['init'] = tokenobject is not None
if res['init']:
res['user'] = copy_token_user(serial, new_serial)
res['pin'] = copy_token_pin(serial, new_serial)
# set validity period
end_date = (datetime.datetime.now(tzlocal())
+ datetime.timedelta(days=validity)).strftime(DATE_FORMAT)
tokenobject_list = get_tokens(serial=new_serial)
for tokenobject in tokenobject_list:
tokenobject.set_validity_period_end(end_date)
# fill results
res['valid_to'] = "xxxx"
res['password'] = password
res['end_date'] = end_date
# disable token
res['disable'] = enable_token(serial, enable=False)
return res
@log_with(log)
def check_realm_pass(realm, passw, options=None):
"""
This function checks, if the given passw matches any token in the given
realm. This can be used for the 4-eyes token.
Only tokens that are assigned are tested.
It returns the res True/False and a reply_dict, which contains the
serial number of the matching token.
:param realm: The realm of the user
:param passw: The password containing PIN+OTP
:param options: Additional options that are passed to the tokens
:type options: dict
:return: tuple of bool and dict
"""
res = False
reply_dict = {}
# since an attacker does not know, which token is tested, we restrict to
# only active tokens. He would not guess that the given OTP value is that
# of an inactive token.
tokenobject_list = get_tokens(realm=realm, assigned=True, active=True)
if not tokenobject_list:
res = False
reply_dict["message"] = "There is no active and assigned token in " \
"this realm"
else:
res, reply_dict = check_token_list(tokenobject_list, passw,
options=options)
return res, reply_dict
@log_with(log)
@libpolicy(auth_lastauth)
def check_serial_pass(serial, passw, options=None):
"""
This function checks the otp for a given serial
If the OTP matches, True is returned and the otp counter is increased.
The function tries to determine the user (token owner), to derive possible
additional policies from the user.
:param serial: The serial number of the token
:type serial: basestring
:param passw: The password usually consisting of pin + otp
:type passw: basestring
:param options: Additional options. Token specific.
:type options: dict
:return: tuple of result (True, False) and additional dict
:rtype: tuple
"""
reply_dict = {}
tokenobject = get_one_token(serial=serial)
res, reply_dict = check_token_list([tokenobject], passw,
user=tokenobject.user,
options=options)
return res, reply_dict
@log_with(log)
def check_otp(serial, otpval):
"""
This function checks the OTP for a given serial number
:param serial:
:param otpval:
:return:
"""
reply_dict = {}
tokenobject = get_one_token(serial=serial)
res = tokenobject.check_otp(otpval) >= 0
if not res:
reply_dict["message"] = "OTP verification failed."
return res, reply_dict
@libpolicy(auth_cache)
@libpolicy(auth_user_does_not_exist)
@libpolicy(auth_user_has_no_token)
@libpolicy(auth_user_timelimit)
@libpolicy(auth_lastauth)
@libpolicy(auth_user_passthru)
@log_with(log, hide_kwargs=["passw"])
def check_user_pass(user, passw, options=None):
"""
This function checks the otp for a given user.
It is called by the API /validate/check
If the OTP matches, True is returned and the otp counter is increased.
:param user: The user who is trying to authenticate
:type user: User object
:param passw: The password usually consisting of pin + otp
:type passw: basestring
:param options: Additional options. Token specific.
:type options: dict
:return: tuple of result (True, False) and additional dict
:rtype: tuple
"""
tokenobject_list = get_tokens(user=user)
reply_dict = {}
if not tokenobject_list:
# The user has no tokens assigned
res = False
reply_dict["message"] = "The user has no tokens assigned"
else:
tokenobject = tokenobject_list[0]
res, reply_dict = check_token_list(tokenobject_list, passw,
user=tokenobject.user,
options=options)
return res, reply_dict
@log_with(log)
def check_token_list(tokenobject_list, passw, user=None, options=None):
"""
this takes a list of token objects and tries to find the matching token
for the given passw. It also tests,
* if the token is active or
* the max fail count is reached,
* if the validity period is ok...
This function is called by check_serial_pass, check_user_pass and
check_yubikey_pass.
:param tokenobject_list: list of identified tokens
:param passw: the provided passw (mostly pin+otp)
:param user: the identified use - as class object
:param options: additional parameters, which are passed to the token
:return: tuple of success and optional response
:rtype: (bool, dict)
"""
res = False
reply_dict = {}
increase_auth_counters = not is_true(get_from_config(key="no_auth_counter"))
# add the user to the options, so that every token, that get passed the
# options can see the user
if options:
options = options.copy()
else:
options = {}
options.update({'user': user})
# if there has been one token in challenge mode, we only handle challenges
challenge_response_token_list = []
challenge_request_token_list = []
pin_matching_token_list = []
invalid_token_list = []
valid_token_list = []
# Remove locked tokens from tokenobject_list
if len(tokenobject_list) > 1:
for tokenobject in tokenobject_list:
if tokenobject.is_revoked():
tokenobject_list.remove(tokenobject)
if len(tokenobject_list) == 0:
# If there is no unlocked token left.
raise TokenAdminError(_("This action is not possible, since the "
"token is locked"), id=1007)
for tokenobject in tokenobject_list:
log.debug("Found user with loginId {0!r}: {1!r}".format(
tokenobject.user, tokenobject.get_serial()))
if tokenobject.is_challenge_response(passw, user=user, options=options):
# This is a challenge response
challenge_response_token_list.append(tokenobject)
elif tokenobject.is_challenge_request(passw, user=user,
options=options):
# This is a challenge request
challenge_request_token_list.append(tokenobject)
else:
# This is a normal authentication attempt
try:
pin_match, otp_count, repl = \
tokenobject.authenticate(passw, user, options=options)
except TokenAdminError as tae:
# Token is locked
pin_match = False
otp_count = -1
repl = {'message': tae.message}
repl = repl or {}
reply_dict.update(repl)
if otp_count >= 0:
# This is a successful authentication
valid_token_list.append(tokenobject)
elif pin_match:
# The PIN of the token matches
pin_matching_token_list.append(tokenobject)
else:
# Nothing matches at all
invalid_token_list.append(tokenobject)
"""
There might be
2 in pin_matching_token_list
0 in valid_token_list
10 in invalid_token_list
0 in challenge_token_list.
in this case, the failcounter of the 2 tokens in pin_matchting_token_list
needs to be increased. And return False
If there is
0 pin_matching
0 valid
10 invalid
0 challenge
AND incFailCountOnFalsePin is True, then the failcounter of the
10 invalid tokens need to be increased. And return False
If there is
X pin_matching
1+ valid
X invalid
0 challenge
Then the authentication with the valid tokens was successful and the
<count> of the valid tokens need to be increased to the new count.
"""
if valid_token_list:
# One ore more successfully authenticating tokens found
# We need to return success
message_list = ["matching {0:d} tokens".format(len(valid_token_list))]
# write serial numbers or something to audit log
for token_obj in valid_token_list:
if increase_auth_counters:
token_obj.inc_count_auth_success()
# Check if the max auth is succeeded
if token_obj.check_all(message_list):
# The token is active and the auth counters are ok.
res = True
if not reply_dict.get("type"):
reply_dict["type"] = token_obj.token.tokentype
if reply_dict["type"] != token_obj.token.tokentype:
reply_dict["type"] = "undetermined"
# reset the failcounter of valid token
try:
token_obj.reset()
except Exception as _e:
# In some cases (Registration Token) the token does not
# exist anymore. So this would bail an exception!
log.debug("registration token does not exist anymore and "
"cannot be reset.")
if len(valid_token_list) == 1:
# If only one token was found, we add the serial number,
# the token type and the OTP length
reply_dict["serial"] = valid_token_list[0].token.serial
reply_dict["type"] = valid_token_list[0].token.tokentype
reply_dict["otplen"] = valid_token_list[0].token.otplen
# If exist, add next pin and next password change
next_pin = valid_token_list[0].get_tokeninfo("next_pin_change")
if next_pin:
reply_dict["next_pin_change"] = next_pin
reply_dict["pin_change"] = valid_token_list[0].is_pin_change()
next_passw = valid_token_list[0].get_tokeninfo(
"next_password_change")
if next_passw:
reply_dict["next_password_change"] = next_passw
reply_dict["password_change"] = valid_token_list[
0].is_pin_change(password=True)
reply_dict["message"] = ", ".join(message_list)
# Check if we should reset ALL tokens of the user.
g = options.get("g")
if g:
clientip = options.get("clientip")
policy_object = g.policy_object
token_owner = valid_token_list[0].user
reset_all = policy_object.get_policies(
action=ACTION.RESETALLTOKENS,
scope=SCOPE.AUTH,
realm=token_owner.login if token_owner else None,
user=token_owner.realm if token_owner else None,
client=clientip, active=True,
audit_data=g.audit_object.audit_data)
if reset_all:
log.debug("Reset failcounter of all tokens of {0!s}".format(
token_owner))
for tok_obj_reset in tokenobject_list:
try:
tok_obj_reset.reset()
except Exception:
log.debug(
"registration token does not exist anymore and "
"cannot be reset.")
elif challenge_response_token_list:
# The RESPONSE for a previous request of a challenge response token was
# found.
for tokenobject in challenge_response_token_list:
if tokenobject.check_challenge_response(passw=passw,
options=options) >= 0:
reply_dict["serial"] = tokenobject.token.serial
if tokenobject.is_active():
# OTP matches
res = True
if increase_auth_counters:
tokenobject.inc_count_auth_success()
reply_dict["message"] = "Found matching challenge"
tokenobject.challenge_janitor()
# clean up all other challenges from other tokens. I.e.
# all challenges with this very transaction_id!
transaction_id = options.get("transaction_id") or \
options.get("state")
Challenge.query.filter(Challenge.transaction_id == u'' +
transaction_id).delete()
# Reset the fail counter of the challenge response token
tokenobject.reset()
# We have one successful authentication, so we bail out
break
else:
reply_dict["message"] = "Challenge matches, but token is inactive."
log.info("Received a valid response to a "
"challenge for inactive token {0!s}".format(tokenobject.token.serial))
elif challenge_request_token_list:
# This is the initial REQUEST of a challenge response token
active_challenge_token = [t for t in challenge_request_token_list
if t.token.active ]
if len(active_challenge_token) == 0:
reply_dict["message"] = "No active challenge response token found"
else:
reply_dict["multi_challenge"] = []
transaction_id = None
message_list = []
for token_obj in active_challenge_token:
# Check if the max auth is succeeded
if token_obj.check_all(message_list):
r_chal, message, transaction_id, attributes = \
token_obj.create_challenge(
transactionid=transaction_id, options=options)
# Add the reply to the response
message_list.append(message)
reply_dict["message"] = ", ".join(message_list)
if r_chal:
challenge_info = {}
challenge_info["transaction_id"] = transaction_id
challenge_info["attributes"] = attributes
challenge_info["serial"] = token_obj.token.serial
# If exist, add next pin and next password change
next_pin = challenge_request_token_list[0].get_tokeninfo(
"next_pin_change")
if next_pin:
challenge_info["next_pin_change"] = next_pin
challenge_info["pin_change"] = \
challenge_request_token_list[0].is_pin_change()
next_passw = challenge_request_token_list[0].get_tokeninfo(
"next_password_change")
if next_passw:
challenge_info["next_password_change"] = next_passw
challenge_info["password_change"] = \
challenge_request_token_list[0].is_pin_change(
password=True)
for k, v in challenge_info.items():
reply_dict[k] = v
reply_dict["multi_challenge"].append(challenge_info)
elif pin_matching_token_list:
# We did not find a valid token and no challenge.
# But there are tokens, with a matching pin.
# So we increase the failcounter. Return failure.
for tokenobject in pin_matching_token_list:
tokenobject.inc_failcount()
reply_dict["message"] = "wrong otp value"
if len(pin_matching_token_list) == 1:
# If there is only one pin matching token, we look if it was
# a previous OTP value
token = pin_matching_token_list[0]
_r, pin, otp = token.split_pin_pass(passw)
if token.is_previous_otp(otp):
reply_dict["message"] += ". previous otp used again"
if increase_auth_counters:
for token_obj in pin_matching_token_list:
token_obj.inc_count_auth()
# write the serial numbers to the audit log
if len(pin_matching_token_list) == 1:
reply_dict["serial"] = pin_matching_token_list[0].token.serial
reply_dict["type"] = pin_matching_token_list[0].token.tokentype
reply_dict["otplen"] = pin_matching_token_list[0].token.otplen
elif invalid_token_list:
# There were only tokens, that did not match the OTP value and
# not even the PIN.
# Depending of IncFailCountOnFalsePin, we increase the failcounter.
reply_dict["message"] = "wrong otp pin"
if get_inc_fail_count_on_false_pin():
for tokenobject in invalid_token_list:
tokenobject.inc_failcount()
if increase_auth_counters:
tokenobject.inc_count_auth()
return res, reply_dict
def get_dynamic_policy_definitions(scope=None):
"""
This returns the dynamic policy definitions that come with the new loaded
token classes.
:param scope: an optional scope parameter. Only return the policies of
this scope.
:return: The policy definition for the token or only for the scope.
"""
from privacyidea.lib.policy import SCOPE, MAIN_MENU, GROUP
pol = {SCOPE.ADMIN: {},
SCOPE.USER: {},
SCOPE.AUTH: {},
SCOPE.ENROLL: {},
SCOPE.AUTHZ: {}}
for ttype in get_token_types():
pol[SCOPE.ADMIN]["enroll{0!s}".format(ttype.upper())] \
= {'type': 'bool',
'desc': _(u"Admin is allowed to initalize {0!s} tokens.").format(ttype.upper()),
'mainmenu': [MAIN_MENU.TOKENS],
'group': GROUP.ENROLLMENT}
conf = get_tokenclass_info(ttype, section='user')
if 'enroll' in conf:
pol[SCOPE.USER]["enroll{0!s}".format(ttype.upper())] = {
'type': 'bool',
'desc': _(u"The user is allowed to enroll a {0!s} token.").format(ttype.upper()),
'mainmenu': [MAIN_MENU.TOKENS],
'group': GROUP.ENROLLMENT}
# now merge the dynamic Token policy definition
# into the global definitions
policy = get_tokenclass_info(ttype, section='policy')
# get all policy sections like: admin, user, enroll, auth, authz
pol_keys = list(pol)
for pol_section in policy.keys():
# if we have a dyn token definition of this section type
# add this to this section - and make sure, that it is
# then token type prefixed
if pol_section in pol_keys:
pol_entry = policy.get(pol_section)
for pol_def in pol_entry:
set_def = pol_def
if pol_def.startswith(ttype) is not True:
set_def = '{0!s}_{1!s}'.format(ttype, pol_def)
pol[pol_section][set_def] = pol_entry.get(pol_def)
# If the token class should provide specific PIN policies, now merge
# PIN policies
pin_scopes = get_tokenclass_info(ttype, section='pin_scopes') or []
for pin_scope in pin_scopes:
pol[pin_scope]['{0!s}_otp_pin_maxlength'.format(ttype.lower())] = {
'type': 'int',
'value': list(range(0, 32)),
"desc": _("Set the maximum allowed PIN length of the {0!s}"
" token.").format(ttype.upper()),
'group': GROUP.PIN
}
pol[pin_scope]['{0!s}_otp_pin_minlength'.format(ttype.lower())] = {
'type': 'int',
'value': list(range(0, 32)),
"desc": _("Set the minimum required PIN length of the {0!s}"
" token.").format(ttype.upper()),
'group': GROUP.PIN
}
pol[pin_scope]['{0!s}_otp_pin_contents'.format(ttype.lower())] = {
'type': 'str',
"desc": _("Specifiy the required PIN contents of the "
"{0!s} token. "
"(c)haracters, (n)umeric, "
"(s)pecial, (o)thers. [+/-]!").format(ttype.upper()),
'group': GROUP.PIN
}
# return sub section, if scope is defined
# make sure that scope is in the policy key
# e.g. scope='_' is undefined and would break
if scope and scope in pol:
pol = pol[scope]
return pol