-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
201 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
from clients.teams import TeamsClient | ||
|
||
__all__ = [ | ||
"TeamsClient", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
import os | ||
from keycloak.authorization import Authorization | ||
|
||
import requests | ||
|
||
|
||
class TeamsClient: | ||
@staticmethod | ||
def has_member(access_token: str, team_id: str, user_id: str): | ||
r = requests.get( | ||
url=f"{os.environ['TEAMS_API_URL']}/teams/{team_id}/members/{user_id}", | ||
headers={"Authorization": f"Bearer {access_token}"}, | ||
) | ||
r.raise_for_status() | ||
return r.status_code == 200 | ||
|
||
@staticmethod | ||
def has_role(access_token: str, team_id: str, role: str): | ||
r = requests.get( | ||
url=f"{os.environ['TEAMS_API_URL']}/teams/{team_id}/roles", | ||
headers={"Authorization": f"Bearer {access_token}"}, | ||
) | ||
r.raise_for_status() | ||
return role in r.json() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
|
||
class MaskinportenClientIn(BaseModel): | ||
team_id: str | ||
name: str | ||
description: str | ||
scopes: list[str] | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import os | ||
|
||
from fastapi import Depends, status | ||
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer | ||
from keycloak.keycloak_openid import KeycloakOpenID | ||
from requests.exceptions import HTTPError | ||
|
||
from clients import TeamsClient | ||
from models import MaskinportenClientIn | ||
from resources.errors import ErrorResponse | ||
|
||
|
||
http_bearer = HTTPBearer(scheme_name="Keycloak token") | ||
|
||
|
||
class ServiceClient: | ||
keycloak: KeycloakOpenID | ||
|
||
def __init__(self): | ||
self.keycloak = KeycloakOpenID( | ||
server_url=f"{os.environ['KEYCLOAK_SERVER']}/auth/", | ||
realm_name=os.environ["KEYCLOAK_REALM"], | ||
client_id=os.environ["CLIENT_ID"], | ||
client_secret_key=os.environ["CLIENT_SECRET"], | ||
) | ||
|
||
@property | ||
def authorization_header(self): | ||
response = self.keycloak.token(grant_type=["client_credentials"]) | ||
access_token = f"{response['token_type']} {response['access_token']}" | ||
return {"Authorization": access_token} | ||
|
||
|
||
class Auth: | ||
principal_id: str | ||
bearer_token: str | ||
service_client: ServiceClient | ||
|
||
def __init__( | ||
self, | ||
authorization: HTTPAuthorizationCredentials = Depends(http_bearer), | ||
service_client: ServiceClient = Depends(), | ||
): | ||
introspected = service_client.keycloak.introspect(authorization.credentials) | ||
|
||
if not introspected["active"]: | ||
raise ErrorResponse(status.HTTP_401_UNAUTHORIZED, "Invalid access token") | ||
|
||
self.principal_id = introspected["username"] | ||
self.bearer_token = authorization.credentials | ||
self.service_client = service_client | ||
|
||
|
||
def is_team_member( | ||
body: MaskinportenClientIn, | ||
auth: Auth = Depends(), | ||
): | ||
"""Pass through without exception if user is a team member.""" | ||
try: | ||
if not TeamsClient.has_member( | ||
auth.bearer_token, body.team_id, auth.principal_id | ||
): | ||
raise ErrorResponse(status.HTTP_403_FORBIDDEN, "Forbidden") | ||
except HTTPError as e: | ||
if e.response.status_code == 404: | ||
raise ErrorResponse( | ||
status.HTTP_400_BAD_REQUEST, | ||
"User is not a member of specified team", | ||
) | ||
raise ErrorResponse(status.HTTP_500_INTERNAL_SERVER_ERROR, "Server error") | ||
|
||
|
||
def has_team_role(role: str): | ||
def _verify_team_role( | ||
body: MaskinportenClientIn, | ||
auth: Auth = Depends(), | ||
): | ||
"""Pass through without exception if specified team is assigned `role`.""" | ||
try: | ||
if not TeamsClient.has_role(auth.bearer_token, body.team_id, role): | ||
raise ErrorResponse( | ||
status.HTTP_403_FORBIDDEN, | ||
f"Team is not assigned role {role}", | ||
) | ||
except HTTPError as e: | ||
if e.response.status_code == 404: | ||
raise ErrorResponse(status.HTTP_400_BAD_REQUEST, "Team does not exist") | ||
raise ErrorResponse(status.HTTP_500_INTERNAL_SERVER_ERROR, "Server error") | ||
|
||
return _verify_team_role |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
from typing import Optional, TypedDict | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class ErrorResponse(Exception): | ||
def __init__(self, status_code: int, message: Optional[str] = None): | ||
self.status_code = status_code | ||
self.message = message | ||
|
||
|
||
class Message(BaseModel): | ||
message: Optional[str] | ||
|
||
|
||
def error_message_models(*status_codes) -> dict: | ||
return {code: {"model": Message} for code in status_codes} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters