Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inject current request in security handlers #1883

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
class AbstractSecurityHandler:

required_scopes_kw = "required_scopes"
request_kw = "request"
client = None
security_definition_key: str
"""The key which contains the value for the function name to resolve."""
Expand Down Expand Up @@ -106,12 +107,12 @@ def _get_function(
return default

def _generic_check(self, func, exception_msg):
need_to_add_required_scopes = self._need_to_add_scopes(func)

async def wrapper(request, *args, required_scopes=None):
kwargs = {}
if need_to_add_required_scopes:
if self._accepts_kwarg(func, self.required_scopes_kw):
kwargs[self.required_scopes_kw] = required_scopes
if self._accepts_kwarg(func, self.request_kw):
kwargs[self.request_kw] = request
token_info = func(*args, **kwargs)
while asyncio.iscoroutine(token_info):
token_info = await token_info
Expand Down Expand Up @@ -140,10 +141,11 @@ def get_auth_header_value(request):
raise OAuthProblem(detail="Invalid authorization header")
return auth_type.lower(), value

def _need_to_add_scopes(self, func):
@staticmethod
def _accepts_kwarg(func: t.Callable, keyword: str) -> bool:
"""Check if the function accepts the provided keyword argument."""
arguments, has_kwargs = inspect_function_arguments(func)
need_required_scopes = has_kwargs or self.required_scopes_kw in arguments
return need_required_scopes
return has_kwargs or keyword in arguments

def _resolve_func(self, security_scheme):
"""
Expand Down
5 changes: 5 additions & 0 deletions docs/security.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ The function should accept the following arguments:
- username
- password
- required_scopes (optional)
- request (optional)

You can find a `minimal Basic Auth example application`_ in Connexion's "examples" folder.

Expand All @@ -85,6 +86,7 @@ The function should accept the following arguments:

- token
- required_scopes (optional)
- request (optional)

You can find a `minimal Bearer example application`_ in Connexion's "examples" folder.

Expand All @@ -100,6 +102,7 @@ The function should accept the following arguments:

- apikey
- required_scopes (optional)
- request (optional)

You can find a `minimal API Key example application`_ in Connexion's "examples" folder.

Expand All @@ -115,6 +118,7 @@ The function should accept the following arguments:

- token
- required_scopes (optional)
- request (optional)

As alternative to an ``x-tokenInfoFunc`` definition, you can set an ``x-tokenInfoUrl`` definition or
``TOKENINFO_URL`` environment variable, and connexion will call the url instead of a local
Expand All @@ -132,6 +136,7 @@ The function should accept the following arguments:

- required_scopes
- token_scopes
- request (optional)

and return a boolean indicating if the validation was successful.

Expand Down
49 changes: 49 additions & 0 deletions tests/decorators/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,52 @@ def test_raise_most_specific(errors, most_specific):
security_handler_factory = SecurityHandlerFactory()
with pytest.raises(most_specific):
security_handler_factory._raise_most_specific(errors)


async def test_optional_kwargs_injected():
"""Test that optional keyword arguments 'required_scopes' and 'request' are injected when
defined as arguments in the user security function. This test uses the ApiKeySecurityHandler,
but the tested behavior is generic across handlers."""
security_handler_factory = ApiKeySecurityHandler()

request = ConnexionRequest(
scope={"type": "http", "headers": [[b"x-auth", b"foobar"]]}
)

def apikey_info_no_kwargs(key):
"""Will fail if additional keywords are injected."""
return {"sub": "no_kwargs"}

wrapped_func_no_kwargs = security_handler_factory._get_verify_func(
apikey_info_no_kwargs, "header", "X-Auth"
)
assert await wrapped_func_no_kwargs(request) == {"sub": "no_kwargs"}

def apikey_info_request(key, request):
"""Will fail if request is not injected."""
return {"sub": "request"}

wrapped_func_request = security_handler_factory._get_verify_func(
apikey_info_request, "header", "X-Auth"
)
assert await wrapped_func_request(request) == {"sub": "request"}

def apikey_info_scopes(key, required_scopes):
"""Will fail if required_scopes is not injected."""
return {"sub": "scopes"}

wrapped_func_scopes = security_handler_factory._get_verify_func(
apikey_info_scopes, "header", "X-Auth"
)
assert await wrapped_func_scopes(request) == {"sub": "scopes"}

def apikey_info_kwargs(key, **kwargs):
"""Will fail if request and required_scopes are not injected."""
assert "request" in kwargs
assert "required_scopes" in kwargs
return {"sub": "kwargs"}

wrapped_func_kwargs = security_handler_factory._get_verify_func(
apikey_info_kwargs, "header", "X-Auth"
)
assert await wrapped_func_kwargs(request) == {"sub": "kwargs"}
Loading