Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion servicex_app/servicex_app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ def approve_user_command(sub):
Bootstrap5(app)
CORS(app)

app.config["JWT_DECODE_ALGORITHMS"] = ["HS256", "RS256"]
JWTManager(app)

# setup logging

logstash_host = os.environ.get("LOGSTASH_HOST")
Expand Down
15 changes: 13 additions & 2 deletions servicex_app/servicex_app/resources/users/token_refresh.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,30 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from flask import current_app
from flask_jwt_extended import create_access_token, decode_token, get_jwt, jwt_required
from flask_restful import Resource
from servicex_app.models import UserModel


class TokenRefresh(Resource):
@jwt_required(refresh=True)
@jwt_required(refresh=True, optional=True)
def post(self):
if not current_app.config.get("ENABLE_AUTH"):
return {
"message": "Authentication is disabled on this instance",
"access_token": "authentication_disabled",
"auth_disabled": True,
}, 200

claims = get_jwt()
if not claims:
return {"message": "Missing refresh token"}, 401

user = UserModel.find_by_email(claims["sub"])
decoded = decode_token(user.refresh_token)
if not claims["jti"] == decoded["jti"]:
return {"message": "Invalid or outdated refresh token"}, 401
current_user = user.email
access_token = create_access_token(identity=current_user)
return {"access_token": access_token}
return {"access_token": access_token, "auth_disabled": False}, 200
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from flask.wrappers import Response
from flask_jwt_extended import create_refresh_token

from servicex_app_test.resource_test_base import ResourceTestBase
from servicex_app.models import UserModel


class TestTokenRefresh(ResourceTestBase):
"""Test suite for token refresh endpoint and JWT configuration."""

def test_jwt_decode_algorithms_configured(self, client):
"""Test that JWT_DECODE_ALGORITHMS is properly configured with HS256 and RS256."""
assert "JWT_DECODE_ALGORITHMS" in client.application.config
algorithms = client.application.config["JWT_DECODE_ALGORITHMS"]

assert "HS256" in algorithms, "HS256 algorithm should be configured"
assert "RS256" in algorithms, "RS256 algorithm should be configured"

def test_token_refresh_with_auth_disabled(self, client):
"""Test that token refresh endpoint returns graceful message when auth is disabled."""
response: Response = client.post("/token/refresh")

assert response.status_code == 200
assert "access_token" in response.json
assert response.json["access_token"] == "authentication_disabled"
assert "auth_disabled" in response.json
assert response.json["auth_disabled"] is True

def test_token_refresh_with_auth_enabled_requires_token(self):
"""Test that token refresh requires a valid token when auth is enabled."""
client = self._test_client(extra_config={"ENABLE_AUTH": True})

with client.application.app_context():
response: Response = client.post("/token/refresh")
assert response.status_code == 401

def test_token_refresh_with_auth_enabled_and_valid_token(self, mocker):
"""Test token refresh works with valid refresh token when auth is enabled."""
client = self._test_client(extra_config={"ENABLE_AUTH": True})

with client.application.app_context():
test_user = UserModel()
test_user.email = "testuser@example.com"
test_user.sub = "test-sub-123"
test_user.name = "Test User"
test_user.institution = "Test Institution"

refresh_token = create_refresh_token(identity=test_user.email)
test_user.refresh_token = refresh_token

mocker.patch(
"servicex_app.resources.users.token_refresh.UserModel.find_by_email",
return_value=test_user,
)

headers = {"Authorization": f"Bearer {refresh_token}"}
response: Response = client.post("/token/refresh", headers=headers)

assert response.status_code == 200
assert "access_token" in response.json
assert response.json["access_token"] != "authentication_disabled"
assert "auth_disabled" in response.json
assert response.json["auth_disabled"] is False

def test_token_refresh_with_mismatched_jti(self, mocker):
"""Test that token refresh fails when JTI doesn't match stored token."""
client = self._test_client(extra_config={"ENABLE_AUTH": True})

with client.application.app_context():
test_user = UserModel()
test_user.email = "testuser@example.com"
test_user.sub = "test-sub-123"
test_user.name = "Test User"
test_user.institution = "Test Institution"

old_refresh_token = create_refresh_token(identity=test_user.email)
new_refresh_token = create_refresh_token(identity=test_user.email)

test_user.refresh_token = new_refresh_token

mocker.patch(
"servicex_app.resources.users.token_refresh.UserModel.find_by_email",
return_value=test_user,
)

headers = {"Authorization": f"Bearer {old_refresh_token}"}
response: Response = client.post("/token/refresh", headers=headers)

assert response.status_code == 401
assert "Invalid or outdated refresh token" in response.json.get(
"message", ""
)