Skip to content

Commit

Permalink
Remove get_bearer_saml_assertion_gss_api()
Browse files Browse the repository at this point in the history
This change removes get_bearer_saml_assertion_gss_api() and therefore removes the need of a 3rd party dependency to pywin32 and the platform specific code in sso.py
  • Loading branch information
ddraganov committed Jan 12, 2024
1 parent 5349c65 commit 11dc306
Showing 1 changed file with 1 addition and 202 deletions.
203 changes: 1 addition & 202 deletions pyVim/sso.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#############################################################
# Copyright (c) 2012-2023 VMware, Inc.
# Copyright (c) 2012-2024 VMware, Inc.
# A python helper module to do SSO related operations.
#############################################################
__author__ = 'VMware, Inc.'
Expand All @@ -12,7 +12,6 @@
from html import escape
else:
from cgi import escape
import sys
import datetime
import base64
import hashlib
Expand Down Expand Up @@ -362,206 +361,6 @@ def get_bearer_saml_assertion(self,
{'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}),
pretty_print=False).decode(UTF_8)

def _get_gss_soap_response(self,
binary_token,
request_duration=60,
token_duration=600,
delegatable=False,
renewable=False,
ssl_context=None):
"""
Extracts the assertion from the Bearer Token received from the Security
Token Service using the binary token generated using either sspi or gss module.
@type binary_token: C{str}
@param binary_token: The security token in base64 encoded format
@type request_duration: C{long}
@param request_duration: The duration for which the request is valid. If
the STS receives this request after this
duration, it is assumed to have expired. The
duration is in seconds and the default is 60s.
@type token_duration: C{long}
@param token_duration: The duration for which the SAML token is issued
for. The duration is specified in seconds and
the default is 600s.
@type delegatable: C{boolean}
@param delegatable: Whether the generated token is delegatable or not
The default value is False
@type renewable: C{boolean}
@param renewable: Whether the generated token is renewable or not
The default value is False
@type ssl_context: C{ssl.SSLContext}
@param ssl_context: SSL context describing the various SSL options.
It is only supported in Python 2.7.9 or higher.
@rtype: C{str}
@return: The SAML assertion.
"""
request = SecurityTokenRequest(request_duration=request_duration,
token_duration=token_duration,
gss_binary_token=binary_token)
soap_message = request.construct_bearer_token_request_with_binary_token(
delegatable=delegatable, renewable=renewable)
return self.perform_request(soap_message, ssl_context=ssl_context)

def _get_bearer_saml_assertion_win(self,
request_duration=60,
token_duration=600,
delegatable=False,
renewable=False,
ssl_context=None):
"""
Extracts the assertion from the Bearer Token received from the Security
Token Service using the SSPI module.
@type request_duration: C{long}
@param request_duration: The duration for which the request is valid. If
the STS receives this request after this
duration, it is assumed to have expired. The
duration is in seconds and the default is 60s.
@type token_duration: C{long}
@param token_duration: The duration for which the SAML token is issued
for. The duration is specified in seconds and
the default is 600s.
@type delegatable: C{boolean}
@param delegatable: Whether the generated token is delegatable or not
The default value is False
@type renewable: C{boolean}
@param renewable: Whether the generated token is renewable or not
The default value is False
@type ssl_context: C{ssl.SSLContext}
@param ssl_context: SSL context describing the various SSL options.
It is only supported in Python 2.7.9 or higher.
@rtype: C{str}
@return: The SAML assertion.
"""
import sspi, win32api
spn = "sts/%s.com" % win32api.GetDomainName()
sspiclient = sspi.ClientAuth("Kerberos", targetspn=spn)
in_buf = None
err = True
# The following will keep running unless we receive a saml token or an error
while True:
err, out_buf = sspiclient.authorize(in_buf)
sectoken = base64.b64encode(out_buf[0].Buffer)
soap_response = self._get_gss_soap_response(
sectoken, request_duration, token_duration, delegatable,
renewable, ssl_context)
et = etree.fromstring(soap_response)
try:
# Check if we have received a challenge token from the server
element = _extract_element(
et, 'BinaryExchange',
{'ns': "http://docs.oasis-open.org/ws-sx/ws-trust/200512"})
negotiate_token = element.text
out_buf[0].Buffer = base64.b64decode(negotiate_token)
in_buf = out_buf
except KeyError:
# Response does not contain the negotiate token.
# It should contain SAML token then.
saml_token = etree.tostring(_extract_element(
et, 'Assertion',
{'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}),
pretty_print=False).decode(UTF_8)
break
return saml_token

def _get_bearer_saml_assertion_lin(self,
request_duration=60,
token_duration=600,
delegatable=False,
renewable=False):
"""
Extracts the assertion from the Bearer Token received from the Security
Token Service using kerberos.
@type request_duration: C{long}
@param request_duration: The duration for which the request is valid. If
the STS receives this request after this
duration, it is assumed to have expired. The
duration is in seconds and the default is 60s.
@type token_duration: C{long}
@param token_duration: The duration for which the SAML token is issued
for. The duration is specified in seconds and
the default is 600s.
@type delegatable: C{boolean}
@param delegatable: Whether the generated token is delegatable or not
The default value is False
@type renewable: C{boolean}
@param renewable: Whether the generated token is renewable or not
The default value is False
@rtype: C{str}
@return: The SAML assertion in Unicode.
"""
import kerberos, platform
service = 'host@%s' % platform.node()
_, context = kerberos.authGSSClientInit(service, 0)
challenge = ''
# The following will keep running unless we receive a saml token or an error
while True:
# Call GSS step
result = kerberos.authGSSClientStep(context, challenge)
if result < 0:
break
sectoken = kerberos.authGSSClientResponse(context)
soap_response = self._get_gss_soap_response(
sectoken, request_duration, token_duration, delegatable,
renewable)
et = etree.fromstring(soap_response)
try:
# Check if we have received a challenge token from the server
element = _extract_element(
et, 'BinaryExchange',
{'ns': "http://docs.oasis-open.org/ws-sx/ws-trust/200512"})
negotiate_token = element.text
challenge = negotiate_token
except KeyError:
# Response does not contain the negotiate token.
# It should contain SAML token then.
saml_token = etree.tostring(_extract_element(
et, 'Assertion',
{'saml2': "urn:oasis:names:tc:SAML:2.0:assertion"}),
pretty_print=False).decode(UTF_8)
break
return saml_token

def get_bearer_saml_assertion_gss_api(self,
request_duration=60,
token_duration=600,
delegatable=False,
renewable=False):
"""
Extracts the assertion from the Bearer Token received from the Security
Token Service using the GSS API.
@type request_duration: C{long}
@param request_duration: The duration for which the request is valid. If
the STS receives this request after this
duration, it is assumed to have expired. The
duration is in seconds and the default is 60s.
@type token_duration: C{long}
@param token_duration: The duration for which the SAML token is issued
for. The duration is specified in seconds and
the default is 600s.
@type delegatable: C{boolean}
@param delegatable: Whether the generated token is delegatable or not
The default value is False
@type renewable: C{boolean}
@param renewable: Whether the generated token is renewable or not
The default value is False
@rtype: C{str}
@return: The SAML assertion.
"""
if sys.platform == "win32":
saml_token = self._get_bearer_saml_assertion_win(
request_duration, token_duration, delegatable, renewable)
else:
raise Exception("Currently, not supported on this platform")
# TODO Remove this exception once SSO supports validation of tickets
# generated against host machines
# saml_token = self._get_bearer_saml_assertion_lin(request_duration, token_duration, delegatable)
return saml_token

def get_hok_saml_assertion(self,
public_key,
private_key,
Expand Down

0 comments on commit 11dc306

Please sign in to comment.