Skip to content

Commit

Permalink
more refactoring and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kencochrane committed Jan 1, 2015
1 parent bd96989 commit a8e721d
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 120 deletions.
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ to improve the login.
requirements
============

- django >= 1.6 (may work is previous versions, but not officially supported)
- django: 1.4.x, 1.6.x, 1.7.x
- redis


Expand Down Expand Up @@ -113,8 +113,8 @@ Cache backend:
cache keys:
-----------

- prefix:failed:ip:[ip] (count, expires)
- prefix:failed:username:[username] (count, expires)
- prefix:failed:ip:[ip] (count, TTL)
- prefix:failed:username:[username] (count, TTL)
- prefix:blocked:ip:[ip] (true, TTL)
- prefix:blocked:username:[username] (true, TTL)

Expand Down Expand Up @@ -211,11 +211,9 @@ reverse proxy IP address Default: ``HTTP_X_FORWARDED_FOR``
Default: ``defender``
* ``DEFENDER_LOCKOUT_URL``: The URL you want to redirect to if someone is
locked out.

* ``REDIS_HOST``: the host name for your redis server
* ``REDIS_PORT``: the host port for your redis server
* ``REDIS_PASSWORD``: the password for your redis server
* ``REDIS_DB``: the db number for your redis server
* ``DEFENDER_REDIS_URL``: the redis url for defender.
Default: ``redis://localhost:6379/0``
(Example with password: ``redis://:mypassword@localhost:6379/0``)


Running Tests
Expand Down
45 changes: 45 additions & 0 deletions defender/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from django.conf import settings
from django.utils.translation import ugettext_lazy


def get_setting(variable, default=None):
""" get the 'variable' from settings if not there use the
provided default """
return getattr(settings, variable, default)

# redis server host
DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL')

MOCK_REDIS = get_setting('DEFENDER_MOCK_REDIS', False)

# see if the user has overridden the failure limit
FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3)

USE_USER_AGENT = get_setting('DEFENDER_USE_USER_AGENT', False)

# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')

# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False)

# the prefix for these keys in your cache.
CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender')

# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')

# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = get_setting('DEFENDER_COOLOFF_TIME', 300) # seconds

LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE')

ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")

# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')

LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL')
52 changes: 52 additions & 0 deletions defender/connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import redis
try:
import urlparse
except ImportError:
import urllib.parse as urlparse

from . import config

# Register database schemes in URLs.
urlparse.uses_netloc.append("redis")


def get_redis_connection():
""" Get the redis connection """
redis_config = parse_redis_url(config.DEFENDER_REDIS_URL)
return redis.StrictRedis(
host=redis_config.get('HOST'),
port=redis_config.get('PORT'),
db=redis_config.get('DB'),
password=redis_config.get('PASSWORD'))


def parse_redis_url(url):
"""Parses a redis URL."""

# create config with some sane defaults
config = {
"DB": 0,
"PASSWORD": None,
"HOST": "localhost",
"PORT": 6379,
}

if not url:
return config

url = urlparse.urlparse(url)
print url
# Remove query strings.
path = url.path[1:]
path = path.split('?', 2)[0]

if path:
config.update({"DB": int(path)})
if url.password:
config.update({"PASSWORD": url.password})
if url.hostname:
config.update({"HOST": url.hostname})
if url.port:
config.update({"PORT": int(url.port)})

return config
9 changes: 3 additions & 6 deletions defender/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@

SITE_ID = 1

REDIS_HOST = 'localhost'
REDIS_PORT = '1234'
REDIS_PASSWORD = 'mypassword'
REDIS_DB = 1

MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
Expand All @@ -38,4 +33,6 @@

DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
MOCK_REDIS = True
DEFENDER_REDIS_URL = None
# use mock redis in unit tests locally.
DEFENDER_MOCK_REDIS = True
103 changes: 79 additions & 24 deletions defender/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
from django.core.urlresolvers import reverse
from django.conf import settings

from .connection import parse_redis_url
from . import utils
from . import config

if settings.MOCK_REDIS:
if config.MOCK_REDIS:
redis_client = mockredis.mock_strict_redis_client()
else:
from .utils import redis_server
redis_client = redis_server
from .connection import get_redis_connection
redis_client = get_redis_connection()

# Django >= 1.7 compatibility
try:
Expand All @@ -40,7 +42,7 @@ def _get_random_str(self):

return ''.join(random.choice(chars) for x in range(20))

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def _login(self, is_valid=False, user_agent='test-browser'):
""" Login a user. A valid credential is used when is_valid is True,
otherwise it will use a random string to make a failed login.
Expand All @@ -55,7 +57,7 @@ def _login(self, is_valid=False, user_agent='test-browser'):

return response

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def setUp(self):
""" Create a valid user for login
"""
Expand All @@ -69,12 +71,12 @@ def tearDown(self):
""" clean up the db """
redis_client.flushdb()

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_failure_limit_once(self):
""" Tests the login lock trying to login one more time
than failure limit
"""
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
Expand All @@ -88,12 +90,12 @@ def test_failure_limit_once(self):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_failure_limit_many(self):
""" Tests the login lock trying to login a lot of times more
than failure limit
"""
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
Expand All @@ -109,53 +111,53 @@ def test_failure_limit_many(self):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_valid_login(self):
""" Tests a valid login for a real username
"""
response = self._login(is_valid=True)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_cooling_off(self):
""" Tests if the cooling time allows a user to login
"""
self.test_failure_limit_once()
# Wait for the cooling off period
time.sleep(utils.COOLOFF_TIME)
time.sleep(config.COOLOFF_TIME)

if settings.MOCK_REDIS:
if config.MOCK_REDIS:
# mock redis require that we expire on our own
redis_client.do_expire()
# It should be possible to login again, make sure it is.
self.test_valid_login()

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_cooling_off_for_trusted_user(self):
""" Test the cooling time for a trusted user
"""
# Try the cooling off time
self.test_cooling_off()

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_long_user_agent_valid(self):
""" Tests if can handle a long user agent
"""
long_user_agent = 'ie6' * 1024
response = self._login(is_valid=True, user_agent=long_user_agent)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_long_user_agent_not_valid(self):
""" Tests if can handle a long user agent with failure
"""
long_user_agent = 'ie6' * 1024
for i in range(0, utils.FAILURE_LIMIT + 1):
for i in range(0, config.FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)

self.assertContains(response, self.LOCKED_MESSAGE)

@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_reset_ip(self):
""" Tests if can reset an ip address
"""
Expand All @@ -168,13 +170,13 @@ def test_reset_ip(self):
# Make a login attempt again
self.test_valid_login()

@patch('defender.utils.LOCKOUT_URL', 'http://localhost/othe/login/')
@patch('defender.utils.redis_server', redis_client)
@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
@patch('defender.connection.get_redis_connection', redis_client)
def test_failed_login_redirect_to_URL(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """

for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
Expand All @@ -190,13 +192,13 @@ def test_failed_login_redirect_to_URL(self):
self.assertEquals(response.status_code, 302)
self.assertEquals(response['Location'], 'http://localhost/othe/login/')

@patch('defender.utils.LOCKOUT_URL', '/o/login/')
@patch('defender.utils.redis_server', redis_client)
@patch('defender.config.LOCKOUT_URL', '/o/login/')
@patch('defender.connection.get_redis_connection', redis_client)
def test_failed_login_redirect_to_URL_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """

for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
Expand All @@ -222,3 +224,56 @@ def test_is_valid_ip(self):
self.assertEquals(utils.is_valid_ip('fish'), False)
self.assertEquals(utils.is_valid_ip(None), False)
self.assertEquals(utils.is_valid_ip(''), False)

def test_parse_redis_url(self):
""" """
# full regular
conf = parse_redis_url("redis://user:password@localhost2:1234/2")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)

# full non local
conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2")
self.assertEquals(conf.get('HOST'), 'www.localhost.com')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'pass')
self.assertEquals(conf.get('PORT'), 1234)

# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2")
print conf
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 1234)

# no user name 2 with colon
conf = parse_redis_url("redis://:password@localhost2:1234/2")
print conf
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)

# Empty
conf = parse_redis_url(None)
self.assertEquals(conf.get('HOST'), 'localhost')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 6379)

# no db
conf = parse_redis_url("redis://:password@localhost2:1234")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)

# no password
conf = parse_redis_url("redis://localhost2:1234/0")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 1234)
9 changes: 3 additions & 6 deletions defender/travis_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@

SITE_ID = 1

REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
REDIS_PASSWORD = None
REDIS_DB = 1

MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
Expand All @@ -39,4 +34,6 @@

DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
MOCK_REDIS = False
DEFENDER_REDIS_URL = "redis://localhost:6379/1"
# don't use mock redis in unit tests, we will use real redis on travis.
DEFENDER_MOCK_REDIS = False
Loading

0 comments on commit a8e721d

Please sign in to comment.