From 5ae920edc88e0f832db16a9ce615ec89cc2b77fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20Schoentgen?= Date: Fri, 23 Nov 2018 18:13:17 +0100 Subject: [PATCH] NXDRIVE-404: Handle custom SSL certificates A message box will display the custom certificate and ask the user what to do next. If it is decided to bypass SSL verifications, then it will apply everytime for the impacted account. Note that we do not check for bad certificates *after* the account creation as it is unlikely to change over the time. It is more important to test at the beginning. If such scenario happens, the user will have to tweak SSL behavior with ca-bundle or ssl-no-verify options. --- docs/changes/4.0.2.md | 6 +- nxdrive/client/remote_client.py | 1 - nxdrive/data/i18n/i18n.json | 9 +++ nxdrive/engine/engine.py | 8 +++ nxdrive/gui/api.py | 83 +++++++++++++++----------- nxdrive/gui/application.py | 96 +++++++++++++++++++++++++++++- nxdrive/utils.py | 53 +++++++++++++++-- tests/conftest.py | 23 +++++--- tests/test_utils.py | 101 ++++++++++++++++++++++++++++++++ 9 files changed, 330 insertions(+), 50 deletions(-) diff --git a/docs/changes/4.0.2.md b/docs/changes/4.0.2.md index 49c7e706e3..0df965d613 100644 --- a/docs/changes/4.0.2.md +++ b/docs/changes/4.0.2.md @@ -8,11 +8,13 @@ Changes in command line arguments: ## Core +- [NXDRIVE-404](https://jira.nuxeo.com/browse/NXDRIVE-404): Handle custom SSL certificates - [NXDRIVE-1385](https://jira.nuxeo.com/browse/NXDRIVE-1385): Missing translations -- [NXDRIVE-1433](https://jira.nuxeo.com/browse/NXDRIVE-1433): Handle custom SSL certificates +- [NXDRIVE-1433](https://jira.nuxeo.com/browse/NXDRIVE-1433): Add the ca-bundle option to allow setting a custom SSL certificate ## Technical Changes +- Added `Application.accept_unofficial_ssl_cert()` - Moved `CliHandler.get_value()` to utils.py::`get_value()` - Added `DarwinIntegration.register_contextual_menu()` - Changed `DriveSystrayIcon.context_menu` property to `DriveSystrayIcon.get_context_menu()` function @@ -21,3 +23,5 @@ Changes in command line arguments: - Added `WindowsIntegration.register_contextual_menu_entry()` - Added `WindowsIntegration.unregister_contextual_menu()` - Added osi/windows/registry.py +- Added utils.py::`get_certificate_details()` +- Added utils.py::`retrieve_ssl_certificate()` diff --git a/nxdrive/client/remote_client.py b/nxdrive/client/remote_client.py index d32676a8ea..ff4cc98aab 100644 --- a/nxdrive/client/remote_client.py +++ b/nxdrive/client/remote_client.py @@ -58,7 +58,6 @@ def __init__( **kwargs: Any, ) -> None: auth = TokenAuth(token) if token else (user_id, password) - kwargs["verify"] = Options.ca_bundle or not Options.ssl_no_verify self.kwargs = kwargs super().__init__( diff --git a/nxdrive/data/i18n/i18n.json b/nxdrive/data/i18n/i18n.json index c694e73b79..782b2cd812 100644 --- a/nxdrive/data/i18n/i18n.json +++ b/nxdrive/data/i18n/i18n.json @@ -206,6 +206,15 @@ "SHOW_ACTIVITY_WINDOW": "Show activity window", "SHOW_FILE_STATUS": "Show file status", "SOURCE_LINK": "See the source", + "SSL_CERTIFICATE": "Certificate", + "SSL_HOSTNAME_ERROR": "The host name did not match any of the valid hosts for this certificate.", + "SSL_UNTRUSTED_CERT_TITLE": "Untrusted Certificate", + "SSL_CANNOT_CONNECT": "Cannot connect securely to %1", + "SSL_DATE_FROM": "Effective Date:", + "SSL_DATE_EXPIRATION": "Expiration Date:", + "SSL_ISSUER": "Issuer", + "SSL_SERIAL_NUMBER": "Serial Number:", + "SSL_TRUST_ANYWAY": "Trust this certificate anyway", "SUSPEND": "Suspend", "SYNCHRONIZATION_COMPLETED": "Synchronization complete", "SYNCHRONIZATION_IN_PROGRESS": "Synchronization in progress", diff --git a/nxdrive/engine/engine.py b/nxdrive/engine/engine.py index 746e88fe43..fdaded752a 100644 --- a/nxdrive/engine/engine.py +++ b/nxdrive/engine/engine.py @@ -418,6 +418,7 @@ def _load_configuration(self) -> None: self.remote_user = self._dao.get_config("remote_user") self._remote_password = self._dao.get_config("remote_password") self._remote_token = self._dao.get_config("remote_token") + self._ssl_verify = self._dao.get_config("ssl_verify", default=True) if self._remote_password is None and self._remote_token is None: self.set_invalid_credentials( reason="found no password nor token in engine configuration" @@ -694,6 +695,7 @@ def init_remote(self) -> Remote: "check_suspended": self.suspend_client, "dao": self._dao, "proxy": self.manager.proxy, + "verify": self._ssl_verify, } return self.remote_cls(*args, **kwargs) @@ -706,6 +708,11 @@ def bind(self, binder: Binder) -> None: self._remote_token = binder.token self._web_authentication = self._remote_token is not None + # Persist the user preference about the SSL behavior. + # It can be tweaked via ca-bundle or ssl-no-verify options. But also + # from the ponctual bypass-ssl window prompted at the account creation. + self._ssl_verify = Options.ca_bundle or not Options.ssl_no_verify + self.remote = self.init_remote() if check_fs: @@ -733,6 +740,7 @@ def bind(self, binder: Binder) -> None: self._dao.update_config("remote_user", self.remote_user) self._dao.update_config("remote_password", self._remote_password) self._dao.update_config("remote_token", self._remote_token) + self._dao.update_config("ssl_verify", self._ssl_verify) # Check for the root # If the top level state for the server binding doesn't exist, diff --git a/nxdrive/gui/api.py b/nxdrive/gui/api.py index 7b31bde18b..88a7d59640 100644 --- a/nxdrive/gui/api.py +++ b/nxdrive/gui/api.py @@ -11,6 +11,7 @@ import requests from dateutil.tz import tzlocal from nuxeo.exceptions import HTTPError, Unauthorized +from requests.exceptions import SSLError from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot from PyQt5.QtWidgets import QMessageBox @@ -488,8 +489,25 @@ def web_update_token(self, uid: str) -> None: ) self.setMessage.emit("CONNECTION_UNKNOWN", "error") + def _guess_server_url(self, server_url: str) -> str: + """Handle invalide SSL certificates when guessing the server URL.""" + try: + return guess_server_url(server_url, proxy=self._manager.proxy) + except SSLError as exc: + log.critical( + f"{exc}. Use 'ca-bundle' (or 'ssl-no-verify') option to tune SSL behavior." + ) + if "CERTIFICATE_VERIFY_FAILED" in str(exc): + parts = urlsplit(server_url) + hostname = parts.netloc or parts.path + if self.application.accept_unofficial_ssl_cert(hostname): + Options.ca_bundle = None + Options.ssl_no_verify = True + return self._guess_server_url(server_url) + return "" + def _get_authentication_url(self, server_url: str) -> str: - url = guess_server_url(server_url, proxy=self._manager.proxy) + url = self._guess_server_url(server_url) if not url: raise ValueError("No URL found for Nuxeo server") @@ -584,7 +602,7 @@ def bind_server( name: str = None, **kwargs: Any, ) -> None: - server_url = guess_server_url(url, proxy=self._manager.proxy) + server_url = self._guess_server_url(url) if not server_url: self.setMessage.emit("CONNECTION_ERROR", "error") return @@ -637,7 +655,7 @@ def bind_server( @pyqtSlot(str, str) def web_authentication(self, server_url: str, local_folder: str) -> None: # Handle the server URL - url = guess_server_url(server_url, proxy=self._manager.proxy) + url = self._guess_server_url(server_url) if not url: self.setMessage.emit("CONNECTION_ERROR", "error") return @@ -657,32 +675,35 @@ def web_authentication(self, server_url: str, local_folder: str) -> None: # Connect to startup page status = self._connect_startup_page(server_url) + callback_params = { + "local_folder": local_folder, + "server_url": server_url, + "engine_type": engine_type, + } + url = self._get_authentication_url(server_url) + # Server will send a 401 in case of anonymous user configuration # Should maybe only check for 404 if status < 400 or status in (401, 500, 503): # Page exists, let's open authentication dialog - callback_params = { - "local_folder": local_folder, - "server_url": server_url, - "engine_type": engine_type, - } - url = self._get_authentication_url(server_url) log.debug( f"Web authentication is available on server {server_url}, " f"opening login window with URL {url}" ) - self.openAuthenticationDialog.emit(url, callback_params) - return else: # Startup page is not available log.debug( f"Web authentication not available on server {server_url}, " "falling back on basic authentication" ) - # We might have to downgrade because the - # browser login is not available. - self._manager.updater._force_downgrade() - return + if Options.is_frozen: + # We might have to downgrade because the + # browser login is not available. + self._manager.updater._force_downgrade() + return + + self.openAuthenticationDialog.emit(url, callback_params) + return except FolderAlreadyUsed: error = "FOLDER_USED" except StartupPageConnectionError: @@ -696,7 +717,7 @@ def web_authentication(self, server_url: str, local_folder: str) -> None: def _connect_startup_page(self, server_url: str) -> int: # Take into account URL parameters - parts = urlsplit(guess_server_url(server_url, proxy=self._manager.proxy) or "") + parts = urlsplit(self._guess_server_url(server_url)) url = urlunsplit( ( parts.scheme, @@ -706,38 +727,34 @@ def _connect_startup_page(self, server_url: str) -> int: parts.fragment, ) ) + headers = { + "X-Application-Name": APP_NAME, + "X-Device-Id": self._manager.device_id, + "X-Client-Version": self._manager.version, + "User-Agent": f"{APP_NAME}/{self._manager.version}", + } + log.debug( + f"Proxy configuration for startup page connection: {self._manager.proxy}" + ) try: - log.debug( - f"Proxy configuration for startup page connection: {self._manager.proxy}" - ) - headers = { - "X-Application-Name": APP_NAME, - "X-Device-Id": self._manager.device_id, - "X-Client-Version": self._manager.version, - "User-Agent": f"{APP_NAME}/{self._manager.version}", - } - timeout = STARTUP_PAGE_CONNECTION_TIMEOUT with requests.get( url, headers=headers, proxies=self._manager.proxy.settings(url=url), - timeout=timeout, + timeout=STARTUP_PAGE_CONNECTION_TIMEOUT, verify=Options.ca_bundle or not Options.ssl_no_verify, ) as resp: status = resp.status_code - except OSError as exc: - # OSError: Could not find a suitable TLS CA certificate bundle, invalid path: ... - log.critical(f"{exc}. Ensure the 'ca_bundle' option is correct.") - raise StartupPageConnectionError() except: log.exception( f"Error while trying to connect to {APP_NAME}" f" startup page with URL {url}" ) raise StartupPageConnectionError() - log.debug(f"Status code for {url} = {status}") - return status + else: + log.debug(f"Status code for {url} = {status}") + return status @pyqtSlot(str, str, result=bool) def set_server_ui(self, uid: str, server_ui: str) -> bool: diff --git a/nxdrive/gui/application.py b/nxdrive/gui/application.py index 73992f4d98..88e3f114f1 100644 --- a/nxdrive/gui/application.py +++ b/nxdrive/gui/application.py @@ -15,6 +15,7 @@ from PyQt5.QtQuick import QQuickView, QQuickWindow from PyQt5.QtWidgets import ( QApplication, + QCheckBox, QDialog, QDialogButtonBox, QMessageBox, @@ -555,7 +556,10 @@ def auth() -> None: user = str(username.text()) pwd = str(password.text()) nuxeo = Nuxeo( - host=url, auth=(user, pwd), proxies=self.manager.proxy.settings(url=url) + host=url, + auth=(user, pwd), + proxies=self.manager.proxy.settings(url=url), + verify=Options.ca_bundle or not Options.ssl_no_verify, ) try: token = nuxeo.client.request_auth_token( @@ -826,6 +830,96 @@ def show_release_notes(self, version: str) -> None: dialog.setLayout(layout) dialog.exec_() + def accept_unofficial_ssl_cert(self, hostname: str) -> bool: + """Ask the user to bypass the SSL certificate verification.""" + from ..utils import get_certificate_details + + def signature(sig: str) -> str: + """ + Format the certificate signature. + + >>> signature("0F4019D1E6C52EF9A3A929B6D5613816") + 0f:40:19:d1:e6:c5:2e:f9:a3:a9:29:b6:d5:61:38:16 + + """ + from textwrap import wrap + + return str.lower(":".join(wrap(sig, 2))) + + cert = get_certificate_details(hostname=hostname) + if not cert: + return False + + subject = [ + f"
  • {details[0][0]}: {details[0][1]}
  • " + for details in sorted(cert["subject"]) + ] + issuer = [ + f"
  • {details[0][0]}: {details[0][1]}
  • " + for details in sorted(cert["issuer"]) + ] + urls = [ + f"
  • {details}
  • " + for details in cert["caIssuers"] + ] + sig = f"{signature(cert['serialNumber'])}" + message = f""" +

    {Translator.get("SSL_CANNOT_CONNECT", [hostname])}

    +

    {Translator.get("SSL_HOSTNAME_ERROR")}

    + +

    {Translator.get("SSL_CERTIFICATE")}

    + + +

    {Translator.get("SSL_ISSUER")}

    + + +

    {Translator.get("URL")}

    + +""" + + dialog = QDialog() + dialog.setWindowTitle(Translator.get("SSL_UNTRUSTED_CERT_TITLE")) + dialog.setWindowIcon(QIcon(self.get_window_icon())) + dialog.resize(600, 650) + + notes = QTextEdit() + notes.setReadOnly(True) + notes.setHtml(message) + + continue_with_bad_ssl_cert = False + + def accept() -> None: + nonlocal continue_with_bad_ssl_cert + continue_with_bad_ssl_cert = True + dialog.accept() + + buttons = QDialogButtonBox() + buttons.setStandardButtons(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) + buttons.button(QDialogButtonBox.Ok).setEnabled(False) + buttons.accepted.connect(accept) + buttons.rejected.connect(dialog.close) + + def bypass_triggered(state: int) -> None: + """Enable the OK button only when the checkbox is checked.""" + buttons.button(QDialogButtonBox.Ok).setEnabled(bool(state)) + + bypass = QCheckBox(Translator.get("SSL_TRUST_ANYWAY")) + bypass.stateChanged.connect(bypass_triggered) + + layout = QVBoxLayout() + layout.addWidget(notes) + layout.addWidget(bypass) + layout.addWidget(buttons) + dialog.setLayout(layout) + dialog.exec_() + + return continue_with_bad_ssl_cert + def show_metadata(self, file_path: str) -> None: self.manager.ctx_edit_metadata(file_path) diff --git a/nxdrive/utils.py b/nxdrive/utils.py index da695a644c..5830fdd630 100644 --- a/nxdrive/utils.py +++ b/nxdrive/utils.py @@ -27,6 +27,7 @@ "find_resource", "force_decode", "force_encode", + "get_certificate_details", "get_device", "guess_server_url", "if_frozen", @@ -37,6 +38,7 @@ "parse_edit_protocol", "parse_protocol_url", "path_join", + "retrieve_ssl_certificate", "safe_filename", "safe_long_path", "set_path_readonly", @@ -475,6 +477,46 @@ def force_encode(data: Union[bytes, str]) -> bytes: return data +def retrieve_ssl_certificate(hostname: str, port: int = 443) -> str: + """Retreive the SSL certificate from a given hostname.""" + + import ssl + + with ssl.create_connection((hostname, port)) as conn: # type: ignore + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + with context.wrap_socket(conn, server_hostname=hostname) as sock: + cert_data: bytes = sock.getpeercert(binary_form=True) # type: ignore + return ssl.DER_cert_to_PEM_cert(cert_data) + + +def get_certificate_details(hostname: str = "", cert_data: str = "") -> Dict[str, Any]: + """ + Get SSL certificate details from a given certificate content or hostname. + + Note: This function uses a undocumented method of the _ssl module. + It is continuously tested in our CI to ensure it still + available after any Python upgrade. + Certified working as of Python 3.6.7. + """ + + import ssl + + cert_file = "c.crt" + + try: + certificate = cert_data or retrieve_ssl_certificate(hostname) + with open(cert_file, "w") as f: + f.write(certificate) + try: + # Taken from https://stackoverflow.com/a/50072461/1117028 + return ssl._ssl._test_decode_cert(cert_file) # type: ignore + finally: + os.remove(cert_file) + except: + log.exception("Error while retreiving the SSL certificate") + return {} + + def encrypt( plaintext: Union[bytes, str], secret: Union[bytes, str], lazy: bool = True ) -> bytes: @@ -529,7 +571,7 @@ def guess_server_url( login_page: str = Options.startup_page, proxy: "Proxy" = None, timeout: int = 5, -) -> Optional[str]: +) -> str: """ Guess the complete server URL given an URL (either an IP address, a simple domain name or an already complete URL). @@ -543,6 +585,8 @@ def guess_server_url( import requests import rfc3987 + from requests.exceptions import SSLError + parts = urlsplit(url) # IP address or domain name only @@ -611,16 +655,15 @@ def guess_server_url( if exc.response.status_code == 401: # When there is only Web-UI installed, the code is 401. return new_url + except SSLError: + raise except (ValueError, requests.RequestException): pass - except OSError as exc: - # OSError: Could not find a suitable TLS CA certificate bundle, invalid path: ... - log.critical(f"{exc}. Ensure the 'ca_bundle' option is correct.") except Exception: log.exception("Unhandled error") if not url.lower().startswith("http"): - return None + return "" return url diff --git a/tests/conftest.py b/tests/conftest.py index 82a9e40cf1..8ea2f07826 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,15 +24,20 @@ def pytest_namespace(): user = os.getenv("NXDRIVE_TEST_USER", "Administrator") version = nxdrive.__version__ - root_remote = DocRemote( - nuxeo_url, - user, - "nxdrive-test-administrator-device", - version, - password=password, - base_folder="/", - timeout=60, - ) + try: + root_remote = DocRemote( + nuxeo_url, + user, + "nxdrive-test-administrator-device", + version, + password=password, + base_folder="/", + timeout=60, + ) + except: + # When testing locally a function that does not need to communicate with the + # server we can skip this object. To be reviewed with the tests refactoring. + root_remote = None return { "nuxeo_url": nuxeo_url, diff --git a/tests/test_utils.py b/tests/test_utils.py index 4a56c592dd..fd929d9fa8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -44,6 +44,107 @@ def test_generated_tempory_file(name, state): assert nxdrive.utils.is_generated_tmp_file(name) == state +def test_get_certificate_details_from_file(): + cert_data = """ +-----BEGIN CERTIFICATE----- +MIIG8DCCBdigAwIBAgIQD0AZ0ebFLvmjqSm21WE4FjANBgkqhkiG9w0BAQsFADBk +MQswCQYDVQQGEwJOTDEWMBQGA1UECBMNTm9vcmQtSG9sbGFuZDESMBAGA1UEBxMJ +QW1zdGVyZGFtMQ8wDQYDVQQKEwZURVJFTkExGDAWBgNVBAMTD1RFUkVOQSBTU0wg +Q0EgMzAeFw0xODA3MjAwMDAwMDBaFw0yMDA3MjQxMjAwMDBaMIGZMQswCQYDVQQG +EwJGUjEWMBQGA1UECBMNSWxlLWRlLUZyYW5jZTEaMBgGA1UEBxMRU2FpbnQgRGVu +aXMgQ2VkZXgxHDAaBgNVBAoME1VuaXZlcnNpdMOpIFBhcmlzIDgxGTAXBgNVBAsT +EERTSSBQb2xlIFdFQi1FTlQxHTAbBgNVBAMTFG51eGVvLnVuaXYtcGFyaXM4LmZy +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAosA++LzWOIa8cSaH2Cmk +C4nir+Vmv2XQuMVp8AollXJKeiWTeulttfYU2txC7qDsjpXsSqfkvDQbCfUB25Ty +Y3ze9eh8pXzK5qwYFXIeDZIlVTquEZAA/F5bRnZ6HsaTBI0Gjq/BXiOlykvExVdP +1JK1E7j8pkUD4hygyhKPx95IVgQS5EgXWuCJnHJs/T6VRfYFaOix4yfJG9MOgb4D +3pkWh13WOcwJUQ1M5469e2JweW7jZsW6Oe1cfBR1VgvlRD7fSJDRwCj7MRqOfK5k +LC9so8o+9zUXHcWLk6WuBiKxX4xtr1waqViJxfn2/BUedg0J0juzoE87fZR52hJI +TwIDAQABo4IDZjCCA2IwHwYDVR0jBBgwFoAUZ/2IIBQnmMcJ0iUZu+lREWN1UGIw +HQYDVR0OBBYEFN4UHBjiYDWj5091Qd/fgITrtwGgMDYGA1UdEQQvMC2CFG51eGVv +LnVuaXYtcGFyaXM4LmZyghVtb29kbGUudW5pdi1wYXJpczguZnIwDgYDVR0PAQH/ +BAQDAgWgMB0GA1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjBrBgNVHR8EZDBi +MC+gLaArhilodHRwOi8vY3JsMy5kaWdpY2VydC5jb20vVEVSRU5BU1NMQ0EzLmNy +bDAvoC2gK4YpaHR0cDovL2NybDQuZGlnaWNlcnQuY29tL1RFUkVOQVNTTENBMy5j +cmwwTAYDVR0gBEUwQzA3BglghkgBhv1sAQEwKjAoBggrBgEFBQcCARYcaHR0cHM6 +Ly93d3cuZGlnaWNlcnQuY29tL0NQUzAIBgZngQwBAgIwbgYIKwYBBQUHAQEEYjBg +MCQGCCsGAQUFBzABhhhodHRwOi8vb2NzcC5kaWdpY2VydC5jb20wOAYIKwYBBQUH +MAKGLGh0dHA6Ly9jYWNlcnRzLmRpZ2ljZXJ0LmNvbS9URVJFTkFTU0xDQTMuY3J0 +MAwGA1UdEwEB/wQCMAAwggF+BgorBgEEAdZ5AgQCBIIBbgSCAWoBaAB2AKS5CZC0 +GFgUh7sTosxncAo8NZgE+RvfuON3zQ7IDdwQAAABZLh+KWMAAAQDAEcwRQIgPrGk +CO4wULGkZOaipluKHKgVX231md0r65CLxvgKGHoCIQD2oxZfAb7XDqTK9jgs42fo +UQra7C3P9QFjRncCwk3LrQB2AId1v+dZfPiMQ5lfvfNu/1aNR1Y2/0q1YMG06v9e +oIMPAAABZLh+KjQAAAQDAEcwRQIgSrjUujzDVEnVdxenp1ucQpJH6ofa4t+jVfYB +mmDjf6ICIQCsv+Gg67zSdNqcGCPSgLfI88bgYNDK0eZK55uk01E40wB2ALvZ37wf +inG1k5Qjl6qSe0c4V5UKq1LoGpCWZDaOHtGFAAABZLh+KYgAAAQDAEcwRQIgE0Fa +/7qoHixhfjjIN2ZsU8Y0AZFAkOuS0cGGGkKp9xkCIQDIGYAdx5qAdTBOFIL5NAr6 +Y7TIycq4avd3Fu1E86HpFTANBgkqhkiG9w0BAQsFAAOCAQEAllmQTDhGDhN8d/uX +E7oOkZknAogXttMXkksDjB7rN0BATV1ufWDbjShGQuoIYmtQYVddf77p5kNk48vT +BuM90iblou8PbFEdTIqLTHLs/+Df8a6wTEFDma3icvNKqeZWfylNLJUErtWILaWN +LBkdHkz68Cr7lhTW91XEbDGK9/IYu6YdWqoAS4bXks/vKJOEaQr2NN+QNDjzR9wG +sIOkgLQ2Kt4lWCaF7xEaOP2e/if3Ebm0alNx1lwUxn00LEm1VyKBlepV+XmsDivB +JROlHjdA3/jyqD4WuFzzXzWwF6zta0l/hqF9BtGmQRR9qXC2+fsQ4mhUK8C9vhCH +7fV0vw== +-----END CERTIFICATE----- +""" + cert_details_expected = { + "subject": ( + (("countryName", "FR"),), + (("stateOrProvinceName", "Ile-de-France"),), + (("localityName", "Saint Denis Cedex"),), + (("organizationName", "Université Paris 8"),), + (("organizationalUnitName", "DSI Pole WEB-ENT"),), + (("commonName", "nuxeo.univ-paris8.fr"),), + ), + "issuer": ( + (("countryName", "NL"),), + (("stateOrProvinceName", "Noord-Holland"),), + (("localityName", "Amsterdam"),), + (("organizationName", "TERENA"),), + (("commonName", "TERENA SSL CA 3"),), + ), + "version": 3, + "serialNumber": "0F4019D1E6C52EF9A3A929B6D5613816", + "notBefore": "Jul 20 00:00:00 2018 GMT", + "notAfter": "Jul 24 12:00:00 2020 GMT", + "subjectAltName": ( + ("DNS", "nuxeo.univ-paris8.fr"), + ("DNS", "moodle.univ-paris8.fr"), + ), + "OCSP": ("http://ocsp.digicert.com",), + "caIssuers": ("http://cacerts.digicert.com/TERENASSLCA3.crt",), + "crlDistributionPoints": ( + "http://crl3.digicert.com/TERENASSLCA3.crl", + "http://crl4.digicert.com/TERENASSLCA3.crl", + ), + } + cert_details = nxdrive.utils.get_certificate_details(cert_data=cert_data) + assert cert_details == cert_details_expected + + +def test_get_certificate_details_from_hostname(): + cert_details = nxdrive.utils.get_certificate_details(hostname="example.org") + for key in { + "caIssuers", + "issuer", + "notAfter", + "notBefore", + "serialNumber", + "subject", + }: + assert key in cert_details + + +def test_retrieve_ssl_certificate_unknown(): + from ssl import SSLError + + func = nxdrive.utils.get_certificate_details + assert func(hostname="example42.org") == {} + + with pytest.raises(SSLError): + nxdrive.utils.retrieve_ssl_certificate("example.org", port=80) + + @pytest.mark.parametrize( "raw_value, expected_value", [