-
-
Notifications
You must be signed in to change notification settings - Fork 142
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow decoration of functions beyond the admin login #86
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
[run] | ||
omit = *_settings.py, defender/*migrations/*, defender/exampleapp/* | ||
omit = *_settings.py, defender/*migrations/*, defender/exampleapp/*, *test.py, | ||
*__init__.py, *tests.py, *urls.py |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,38 +1,54 @@ | ||
from . import utils | ||
|
||
import functools | ||
|
||
def watch_login(func): | ||
|
||
def watch_login(status_code=302, msg=''): | ||
""" | ||
Used to decorate the django.contrib.admin.site.login method. | ||
Used to decorate the django.contrib.admin.site.login method or | ||
any other function you want to protect by brute forcing. | ||
To make it work on normal functions just pass the status code that should | ||
indicate a failure and/or a string that will be checked within the | ||
response body. | ||
""" | ||
|
||
def decorated_login(request, *args, **kwargs): | ||
# if the request is currently under lockout, do not proceed to the | ||
# login function, go directly to lockout url, do not pass go, do not | ||
# collect messages about this login attempt | ||
if utils.is_already_locked(request): | ||
return utils.lockout_response(request) | ||
|
||
# call the login function | ||
response = func(request, *args, **kwargs) | ||
|
||
if request.method == 'POST': | ||
# see if the login was successful | ||
login_unsuccessful = ( | ||
response and | ||
not response.has_header('location') and | ||
response.status_code != 302 | ||
) | ||
|
||
# ideally make this background task, but to keep simple, keeping | ||
# it inline for now. | ||
utils.add_login_attempt_to_db(request, not login_unsuccessful) | ||
|
||
if utils.check_request(request, login_unsuccessful): | ||
return response | ||
|
||
return utils.lockout_response(request) | ||
|
||
return response | ||
|
||
def decorated_login(func): | ||
@functools.wraps(func) | ||
def wrapper(request, *args, **kwargs): | ||
# if the request is currently under lockout, do not proceed to the | ||
# login function, go directly to lockout url, do not pass go, do not | ||
# collect messages about this login attempt | ||
if utils.is_already_locked(request): | ||
return utils.lockout_response(request) | ||
|
||
# call the login function | ||
response = func(request, *args, **kwargs) | ||
|
||
if request.method == 'POST': | ||
# see if the login was successful | ||
if status_code == 302: # standard Django login view | ||
login_unsuccessful = ( | ||
response and | ||
not response.has_header('location') and | ||
response.status_code != status_code | ||
) | ||
else: | ||
# If msg is not passed the last condition will be evaluated | ||
# always to True so the first 2 will decide the result. | ||
login_unsuccessful = ( | ||
response and response.status_code == status_code | ||
and msg in response.content.decode('utf-8') | ||
) | ||
|
||
# ideally make this background task, but to keep simple, keeping | ||
# it inline for now. | ||
utils.add_login_attempt_to_db(request, not login_unsuccessful) | ||
|
||
if utils.check_request(request, login_unsuccessful): | ||
return response | ||
|
||
return utils.lockout_response(request) | ||
|
||
return response | ||
|
||
return wrapper | ||
return decorated_login |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,20 +3,25 @@ | |
import time | ||
from distutils.version import StrictVersion | ||
|
||
from mock import patch | ||
# Python 3 has mock in the stdlib | ||
try: | ||
from mock import patch | ||
except ImportError: | ||
from unittest.mock import patch | ||
|
||
from django import get_version | ||
from django.contrib.auth.models import User | ||
from django.contrib.auth.models import AnonymousUser | ||
from django.contrib.sessions.backends.db import SessionStore | ||
from django.core.urlresolvers import reverse | ||
from django.http import HttpRequest | ||
from django.http import HttpRequest, HttpResponse | ||
from django.test.client import RequestFactory | ||
from redis.client import Redis | ||
|
||
from . import utils | ||
from . import config | ||
from .connection import parse_redis_url, get_redis_connection | ||
from .decorators import watch_login | ||
from .models import AccessAttempt | ||
from .test import DefenderTestCase, DefenderTransactionTestCase | ||
|
||
|
@@ -687,7 +692,7 @@ def test_disable_username_lockout(self): | |
|
||
# try logging in with the same username, but different IPs. | ||
# we shouldn't be locked. | ||
for i in range(0, config.FAILURE_LIMIT+10): | ||
for i in range(0, config.FAILURE_LIMIT + 10): | ||
ip = '74.125.126.{0}'.format(i) | ||
response = self._login(username=username, remote_addr=ip) | ||
# Check if we are in the same login page | ||
|
@@ -727,6 +732,92 @@ def test_disable_username_lockout(self): | |
data_out = utils.get_blocked_usernames() | ||
self.assertEqual(data_out, []) | ||
|
||
@patch('defender.config.BEHIND_REVERSE_PROXY', True) | ||
@patch('defender.config.FAILURE_LIMIT', 3) | ||
def test_login_blocked_for_non_standard_login_views_without_msg(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add docstrings to the test methods explaining what each test does? Not all tests have them now, and I have been meaning to fix that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, no problem There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thx |
||
""" | ||
Check that a view wich returns the expected status code is causing | ||
the user to be locked out when we do not expect a specific message | ||
to be returned. | ||
""" | ||
|
||
@watch_login(status_code=401) | ||
def fake_api_401_login_view_without_msg(request): | ||
return HttpResponse(status=401) | ||
|
||
request_factory = RequestFactory() | ||
request = request_factory.post('api/login') | ||
request.user = AnonymousUser() | ||
request.session = SessionStore() | ||
|
||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' | ||
|
||
for _ in range(3): | ||
fake_api_401_login_view_without_msg(request) | ||
|
||
data_out = utils.get_blocked_ips() | ||
self.assertEqual(data_out, []) | ||
|
||
fake_api_401_login_view_without_msg(request) | ||
|
||
data_out = utils.get_blocked_ips() | ||
self.assertEqual(data_out, ['192.168.24.24']) | ||
|
||
@patch('defender.config.BEHIND_REVERSE_PROXY', True) | ||
@patch('defender.config.FAILURE_LIMIT', 3) | ||
def test_login_blocked_for_non_standard_login_views_with_msg(self): | ||
""" | ||
Check that a view wich returns the expected status code and the | ||
expected message is causing the IP to be locked out. | ||
""" | ||
@watch_login(status_code=401, msg='Invalid credentials') | ||
def fake_api_401_login_view_without_msg(request): | ||
return HttpResponse('Sorry, Invalid credentials', | ||
status=401) | ||
|
||
request_factory = RequestFactory() | ||
request = request_factory.post('api/login') | ||
request.user = AnonymousUser() | ||
request.session = SessionStore() | ||
|
||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' | ||
|
||
for _ in range(3): | ||
fake_api_401_login_view_without_msg(request) | ||
|
||
data_out = utils.get_blocked_ips() | ||
self.assertEqual(data_out, []) | ||
|
||
fake_api_401_login_view_without_msg(request) | ||
|
||
data_out = utils.get_blocked_ips() | ||
self.assertEqual(data_out, ['192.168.24.24']) | ||
|
||
@patch('defender.config.BEHIND_REVERSE_PROXY', True) | ||
@patch('defender.config.FAILURE_LIMIT', 3) | ||
def test_login_non_blocked_for_non_standard_login_views_different_msg(self): | ||
""" | ||
Check that a view wich returns the expected status code but not the | ||
expected message is not causing the IP to be locked out. | ||
""" | ||
@watch_login(status_code=401, msg='Invalid credentials') | ||
def fake_api_401_login_view_without_msg(request): | ||
return HttpResponse('Ups, wrong credentials', | ||
status=401) | ||
|
||
request_factory = RequestFactory() | ||
request = request_factory.post('api/login') | ||
request.user = AnonymousUser() | ||
request.session = SessionStore() | ||
|
||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' | ||
|
||
for _ in range(4): | ||
fake_api_401_login_view_without_msg(request) | ||
|
||
data_out = utils.get_blocked_ips() | ||
self.assertEqual(data_out, []) | ||
|
||
|
||
class DefenderTestCaseTest(DefenderTestCase): | ||
"""Make sure that we're cleaning the cache between tests""" | ||
|
@@ -759,6 +850,7 @@ def test_second_incr(self): | |
|
||
|
||
class TestUtils(DefenderTestCase): | ||
|
||
def test_username_blocking(self): | ||
username = 'foo' | ||
self.assertFalse(utils.is_user_already_locked(username)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that correct? I'm no expert when it comes to decorators so might be correct, it just looks weird with
watch_login()(auth_views.login)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks weird but it's correct. The old decorator couldn't accept arguments and had a deep of 2. I had to add one more level in order to accept arguments so the first thing the new decorator expects now are the arguments (or the defaults) while here a function was passed in first instance. Does it make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, thank you.