Skip to content

Commit

Permalink
Cleanup the code to remove lint warnings (#87)
Browse files Browse the repository at this point in the history
* Cleanup the code to remove lint warnings

Signed-off-by: Ken Cochrane <kencochrane@gmail.com>

* Fixed typo

Signed-off-by: Ken Cochrane <kencochrane@gmail.com>
  • Loading branch information
kencochrane committed Jun 28, 2017
1 parent b985d17 commit 4d9adc3
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 38 deletions.
1 change: 1 addition & 0 deletions defender/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@


class AccessAttemptAdmin(admin.ModelAdmin):
""" Access attempt admin config """
list_display = (
'attempt_time',
'ip_address',
Expand Down
4 changes: 3 additions & 1 deletion defender/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ def get_setting(variable, default=None):
provided default """
return getattr(settings, variable, default)


# redis server host
DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL')

Expand All @@ -29,7 +30,8 @@ def get_setting(variable, default=None):

# If this is True, usernames will not get locked when
# there are too many login attempts.
DISABLE_USERNAME_LOCKOUT = get_setting('DEFENDER_DISABLE_USERNAME_LOCKOUT', False)
DISABLE_USERNAME_LOCKOUT = get_setting(
'DEFENDER_DISABLE_USERNAME_LOCKOUT', False)

# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
Expand Down
6 changes: 4 additions & 2 deletions defender/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@


MOCKED_REDIS = mockredis.mock_strict_redis_client()
INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache settings.'
INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache' \
' settings.'


def get_redis_connection():
Expand All @@ -25,7 +26,8 @@ def get_redis_connection():
try:
cache = caches[config.DEFENDER_REDIS_NAME]
except InvalidCacheBackendError:
raise KeyError(INVALID_CACHE_ERROR_MSG.format(config.DEFENDER_REDIS_NAME))
raise KeyError(INVALID_CACHE_ERROR_MSG.format(
config.DEFENDER_REDIS_NAME))
# every redis backend implement it own way to get the low level client
try:
# redis_cache.RedisCache case (django-redis-cache package)
Expand Down
6 changes: 2 additions & 4 deletions defender/exampleapp/settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
from celery import Celery

PROJECT_DIR = lambda base: os.path.abspath(
os.path.join(os.path.dirname(__file__), base).replace('\\', '/'))

Expand Down Expand Up @@ -64,10 +66,6 @@
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.exampleapp.settings')

Expand Down
1 change: 1 addition & 0 deletions defender/management/commands/cleanup_django_defender.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


class Command(BaseCommand):
""" clean up management command """
help = "Cleans up django-defender AccessAttempt table"

def handle(self, **options):
Expand Down
1 change: 1 addition & 0 deletions defender/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


class FailedLoginMiddleware(object):
""" Failed login middleware """
patched = False

def __init__(self, *args, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions defender/migrations/0001_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


class Migration(migrations.Migration):
""" Initial migrations """

dependencies = [
]
Expand Down
1 change: 1 addition & 0 deletions defender/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

@python_2_unicode_compatible
class AccessAttempt(models.Model):
""" Access Attempt log """
user_agent = models.CharField(
max_length=255,
)
Expand Down
5 changes: 3 additions & 2 deletions defender/south_migrations/0001_initial.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@


class Migration(SchemaMigration):
"""Initial Migration for Defender"""

def forwards(self, orm):
# Adding model 'AccessAttempt'
""" Adding model 'AccessAttempt' """
db.create_table(u'defender_accessattempt', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('user_agent', self.gf('django.db.models.fields.CharField')(max_length=255)),
Expand Down Expand Up @@ -41,4 +42,4 @@ def backwards(self, orm):
}
}

complete_apps = ['defender']
complete_apps = ['defender']
5 changes: 1 addition & 4 deletions defender/test_settings.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from celery import Celery

DATABASES = {
'default': {
Expand Down Expand Up @@ -62,10 +63,6 @@
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.test_settings')

Expand Down
58 changes: 40 additions & 18 deletions defender/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_reset_ip(self):
self.test_valid_login()

@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
def test_failed_login_redirect_to_URL(self):
def test_failed_login_redirect_to_url(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """

Expand All @@ -311,7 +311,7 @@ def test_failed_login_redirect_to_URL(self):
self.assertEqual(response['Location'], 'http://localhost/othe/login/')

@patch('defender.config.LOCKOUT_URL', '/o/login/')
def test_failed_login_redirect_to_URL_local(self):
def test_failed_login_redirect_to_url_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """

Expand Down Expand Up @@ -361,6 +361,7 @@ def test_failed_login_redirect_to_template(self):

@patch('defender.config.COOLOFF_TIME', 0)
def test_failed_login_no_cooloff(self):
""" failed login no cooloff """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
Expand All @@ -376,7 +377,7 @@ def test_failed_login_no_cooloff(self):
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)

def test_login_attempt_model(self):
""" test the login model"""
""" test the login model """

response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
Expand Down Expand Up @@ -458,15 +459,18 @@ def test_parse_redis_url(self):

@patch('defender.config.DEFENDER_REDIS_NAME', 'default')
def test_get_redis_connection_django_conf(self):
""" get the redis connection """
redis_client = get_redis_connection()
self.assertIsInstance(redis_client, Redis)

@patch('defender.config.DEFENDER_REDIS_NAME', 'bad-key')
def test_get_redis_connection_django_conf_wrong_key(self):
""" see if we get the correct error """
error_msg = 'The cache bad-key was not found on the django cache settings.'
self.assertRaisesMessage(KeyError, error_msg, get_redis_connection)

def test_get_ip_address_from_request(self):
""" get ip from request, make sure it is correct """
req = HttpRequest()
req.META['REMOTE_ADDR'] = '1.2.3.4'
ip = utils.get_ip_address_from_request(req)
Expand Down Expand Up @@ -494,6 +498,7 @@ def test_get_ip_address_from_request(self):
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED')
def test_get_ip_reverse_proxy_custom_header(self):
""" make sure the ip is correct behind reverse proxy """
req = HttpRequest()
req.META['HTTP_X_PROXIED'] = '1.2.3.4'
self.assertEqual(utils.get_ip(req), '1.2.3.4')
Expand All @@ -509,6 +514,7 @@ def test_get_ip_reverse_proxy_custom_header(self):
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP')
def test_get_user_attempts(self):
""" Get the user attempts make sure they are correct """
ip_attempts = random.randint(3, 12)
username_attempts = random.randint(3, 12)
for i in range(0, ip_attempts):
Expand Down Expand Up @@ -574,7 +580,7 @@ def test_decorator_middleware(self):
self.assertContains(response, self.LOCKED_MESSAGE)

def test_get_view(self):
""" Check that the decorator doesn't tamper with GET requests"""
""" Check that the decorator doesn't tamper with GET requests """
for i in range(0, config.FAILURE_LIMIT):
response = self.client.get(ADMIN_LOGIN_URL)
# Check if we are in the same login page
Expand All @@ -584,7 +590,7 @@ def test_get_view(self):

@patch('defender.config.USE_CELERY', True)
def test_use_celery(self):
""" Check that use celery works"""
""" Check that use celery works """

self.assertEqual(AccessAttempt.objects.count(), 0)

Expand All @@ -598,12 +604,14 @@ def test_use_celery(self):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)

self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1)
self.assertEqual(AccessAttempt.objects.count(),
config.FAILURE_LIMIT + 1)
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))

@patch('defender.config.LOCKOUT_BY_IP_USERNAME', True)
def test_lockout_by_ip_and_username(self):
"""Check that lockout still works when locking out by IP and Username combined"""
""" Check that lockout still works when locking out by
IP and Username combined """

username = 'testy'

Expand All @@ -621,19 +629,21 @@ def test_lockout_by_ip_and_username(self):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)

# We shouldn't get a lockout message when attempting to use a different username
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)

# We shouldn't get a lockout message when attempting to use a different ip address
# We shouldn't get a lockout message when attempting to use a
# different ip address
ip = '74.125.239.60'
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)

@patch('defender.config.DISABLE_IP_LOCKOUT', True)
def test_disable_ip_lockout(self):
"""Check that lockout still works when we disable IP Lock out"""
""" Check that lockout still works when we disable IP Lock out """

username = 'testy'

Expand Down Expand Up @@ -663,11 +673,13 @@ def test_disable_ip_lockout(self):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)

# We shouldn't get a lockout message when attempting to use a different username
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)

# We shouldn't get a lockout message when attempting to use a different ip address
# We shouldn't get a lockout message when attempting to use a
# different ip address
second_ip = '74.125.239.99'
response = self._login(username=VALID_USERNAME, remote_addr=second_ip)
# Check if we are in the same login page
Expand All @@ -686,7 +698,7 @@ def test_disable_ip_lockout(self):

@patch('defender.config.DISABLE_USERNAME_LOCKOUT', True)
def test_disable_username_lockout(self):
"""Check lockouting still works when we disable username lockout"""
""" Check lockouting still works when we disable username lockout """

username = 'testy'

Expand Down Expand Up @@ -714,8 +726,8 @@ def test_disable_username_lockout(self):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)

# We shouldn't get a lockout message when attempting to use a different ip address
# to be sure that username is not blocked.
# We shouldn't get a lockout message when attempting to use a
# different ip address to be sure that username is not blocked.
second_ip = '74.125.127.2'
response = self._login(username=username, remote_addr=second_ip)
# Check if we are in the same login page
Expand All @@ -736,13 +748,14 @@ def test_disable_username_lockout(self):
@patch('defender.config.FAILURE_LIMIT', 3)
def test_login_blocked_for_non_standard_login_views_without_msg(self):
"""
Check that a view wich returns the expected status code is causing
Check that a view wich returns the expected status code is causing
the user to be locked out when we do not expect a specific message
to be returned.
"""

@watch_login(status_code=401)
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse(status=401)

request_factory = RequestFactory()
Expand Down Expand Up @@ -772,6 +785,7 @@ def test_login_blocked_for_non_standard_login_views_with_msg(self):
"""
@watch_login(status_code=401, msg='Invalid credentials')
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Sorry, Invalid credentials',
status=401)

Expand Down Expand Up @@ -802,6 +816,7 @@ def test_login_non_blocked_for_non_standard_login_views_different_msg(self):
"""
@watch_login(status_code=401, msg='Invalid credentials')
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Ups, wrong credentials',
status=401)

Expand All @@ -820,38 +835,44 @@ def fake_api_401_login_view_without_msg(request):


class DefenderTestCaseTest(DefenderTestCase):
"""Make sure that we're cleaning the cache between tests"""
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'

def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)

def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)


class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
"""Make sure that we're cleaning the cache between tests"""
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'

def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)

def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)


class TestUtils(DefenderTestCase):
""" Unit tests for util methods """

def test_username_blocking(self):
""" test username blocking """
username = 'foo'
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
Expand All @@ -860,6 +881,7 @@ def test_username_blocking(self):
self.assertFalse(utils.is_user_already_locked(username))

def test_ip_address_blocking(self):
""" ip address blocking """
ip = '1.2.3.4'
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)
Expand Down
Loading

0 comments on commit 4d9adc3

Please sign in to comment.