Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate two-factor auth into registration and confirmation. #225

Merged
merged 1 commit into from
Dec 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions flask_security/twofactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,20 +140,18 @@ def complete_two_factor_process(user, primary_method, is_changing):
"""clean session according to process (login or changing two-factor method)
and perform action accordingly
"""

# only update primary_method and DB if necessary
if user.tf_primary_method != primary_method:
user.tf_primary_method = primary_method
_datastore.put(user)

# if we are changing two-factor method
if is_changing:
# only update primary_method and DB if necessary
if user.tf_primary_method != primary_method:
user.tf_primary_method = primary_method
_datastore.put(user)

# TODO Flashing shouldn't occur here - should be at view level to can
# make sure not to do it for json requests.
completion_message = "TWO_FACTOR_CHANGE_METHOD_SUCCESSFUL"
tf_profile_changed.send(
app._get_current_object(), user=user, method=primary_method
)

# if we are logging in for the first time
else:
completion_message = "TWO_FACTOR_LOGIN_SUCCESSFUL"
Expand Down
17 changes: 12 additions & 5 deletions flask_security/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ def login():
form = form_class(request.form, meta=_suppress_form_csrf())

if form.validate_on_submit():
if config_value("TWO_FACTOR") is True and (
config_value("TWO_FACTOR_REQUIRED") is True
if config_value("TWO_FACTOR") and (
config_value("TWO_FACTOR_REQUIRED")
or (form.user.tf_totp_secret and form.user.tf_primary_method)
):
return _two_factor_login(form)
Expand Down Expand Up @@ -274,6 +274,8 @@ def register():
form.user = user

if not _security.confirmable or _security.login_without_confirmation:
if config_value("TWO_FACTOR") and config_value("TWO_FACTOR_REQUIRED"):
return _two_factor_login(form)
after_this_request(_commit)
login_user(user)
did_login = True
Expand Down Expand Up @@ -425,18 +427,22 @@ def confirm_email(token):
get_url(_security.confirm_error_view) or url_for("send_confirmation")
)

confirm_user(user)
after_this_request(_commit)

if user != current_user:
logout_user()
if config_value("AUTO_LOGIN_AFTER_CONFIRM"):
# N.B. this is a (small) security risk if email went to wrong place.
# and you have the LOGIN_WITH_CONFIRMATION flag since in that case
# you can be logged in and doing stuff - but another person could
# get the email.
if config_value("TWO_FACTOR") and config_value("TWO_FACTOR_REQUIRED"):
form = _security.login_form(MultiDict([]), meta=_suppress_form_csrf())
form.user = user
return _two_factor_login(form)
login_user(user)

confirm_user(user)
after_this_request(_commit)

m, c = get_message("EMAIL_CONFIRMED")
if _security.redirect_behavior == "spa":
return redirect(
Expand Down Expand Up @@ -636,6 +642,7 @@ def _two_factor_login(form):
""" Helper for two-factor authentication login

This is called only when login/password have already been validated.
This can be from login, register, and/or confirm.

The result of this is either sending a 2FA token OR starting setup for new user.
In either case we do NOT log in user, so we must store some info in session to
Expand Down
6 changes: 5 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,12 @@ def index():
return render_template("index.html", content="Home Page")

@app.route("/profile")
@login_required
@auth_required()
def profile():
if hasattr(app, "security"):
if app.security._want_json(flask_request):
return jsonify(message="profile")

return render_template("index.html", content="Profile Page")

@app.route("/post_login")
Expand Down
51 changes: 51 additions & 0 deletions tests/test_confirmable.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Confirmable tests
"""

import json
import time

import pytest
Expand Down Expand Up @@ -386,3 +387,53 @@ def test_spa_get_bad_token(app, client, get_message):
msg = get_message("INVALID_CONFIRMATION_TOKEN")
assert msg == qparams["error"].encode("utf-8")
assert len(flashes) == 1


@pytest.mark.two_factor()
@pytest.mark.registerable()
@pytest.mark.settings(two_factor_required=True)
def test_two_factor(app, client):
""" If two-factor is enabled, the confirm shouldn't login, but start the
2-factor setup.
"""
with capture_registrations() as registrations:
data = dict(email="mary@lp.com", password="password", next="")
client.post("/register", data=data, follow_redirects=True)

# make sure not logged in
response = client.get("/profile")
assert response.status_code == 302
assert "/login?next=%2Fprofile" in response.location

token = registrations[0]["confirm_token"]
response = client.get("/confirm/" + token, follow_redirects=False)
assert "tf-setup" in response.location


@pytest.mark.two_factor()
@pytest.mark.registerable()
@pytest.mark.settings(two_factor_required=True)
def test_two_factor_json(app, client, get_message):
with capture_registrations() as registrations:
data = dict(email="dude@lp.com", password="password")
response = client.post(
"/register", content_type="application/json", data=json.dumps(data)
)
assert response.headers["content-type"] == "application/json"
assert response.jdata["meta"]["code"] == 200
assert len(response.jdata["response"]) == 2
assert all(k in response.jdata["response"] for k in ["csrf_token", "user"])

# make sure not logged in
response = client.get("/profile", headers={"accept": "application/json"})
assert response.status_code == 401
assert response.jdata["response"]["error"].encode("utf-8") == get_message(
"UNAUTHENTICATED"
)

token = registrations[0]["confirm_token"]
response = client.get("/confirm/" + token, headers={"Accept": "application/json"})

assert response.status_code == 200
assert response.jdata["response"]["tf_required"]
assert response.jdata["response"]["tf_state"] == "setup_from_login"
38 changes: 38 additions & 0 deletions tests/test_registerable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

Registerable tests
"""
import json

import pytest
from flask import Flask
Expand Down Expand Up @@ -71,6 +72,8 @@ def on_user_registerd(app, user, confirm_token):
)
assert response.headers["content-type"] == "application/json"
assert response.jdata["meta"]["code"] == 200
assert len(response.jdata["response"]) == 2
assert all(k in response.jdata["response"] for k in ["csrf_token", "user"])

logout(client)

Expand Down Expand Up @@ -122,3 +125,38 @@ def test_disable_register_emails(client, app):
with app.mail.record_messages() as outbox:
client.post("/register", data=data, follow_redirects=True)
assert len(outbox) == 0


@pytest.mark.two_factor()
@pytest.mark.settings(two_factor_required=True)
def test_two_factor(app, client):
""" If two-factor is enabled, the register shouldn't login, but start the
2-factor setup.
"""
data = dict(email="dude@lp.com", password="password", password_confirm="password")
response = client.post("/register", data=data, follow_redirects=False)
assert "tf-setup" in response.location

# make sure not logged in
response = client.get("/profile")
assert response.status_code == 302
assert "/login?next=%2Fprofile" in response.location


@pytest.mark.two_factor()
@pytest.mark.settings(two_factor_required=True)
def test_two_factor_json(app, client, get_message):
data = dict(email="dude@lp.com", password="password", password_confirm="password")
response = client.post(
"/register", content_type="application/json", data=json.dumps(data)
)
assert response.status_code == 200
assert response.jdata["response"]["tf_required"]
assert response.jdata["response"]["tf_state"] == "setup_from_login"

# make sure not logged in
response = client.get("/profile", headers={"accept": "application/json"})
assert response.status_code == 401
assert response.jdata["response"]["error"].encode("utf-8") == get_message(
"UNAUTHENTICATED"
)
69 changes: 38 additions & 31 deletions tests/test_two_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from flask_security.twofactor import get_totp_uri
from utils import authenticate, get_session, logout
from flask_principal import identity_changed
from flask_security.utils import SmsSenderBaseClass, SmsSenderFactory
from flask_security.utils import SmsSenderBaseClass, SmsSenderFactory, capture_flashes

pytestmark = pytest.mark.two_factor()

Expand Down Expand Up @@ -61,16 +61,6 @@ def send(self, msg):
self.count += 1


def assert_flashes(client, expected_message, expected_category="message"):
with client.session_transaction() as session:
try:
category, message = session["_flashes"][0]
except KeyError:
raise AssertionError("nothing flashed")
assert expected_message in message
assert expected_category == category


def two_factor_authenticate(client, validate=True):
""" Login/Authenticate using two factor.
This is the equivalent of utils:authenticate
Expand All @@ -90,15 +80,26 @@ def two_factor_authenticate(client, validate=True):
assert response.status_code == 200


def tf_in_session(session):
return any(
k in session
for k in ["tf_state", "tf_primary_method", "tf_user_id", "tf_confirmed"]
)


@pytest.mark.settings(two_factor_required=True)
def test_two_factor_two_factor_setup_anonymous(app, client):
def test_two_factor_two_factor_setup_anonymous(app, client, get_message):

# trying to pick method without doing earlier stage
data = dict(setup="mail")
response = client.post("/tf-setup", data=data)
assert response.status_code == 302
flash_message = "You currently do not have permissions to access this page"
assert_flashes(client, flash_message, expected_category="error")

with capture_flashes() as flashes:
response = client.post("/tf-setup", data=data)
assert response.status_code == 302
assert flashes[0]["category"] == "error"
assert flashes[0]["message"].encode("utf-8") == get_message(
"TWO_FACTOR_PERMISSION_DENIED"
)


@pytest.mark.settings(two_factor_required=True)
Expand Down Expand Up @@ -288,12 +289,8 @@ def test_two_factor_flag(app, client):
assert b"Your token has been confirmed" in response.data

response = logout(client)
session = get_session(response)
# Verify that logout clears session info
assert not any(
k in session
for k in ["tf_state", "tf_user_id", "tf_primary_method", "tf_confirmed"]
)
assert not tf_in_session(get_session(response))

# Test two-factor authentication first login
data = dict(email="matt@lp.com", password="password")
Expand Down Expand Up @@ -326,6 +323,7 @@ def test_two_factor_flag(app, client):

response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
assert b"Your token has been confirmed" in response.data
assert not tf_in_session(get_session(response))

logout(client)

Expand Down Expand Up @@ -500,9 +498,7 @@ def on_identity_changed(app, identity):

# Upon completion, session cookie shouldnt have any two factor stuff in it.
session = get_session(response)
assert not any(
k in session for k in ["tf_state", "tf_primary_method", "tf_user_id"]
)
assert not tf_in_session(session)

# Log out
logout(client)
Expand Down Expand Up @@ -553,27 +549,38 @@ def on_identity_changed(app, identity):
def test_datastore(app, client):
# Test that user record is properly set after proper 2FA setup.
sms_sender = SmsSenderFactory.createSender("test")
json_data = '{"email": "gal@lp.com", "password": "password"}'
data = dict(email="gene@lp.com", password="password")
response = client.post(
"/login", data=json_data, headers={"Content-Type": "application/json"}
"/login", data=json.dumps(data), headers={"Content-Type": "application/json"}
)
assert b'"code": 200' in response.data
assert response.jdata["meta"]["code"] == 200
session = get_session(response)
assert session["tf_state"] == "setup_from_login"

# setup
data = dict(setup="sms", phone="+111111111111")
response = client.post(
"/tf-setup", data=json.dumps(data), headers={"Content-Type": "application/json"}
)

assert sms_sender.get_count() == 1
session = get_session(response)
assert session["tf_state"] == "ready"
assert session["tf_state"] == "validating_profile"
assert session["tf_primary_method"] == "sms"

code = sms_sender.messages[0].split()[-1]

# submit right token and show appropriate response
# submit token and show appropriate response
response = client.post("/tf-validate", data=dict(code=code), follow_redirects=True)
assert b"Your token has been confirmed" in response.data
session = get_session(response)
# Verify that successful login clears session info
assert not any(k in session for k in ["tf_state", "tf_user_id"])
assert not tf_in_session(session)

with app.app_context():
user = app.security.datastore.find_user(email="gal@lp.com")
user = app.security.datastore.find_user(email="gene@lp.com")
assert user.tf_primary_method == "sms"
assert user.tf_phone_number == "+111111111111"
assert "enckey" in user.tf_totp_secret


Expand Down
6 changes: 6 additions & 0 deletions tests/view_scaffold.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def create_app():
app.config["SECURITY_HASHING_SCHEMES"] = ["hex_md5"]
app.config["SECURITY_DEPRECATED_HASHING_SCHEMES"] = []

# Turn on all features (except passwordless since that removes normal login)
for opt in [
"changeable",
"recoverable",
Expand All @@ -74,7 +75,12 @@ def create_app():
app.config["SECURITY_" + opt.upper()] = True

if os.environ.get("SETTINGS"):
# Load settings from a file pointed to by SETTINGS
app.config.from_envvar("SETTINGS")
# Allow any SECURITY_ config to be set in environment.
for ev in os.environ:
if ev.startswith("SECURITY_"):
app.config[ev] = os.environ.get(ev)
mail = Mail(app)

app.json_encoder = JSONEncoder
Expand Down