diff --git a/pyVim/sso.py b/pyVim/sso.py index 0809bbd2..b5ed1dd6 100644 --- a/pyVim/sso.py +++ b/pyVim/sso.py @@ -1,5 +1,5 @@ ############################################################# -# Copyright (c) 2012-2023 VMware, Inc. +# Copyright (c) 2012-2024 VMware, Inc. # A python helper module to do SSO related operations. ############################################################# __author__ = 'VMware, Inc.' @@ -12,7 +12,6 @@ from html import escape else: from cgi import escape -import sys import datetime import base64 import hashlib @@ -362,206 +361,6 @@ def get_bearer_saml_assertion(self, {'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}), pretty_print=False).decode(UTF_8) - def _get_gss_soap_response(self, - binary_token, - request_duration=60, - token_duration=600, - delegatable=False, - renewable=False, - ssl_context=None): - """ - Extracts the assertion from the Bearer Token received from the Security - Token Service using the binary token generated using either sspi or gss module. - - @type binary_token: C{str} - @param binary_token: The security token in base64 encoded format - - @type request_duration: C{long} - @param request_duration: The duration for which the request is valid. If - the STS receives this request after this - duration, it is assumed to have expired. The - duration is in seconds and the default is 60s. - @type token_duration: C{long} - @param token_duration: The duration for which the SAML token is issued - for. The duration is specified in seconds and - the default is 600s. - @type delegatable: C{boolean} - @param delegatable: Whether the generated token is delegatable or not - The default value is False - @type renewable: C{boolean} - @param renewable: Whether the generated token is renewable or not - The default value is False - @type ssl_context: C{ssl.SSLContext} - @param ssl_context: SSL context describing the various SSL options. - It is only supported in Python 2.7.9 or higher. - @rtype: C{str} - @return: The SAML assertion. - """ - request = SecurityTokenRequest(request_duration=request_duration, - token_duration=token_duration, - gss_binary_token=binary_token) - soap_message = request.construct_bearer_token_request_with_binary_token( - delegatable=delegatable, renewable=renewable) - return self.perform_request(soap_message, ssl_context=ssl_context) - - def _get_bearer_saml_assertion_win(self, - request_duration=60, - token_duration=600, - delegatable=False, - renewable=False, - ssl_context=None): - """ - Extracts the assertion from the Bearer Token received from the Security - Token Service using the SSPI module. - - @type request_duration: C{long} - @param request_duration: The duration for which the request is valid. If - the STS receives this request after this - duration, it is assumed to have expired. The - duration is in seconds and the default is 60s. - @type token_duration: C{long} - @param token_duration: The duration for which the SAML token is issued - for. The duration is specified in seconds and - the default is 600s. - @type delegatable: C{boolean} - @param delegatable: Whether the generated token is delegatable or not - The default value is False - @type renewable: C{boolean} - @param renewable: Whether the generated token is renewable or not - The default value is False - @type ssl_context: C{ssl.SSLContext} - @param ssl_context: SSL context describing the various SSL options. - It is only supported in Python 2.7.9 or higher. - @rtype: C{str} - @return: The SAML assertion. - """ - import sspi, win32api - spn = "sts/%s.com" % win32api.GetDomainName() - sspiclient = sspi.ClientAuth("Kerberos", targetspn=spn) - in_buf = None - err = True - # The following will keep running unless we receive a saml token or an error - while True: - err, out_buf = sspiclient.authorize(in_buf) - sectoken = base64.b64encode(out_buf[0].Buffer) - soap_response = self._get_gss_soap_response( - sectoken, request_duration, token_duration, delegatable, - renewable, ssl_context) - et = etree.fromstring(soap_response) - try: - # Check if we have received a challenge token from the server - element = _extract_element( - et, 'BinaryExchange', - {'ns': "http://docs.oasis-open.org/ws-sx/ws-trust/200512"}) - negotiate_token = element.text - out_buf[0].Buffer = base64.b64decode(negotiate_token) - in_buf = out_buf - except KeyError: - # Response does not contain the negotiate token. - # It should contain SAML token then. - saml_token = etree.tostring(_extract_element( - et, 'Assertion', - {'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}), - pretty_print=False).decode(UTF_8) - break - return saml_token - - def _get_bearer_saml_assertion_lin(self, - request_duration=60, - token_duration=600, - delegatable=False, - renewable=False): - """ - Extracts the assertion from the Bearer Token received from the Security - Token Service using kerberos. - - @type request_duration: C{long} - @param request_duration: The duration for which the request is valid. If - the STS receives this request after this - duration, it is assumed to have expired. The - duration is in seconds and the default is 60s. - @type token_duration: C{long} - @param token_duration: The duration for which the SAML token is issued - for. The duration is specified in seconds and - the default is 600s. - @type delegatable: C{boolean} - @param delegatable: Whether the generated token is delegatable or not - The default value is False - @type renewable: C{boolean} - @param renewable: Whether the generated token is renewable or not - The default value is False - @rtype: C{str} - @return: The SAML assertion in Unicode. - """ - import kerberos, platform - service = 'host@%s' % platform.node() - _, context = kerberos.authGSSClientInit(service, 0) - challenge = '' - # The following will keep running unless we receive a saml token or an error - while True: - # Call GSS step - result = kerberos.authGSSClientStep(context, challenge) - if result < 0: - break - sectoken = kerberos.authGSSClientResponse(context) - soap_response = self._get_gss_soap_response( - sectoken, request_duration, token_duration, delegatable, - renewable) - et = etree.fromstring(soap_response) - try: - # Check if we have received a challenge token from the server - element = _extract_element( - et, 'BinaryExchange', - {'ns': "http://docs.oasis-open.org/ws-sx/ws-trust/200512"}) - negotiate_token = element.text - challenge = negotiate_token - except KeyError: - # Response does not contain the negotiate token. - # It should contain SAML token then. - saml_token = etree.tostring(_extract_element( - et, 'Assertion', - {'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}), - pretty_print=False).decode(UTF_8) - break - return saml_token - - def get_bearer_saml_assertion_gss_api(self, - request_duration=60, - token_duration=600, - delegatable=False, - renewable=False): - """ - Extracts the assertion from the Bearer Token received from the Security - Token Service using the GSS API. - - @type request_duration: C{long} - @param request_duration: The duration for which the request is valid. If - the STS receives this request after this - duration, it is assumed to have expired. The - duration is in seconds and the default is 60s. - @type token_duration: C{long} - @param token_duration: The duration for which the SAML token is issued - for. The duration is specified in seconds and - the default is 600s. - @type delegatable: C{boolean} - @param delegatable: Whether the generated token is delegatable or not - The default value is False - @type renewable: C{boolean} - @param renewable: Whether the generated token is renewable or not - The default value is False - @rtype: C{str} - @return: The SAML assertion. - """ - if sys.platform == "win32": - saml_token = self._get_bearer_saml_assertion_win( - request_duration, token_duration, delegatable, renewable) - else: - raise Exception("Currently, not supported on this platform") - # TODO Remove this exception once SSO supports validation of tickets - # generated against host machines - # saml_token = self._get_bearer_saml_assertion_lin(request_duration, token_duration, delegatable) - return saml_token - def get_hok_saml_assertion(self, public_key, private_key,