Skip to content

Commit

Permalink
Merge pull request #27 from sdelements/auth_throttle_ajax
Browse files Browse the repository at this point in the history
Auth throttle ajax
  • Loading branch information
funkaoshi committed Nov 26, 2013
2 parents d827d11 + eb052fa commit 8c9770f
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 81 deletions.
192 changes: 128 additions & 64 deletions security/auth_throttling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
# Copyright (c) 2011, SD Elements. See ../LICENSE.txt for details.

import json
import hashlib
import logging
from math import ceil
import re
import time # Monkeypatched by the tests.

from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth.forms import AuthenticationForm
from django.contrib.sites.models import get_current_site
from django.core.cache import cache
from django.forms import ValidationError
from django.core.exceptions import ImproperlyConfigured
from django.http import HttpResponse
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_protect

from django.conf import settings
from security.middleware import BaseMiddleware

logger = logging.getLogger(__name__)


class HttpResponseTooManyRequests(HttpResponse):
status_code = 429


def delay_message(remainder):
"""
A natural-language description of a delay period.
Expand Down Expand Up @@ -60,11 +66,12 @@ def increment_counters(**counters):
cache.set_many(existing)


def attempt_count(attempt_type, id):
def _extract_username(request):
"""
Only used by tests.
Look for the "username" in a request. If there is no valid username we
will simply be throttling on IP alone.
"""
return cache.get(_key(attempt_type, id), (0,))[0]
return request.POST.get("username", "notfound").lower()


def register_authentication_attempt(request):
Expand All @@ -73,35 +80,63 @@ def register_authentication_attempt(request):
authentication middleware. Adjusts the throttling counters based on whether
it succeeded or failed.
"""
(reset_counters if request.user.is_authenticated() else increment_counters
)(username=request.POST["username"], ip=request.META["REMOTE_ADDR"])
username = _extract_username(request)
ip = request.META["REMOTE_ADDR"]
if request.user.is_authenticated():
reset_counters(username=username, ip=ip)
else:
increment_counters(username=username, ip=ip)


def default_delay_function(account_attempt_count, ip_attempt_count):
"""
We throttle based on how many times we have seen a request from a
particular IP or username. This function will delay the third attempt on an
account for five seconds, and double that delay on every additional
failure, to a maximum of twenty-four hours. We do NOT delay based on IP.
Popular opinion is that IP based throttling doesn't belong in the
application layer.
"""
if account_attempt_count < 3:
return (0, 0)

twentyfour_hours = 60 * 60 * 24
account_delay = min(5 * 2 ** (account_attempt_count - 3), twentyfour_hours)

return (account_delay, 0)


def throttling_delay(username, ip, delay_function=default_delay_function):
"""
Return the greater of the delay periods called for by the username and
the IP of this login request.
"""
t = time.time()
acc_n, acc_t = cache.get(_key("username", username), (0, t))
ip_n, ip_t = cache.get(_key("ip", ip), (0, t))
acc_delay, ip_delay = delay_function(acc_n, ip_n)
return max(acc_t + acc_delay - t, ip_t + ip_delay - t)


def attempt_count(attempt_type, id):
"""
Only used by tests.
"""
return cache.get(_key(attempt_type, id), (0,))[0]


class _ThrottlingForm(AuthenticationForm):
def __init__(self, throttling_delay, *args, **kwargs):
def __init__(self, delay, *args, **kwargs):
super(_ThrottlingForm, self).__init__(*args, **kwargs)
self._errors = {"__all__":
self.error_class(["Due to the failure of previous "
"attempts, your login request "
"has been denied as a security "
"precaution. Please try again "
"in at least %s. " %
delay_message(throttling_delay)
])}


class Middleware:
"""
Performs authentication throttling by username and IP. Expects a settings
dict named AUTHENTICATION_THROTTLING with at least two elements,
LOGIN_URLS_WITH_TEMPLATES and DELAY_FUNCTION. The former is a list of
pairs of URL and template path. The latter is a function of two arguments,
called when a request is being considered for throttling: The first
argument is the number of failed attempts that have been made on the
username being supplied since the last success, and the second argument is
the number of attempts from the IP. The function should return a pair: The
number of seconds to delay the next attempt on that username, and the
number of seconds to delay the next attempt from that IP.
message = ("Due to the failure of previous attempts, your login "
"request has been denied as a security precaution. Please "
"try again in at least %s." % delay_message(delay))
self._errors = {"__all__": self.error_class([message])}


class Middleware(BaseMiddleware):
"""
Performs authentication throttling by username and IP.
Any POST request to one of the supplied login URLs is assumed to be a login
attempt. The Django cache is used to store failure counts and timestamps:
Expand All @@ -110,50 +145,75 @@ class Middleware:
that URL, with an error informing the user of the situation. Otherwise,
the request is allowed to continue, and a response handler checks
request.user to determine whether the login attempt succeeded.
There is only one required setting, AUTHENTICATION_THROTTLING, which
should contain the following information:
LOGIN_URLS_WITH_TEMPLATES - a list of pairs of URL and django
template paths. If the supplied template
in LOGIN_URLS_WITH_TEMPLATES is None we
simply return a HTTP 429 error.
DELAY_FUNCTION - a function of two arguments, called when a
request is being considered for throttling:
The first argument is the number of failed
attempts that have been made on the
username being supplied since the last
success, and the second argument is the
number of attempts from the IP. The
function should return a pair: The number
of seconds to delay the next attempt on
that username, and the number of seconds to
delay the next attempt from that IP.
REDIRECT_FIELD_NAME - used to override the default
REDIRECT_FIELD_NAME.
LOGIN_URLS_WITH_TEMPLATES is required. The other parameters are optional.
"""

def __init__(self):
REQUIRED_SETTINGS = ("AUTHENTICATION_THROTTLING",)

def load_setting(self, setting, value):
"""
Looks for a valid configuration in settings.AUTHENTICATION_THROTTLING.
If such is not found, the handlers are not installed.
"""
value = value or {}

try:
config = settings.AUTHENTICATION_THROTTLING
self.delay_function = config["DELAY_FUNCTION"]
self.logins = list(config["LOGIN_URLS_WITH_TEMPLATES"])
self.redirect_field_name = config.get("REDIRECT_FIELD_NAME",
REDIRECT_FIELD_NAME)
# TODO: Test the validity of the list items?
self.process_request = self._process_request_if_configured
self.process_response = self._process_response_if_configured
except:
logger.error("Bad AUTHENTICATION_THROTTLING dictionary. "
"AuthenticationThrottlingMiddleware disabled.")

def _throttling_delay(self, request):
"""
Return the greater of the delay periods called for by the username and
the IP of this login request.
"""
t = time.time()
acc_n, acc_t = cache.get(_key("username", request.POST["username"]),
(0, t))
ip_n, ip_t = cache.get(_key("ip", request.META["REMOTE_ADDR"]), (0, t))
acc_delay, ip_delay = self.delay_function(acc_n, ip_n)
return max(acc_t + acc_delay - t, ip_t + ip_delay - t)

def _process_request_if_configured(self, request):
self.logins = list(value["LOGIN_URLS_WITH_TEMPLATES"])
except KeyError:
raise ImproperlyConfigured(
"Bad AUTHENTICATION_THROTTLING dictionary. "
"AuthenticationThrottlingMiddleware disabled."
)

self.delay_function = value.get("DELAY_FUNCTION", default_delay_function)
self.redirect_field_name = value.get("REDIRECT_FIELD_NAME", REDIRECT_FIELD_NAME)

def process_request(self, request):
"""
Block the request if it is a login attempt to which a throttling delay
is applicable.
is applicable. We don't process requests that are not PUTs or POSTs.
"""
if request.method != "POST": return
if not (request.method == "POST" or request.method == "PUT"): return

for url, template_name in self.logins:
if request.path[1:] != url: continue
delay = self._throttling_delay(request)

username = _extract_username(request)
ip = request.META["REMOTE_ADDR"]

delay = throttling_delay(username, ip, self.delay_function)

if delay <= 0:
request.META["login_request_permitted"] = True
return
# else: throttle the request

if not template_name:
# we simply return HTTP 429 Too Many Requests
return HttpResponseTooManyRequests()

# update the login form to indicate the throttling error, which
# will be displayed to the user.
form = _ThrottlingForm(delay, request)
redirect_url = request.REQUEST.get(self.redirect_field_name, "")
current_site = get_current_site(request)
Expand All @@ -172,9 +232,13 @@ def _process_request_if_configured(self, request):
RequestContext(request))
)(request)

def _process_response_if_configured(self, request, response):
def process_response(self, request, response):
if request.META.get("login_request_permitted", False):
register_authentication_attempt(request)
return response

__all__ = [delay_message, increment_counters, attempt_count, reset_counters]

__all__ = [
delay_message, increment_counters, reset_counters, attempt_count,
default_delay_function, throttling_delay
]
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
long_description=readme,
maintainer="SD Elements",
maintainer_email="django-security@sdelements.com",
version="0.1.18b",
version="0.1.19b",
packages=["security", "security.migrations", "security.auth_throttling"],
url='https://github.com/sdelements/django-security',
classifiers=[
Expand Down

0 comments on commit 8c9770f

Please sign in to comment.