From 8ea6f57b46996e0cab2642ab9d5fbf1d0578b1b6 Mon Sep 17 00:00:00 2001 From: Matt Lewellyn Date: Fri, 12 Aug 2022 11:53:51 -0400 Subject: [PATCH] skip attempt tests during execution to avoid import order issues --- keg_auth/testing.py | 49 ++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/keg_auth/testing.py b/keg_auth/testing.py index 174febf..3e290ff 100644 --- a/keg_auth/testing.py +++ b/keg_auth/testing.py @@ -17,16 +17,18 @@ from keg_auth.libs.authenticators import AttemptLimitMixin -try: - has_attempt_model = flask.current_app.auth_manager.entity_registry.is_registered('attempt') -except RuntimeError as exc: - if 'application context' not in str(exc): - raise - has_attempt_model = False - has_attempt_skip_reason = 'no attempt model registered in entity registry' +def has_attempt_model(): + try: + return flask.current_app.auth_manager.entity_registry.is_registered('attempt') + except RuntimeError as exc: + if 'application context' not in str(exc): + raise + return False + + class AuthAttemptTests(object): """Tests to verify that automated attempt logging/blocking works as intended. These tests are included in the AuthTests class and are intended to be used in target @@ -77,7 +79,6 @@ def do_login_test(self, username, login_time, flashes, password='badpass', resp = self.do_login(client or self.client, username, password, submit_status) assert resp.flashes == flashes - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), (3, 7200, 300), @@ -97,6 +98,8 @@ def test_login_attempts_blocked(self, limit, timespan, lockout, create_user, vie limit. Login attempts after the lockout period has passed (since the failed attempt that caused the lockout) should not be blocked. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) config_key = 'LOGIN_' if view_config else '' with mock.patch.dict('flask.current_app.config', { f'KEGAUTH_{config_key}ATTEMPT_LIMIT': limit, @@ -170,7 +173,6 @@ def do_test(login_time, flashes, password='badpass', submit_status=200): assert_attempt_count(limit + 1, fail_count) assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @mock.patch.dict('flask.current_app.config', { 'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False, 'KEGAUTH_LOGIN_ATTEMPT_LIMIT': 3, @@ -181,6 +183,8 @@ def test_login_attempts_not_blocked(self): ''' Test that we do not block any attempts with missing attempt entity. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) user = self.user_ent.testing_create(email='foo@bar.com', password='pass') assert self.attempt_ent.query.count() == 0 @@ -208,7 +212,6 @@ def test_login_attempts_blocked_but_not_configured(self, _): with pytest.raises(Exception, match=r'.*attempt entity is not registered.*'): self.do_login(self.client, user.email, 'badpass', 200) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), (3, 7200, 300), @@ -219,6 +222,8 @@ def test_successful_login_resets_attempt_counter(self, limit, timespan, lockout) Test that several failed logins before a successful login do not count towards the attempt lockout counter. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) with mock.patch.dict('flask.current_app.config', { 'KEGAUTH_LOGIN_ATTEMPT_LIMIT': limit, 'KEGAUTH_LOGIN_ATTEMPT_TIMESPAN': timespan, @@ -253,7 +258,6 @@ def do_test(login_time, flashes, password='badpass', submit_status=200): # after the successful attempt. do_test(login_time + timedelta(seconds=limit + 1), self.login_lockout_flashes) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), ]) @@ -261,6 +265,8 @@ def test_login_attempts_blocked_by_ip(self, limit, timespan, lockout): ''' Test that login attempts get blocked for an IP address ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) client = flask_webtest.TestApp(flask.current_app, extra_environ={'REMOTE_ADDR': '192.168.0.111'}) @@ -322,7 +328,6 @@ def do_forgot_test(self, username, forgot_time, flashes, submit_status=200): resp = self.do_forgot(self.client, username, submit_status) assert resp.flashes == flashes - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), (3, 7200, 300), @@ -334,6 +339,8 @@ def test_forgot_attempts_blocked(self, limit, timespan, lockout): limit. forgot attempts after the lockout period has passed (since the failed attempt that caused the lockout) should not be blocked. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) with mock.patch.dict('flask.current_app.config', { 'KEGAUTH_FORGOT_ATTEMPT_LIMIT': limit, 'KEGAUTH_FORGOT_ATTEMPT_TIMESPAN': timespan, @@ -402,7 +409,6 @@ def do_test(forgot_time, flashes, submit_status=200): assert_attempt_count(limit + 1, fail_count) assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @mock.patch.dict('flask.current_app.config', { 'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False, 'KEGAUTH_FORGOT_ATTEMPT_LIMIT': 3, @@ -413,6 +419,8 @@ def test_forgot_attempts_not_blocked(self): ''' Test that we do not block any attempts with missing attempt entity. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) assert self.attempt_ent.query.count() == 0 def do_test(attempt_count, flashes, submit_status=200): @@ -424,7 +432,6 @@ def do_test(attempt_count, flashes, submit_status=200): do_test(0, self.forgot_invalid_flashes) do_test(0, self.forgot_invalid_flashes) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), (3, 7200, 300), @@ -435,6 +442,8 @@ def test_successful_forgot_resets_attempt_counter(self, limit, timespan, lockout Test that several failed forgots before a successful forgot do not count towards the attempt lockout counter. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) with mock.patch.dict('flask.current_app.config', { 'KEGAUTH_FORGOT_ATTEMPT_LIMIT': limit, 'KEGAUTH_FORGOT_ATTEMPT_TIMESPAN': timespan, @@ -483,7 +492,6 @@ def do_reset_test(self, user, reset_time, flashes, submit_status=200): resp = resp.form.submit(status=submit_status) assert resp.flashes == flashes - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @pytest.mark.parametrize('limit, timespan, lockout', [ (3, 3600, 7200), (3, 7200, 300), @@ -495,6 +503,8 @@ def test_reset_pw_attempts_blocked(self, limit, timespan, lockout): limit. Login attempts after the lockout period has passed (since the failed attempt that caused the lockout) should not be blocked. ''' + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) with mock.patch.dict('flask.current_app.config', { 'KEGAUTH_RESET_ATTEMPT_LIMIT': limit, 'KEGAUTH_RESET_ATTEMPT_TIMESPAN': timespan, @@ -557,7 +567,6 @@ def do_test(reset_time, flashes, submit_status=200): assert_attempt_count(limit + 1, 0) assert_attempt_count(limit + 1, limit + 1, is_during_lockout=True) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @mock.patch.dict('flask.current_app.config', { 'KEGAUTH_ATTEMPT_LIMIT_ENABLED': False, 'KEGAUTH_RESET_ATTEMPT_LIMIT': 2, @@ -565,6 +574,8 @@ def do_test(reset_time, flashes, submit_status=200): 'KEGAUTH_RESET_ATTEMPT_LOCKOUT': 7200, }) def test_reset_pw_attempts_not_blocked(self): + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) user = self.user_ent.testing_create() assert self.attempt_ent.query.count() == 0 @@ -576,17 +587,19 @@ def do_test(reset_time, flashes, submit_status=200): do_test(arrow.utcnow(), self.reset_success_flashes, 302) do_test(arrow.utcnow(), self.reset_success_flashes, 302) - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) @mock.patch('keg_auth.libs.authenticators.AttemptLimitMixin.get_request_remote_addr', return_value='12.12.12.12') def test_logs_attempt_source_ip(self, m_get_remote_addr): + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) user = self.user_ent.testing_create(email='foo@bar.com', password='pass') self.do_login(self.client, user.email, 'pass', 302) assert self.attempt_ent.query.one().source_ip == m_get_remote_addr.return_value - @pytest.mark.skipif(not has_attempt_model, reason=has_attempt_skip_reason) def test_get_request_remote_addr(self): + if not has_attempt_model(): + pytest.skip(has_attempt_skip_reason) with current_app.test_request_context(environ_base={'REMOTE_ADDR': '12.12.12.12'}): assert AttemptLimitMixin.get_request_remote_addr() == '12.12.12.12'