Skip to content

Commit

Permalink
Fix errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tito97sp committed Mar 24, 2024
1 parent 46a5fd0 commit 47ac23a
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1158,15 +1158,13 @@ def update_params_for_auth(
)
try:
security_scheme_instance = self.configuration.security_scheme_info[security_scheme_component_name]
oauth_server_client_info = self.configuration.oauth_server_client_info
security_scheme_instance.apply_auth(
headers=headers,
resource_path=resource_path,
method=method,
body=body,
query_params_suffix=query_params_suffix,
scope_names=scope_names,
client_info=oauth_server_client_info
)
except KeyError as ex:
raise ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
pass

Expand All @@ -96,7 +96,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
if self.in_location is ApiKeyInLocation.COOKIE:
headers.add('Cookie', self.api_key)
Expand Down Expand Up @@ -134,7 +134,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
user_pass = f"{self.user_id}:{self.password}"
b64_user_pass = base64.b64encode(user_pass.encode(encoding=self.encoding))
Expand All @@ -156,7 +156,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
headers.add('Authorization', f"Bearer {self.access_token}")

Expand All @@ -174,7 +174,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("HTTPDigestSecurityScheme not yet implemented")

Expand All @@ -191,7 +191,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("MutualTLSSecurityScheme not yet implemented")

Expand All @@ -209,7 +209,7 @@ def set_token(


@dataclasses.dataclass
class ImplicitOAuthFlow(__SecuritySchemeBase, OAuthFlowBase):
class ImplicitOAuthFlow(OAuthFlowBase):
authorization_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -225,8 +225,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
"""
Not implemented because this flow requires the user to visit a webpage and grant access
Expand All @@ -236,7 +236,7 @@ def apply_auth(


@dataclasses.dataclass
class PasswordOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class PasswordOauthFlow(OAuthFlowBase):
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -256,8 +256,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
token = self._scope_names_to_token.get(scope_names)
if token:
Expand All @@ -282,7 +282,7 @@ def apply_auth(


@dataclasses.dataclass
class ClientCredentialsOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class ClientCredentialsOauthFlow(OAuthFlowBase):
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -300,8 +300,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
token = self._scope_names_to_token.get(scope_names)
if token:
Expand All @@ -325,7 +325,7 @@ def apply_auth(


@dataclasses.dataclass
class AuthorizationCodeOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class AuthorizationCodeOauthFlow(OAuthFlowBase):
authorization_url: parse.ParseResult
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
Expand All @@ -342,8 +342,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
"""
Not implemented because this flow requires the user to visit a webpage and grant access
Expand Down Expand Up @@ -372,14 +372,14 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oath_server_client_info: OauthServerClientInfo = {}
) -> None:
if not self.flows:
raise exceptions.ApiValueError('flows are not defined and are required, define them')
if not scope_names:
raise exceptions.ApiValueError('scope_names are not defined and are required, define them')
if not client_info:
raise exceptions.ApiValueError('client_info is not defined and is required, define it')
if not oath_server_client_info:
raise exceptions.ApiValueError('oath_server_client_info is not defined and is required, define it')
chosen_flows = []
for flow in [self.flows.implicit, self.flows.password, self.flows.client_credentials, self.flows.authorization_code]:
if flow is None:
Expand All @@ -400,12 +400,12 @@ def apply_auth(
"flow may contain the scopes"
)
chosen_flow = chosen_flows[0]
if chosen_flow.auth_or_token_url.netloc not in client_info:
if chosen_flow.auth_or_token_url.netloc not in oath_server_client_info:
raise exceptions.ApiValueError(
f"client_info is missing info for oauth server "
f"oauth_server_client_info is missing info for oauth server "
"hostname={chosen_flow.auth_or_token_url.netloc}. Add it to you api_configuration"
)
client_info = client_info[chosen_flow.auth_or_token_url.netloc]
client_info = oath_server_client_info[chosen_flow.auth_or_token_url.netloc]
# note: scope input must be sorted tuple
chosen_flow.apply_auth(
headers=headers,
Expand All @@ -430,7 +430,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("OpenIdConnectSecurityScheme not yet implemented")

Expand Down
2 changes: 0 additions & 2 deletions samples/client/petstore/python/src/petstore_api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1160,15 +1160,13 @@ def update_params_for_auth(
)
try:
security_scheme_instance = self.configuration.security_scheme_info[security_scheme_component_name]
oauth_server_client_info = self.configuration.oauth_server_client_info
security_scheme_instance.apply_auth(
headers=headers,
resource_path=resource_path,
method=method,
body=body,
query_params_suffix=query_params_suffix,
scope_names=scope_names,
client_info=oauth_server_client_info
)
except KeyError as ex:
raise ex
Expand Down
52 changes: 26 additions & 26 deletions samples/client/petstore/python/src/petstore_api/security_schemes.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
pass

Expand All @@ -97,7 +97,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
if self.in_location is ApiKeyInLocation.COOKIE:
headers.add('Cookie', self.api_key)
Expand Down Expand Up @@ -135,7 +135,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
user_pass = f"{self.user_id}:{self.password}"
b64_user_pass = base64.b64encode(user_pass.encode(encoding=self.encoding))
Expand All @@ -157,7 +157,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
headers.add('Authorization', f"Bearer {self.access_token}")

Expand All @@ -176,7 +176,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
auth_headers = self.signing_info.get_http_signature_headers(
resource_path, method, headers, body, query_params_suffix)
Expand All @@ -197,7 +197,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("HTTPDigestSecurityScheme not yet implemented")

Expand All @@ -214,7 +214,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("MutualTLSSecurityScheme not yet implemented")

Expand All @@ -232,7 +232,7 @@ def set_token(


@dataclasses.dataclass
class ImplicitOAuthFlow(__SecuritySchemeBase, OAuthFlowBase):
class ImplicitOAuthFlow(OAuthFlowBase):
authorization_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -248,8 +248,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
"""
Not implemented because this flow requires the user to visit a webpage and grant access
Expand All @@ -259,7 +259,7 @@ def apply_auth(


@dataclasses.dataclass
class PasswordOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class PasswordOauthFlow(OAuthFlowBase):
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -279,8 +279,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
token = self._scope_names_to_token.get(scope_names)
if token:
Expand All @@ -305,7 +305,7 @@ def apply_auth(


@dataclasses.dataclass
class ClientCredentialsOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class ClientCredentialsOauthFlow(OAuthFlowBase):
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
refresh_url: typing.Optional[str] = None
Expand All @@ -323,8 +323,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
token = self._scope_names_to_token.get(scope_names)
if token:
Expand All @@ -348,7 +348,7 @@ def apply_auth(


@dataclasses.dataclass
class AuthorizationCodeOauthFlow(__SecuritySchemeBase, OAuthFlowBase):
class AuthorizationCodeOauthFlow(OAuthFlowBase):
authorization_url: parse.ParseResult
token_url: parse.ParseResult
scopes: typing.Dict[str, str]
Expand All @@ -365,8 +365,8 @@ def apply_auth(
method: str,
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
client_info: OauthClientInfo,
scope_names: typing.Tuple[str, ...] = ()
) -> None:
"""
Not implemented because this flow requires the user to visit a webpage and grant access
Expand Down Expand Up @@ -395,14 +395,14 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oath_server_client_info: OauthServerClientInfo = {}
) -> None:
if not self.flows:
raise exceptions.ApiValueError('flows are not defined and are required, define them')
if not scope_names:
raise exceptions.ApiValueError('scope_names are not defined and are required, define them')
if not client_info:
raise exceptions.ApiValueError('client_info is not defined and is required, define it')
if not oath_server_client_info:
raise exceptions.ApiValueError('oath_server_client_info is not defined and is required, define it')
chosen_flows = []
for flow in [self.flows.implicit, self.flows.password, self.flows.client_credentials, self.flows.authorization_code]:
if flow is None:
Expand All @@ -423,12 +423,12 @@ def apply_auth(
"flow may contain the scopes"
)
chosen_flow = chosen_flows[0]
if chosen_flow.auth_or_token_url.netloc not in client_info:
if chosen_flow.auth_or_token_url.netloc not in oath_server_client_info:
raise exceptions.ApiValueError(
f"client_info is missing info for oauth server "
f"oauth_server_client_info is missing info for oauth server "
"hostname={chosen_flow.auth_or_token_url.netloc}. Add it to you api_configuration"
)
client_info = client_info[chosen_flow.auth_or_token_url.netloc]
client_info = oath_server_client_info[chosen_flow.auth_or_token_url.netloc]
# note: scope input must be sorted tuple
chosen_flow.apply_auth(
headers=headers,
Expand All @@ -453,7 +453,7 @@ def apply_auth(
body: typing.Optional[typing.Union[str, bytes]],
query_params_suffix: typing.Optional[str],
scope_names: typing.Tuple[str, ...] = (),
client_info: OauthServerClientInfo = {}
oauth_server_client_info: OauthServerClientInfo = {}
) -> None:
raise NotImplementedError("OpenIdConnectSecurityScheme not yet implemented")

Expand Down

0 comments on commit 47ac23a

Please sign in to comment.