Skip to content

Commit

Permalink
Merge pull request from GHSA-45gq-q4xh-cp53
Browse files Browse the repository at this point in the history
Fix for username timing attack
  • Loading branch information
bartvanb committed Jan 18, 2024
2 parents eac19db + d07ffdd commit 389f416
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 51 deletions.
14 changes: 14 additions & 0 deletions docs/server/yaml/server_config.yaml
Expand Up @@ -157,3 +157,17 @@ ui:

# port at which the UI will be available on your local machine
port: 3456

# set password policies for the server
password_policy:
# maximum number of failed login attempts before the user is locked out for
# a certain amount of time. Default is 5.
max_failed_attempts: 5

# number of minutes the user is locked out after the maximum number of failed
# login attempts is reached. Default is 15.
inactivation_minutes: 15

# number of minutes to wait between emails that alert a user that someone is
# trying to log in to their account. Default is 60.
between_email_blocked_login_minutes: 60
5 changes: 5 additions & 0 deletions vantage6-server/vantage6/server/globals.py
Expand Up @@ -54,3 +54,8 @@
# pagination settings
DEFAULT_PAGE = 1
DEFAULT_PAGE_SIZE = 10

# default password policies
DEFAULT_MAX_FAILED_ATTEMPTS = 5
DEFAULT_INACTIVATION_MINUTES = 15
DEFAULT_BETWEEN_BLOCKED_LOGIN_EMAIL_MINUTES = 60
27 changes: 23 additions & 4 deletions vantage6-server/vantage6/server/model/user.py
Expand Up @@ -65,6 +65,7 @@ class User(Authenticatable):
failed_login_attempts = Column(Integer, default=0)
last_login_attempt = Column(DateTime)
otp_secret = Column(String(32))
last_email_failed_login_sent = Column(DateTime)

# relationships
organization = relationship("Organization", back_populates="users")
Expand Down Expand Up @@ -155,7 +156,7 @@ def check_password(self, pw: str) -> bool:
return False

def is_blocked(self, max_failed_attempts: int,
inactivation_in_minutes: int) -> tuple[bool, str | None]:
inactivation_in_minutes: int) -> tuple[bool, int | None]:
"""
Check if user can login or if they are temporarily blocked because they
entered a wrong password too often
Expand All @@ -164,15 +165,15 @@ def is_blocked(self, max_failed_attempts: int,
----------
max_failed_attempts: int
Maximum number of attempts to login before temporary deactivation
inactivation_minutes: int
inactivation_in_minutes: int
How many minutes an account is deactivated
Returns
-------
bool
Whether or not user is blocked temporarily
str | None
Message if user is blocked, else None
int | None
How many minutes user is still blocked for
"""
td_max_blocked = dt.timedelta(minutes=inactivation_in_minutes)
td_last_login = dt.datetime.now() - self.last_login_attempt \
Expand Down Expand Up @@ -213,6 +214,24 @@ def get_by_username(cls, username: str) -> User:
session.commit()
return result

@classmethod
def get_first_user(cls) -> User:
"""
Get a random user by their username.
This function is used to prevent an attacker from finding out which
usernames exist.
Returns
-------
User
A random user that is in the database
"""
session = DatabaseSessionManager.get_session()
result = session.query(cls).order_by(cls.id).first()
session.commit()
return result

@classmethod
def get_by_email(cls, email: str) -> User:
"""
Expand Down
196 changes: 149 additions & 47 deletions vantage6-server/vantage6/server/resource/common/auth_helper.py
@@ -1,14 +1,18 @@
import sys
import logging
import datetime as dt
import pyotp

from http import HTTPStatus
from flask import request, render_template
from flask import request, render_template, current_app, Flask
from flask_mail import Mail
from threading import Thread

from vantage6.common.globals import APPNAME, MAIN_VERSION_NAME
from vantage6.server.globals import (
DEFAULT_SUPPORT_EMAIL_ADDRESS, DEFAULT_EMAIL_FROM_ADDRESS
DEFAULT_SUPPORT_EMAIL_ADDRESS, DEFAULT_MAX_FAILED_ATTEMPTS,
DEFAULT_INACTIVATION_MINUTES, DEFAULT_BETWEEN_BLOCKED_LOGIN_EMAIL_MINUTES,
DEFAULT_EMAIL_FROM_ADDRESS
)
from vantage6.server.model.user import User

Expand Down Expand Up @@ -40,84 +44,182 @@ def user_login(
HTTPStatus:
Status code that the current request should return
"""
log.info(f"Trying to login '{username}'")
log.info("Trying to login '%s'", username)
failed_login_msg = "Failed to login"
if User.username_exists(username):
user = User.get_by_username(username)
password_policy = config.get("password_policy", {})
max_failed_attempts = password_policy.get('max_failed_attempts', 5)
inactivation_time = password_policy.get('inactivation_minutes', 15)

is_blocked, min_rem = user.is_blocked(max_failed_attempts,
inactivation_time)
if is_blocked:
notify_user_blocked(user, max_failed_attempts, min_rem, mail,
config)
return {"msg": failed_login_msg}, HTTPStatus.UNAUTHORIZED
elif user.check_password(password):
user.failed_login_attempts = 0
user.save()
return user, HTTPStatus.OK
else:
# update the number of failed login attempts
user.failed_login_attempts = 1 \
if (
not user.failed_login_attempts or
user.failed_login_attempts >= max_failed_attempts
) else user.failed_login_attempts + 1
user.last_login_attempt = dt.datetime.now()
user.save()

# check if username exists. If it does not, we continue anyway, to prevent
# that an attacker can find out which usernames exist via a timing attack.
# In that case, we fetch the first user as random user.
username_exists = User.username_exists(username)
random_username = User.get_first_user().username
user = User.get_by_username(username) if username_exists \
else User.get_by_username(random_username)

password_policy = config.get("password_policy", {})
max_failed_attempts = password_policy.get(
'max_failed_attempts', DEFAULT_MAX_FAILED_ATTEMPTS
)
inactivation_time = password_policy.get(
'inactivation_minutes', DEFAULT_INACTIVATION_MINUTES
)

is_blocked, min_rem = user.is_blocked(max_failed_attempts,
inactivation_time)

if user.check_password(password) and not is_blocked and username_exists:
# Note: above the username_exists is checked to prevent that an
# attacker happens to get the correct password for the random user
# that is returned when the username does not exist. Note also that
# the password is checked first to keep the timing equal for both.
user.failed_login_attempts = 0
user.save()
return user, HTTPStatus.OK

# Handle database updates required upon failed login in a separate thread
# to ensure similar response times
# pylint: disable=W0212
t1 = Thread(target=__handle_failed_login, args=(
current_app._get_current_object(), username_exists, username,
password_policy, is_blocked, min_rem, mail, config,
request.access_route[-1]
))
t1.start()

return {"msg": failed_login_msg}, HTTPStatus.UNAUTHORIZED


def notify_user_blocked(
user: User, max_n_attempts: int, min_rem: int, mail: Mail,
def __handle_failed_login(
app: Flask, user_exists: bool, username: str, password_policy: dict,
is_blocked: bool, min_rem: int, mail: Mail, config: dict, ip: str
) -> None:
"""
When a user login fails, this function is called to update the database
with the failed login attempt and send an email to the user if necessary.
Note that this function is called in a separate thread to keep response
times for login attempts similar in all cases. Therefore, this function
calls `sys.exit()` to terminate the thread.
Parameters
----------
app: flask.Flask
The current Flask app
user_exists: bool
Whether user exists or not
username: str
Username of the user that failed to login
password_policy: dict
Dictionary with password policy settings.
min_rem: int
Number of minutes remaining before the account is unlocked
mail: flask_mail.Mail
An instance of the Flask mail class. Used to send email to user in case
of too many failed login attempts.
config: dict
Dictionary with configuration settings
ip: str
IP address from where the login attempt was made
"""
if not user_exists:
sys.exit()
# get user object again (required because we are in a new thread)
user = User.get_by_username(username)

max_failed_attempts = password_policy.get(
'max_failed_attempts', DEFAULT_MAX_FAILED_ATTEMPTS
)

if is_blocked:
# alert the user via email that they are blocked
__notify_user_blocked(app, user, min_rem, mail, config, ip)
sys.exit()
elif (
not user.failed_login_attempts or
user.failed_login_attempts >= max_failed_attempts
):
# set failed login attempts to 1 if first failed login attempt or if
# user got unblocked after being blocked previously
user.failed_login_attempts = 1
else:
user.failed_login_attempts += 1
user.last_login_attempt = dt.datetime.now()
user.save()
sys.exit()


def __notify_user_blocked(
app: Flask, user: User, min_rem: int, mail: Mail, config: dict, ip: str
) -> None:
"""
Sends an email to the user when his or her account is locked
Sends an email to the user when their account is locked.
This function also checks that emails are not sent too often to the same
user.
Parameters
----------
app: flask.Flask
The current Flask app
user: :class:`~vantage6.server.model.user.User`
User who is temporarily blocked
max_n_attempts: int
Maximum number of failed login attempts before the account is locked
min_rem: int
Number of minutes remaining before the account is unlocked
mail: flask_mail.Mail
An instance of the Flask mail class. Used to send email to user in case
of too many failed login attempts.
config: dict
Dictionary with configuration settings
ip: str
IP address from where the login attempt was made
"""
if not user.email:
log.warning(f'User {user.username} is locked, but does not have'
'an email registered. So no message has been sent.')
log.info('User %s is locked. Sending them an email.', user.username)

log.info(f'User {user.username} is locked. Sending them an email.')
# check that email has not already been sent recently
password_policy = config.get("password_policy", {})
minutes_between_blocked_emails = password_policy.get(
'between_email_blocked_login_minutes',
DEFAULT_BETWEEN_BLOCKED_LOGIN_EMAIL_MINUTES
)
email_sent_recently = user.last_email_failed_login_sent and (
dt.datetime.now() < user.last_email_failed_login_sent +
dt.timedelta(minutes=minutes_between_blocked_emails)
)
if email_sent_recently:
return

# send email
smtp_settings = config.get("smtp", {})
email_from = smtp_settings.get("email_from", DEFAULT_EMAIL_FROM_ADDRESS)
support_email = config.get("support_email", DEFAULT_SUPPORT_EMAIL_ADDRESS)

max_failed_attempts = password_policy.get(
'max_failed_attempts', DEFAULT_MAX_FAILED_ATTEMPTS
)
template_vars = {
'firstname': user.firstname,
'number_of_allowed_attempts': max_n_attempts,
'ip': request.access_route[-1],
'firstname': user.firstname if user.firstname else user.username,
'number_of_allowed_attempts': max_failed_attempts,
'ip': ip,
'time': dt.datetime.now(dt.timezone.utc),
'time_remaining': min_rem,
'support_email': support_email,
}

mail.send_email(
"Failed login attempts on your vantage6 account",
sender=email_from,
recipients=[user.email],
text_body=render_template("mail/blocked_account.txt", **template_vars),
html_body=render_template("mail/blocked_account.html", **template_vars)
)
with app.app_context():
mail.send_email(
"Failed login attempts on your vantage6 account",
sender=email_from,
recipients=[user.email],
text_body=render_template(
"mail/blocked_account.txt", **template_vars
),
html_body=render_template(
"mail/blocked_account.html", **template_vars
)
)

# Update latest email sent timestamp
user.last_email_failed_login_sent = dt.datetime.now()
user.save()


def create_qr_uri(user: User) -> dict:
Expand Down

0 comments on commit 389f416

Please sign in to comment.