-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
♻️ Extract AuthRegistry; move bulk of request and response processing…
… to OperationModel.
- Loading branch information
Raphael Krupinski
committed
Jun 1, 2024
1 parent
fed7e51
commit 53c8f27
Showing
9 changed files
with
315 additions
and
298 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
from collections.abc import Iterable, Mapping, MutableMapping | ||
from typing import Optional | ||
|
||
import httpx | ||
|
||
from .._httpx import AuthType | ||
from ..types_ import MultiAuth, SecurityRequirements | ||
|
||
|
||
class AuthRegistry: | ||
def __init__(self, security: Optional[Iterable[SecurityRequirements]]): | ||
# Every Auth instance the user code authenticated with | ||
self._auth: MutableMapping[str, httpx.Auth] = {} | ||
|
||
# (Multi)Auth instance for every operation and the client | ||
self._auth_cache: MutableMapping[str, httpx.Auth] = {} | ||
|
||
# Client-wide security requirements | ||
self._security = security | ||
|
||
def resolve_auth(self, name: str, security: Optional[Iterable[SecurityRequirements]]) -> AuthType: | ||
if security: | ||
sec_name = name | ||
sec_source = security | ||
elif self._security: | ||
sec_name = '*' | ||
sec_source = self._security | ||
else: | ||
sec_name = None | ||
sec_source = None | ||
|
||
if sec_source: | ||
assert sec_name | ||
if sec_name not in self._auth_cache: | ||
auth = self._mk_auth(sec_source) | ||
self._auth_cache[sec_name] = auth | ||
else: | ||
auth = self._auth_cache[sec_name] | ||
return auth | ||
else: | ||
return None | ||
|
||
def _mk_auth(self, security: Iterable[SecurityRequirements]) -> httpx.Auth: | ||
security = list(security) | ||
assert security | ||
last_error: Optional[Exception] = None | ||
for requirements in security: | ||
try: | ||
auth = _build_auth(self._auth, requirements) | ||
break | ||
except ValueError as ve: | ||
last_error = ve | ||
continue | ||
else: | ||
assert last_error | ||
# due to asserts and break above, we never enter here, unless ValueError was raised | ||
raise last_error # noqa | ||
return auth | ||
|
||
def authenticate(self, auth_models: Mapping[str, httpx.Auth]) -> None: | ||
self._auth.update(auth_models) | ||
self._auth_cache.clear() | ||
|
||
def deauthenticate(self, sec_names: Iterable[str]) -> None: | ||
if sec_names: | ||
for sec_name in sec_names: | ||
del self._auth[sec_name] | ||
else: | ||
self._auth.clear() | ||
self._auth_cache.clear() | ||
|
||
|
||
def _build_auth(schemes: Mapping[str, httpx.Auth], requirements: SecurityRequirements) -> httpx.Auth: | ||
auth_flows = [] | ||
for scheme, scopes in requirements.items(): | ||
auth_flow = schemes.get(scheme) | ||
if not auth_flow: | ||
raise ValueError('Not authenticated', scheme) | ||
auth_flows.append(auth_flow) | ||
return MultiAuth(*auth_flows) |
Oops, something went wrong.