Skip to content

Commit

Permalink
NEF verifies single jwt token - either from NEF or CAPIF
Browse files Browse the repository at this point in the history
  • Loading branch information
JFrgs committed May 31, 2023
1 parent cca77cc commit 16483f5
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 71 deletions.
2 changes: 2 additions & 0 deletions backend/app/app/api/api_v1/endpoints/monitoringevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def create_subscription(
ccf_logs(http_request, json_response, "service_monitoring_event.json", token_payload.get("sub"))
except TypeError as error:
logging.error(f"Error: {error}")
except AttributeError as error:
logging.error(f"Error: {error}")

return http_response
elif (item_in.monitoringType == "LOSS_OF_CONNECTIVITY" or item_in.monitoringType == "UE_REACHABILITY") and item_in.maximumNumberOfReports == 1:
Expand Down
54 changes: 27 additions & 27 deletions backend/app/app/api/deps.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Generator, Dict
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt
from pydantic import ValidationError
from sqlalchemy.orm import Session
Expand All @@ -9,7 +10,7 @@
from app.db.session import SessionLocal
from app.core.config import settings

reusable_oauth2 = security.OAuth2TwoTokensBearer(
reusable_oauth2 = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/login/access-token"
)

Expand All @@ -21,18 +22,13 @@ def get_db() -> Generator:
finally:
db.close() #The code following the yield statement is executed after the response has been delivered

def verify_with_public_key(tokens: Dict[str, str] = Depends(reusable_oauth2)):
def verify_with_public_key(token: str = Depends(reusable_oauth2)):
'''
Enable verification of a token generated by CAPIF for an API Invoker.
If env variable USE_PUBLIC_KEY_VERIFICATION is set to False, the verification is ommited.
'''
if settings.USE_PUBLIC_KEY_VERIFICATION == True:
try:
if "capif_token" in tokens:
token = tokens.get("capif_token")
else:
token = tokens.get("token")

payload = jwt.decode(
token, security.extract_public_key('/app/app/core/certificates/capif_cert_server.pem'), algorithms=[security.ALGORITHM[1]]
)
Expand All @@ -46,27 +42,31 @@ def verify_with_public_key(tokens: Dict[str, str] = Depends(reusable_oauth2)):
return None

def get_current_user(
db: Session = Depends(get_db), tokens: Dict[str, str] = Depends(reusable_oauth2)
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> models.User:
try:
if "nef_token" in tokens:
token = tokens.get("nef_token")
else:
token = tokens.get("token")

payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM[0]]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials - Token from NEF",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
'''
Enable verification of a token generated by CAPIF for an API Invoker.
If env variable USE_PUBLIC_KEY_VERIFICATION is set to False, NEF verifies the access token acquired from NEF.
Else NEF vefirication token is ommited and the function returns the default user
'''
if settings.USE_PUBLIC_KEY_VERIFICATION == False:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[security.ALGORITHM[0]]
)
token_data = schemas.TokenPayload(**payload)
except (jwt.JWTError, ValidationError):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials - Token from NEF",
)
user = crud.user.get(db, id=token_data.sub)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
else:
user = crud.user.get_by_email(db, email="admin@my-email.com")
return user


def get_current_active_user(
Expand Down
88 changes: 44 additions & 44 deletions backend/app/app/core/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,61 @@
from OpenSSL import crypto
from jose import jwt
from passlib.context import CryptContext
from typing import Optional, Dict, Tuple
from fastapi import HTTPException, Request, status
from fastapi.security import OAuth2
from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
from fastapi.security.utils import get_authorization_scheme_param
# from typing import Optional, Dict, Tuple
# from fastapi import HTTPException, Request, status
# from fastapi.security import OAuth2
# from fastapi.openapi.models import OAuthFlows as OAuthFlowsModel
# from fastapi.security.utils import get_authorization_scheme_param
from app.core.config import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


ALGORITHM = ("HS256", "RS256")

class OAuth2TwoTokensBearer(OAuth2):
'''
Override OAuth2 class based on FastAPI's OAuth2PasswordBearer to support two tokens bearer to authorise either NEF or CAPIF jtw tokens
# class OAuth2TwoTokensBearer(OAuth2):
# '''
# Override OAuth2 class based on FastAPI's OAuth2PasswordBearer to support two tokens bearer to authorise either NEF or CAPIF jtw tokens

This implementation takes the Authorization header and splits the token parameter into two tokens, assuming they are separated by a comma. It returns a tuple containing the two tokens.
'''
def __init__(
self,
tokenUrl: str,
scheme_name: Optional[str] = None,
scopes: Optional[Dict[str, str]] = None,
description: Optional[str] = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)
# This implementation takes the Authorization header and splits the token parameter into two tokens, assuming they are separated by a comma. It returns a tuple containing the two tokens.
# '''
# def __init__(
# self,
# tokenUrl: str,
# scheme_name: Optional[str] = None,
# scopes: Optional[Dict[str, str]] = None,
# description: Optional[str] = None,
# auto_error: bool = True,
# ):
# if not scopes:
# scopes = {}
# flows = OAuthFlowsModel(password={"tokenUrl": tokenUrl, "scopes": scopes})
# super().__init__(
# flows=flows,
# scheme_name=scheme_name,
# description=description,
# auto_error=auto_error,
# )

async def __call__(self, request: Request) -> Optional[Tuple[str, str]]:
authorization: str = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
# async def __call__(self, request: Request) -> Optional[Tuple[str, str]]:
# authorization: str = request.headers.get("Authorization")
# scheme, param = get_authorization_scheme_param(authorization)
# if not authorization or scheme.lower() != "bearer":
# if self.auto_error:
# raise HTTPException(
# status_code=status.HTTP_401_UNAUTHORIZED,
# detail="Not authenticated",
# headers={"WWW-Authenticate": "Bearer"},
# )
# else:
# return None

try:
nef_token, capif_token = param.split(',')
except ValueError as ex:
return {"token" : param}
# try:
# nef_token, capif_token = param.split(',')
# except ValueError as ex:
# return {"token" : param}

return {"nef_token" : nef_token, "capif_token" : capif_token}
# return {"nef_token" : nef_token, "capif_token" : capif_token}


def create_access_token(
Expand Down

0 comments on commit 16483f5

Please sign in to comment.