Skip to content

Commit

Permalink
Set Axes request attributes in middleware
Browse files Browse the repository at this point in the history
Fixes #415

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
  • Loading branch information
aleksihakli committed Mar 3, 2019
1 parent e323b19 commit 3bece1a
Show file tree
Hide file tree
Showing 15 changed files with 174 additions and 170 deletions.
20 changes: 7 additions & 13 deletions axes/attempts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

from django.contrib.auth import get_user_model
from django.db.models import QuerySet
from django.http import HttpRequest
from django.utils.timezone import datetime, now

from axes.conf import settings
from axes.request import AxesHttpRequest
from axes.models import AccessAttempt
from axes.helpers import (
get_client_ip_address,
get_client_username,
get_client_user_agent,
get_client_parameters,
get_cool_off,
)
Expand All @@ -29,21 +27,19 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
return attempt_time - get_cool_off()


def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""

username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)

filter_kwargs = get_client_parameters(username, ip_address, user_agent)
filter_kwargs = get_client_parameters(username, request.axes_ip_address, request.axes_user_agent)

return AccessAttempt.objects.filter(**filter_kwargs)


def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> QuerySet:
def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
"""
Get valid user attempts that match the given request and credentials.
"""
Expand All @@ -54,7 +50,7 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_ti
log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured')
return attempts

threshold = get_cool_off_threshold(attempt_time)
threshold = get_cool_off_threshold(request.axes_attempt_time)
log.debug('AXES: Getting access attempts that are newer than %s', threshold)
return attempts.filter(attempt_time__gte=threshold)

Expand All @@ -74,7 +70,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
return count


def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
Expand All @@ -87,9 +83,7 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
return count




def is_user_attempt_whitelisted(request: HttpRequest, credentials: dict = None) -> bool:
def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.
Expand Down
3 changes: 2 additions & 1 deletion axes/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_credentials, get_lockout_message
from axes.request import AxesHttpRequest


class AxesBackend(ModelBackend):
"""
Authentication backend that forbids login attempts for locked out users.
"""

def authenticate(self, request, username=None, password=None, **kwargs):
def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs):
"""
Check user lock out status and raises PermissionDenied if user is not allowed to log in.
Expand Down
3 changes: 0 additions & 3 deletions axes/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,6 @@ class AxesAppConf(AppConf):

IP_BLACKLIST = None

# if no attribute is set by your backend, a value is calculated dynamically with the ipware package
CLIENT_IP_ATTRIBUTE = 'axes_client_ip'

# message to show when locked out and have cooloff enabled
COOLOFF_MESSAGE = _('Account locked: too many login attempts. Please try again later')

Expand Down
4 changes: 2 additions & 2 deletions axes/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_lockout_response
from axes.request import AxesHttpRequest


def axes_dispatch(func):
def inner(request, *args, **kwargs):
def inner(request: AxesHttpRequest, *args, **kwargs):
if AxesProxyHandler.is_allowed(request):
return func(request, *args, **kwargs)

Expand All @@ -22,5 +23,4 @@ def inner(self, *args, **kwargs):

return get_lockout_response(self.request)


return inner
22 changes: 10 additions & 12 deletions axes/handlers/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from django.http import HttpRequest
from django.utils.timezone import datetime

from axes.conf import settings
from axes.helpers import (
is_client_ip_address_blacklisted,
is_client_ip_address_whitelisted,
is_client_method_whitelisted,
)
from axes.request import AxesHttpRequest


class AxesBaseHandler: # pylint: disable=unused-argument
Expand All @@ -28,7 +26,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument
and define the class to be used with ``settings.AXES_HANDLER = 'dotted.full.path.to.YourClass'``.
"""

def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool:
def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the user is allowed to access or use given functionality such as a login view or authentication.
Expand All @@ -54,17 +52,17 @@ def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool:

return True

def user_login_failed(self, sender, credentials: dict, request: HttpRequest = None, **kwargs):
def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs):
"""
Handle the Django user_login_failed authentication signal.
"""

def user_logged_in(self, sender, request: HttpRequest, user, **kwargs):
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs):
"""
Handle the Django user_logged_in authentication signal.
"""

def user_logged_out(self, sender, request: HttpRequest, user, **kwargs):
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
"""
Handle the Django user_logged_out authentication signal.
"""
Expand All @@ -79,7 +77,7 @@ def post_delete_access_attempt(self, instance, **kwargs):
Handle the Axes AccessAttempt object post delete signal.
"""

def is_blacklisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Check if the request or given credentials are blacklisted from access.
"""
Expand All @@ -89,7 +87,7 @@ def is_blacklisted(self, request: HttpRequest, credentials: dict = None) -> bool

return False

def is_whitelisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Check if the request or given credentials are whitelisted for access.
"""
Expand All @@ -102,17 +100,17 @@ def is_whitelisted(self, request: HttpRequest, credentials: dict = None) -> bool

return False

def is_locked(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> bool:
def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the request or given credentials are locked.
"""

if settings.AXES_LOCK_OUT_AT_FAILURE:
return self.get_failures(request, credentials, attempt_time) >= settings.AXES_FAILURE_LIMIT
return self.get_failures(request, credentials) >= settings.AXES_FAILURE_LIMIT

return False

def get_failures(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> int:
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
"""
Check the number of failures associated to the given request and credentials.
"""
Expand Down
35 changes: 15 additions & 20 deletions axes/handlers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,15 @@

from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.request import AxesHttpRequest
from axes.handlers.base import AxesBaseHandler
from axes.signals import user_locked_out
from axes.helpers import (
get_cache,
get_cache_timeout,
get_client_cache_key,
get_client_ip_address,
get_client_path_info,
get_client_str,
get_client_username,
get_client_user_agent,
get_credentials,
)

Expand All @@ -28,11 +26,17 @@ def __init__(self):
self.cache = get_cache()
self.cache_timeout = get_cache_timeout()

def get_failures(self, request, credentials=None, attempt_time=None) -> int:
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
cache_key = get_client_cache_key(request, credentials)
return self.cache.get(cache_key, default=0)

def user_login_failed(self, sender, credentials, request=None, **kwargs): # pylint: disable=too-many-locals
def user_login_failed(
self,
sender,
credentials: dict,
request: AxesHttpRequest = None,
**kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save attempt record in cache and lock user out if necessary.
Expand All @@ -44,10 +48,7 @@ def user_login_failed(self, sender, credentials, request=None, **kwargs): # pyl
return

username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)

if self.is_whitelisted(request, credentials):
log.info('AXES: Login failed from whitelisted client %s.', client_str)
Expand Down Expand Up @@ -78,22 +79,19 @@ def user_login_failed(self, sender, credentials, request=None, **kwargs): # pyl
'axes',
request=request,
username=username,
ip_address=ip_address,
ip_address=request.axes_ip_address,
)

raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')

def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""

username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)

log.info('AXES: Successful login by %s.', client_str)

Expand All @@ -103,11 +101,8 @@ def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=un
self.cache.delete(cache_key)
log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str)

def user_logged_out(self, sender, request, user, **kwargs):
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
username = user.get_username()
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)

log.info('AXES: Successful logout by %s.', client_str)
Loading

0 comments on commit 3bece1a

Please sign in to comment.