diff --git a/CHANGELOG.md b/CHANGELOG.md index fee354d..fed943d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,12 @@ # Changelog -## 0.10.3 - TBD +## 0.11.0 - TBD * Support input password string encoded with the `surrogatepass` error option * This allows the caller to provide a password for a gMSA or machine account that could contain invalid surrogate pairs for both NTLM and Kerberos auth. * Stop using deprecated `datetime.dateime.utcnow()` for CredSSP acceptor context +* Treat an empty string as a valid password, `None` is kept as use the cached credential +* Improve the exception shown when no password was provided and no cached credential was available ## 0.10.2 - 2023-10-04 diff --git a/src/spnego/_ntlm.py b/src/spnego/_ntlm.py index 99d5ed3..0d85231 100644 --- a/src/spnego/_ntlm.py +++ b/src/spnego/_ntlm.py @@ -135,7 +135,9 @@ def _get_credential( https://asecuritysite.com/encryption/lmhash """ if not store: - raise OperationNotAvailableError(context_msg="Retrieving NTLM store without NTLM_USER_FILE set to a filepath") + raise OperationNotAvailableError( + context_msg="No username or password was specified and the credential cache did not exist or contained no credentials" + ) domain = domain or "" @@ -178,7 +180,7 @@ def store_lines( else: raise SpnegoError( ErrorCode.failure, - context_msg="Failed to find any matching credential in " "NTLM_USER_FILE credential store.", + context_msg="Failed to find any matching credential in NTLM_USER_FILE credential store.", ) @@ -306,7 +308,7 @@ def __init__( # Make sure that the credential file is set and exists if not _get_credential_file(): raise OperationNotAvailableError( - context_msg="Retrieving NTLM store without NTLM_USER_FILE set to a " "filepath" + context_msg="NTLM acceptor requires NTLM credential cache to be provided through the env var NTLM_USER_FILE set to a filepath" ) self._temp_negotiate: typing.Optional[Negotiate] = None diff --git a/src/spnego/_version.py b/src/spnego/_version.py index 300b5f3..a7962ed 100644 --- a/src/spnego/_version.py +++ b/src/spnego/_version.py @@ -1,4 +1,4 @@ # Copyright: (c) 2020, Jordan Borean (@jborean93) # MIT License (see LICENSE or https://opensource.org/licenses/MIT) -__version__ = "0.10.3" +__version__ = "0.11.0" diff --git a/tests/integration/inventory.yml b/tests/integration/inventory.yml index 3ca3d37..ba870f2 100644 --- a/tests/integration/inventory.yml +++ b/tests/integration/inventory.yml @@ -27,6 +27,8 @@ all: - C:\Program Files (x86)\Python310-32 - C:\Program Files\Python311 - C:\Program Files (x86)\Python311-32 + - C:\Program Files\Python312 + - C:\Program Files (x86)\Python312-32 python_venv_path: C:\temp\venv krb_provider: SSPI @@ -65,3 +67,4 @@ all: domain_username: spnego domain_password: Password01 domain_upn: '{{ domain_username }}@{{ domain_name | upper }}' + gmsa_username: MyGMSA diff --git a/tests/integration/main.yml b/tests/integration/main.yml index 889ecd3..265fc9f 100644 --- a/tests/integration/main.yml +++ b/tests/integration/main.yml @@ -4,18 +4,18 @@ tasks: - name: create cert output folder - file: + ansible.builtin.file: path: '{{ playbook_dir }}/cert_setup' state: directory - name: create generate_cert script - template: + ansible.builtin.template: src: generate_cert.sh.tmpl dest: '{{ playbook_dir }}/cert_setup/generate_cert.sh' mode: '700' - name: generate CA and LDAPS certificates - shell: ./generate_cert.sh password + ansible.builtin.shell: ./generate_cert.sh password args: creates: '{{ playbook_dir }}/cert_setup/complete.txt' chdir: '{{ playbook_dir }}/cert_setup' @@ -42,21 +42,21 @@ register: network_connection_name_raw - name: fail if we didn't get a network connection name - fail: + ansible.builtin.fail: msg: Failed to get the Windows network connection name when: network_connection_name_raw.output | count != 1 - name: set fact of network connection name - set_fact: + ansible.builtin.set_fact: network_connection_name: '{{ network_connection_name_raw.output[0] }}' - name: copy CA certificate - win_copy: + ansible.windows.win_copy: src: '{{ playbook_dir }}/cert_setup/ca.pem' dest: C:\Windows\TEMP\ca.pem - name: import CA certificate to trusted root CA - win_certificate_store: + ansible.windows.win_certificate_store: path: C:\Windows\TEMP\ca.pem state: present store_location: LocalMachine @@ -67,22 +67,18 @@ gather_facts: no tasks: - name: set the DNS for the specified adapter to localhost - win_dns_client: + ansible.windows.win_dns_client: adapter_name: '{{ network_connection_name }}' ipv4_addresses: 127.0.0.1 - name: ensure domain exists and DC is promoted as a domain controller - win_domain: + microsoft.ad.domain: dns_domain_name: '{{ domain_name }}' safe_mode_password: '{{ domain_password }}' - register: domain_setup_res - - - name: reboot DC if required after install - win_reboot: - when: domain_setup_res.reboot_required + reboot: true - name: create domain username - win_domain_user: + microsoft.ad.user: name: '{{ domain_username }}' upn: '{{ domain_upn }}' description: '{{ domain_username }} Domain Account' @@ -90,11 +86,12 @@ password_never_expires: yes update_password: on_create groups: - - Domain Admins + add: + - Domain Admins state: present - name: test out domain user that was created - win_whoami: + ansible.windows.win_whoami: register: become_res failed_when: become_res.upn != domain_upn become: yes @@ -104,12 +101,12 @@ ansible_become_pass: '{{ domain_password }}' - name: copy LDAPS certificate - win_copy: + ansible.windows.win_copy: src: '{{ playbook_dir }}/cert_setup/DC01.pfx' dest: C:\Windows\TEMP\ldaps.pfx - name: import LDAPS certificate - win_certificate_store: + ansible.windows.win_certificate_store: path: C:\Windows\TEMP\ldaps.pfx password: password key_exportable: no @@ -133,48 +130,93 @@ ansible_become_user: '{{ domain_upn }}' ansible_become_pass: '{{ domain_password }}' + - name: create group which can access gMSA + microsoft.ad.group: + name: gMSAUsers + scope: global + state: present + members: + add: + - '{{ domain_username }}' + + - name: create gMSA account + ansible.windows.win_powershell: + script: | + param([string]$UserName, [string]$Realm) + $ErrorActionPreference = 'Stop' + $Ansible.Changed = $false + + if (-not (Get-KdsRootKey -ErrorAction SilentlyContinue)) { + $null = Add-KdsRootKey -EffectiveTime ((Get-Date).AddHours(-10)) + $Ansible.Changed = $true + } + + try { + $null = Get-ADServiceAccount -Identity $UserName + } + catch { + $accountParams = @{ + Name = $UserName + DNSHostName = "$UserName.$Realm" + KerberosEncryptionType = 'AES128,AES256' + OtherAttributes = @{ + userPrincipalName = "${UserName}`$@${Realm}" + } + PrincipalsAllowedToRetrieveManagedPassword = 'gMSAUsers' + } + New-ADServiceAccount @accountParams + $Ansible.Changed = $true + } + parameters: + UserName: '{{ gmsa_username }}' + Realm: '{{ domain_name }}' + become: true + become_method: runas + become_user: SYSTEM + - name: join Windows host to domain hosts: win_children gather_facts: no tasks: - name: set the DNS for the private adapter to point to the DC - win_dns_client: + ansible.windows.win_dns_client: adapter_names: '{{ network_connection_name }}' ipv4_addresses: '{{ hostvars[groups["win_controller"][0]]["ansible_host"] }}' - name: join host to domain - win_domain_membership: + microsoft.ad.membership: dns_domain_name: '{{ domain_name }}' domain_admin_user: '{{ domain_upn }}' domain_admin_password: '{{ domain_password }}' state: domain - register: domain_join_result + reboot: true - name: trust hosts for delegation in AD - ansible.windows.win_powershell: - parameters: - ComputerName: '{{ inventory_hostname }}' - script: | - param($ComputerName) - - $ErrorActionPreference = 'Stop' - $Ansible.Changed = $false - + microsoft.ad.computer: + identity: SERVER2022$ # We only want to have this host with delegation for testing + trusted_for_delegation: yes + register: delegation_res + run_once: true + delegate_to: '{{ groups["win_controller"][0] }}' - $actual = (Get-ADComputer -Identity $ComputerName -Property TrustedForDelegation).TrustedForDelegation - if (-not $actual) { - Set-ADComputer -Identity $ComputerName -TrustedForDelegation $true - $Ansible.Changed = $true - } - when: inventory_hostname == 'SERVER2022' # We only want to have this hosted with delegation for testing + - name: ensure the host can install the gMSA account + microsoft.ad.group: + identity: gMSAUsers + members: + add: + - '{{ inventory_hostname }}$' + state: present + register: gmsa_group_join delegate_to: '{{ groups["win_controller"][0] }}' - - name: reboot host to finalise domain join - win_reboot: - when: domain_join_result.reboot_required + - name: reboot after joining host to gMSA group or changing delegation settings + ansible.windows.win_reboot: + when: >- + gmsa_group_join is changed or + (delegation_res is changed and inventory_hostname == "SERVER2022") - name: test out domain user logon - win_whoami: + ansible.windows.win_whoami: register: become_res failed_when: become_res.upn != domain_upn become: yes @@ -183,6 +225,12 @@ ansible_become_user: '{{ domain_upn }}' ansible_become_pass: '{{ domain_password }}' + - name: add gMSA to Administrators group + ansible.windows.win_group_membership: + name: Administrators + members: + - '{{ gmsa_username }}$@{{ domain_name }}' + # Use the following to get a snaphot of programs installed and their product_ids # 'SOFTWARE', 'SOFTWARE\Wow6432Node' | ForEach-Object { # $getParams = @{ @@ -201,7 +249,7 @@ gather_facts: no tasks: - name: install Python interpreters - win_package: + ansible.windows.win_package: path: '{{ item.url }}' arguments: '{{ item.arguments }}' product_id: '{{ item.product_id }}' @@ -219,78 +267,84 @@ - url: https://www.python.org/ftp/python/3.9.13/python-3.9.13-amd64.exe product_id: '{90A30DAB-6FD8-4CF8-BB8B-C0DB21C69F20}' arguments: /quiet InstallAllUsers=1 Shortcuts=0 - - url: https://www.python.org/ftp/python/3.10.9/python-3.10.9.exe - product_id: '{335CD0FB-50DC-44D2-80E3-39749356F8D6}' + - url: https://www.python.org/ftp/python/3.10.11/python-3.10.11.exe + product_id: '{2627E7A3-6630-4858-8151-D91D1AF62F8E}' + arguments: /quiet InstallAllUsers=1 Shortcuts=0 + - url: https://www.python.org/ftp/python/3.10.11/python-3.10.11-amd64.exe + product_id: '{6532871D-1F76-408C-ABD0-63C732137351}' + arguments: /quiet InstallAllUsers=1 Shortcuts=0 + - url: https://www.python.org/ftp/python/3.11.9/python-3.11.9.exe + product_id: '{89D284CB-6250-4C7A-88DD-56A7CE162ACD}' arguments: /quiet InstallAllUsers=1 Shortcuts=0 - - url: https://www.python.org/ftp/python/3.10.9/python-3.10.9-amd64.exe - product_id: '{0CBB496F-1D15-42F1-AA45-C01C95196EC8}' + - url: https://www.python.org/ftp/python/3.11.9/python-3.11.9-amd64.exe + product_id: '{9AFDC691-40E5-4B15-835F-9A524AC4672C}' arguments: /quiet InstallAllUsers=1 Shortcuts=0 - - url: https://www.python.org/ftp/python/3.11.1/python-3.11.1.exe - product_id: '{E5CB3216-2C88-4E4B-ADCA-56E9BAEE7404}' + - url: https://www.python.org/ftp/python/3.12.4/python-3.12.4.exe + product_id: '{104F0229-E76E-4C6B-B532-E55DE73A723E}' arguments: /quiet InstallAllUsers=1 Shortcuts=0 - - url: https://www.python.org/ftp/python/3.11.1/python-3.11.1-amd64.exe - product_id: '{21EEFB31-6A96-4CAE-9A3B-B7FD6374C155}' + - url: https://www.python.org/ftp/python/3.12.4/python-3.12.4-amd64.exe + product_id: '{62DD7DAF-6279-46FA-A06B-C4A541244045}' arguments: /quiet InstallAllUsers=1 Shortcuts=0 - name: ensure virtualenv package is installed for each Python install - win_command: '"{{ item }}\python.exe" -m pip install virtualenv' + ansible.windows.win_command: '"{{ item }}\python.exe" -m pip install virtualenv' args: creates: '{{ item }}\Scripts\virtualenv.exe' with_items: '{{ python_interpreters }}' - name: create virtualenv for each Python install - win_command: '"{{ item }}\python.exe" -m virtualenv "{{ python_venv_path }}\{{ item | win_basename }}"' + ansible.windows.win_command: '"{{ item }}\python.exe" -m virtualenv "{{ python_venv_path }}\{{ item | win_basename }}"' args: creates: '{{ python_venv_path }}\{{ item | win_basename }}' with_items: '{{ python_interpreters }}' - name: copy across wheel artifacts - win_copy: + ansible.windows.win_copy: src: artifact.zip dest: C:\temp\wheels.zip - name: ensure wheel dir exists - win_file: + ansible.windows.win_file: path: C:\temp\wheels state: directory - name: extract wheel from archive - win_unzip: + community.windows.win_unzip: src: C:\temp\wheels.zip dest: C:\temp\wheels - name: get pyspnego artifact sdist filename - win_find: + ansible.windows.win_find: paths: C:\temp\wheels patterns: 'pyspnego-*.tar.gz' use_regex: false register: spnego_sdist_file - name: verify sdist was found - assert: + ansible.builtin.assert: that: - spnego_sdist_file.files | count == 1 - name: get pyspnego artifact version - set_fact: + ansible.builtin.set_fact: spnego_version: >- {{ spnego_sdist_file.files[0].filename | regex_replace('pyspnego-(?P.*)\.tar\.gz', '\g') }} - name: install pyspnego into virtualenv - win_command: >- + ansible.windows.win_command: >- "{{ python_venv_path }}\{{ item | win_basename }}\Scripts\python.exe" -m pip install pyspnego=={{ spnego_version }} pytest requests - https://github.com/jborean93/sansldap/archive/174408ab40e42f9a2d34bc493027e43cc5d31715.zip + sansldap --find-links=C:/temp/wheels args: creates: '{{ python_venv_path }}\{{ item | win_basename }}\Lib\site-packages\spnego' with_items: '{{ python_interpreters }}' - name: template out test integration file - win_template: + ansible.windows.win_template: src: test_integration.py.tmpl dest: C:\temp\test_integration.py block_start_string: '{!!' @@ -327,7 +381,7 @@ } - name: allow SMB traffic in - win_firewall_rule: + community.windows.win_firewall_rule: name: File and Printer Sharing (SMB-In) state: present enabled: yes @@ -344,7 +398,7 @@ tasks: - name: install base packages - yum: + ansible.builtin.dnf: name: - dnsmasq - epel-release @@ -356,18 +410,18 @@ state: present - name: install kerberos packages - yum: + ansible.builtin.dnf: name: '{{ krb_packages }}' state: present - name: ensure virtualenv is installed on base Python interpreters - pip: + ansible.builtin.pip: name: - virtualenv executable: /usr/bin/pip3.9 - name: setup NetworkManager to use dnsmasq - copy: + ansible.builtin.copy: dest: /etc/NetworkManager/conf.d/dns.conf content: | [main] @@ -375,25 +429,26 @@ notify: restart NetworkManager.service - name: set dnsmasq to forward requests for domain to DC - copy: + ansible.builtin.copy: dest: /etc/NetworkManager/dnsmasq.d/{{ domain_name }} content: server=/{{ domain_name }}/{{ hostvars[groups['win_controller'][0]]["ansible_host"] }} notify: restart NetworkManager.service - name: template krb5.conf file - template: + ansible.builtin.template: src: krb5.conf.tmpl dest: /etc/krb5.conf - name: create AD principal for Linux keytabs - win_domain_user: + microsoft.ad.user: name: '{{ inventory_hostname }}_{{ item }}' description: Kerberos principal for {{ inventory_hostname }} {{ item }} keytab password: '{{ domain_password }}' password_never_expires: yes update_password: on_create attributes: - msDS-SupportedEncryptionTypes: 16 # AES256_CTS_HMAC_SHA1_96 + set: + msDS-SupportedEncryptionTypes: 16 # AES256_CTS_HMAC_SHA1_96 state: present become: no delegate_to: DC01 @@ -402,7 +457,7 @@ - cifs - name: create keytab for Linux hosts - win_command: >- + ansible.windows.win_command: >- ktpass.exe -out C:\temp\{{ inventory_hostname }}-{{ item }}.keytab -princ {{ item }}/{{ inventory_hostname }}.{{ domain_name }}@{{ domain_name | upper }} @@ -420,7 +475,7 @@ - cifs - name: fetch the keytab - fetch: + ansible.builtin.fetch: src: C:\temp\{{ inventory_hostname }}-{{ item }}.keytab dest: '{{ inventory_hostname }}-{{ item }}.keytab' flat: yes @@ -431,7 +486,7 @@ - cifs - name: copy keytabs to host - copy: + ansible.builtin.copy: src: '{{ inventory_hostname }}-{{ item }}.keytab' dest: /etc/{{ item }}.keytab with_items: @@ -439,7 +494,7 @@ - cifs - name: create user keytab - MIT - command: ktutil + ansible.builtin.command: ktutil args: chdir: ~/ creates: ~/user.keytab @@ -448,7 +503,7 @@ when: krb_provider == 'MIT' - name: create user keytab - Heimdal - command: >- + ansible.builtin.command: >- ktutil --keytab=user.keytab add @@ -463,29 +518,29 @@ when: krb_provider == 'Heimdal' - name: copy across CA cert - copy: + ansible.builtin.copy: src: cert_setup/ca.pem dest: /etc/pki/ca-trust/source/anchors/pyspnego.pem register: ca_cert_copy - name: register CA cert - command: update-ca-trust + ansible.builtin.command: update-ca-trust when: ca_cert_copy is changed - name: ensure wheel dir exists - file: + ansible.builtin.file: path: ~/wheels state: directory become: no - name: extract wheel artifacts - unarchive: + ansible.builtin.unarchive: src: artifact.zip dest: ~/wheels become: no - name: get pyspnego artifact sdist filename - find: + ansible.builtin.find: paths: ~/wheels patterns: 'pyspnego-*.tar.gz' recurse: no @@ -494,31 +549,31 @@ register: spnego_sdist_file - name: verify sdist was found - assert: + ansible.builtin.assert: that: - spnego_sdist_file.files | count == 1 - name: get pyspnego artifact version - set_fact: + ansible.builtin.set_fact: spnego_version: >- {{ spnego_sdist_file.files[0].path | basename | regex_replace('pyspnego-(?P.*)\.tar\.gz', '\g') }} - name: create a virtualenv for each Python interpeter - pip: + ansible.builtin.pip: name: - pytest - pytest-forked - requests - - https://github.com/jborean93/sansldap/archive/174408ab40e42f9a2d34bc493027e43cc5d31715.zip + - sansldap - pyspnego[kerberos] == {{ spnego_version }} virtualenv: '{{ python_venv_path }}/{{ item | basename }}' virtualenv_python: '{{ item }}' - extra_args: --find-links file:///{{ spnego_sdist_file.files[0].path | dirname }} + extra_args: --find-links file://{{ spnego_sdist_file.files[0].path | dirname }} become: no with_items: '{{ python_interpreters }}' - name: template out test integration file - template: + ansible.builtin.template: src: test_integration.py.tmpl dest: ~/test_integration.py block_start_string: '{!!' diff --git a/tests/integration/templates/test_integration.py.tmpl b/tests/integration/templates/test_integration.py.tmpl index ad0424d..f06b789 100644 --- a/tests/integration/templates/test_integration.py.tmpl +++ b/tests/integration/templates/test_integration.py.tmpl @@ -6,6 +6,7 @@ from __future__ import annotations import base64 import collections import dataclasses +import datetime import getpass import io import os @@ -36,6 +37,7 @@ from xml.etree import ElementTree as ET USERNAME = '{{ domain_upn }}' PASSWORD = '{{ domain_password }}' +GMSA_USERNAME = '{{ gmsa_username }}$@{{ domain_name | upper }}' HOSTNAME = socket.gethostname() HOST_FQDN = '%s.{{ domain_name }}' % HOSTNAME WIN_DC = '{{ groups['win_controller'][0] | lower }}.{{ domain_name | lower }}' @@ -142,6 +144,67 @@ class HTTPWinRMAuth(requests.auth.AuthBase): return response +@dataclasses.dataclass(frozen=True) +class MSDSManagedPassword: + # https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-adts/a9019740-3d73-46ef-a9ae-3ea8eb86ac2e + version: int + current_password: bytes + previous_password: bytes + query_password_interval: datetime.timedelta + unchanged_password_interval: datetime.timedelta + + @classmethod + def unpack( + cls, + data: bytearray | bytes | memoryview, + ) -> MSDSManagedPassword: + view = memoryview(data) + + version = int.from_bytes(view[:2], byteorder="little", signed=False) + # reserved = int.from_bytes(view[2:4], byteorder="little", signed=False) + # length = int.from_bytes(view[4:8], byteorder="little", signed=False) + current_password_offset = int.from_bytes(view[8:10], byteorder="little", signed=False) + previous_password_offset = int.from_bytes(view[10:12], byteorder="little", signed=False) + query_password_offset = int.from_bytes(view[12:14], byteorder="little", signed=False) + unchanged_interval_offset = int.from_bytes(view[14:16], byteorder="little", signed=False) + + current_password = MSDSManagedPassword._get_null_terminated_string(view, current_password_offset) + previous_password = MSDSManagedPassword._get_null_terminated_string(view, previous_password_offset) + + query_password_interval = int.from_bytes( + view[query_password_offset : query_password_offset + 8], + byteorder="little", + signed=False, + ) + unchanged_password_interval = int.from_bytes( + view[unchanged_interval_offset : unchanged_interval_offset + 8], + byteorder="little", + signed=False, + ) + + return MSDSManagedPassword( + version=version, + current_password=current_password, + previous_password=previous_password, + query_password_interval=datetime.timedelta(microseconds=query_password_interval / 10), + unchanged_password_interval=datetime.timedelta(microseconds=unchanged_password_interval / 10), + ) + + @classmethod + def _get_null_terminated_string( + cls, + view: memoryview, + offset: int, + ) -> bytes: + if offset == 0: + return b"" + + data = view[offset:].tobytes() + b_data = data[: data.index(b"\x00\x00")] + + return b_data + + @dataclasses.dataclass(frozen=True) class SecTrailer: type: int @@ -1737,3 +1800,107 @@ def test_dce_rpc_auth(protocol: str, auth_level: str) -> None: auth_protocol=auth_kwargs["protocol"], ) assert isinstance(res, GroupKeyEnvelope) + + +@pytest.mark.parametrize('protocol', ["kerberos", "negotiate", "negotiate-ntlm", "ntlm"]) +def test_gmsa_password_encoding(protocol: str) -> None: + domain_parts = WIN_DC.split(".", 2) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) as sock: + sock.settimeout(10) + sock.connect((WIN_DC, 389)) + + auth_kwargs = { + "hostname": WIN_DC, + "protocol": "negotiate", + "service": "ldap", + } + + if os.name != 'nt' or IS_SYSTEM: + c = spnego.client( + USERNAME, + PASSWORD, + **auth_kwargs, + ) + + else: + c = spnego.client(**auth_kwargs) + + ldap = sansldap.LDAPClient() + + in_token = None + while not c.complete: + token = c.step(in_token=in_token) + if not token: + break + + ldap.bind_sasl("GSS-SPNEGO", cred=token) + sock.sendall(ldap.data_to_send()) + + msg = ldap.receive(sock.recv(4096))[0] + assert isinstance(msg, sansldap.BindResponse) + if msg.result.result_code not in [ + sansldap.LDAPResultCode.SUCCESS, + sansldap.LDAPResultCode.SASL_BIND_IN_PROGRESS + ]: + raise Exception(f"Failed to bind: {msg.result.diagnostics_message}") + + in_token = msg.server_sasl_creds + + if msg.result.result_code != sansldap.LDAPResultCode.SUCCESS: + raise Exception(f"Failed to bind {msg.result.result_code.name}: {msg.result.diagnostics_message}") + + # Get gMSA msDS-ManagedPassword value + gmsa_user_split = GMSA_USERNAME.split("@", 1)[0] + ldap.search_request( + base_object=f"DC={domain_parts[1]},DC={domain_parts[2]}", + scope=sansldap.SearchScope.SUBTREE, + dereferencing_policy=sansldap.DereferencingPolicy.NEVER, + size_limit=0, + time_limit=0, + types_only=False, + filter=sansldap.LDAPFilter.from_string(f"(sAMAccountName={gmsa_user_split})"), + attributes=["msDS-ManagedPassword"], + controls=None + ) + + out_msg = c.wrap(ldap.data_to_send()).data + sock.sendall(len(out_msg).to_bytes(4, byteorder="big") + out_msg) + + managed_password_value = None + search_continue = True + while search_continue: + in_msg = sock.recv(4096) + msgs = ldap.receive(c.unwrap(in_msg[4:]).data) + + for msg in msgs: + if isinstance(msg, sansldap.SearchResultEntry): + if msg.object_name.lower().startswith(f"cn={gmsa_user_split.lower()[:-1]}"): + managed_password_value = MSDSManagedPassword.unpack(msg.attributes[0].values[0]) + + elif isinstance(msg, sansldap.SearchResultDone): + if msg.result.result_code != sansldap.LDAPResultCode.SUCCESS: + raise Exception(f"Received error response: {msg.result}") + + search_continue = False + + assert managed_password_value + + context_kwargs = { + 'service': 'http', + 'username': GMSA_USERNAME, + 'password': managed_password_value.current_password.decode("utf-16-le", errors="surrogatepass"), + } + + if protocol == "negotiate-ntlm": + context_kwargs["hostname"] = WIN_SERVER_TRUSTED_IP + context_kwargs["protocol"] = "negotiate" + else: + context_kwargs["hostname"] = WIN_SERVER_TRUSTED + context_kwargs["protocol"] = protocol + + c = spnego.client(**context_kwargs) + rc, stdout, stderr = winrm_run(c, 'Negotiate', WIN_SERVER_TRUSTED, 'whoami.exe', []) + assert rc == 0 + assert stdout.strip().lower() == f"{domain_parts[1].lower()}\\{gmsa_user_split.lower()}" + assert stderr == '' diff --git a/tests/test_ntlm.py b/tests/test_ntlm.py index 88fb2f4..0061614 100644 --- a/tests/test_ntlm.py +++ b/tests/test_ntlm.py @@ -168,12 +168,20 @@ def test_invalid_lm_compat_level(level, monkeypatch): ntlm.NTLMProxy("user", "pass") -@pytest.mark.parametrize("usage", ["initiate", "accept"]) -def test_context_no_store(usage): +def test_context_no_store_initiate(): with pytest.raises( - OperationNotAvailableError, match="Retrieving NTLM store without NTLM_USER_FILE set to a " "filepath" + OperationNotAvailableError, + match="No username or password was specified and the credential cache did not exist or contained no credentials", ): - ntlm.NTLMProxy(CredentialCache(), usage=usage) + ntlm.NTLMProxy(CredentialCache(), usage="initiate") + + +def test_context_no_store_accept(): + with pytest.raises( + OperationNotAvailableError, + match="NTLM acceptor requires NTLM credential cache to be provided through the env var NTLM_USER_FILE set to a filepath", + ): + ntlm.NTLMProxy(CredentialCache(), usage="accept") def test_iov_available():