Skip to content

Commit

Permalink
skip attempt tests during execution to avoid import order issues
Browse files Browse the repository at this point in the history
  • Loading branch information
guruofgentoo committed Aug 12, 2022
1 parent b781f66 commit 8ea6f57
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions keg_auth/testing.py
Expand Up @@ -17,16 +17,18 @@

from keg_auth.libs.authenticators import AttemptLimitMixin

try:
has_attempt_model = flask.current_app.auth_manager.entity_registry.is_registered('attempt')
except RuntimeError as exc:
if 'application context' not in str(exc):
raise
has_attempt_model = False

has_attempt_skip_reason = 'no attempt model registered in entity registry'


def has_attempt_model():
try:
return flask.current_app.auth_manager.entity_registry.is_registered('attempt')
except RuntimeError as exc:
if 'application context' not in str(exc):
raise
return False


class AuthAttemptTests(object):
"""Tests to verify that automated attempt logging/blocking works as intended. These
tests are included in the AuthTests class and are intended to be used in target
Expand Down Expand Up @@ -77,7 +79,6 @@ def do_login_test(self, username, login_time, flashes, password='badpass',
resp = self.do_login(client or self.client, username, password, submit_status)
assert resp.flashes == flashes

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
(3, 7200, 300),
Expand All @@ -97,6 +98,8 @@ def test_login_attempts_blocked(self, limit, timespan, lockout, create_user, vie
limit. Login attempts after the lockout period has passed (since the failed attempt
that caused the lockout) should not be blocked.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
config_key = 'LOGIN_' if view_config else ''
with mock.patch.dict('flask.current_app.config', {
f'KEGAUTH_{config_key}ATTEMPT_LIMIT': limit,
Expand Down Expand Up @@ -170,7 +173,6 @@ def do_test(login_time, flashes, password='badpass', submit_status=200):
assert_attempt_count(limit + 1, fail_count)
assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@mock.patch.dict('flask.current_app.config', {
'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False,
'KEGAUTH_LOGIN_ATTEMPT_LIMIT': 3,
Expand All @@ -181,6 +183,8 @@ def test_login_attempts_not_blocked(self):
'''
Test that we do not block any attempts with missing attempt entity.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
user = self.user_ent.testing_create(email='foo@bar.com', password='pass')
assert self.attempt_ent.query.count() == 0

Expand Down Expand Up @@ -208,7 +212,6 @@ def test_login_attempts_blocked_but_not_configured(self, _):
with pytest.raises(Exception, match=r'.*attempt entity is not registered.*'):
self.do_login(self.client, user.email, 'badpass', 200)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
(3, 7200, 300),
Expand All @@ -219,6 +222,8 @@ def test_successful_login_resets_attempt_counter(self, limit, timespan, lockout)
Test that several failed logins before a successful login do not count
towards the attempt lockout counter.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
with mock.patch.dict('flask.current_app.config', {
'KEGAUTH_LOGIN_ATTEMPT_LIMIT': limit,
'KEGAUTH_LOGIN_ATTEMPT_TIMESPAN': timespan,
Expand Down Expand Up @@ -253,14 +258,15 @@ def do_test(login_time, flashes, password='badpass', submit_status=200):
# after the successful attempt.
do_test(login_time + timedelta(seconds=limit + 1), self.login_lockout_flashes)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
])
def test_login_attempts_blocked_by_ip(self, limit, timespan, lockout):
'''
Test that login attempts get blocked for an IP address
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
client = flask_webtest.TestApp(flask.current_app,
extra_environ={'REMOTE_ADDR': '192.168.0.111'})

Expand Down Expand Up @@ -322,7 +328,6 @@ def do_forgot_test(self, username, forgot_time, flashes, submit_status=200):
resp = self.do_forgot(self.client, username, submit_status)
assert resp.flashes == flashes

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
(3, 7200, 300),
Expand All @@ -334,6 +339,8 @@ def test_forgot_attempts_blocked(self, limit, timespan, lockout):
limit. forgot attempts after the lockout period has passed (since the failed attempt
that caused the lockout) should not be blocked.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
with mock.patch.dict('flask.current_app.config', {
'KEGAUTH_FORGOT_ATTEMPT_LIMIT': limit,
'KEGAUTH_FORGOT_ATTEMPT_TIMESPAN': timespan,
Expand Down Expand Up @@ -402,7 +409,6 @@ def do_test(forgot_time, flashes, submit_status=200):
assert_attempt_count(limit + 1, fail_count)
assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@mock.patch.dict('flask.current_app.config', {
'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False,
'KEGAUTH_FORGOT_ATTEMPT_LIMIT': 3,
Expand All @@ -413,6 +419,8 @@ def test_forgot_attempts_not_blocked(self):
'''
Test that we do not block any attempts with missing attempt entity.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
assert self.attempt_ent.query.count() == 0

def do_test(attempt_count, flashes, submit_status=200):
Expand All @@ -424,7 +432,6 @@ def do_test(attempt_count, flashes, submit_status=200):
do_test(0, self.forgot_invalid_flashes)
do_test(0, self.forgot_invalid_flashes)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
(3, 7200, 300),
Expand All @@ -435,6 +442,8 @@ def test_successful_forgot_resets_attempt_counter(self, limit, timespan, lockout
Test that several failed forgots before a successful forgot do not count
towards the attempt lockout counter.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
with mock.patch.dict('flask.current_app.config', {
'KEGAUTH_FORGOT_ATTEMPT_LIMIT': limit,
'KEGAUTH_FORGOT_ATTEMPT_TIMESPAN': timespan,
Expand Down Expand Up @@ -483,7 +492,6 @@ def do_reset_test(self, user, reset_time, flashes, submit_status=200):
resp = resp.form.submit(status=submit_status)
assert resp.flashes == flashes

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@pytest.mark.parametrize('limit, timespan, lockout', [
(3, 3600, 7200),
(3, 7200, 300),
Expand All @@ -495,6 +503,8 @@ def test_reset_pw_attempts_blocked(self, limit, timespan, lockout):
limit. Login attempts after the lockout period has passed (since the failed attempt
that caused the lockout) should not be blocked.
'''
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
with mock.patch.dict('flask.current_app.config', {
'KEGAUTH_RESET_ATTEMPT_LIMIT': limit,
'KEGAUTH_RESET_ATTEMPT_TIMESPAN': timespan,
Expand Down Expand Up @@ -557,14 +567,15 @@ def do_test(reset_time, flashes, submit_status=200):
assert_attempt_count(limit + 1, 0)
assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@mock.patch.dict('flask.current_app.config', {
'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False,
'KEGAUTH_RESET_ATTEMPT_LIMIT': 2,
'KEGAUTH_RESET_ATTEMPT_TIMESPAN': 3600,
'KEGAUTH_RESET_ATTEMPT_LOCKOUT': 7200,
})
def test_reset_pw_attempts_not_blocked(self):
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
user = self.user_ent.testing_create()
assert self.attempt_ent.query.count() == 0

Expand All @@ -576,17 +587,19 @@ def do_test(reset_time, flashes, submit_status=200):
do_test(arrow.utcnow(), self.reset_success_flashes, 302)
do_test(arrow.utcnow(), self.reset_success_flashes, 302)

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
@mock.patch('keg_auth.libs.authenticators.AttemptLimitMixin.get_request_remote_addr',
return_value='12.12.12.12')
def test_logs_attempt_source_ip(self, m_get_remote_addr):
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
user = self.user_ent.testing_create(email='foo@bar.com', password='pass')
self.do_login(self.client, user.email, 'pass', 302)

assert self.attempt_ent.query.one().source_ip == m_get_remote_addr.return_value

@pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason)
def test_get_request_remote_addr(self):
if not has_attempt_model():
pytest.skip(has_attempt_skip_reason)
with current_app.test_request_context(environ_base={'REMOTE_ADDR': '12.12.12.12'}):
assert AttemptLimitMixin.get_request_remote_addr() == '12.12.12.12'

Expand Down

0 comments on commit 8ea6f57

Please sign in to comment.