Skip to content

Commit

Permalink
Add presenter/view of two-factor verification (login process)
Browse files Browse the repository at this point in the history
Fixes: pypa#996
  • Loading branch information
Sparkycz authored and woodruffw committed Mar 11, 2019
1 parent 83cdd95 commit 32686b5
Show file tree
Hide file tree
Showing 10 changed files with 365 additions and 52 deletions.
1 change: 1 addition & 0 deletions dev/environment
Expand Up @@ -35,5 +35,6 @@ STATUSPAGE_URL=https://2p66nmmycsj3.statuspage.io

TOKEN_PASSWORD_SECRET="an insecure password reset secret key"
TOKEN_EMAIL_SECRET="an insecure email verification secret key"
TOKEN_TWO_FACTOR_SECRET="an insecure two-factor auth secret key"

WAREHOUSE_LEGACY_DOMAIN=pypi.python.org
26 changes: 26 additions & 0 deletions tests/unit/accounts/test_forms.py
Expand Up @@ -553,3 +553,29 @@ def test_password_breached(self):
"This password has appeared in a breach or has otherwise been "
"compromised and cannot be used."
)


class TestTwoFactorForm:
def test_creation(self):
user_service = pretend.stub()
form = forms.TwoFactorForm(user_service=user_service)

assert form.user_service is user_service

def test_opt_secret_exists(self):
form = forms.TwoFactorForm(
data={
"otp_secret": "",
},
user_service=pretend.stub()
)
assert not form.validate()
assert form.otp_secret.errors.pop() == "This field is required."

form = forms.TwoFactorForm(
data={
"otp_secret": "otp_code",
},
user_service=pretend.stub()
)
assert form.validate()
189 changes: 188 additions & 1 deletion tests/unit/accounts/test_views.py
Expand Up @@ -157,6 +157,7 @@ def test_post_validate_redirects(
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: user_id),
update_user=pretend.call_recorder(lambda *a, **kw: None),
has_two_factor=lambda userid: False,
)
breach_service = pretend.stub(check_password=lambda password, tags=None: False)

Expand Down Expand Up @@ -220,6 +221,9 @@ def test_post_validate_redirects(

assert remember.calls == [pretend.call(pyramid_request, str(user_id))]
assert pyramid_request.session.invalidate.calls == [pretend.call()]
assert pyramid_request.find_service.calls == [
pretend.call(IUserService, context=None),
]
assert pyramid_request.session.new_csrf_token.calls == [pretend.call()]

@pytest.mark.parametrize(
Expand All @@ -234,6 +238,7 @@ def test_post_validate_no_redirects(
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
has_two_factor=lambda userid: False,
)
breach_service = pretend.stub(check_password=lambda password, tags=None: False)

Expand Down Expand Up @@ -264,6 +269,189 @@ def test_redirect_authenticated_user(self):
assert isinstance(result, HTTPSeeOther)
assert result.headers["Location"] == "/the-redirect"

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth(self, pyramid_request, redirect_url, token_service):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
has_two_factor=lambda userid: True,
)

token_service = pretend.stub(
dumps=pretend.call_recorder(lambda token_data: "test-secure-token")
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"
if redirect_url:
pyramid_request.POST["next"] = redirect_url

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
username=pretend.stub(data="theuser"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
result = views.login(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": 1}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.dumps.calls == [pretend.call(token_expected_data)]

assert isinstance(result, HTTPSeeOther)
assert result.headerlist == [
('Content-Type', 'text/html; charset=UTF-8'),
('Content-Length', '0'),
('Location', '/account/two-factor'),
('Set-Cookie', "{}={}; Path=/".
format(views.TWO_FACTOR_COOKIE_KEY, "test-secure-token"))]


class TestTwoFactor:
@pytest.mark.parametrize("redirect_url", [None, "/foo/bar/", "/wat/"])
def test_get_returns_form(self, pyramid_request, redirect_url):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=pretend.call_recorder(lambda userid: None),
)

token_data = {"userid": 1}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(
loads=pretend.call_recorder(lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
ITokenService: token_service,
IUserService: user_service,
}[interface]

form_obj = pretend.stub()
form_class = pretend.call_recorder(lambda d, user_service: form_obj)

result = views.two_factor(pyramid_request, _form_class=form_class)

assert result == {
"form": form_obj,
}
assert user_service.send_otp_secret.calls == [
pretend.call(1)
]
assert form_class.calls == [
pretend.call(pyramid_request.POST, user_service=user_service)
]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth(self, monkeypatch, pyramid_request, redirect_url,
token_service):
remember = pretend.call_recorder(lambda request, user_id: [("foo", "bar")])
monkeypatch.setattr(views, "remember", remember)
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=lambda userid: None,
check_otp_secret=lambda userid, otp_token: True,
)

new_session = {}

token_data = {"userid": str(1)}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(
loads=pretend.call_recorder(lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"
pyramid_request.session = pretend.stub(
items=lambda: [("a", "b"), ("foo", "bar")],
update=new_session.update,
invalidate=pretend.call_recorder(lambda: None),
new_csrf_token=pretend.call_recorder(lambda: None),
)

pyramid_request.set_property(
lambda r: str(uuid.uuid4()),
name="unauthenticated_userid",
)

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
otp_secret=pretend.stub(data="test-otp-secret"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
pyramid_request.cookies[views.TWO_FACTOR_COOKIE_KEY] = "test-secure-token"
result = views.two_factor(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": str(1)}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.loads.calls == [pretend.call("test-secure-token")]

assert isinstance(result, HTTPSeeOther)

assert remember.calls == [pretend.call(pyramid_request, str(1))]
assert pyramid_request.session.invalidate.calls == [pretend.call()]
assert pyramid_request.session.new_csrf_token.calls == [pretend.call()]

@pytest.mark.parametrize("redirect_url", ["test_redirect_url", None])
def test_two_factor_auth_invalid(self, pyramid_request, redirect_url,
token_service):
user_service = pretend.stub(
find_userid=pretend.call_recorder(lambda username: 1),
update_user=lambda *a, **k: None,
send_otp_secret=lambda userid: None,
check_otp_secret=lambda userid, otp_token: False,
)

token_data = {"userid": str(1)}
if redirect_url:
token_data['redirect_to'] = redirect_url

token_service = pretend.stub(loads=pretend.call_recorder(
lambda token: token_data)
)
pyramid_request.find_service = lambda interface, **kwargs: {
IUserService: user_service,
ITokenService: token_service,
}[interface]

pyramid_request.method = "POST"

form_obj = pretend.stub(
validate=pretend.call_recorder(lambda: True),
otp_secret=pretend.stub(data="test-otp-secret"),
)
form_class = pretend.call_recorder(lambda d, user_service: form_obj)
pyramid_request.route_path = pretend.call_recorder(
lambda a: "/account/two-factor"
)
pyramid_request.cookies[views.TWO_FACTOR_COOKIE_KEY] = "test-secure-token"
result = views.two_factor(pyramid_request, _form_class=form_class)

token_expected_data = {"userid": str(1)}
if redirect_url:
token_expected_data['redirect_to'] = redirect_url
assert token_service.loads.calls == [pretend.call("test-secure-token")]

assert isinstance(result, HTTPSeeOther)


class TestLogout:
@pytest.mark.parametrize("next_url", [None, "/foo/bar/", "/wat/"])
Expand Down Expand Up @@ -729,7 +917,6 @@ def test_reset_password(self, db_request, user_service, token_service):
pretend.call(IUserService, context=None),
pretend.call(IPasswordBreachedService, context=None),
pretend.call(ITokenService, name="password"),
pretend.call(IUserService, context=None),
]

@pytest.mark.parametrize(
Expand Down
3 changes: 3 additions & 0 deletions warehouse/accounts/__init__.py
Expand Up @@ -125,6 +125,9 @@ def includeme(config):
config.register_service_factory(
TokenServiceFactory(name="email"), ITokenService, name="email"
)
config.register_service_factory(
TokenServiceFactory(name="two_factor"), ITokenService, name="two_factor"
)

# Register our password breach detection service.
breached_pw_class = config.maybe_dotted(
Expand Down
6 changes: 3 additions & 3 deletions warehouse/accounts/forms.py
Expand Up @@ -32,9 +32,9 @@ def validate_username(self, field):
raise wtforms.validators.ValidationError("No user found with that username")


class OtpCodeMixin:
class OtpSecretMixin:

otp_code = wtforms.StringField(validators=[wtforms.validators.DataRequired()])
otp_secret = wtforms.StringField(validators=[wtforms.validators.DataRequired()])


class NewUsernameMixin:
Expand Down Expand Up @@ -224,7 +224,7 @@ def validate_password(self, field):
)


class TwoFactorForm(OtpCodeMixin, forms.Form):
class TwoFactorForm(OtpSecretMixin, forms.Form):
def __init__(self, *args, user_service, **kwargs):
super().__init__(*args, **kwargs)
self.user_service = user_service
Expand Down
24 changes: 21 additions & 3 deletions warehouse/accounts/interfaces.py
Expand Up @@ -19,16 +19,19 @@ def __init__(self, *args, resets_in, **kwargs):

return super().__init__(*args, **kwargs)

class TokenException(Exception):
pass


class TokenExpired(Exception):
class TokenExpired(TokenException):
pass


class TokenInvalid(Exception):
class TokenInvalid(TokenException):
pass


class TokenMissing(Exception):
class TokenMissing(TokenException):
pass


Expand Down Expand Up @@ -97,6 +100,21 @@ def is_disabled(user_id):
(IsDisabled: bool, Reason: Optional[DisableReason])
"""

def has_two_factor(user_id):
"""
Returns True if the user has two factor authentication.
"""

def send_otp_secret(user_id):
"""
Sends two factor authentication OTP code to user
"""

def check_otp_secret(user_id, otp_secret):
"""
Returns True if the given OTP code is valid.
"""


class ITokenService(Interface):
def dumps(data):
Expand Down
18 changes: 18 additions & 0 deletions warehouse/accounts/services.py
Expand Up @@ -226,6 +226,24 @@ def is_disabled(self, user_id):
else:
return (True, user.disabled_for)

def has_two_factor(self, user_id):
"""
Returns True if the user has two factor authentication.
"""
return True

def send_otp_secret(self, user_id):
"""
Sends two factor authentication OTP code to user
"""
pass

def check_otp_secret(self, user_id, otp_secret):
"""
Returns True if the given OTP code is valid.
"""
return True


@implementer(ITokenService)
class TokenService:
Expand Down

0 comments on commit 32686b5

Please sign in to comment.