Skip to content

Commit

Permalink
Merge pull request #52 from docker-hub/bc
Browse files Browse the repository at this point in the history
Add helpers that do not assume how to retrieve `username`
  • Loading branch information
kencochrane committed Jan 25, 2016
2 parents a03726c + 831bb29 commit adc429a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
17 changes: 8 additions & 9 deletions defender/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from . import config
from .data import store_login_attempt

# not sure how to get this to look better. ideally we want to dynamically
# apply the celery decorator based on the USE_CELERY setting.

if config.USE_CELERY:
from celery import shared_task
from celery import shared_task

@shared_task()
def add_login_attempt_task(user_agent, ip_address, username,
http_accept, path_info, login_valid):
""" Create a record for the login attempt """
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)

@shared_task()
def add_login_attempt_task(user_agent, ip_address, username,
http_accept, path_info, login_valid):
""" Create a record for the login attempt """
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)
18 changes: 18 additions & 0 deletions defender/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -686,3 +686,21 @@ def test_second_incr(self):
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)


class TestUtils(DefenderTestCase):
def test_username_blocking(self):
username = 'foo'
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
self.assertTrue(utils.is_user_already_locked(username))
utils.unblock_username(username)
self.assertFalse(utils.is_user_already_locked(username))

def test_ip_address_blocking(self):
ip = '1.2.3.4'
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)
self.assertTrue(utils.is_source_ip_already_locked(ip))
utils.unblock_ip(ip)
self.assertFalse(utils.is_source_ip_already_locked(ip))
35 changes: 20 additions & 15 deletions defender/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,25 +265,30 @@ def lockout_response(request):
"Contact an admin to unlock your account.")


def is_already_locked(request):
""" Is this IP/username already locked? """
def is_user_already_locked(username):
"""Is this username already locked?"""
if username is None:
return False
return REDIS_SERVER.get(get_username_blocked_cache_key(username))

if not config.DISABLE_IP_LOCKOUT:
# ip blocked?
ip_address = get_ip(request)
ip_blocked = REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
else:
# we disabled ip lockout, so it will never be blocked.
ip_blocked = False

# username blocked?
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
user_blocked = REDIS_SERVER.get(get_username_blocked_cache_key(username))
def is_source_ip_already_locked(ip_address):
"""Is this IP already locked?"""
if ip_address is None:
return False
if config.DISABLE_IP_LOCKOUT:
return False
return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))


def is_already_locked(request):
"""Parse the username & IP from the request, and see if it's already locked."""
user_blocked = is_user_already_locked(
request.POST.get(config.USERNAME_FORM_FIELD, None))
ip_blocked = is_source_ip_already_locked(get_ip(request))

if config.LOCKOUT_BY_IP_USERNAME:
LOG.info("Block by ip & username")
# if both this IP and this username are present the request is
# blocked
# if both this IP and this username are present the request is blocked
return ip_blocked and user_blocked

return ip_blocked or user_blocked
Expand Down

0 comments on commit adc429a

Please sign in to comment.