diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 84347b2..ade64cd 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -1,4 +1,4 @@ -name: tests +name: Code Tests 🚨 run-name: testing workflow invoked by ${{ github.actor }} on: [push, pull_request] diff --git a/CHANGELOG.md b/CHANGELOG.md index f07487c..86d3915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ For unreleased changes see [WHATSNEW.md](WHATSNEW.md) # [Released] +## [2.0.1] - 2023-11-29 + +### Added + +- Option ssl_verify for Vault service + ## [2.0.0] - 2023-11-28 ### Added diff --git a/WHATSNEW.md b/WHATSNEW.md index 6b5dca8..5468bc7 100644 --- a/WHATSNEW.md +++ b/WHATSNEW.md @@ -1,6 +1,6 @@ -### Added -### Changed +## [2.0.1] - 2023-11-29 -### Removed +### Added +- Option ssl_verify for Vault service diff --git a/docs/source/usage/configuration.rst b/docs/source/usage/configuration.rst index e3d99a9..b3c56a2 100644 --- a/docs/source/usage/configuration.rst +++ b/docs/source/usage/configuration.rst @@ -231,6 +231,12 @@ Instead of storing the secret_id in the yaml configuration file you may use the variable ``FOTOOBO_VAULT_SECRET_ID`` to store that value. The environment variable takes precedence over the configuration file should both be defined. +ssl_verify (optional, default: True) +"""""""""""""""""""""""""""""""""""" + +Check host SSL certificate (true) or not (false). You can also provide a path to a custom CA +certificate or CA bundle. Please be aware that disabling SSL certificate verification is a security +risk and should not be used in a production environment. token_file (optional) """"""""""""""""""""" diff --git a/fotoobo.yaml.sample b/fotoobo.yaml.sample index 1dc9116..cf18dac 100644 --- a/fotoobo.yaml.sample +++ b/fotoobo.yaml.sample @@ -56,10 +56,11 @@ logging: # Instead of storing credentials in the inventory file you may use VAULT as a placeholder. All asset # attributes that are VAULT will be retreived from the Hashicorp Vault service specified here. # The Hashicorp Vault service will use approle login to get the data. -# This section is optional and may pose a security issue. In future versions we may also read the -# role_id and secret_id from the system environment. +# We may also read the role_id and secret_id from the system environment. This makes it a little bit +# more secure. vault: url: https://vault.local + ssl_verify: false namespace: vault_namespace data_path: /v1/kv/data/fotoobo role_id: ... diff --git a/fotoobo/__init__.py b/fotoobo/__init__.py index e9b3a96..64bb28c 100644 --- a/fotoobo/__init__.py +++ b/fotoobo/__init__.py @@ -8,4 +8,4 @@ from .fortinet import FortiAnalyzer, FortiClientEMS, FortiGate, FortiManager __all__ = ["FortiAnalyzer", "FortiClientEMS", "FortiGate", "FortiManager"] -__version__: str = "2.0.0" +__version__: str = "2.0.1" diff --git a/fotoobo/fortinet/fortinet.py b/fotoobo/fortinet/fortinet.py index 3a1479d..efb1c71 100644 --- a/fotoobo/fortinet/fortinet.py +++ b/fotoobo/fortinet/fortinet.py @@ -5,13 +5,12 @@ import logging from abc import ABC, abstractmethod from time import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union import requests import urllib3 from fotoobo.exceptions import APIError, GeneralError -from fotoobo.helpers.config import config log = logging.getLogger("fotoobo") @@ -41,7 +40,7 @@ def __init__(self, hostname: str, **kwargs: Any) -> None: If you need to connect to your Fortinet device through a proxy server you can set it here as as string. If needed you may append the proxy server port with a column to the proxy server. e.g. "proxy.local:8000". - ssl_verify (bool): enable/disable SSL certificate check + ssl_verify (bool | str): enable/disable SSL certificate check When ssl_verify is enabled you have to install a trusted SSL certificate onto the device you wish to connect to. If you set ssl_verify to false it will also disable the warnings in urllib3. This prevents unwanted SSL warnings to be @@ -58,9 +57,9 @@ def __init__(self, hostname: str, **kwargs: Any) -> None: if proxy := kwargs.get("proxy", ""): self.session.proxies = {"http": f"{proxy}", "https": f"{proxy}"} - self.ssl_verify: bool = kwargs.get("ssl_verify", config.ssl_verify) + self.ssl_verify: Union[bool, str] = kwargs.get("ssl_verify", True) if not self.ssl_verify: - urllib3.disable_warnings() + urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) self.timeout = kwargs.get("timeout", 3) self.type: str = "" @@ -156,7 +155,7 @@ def api( # pylint: disable=too-many-arguments try: response.raise_for_status() - except (requests.exceptions.HTTPError) as err: + except requests.exceptions.HTTPError as err: raise APIError(err) from err return response diff --git a/fotoobo/helpers/config.py b/fotoobo/helpers/config.py index d7cc4e9..4e889b8 100644 --- a/fotoobo/helpers/config.py +++ b/fotoobo/helpers/config.py @@ -29,7 +29,6 @@ class Config: audit_logging: Union[Dict[str, Any], None] = None no_logo: bool = False cli_info: Dict[str, Any] = field(default_factory=dict) - ssl_verify: Union[str, bool] = True vault: Dict[str, str] = field(default_factory=dict) def load_configuration(self, config_file: Union[Path, None]) -> None: @@ -77,7 +76,6 @@ def load_configuration(self, config_file: Union[Path, None]) -> None: self.logging = loaded_config.get("logging", {}) self.audit_logging = loaded_config.get("audit_logging", {}) self.no_logo = loaded_config.get("no_logo", self.no_logo) - self.ssl_verify = loaded_config.get("ssl_verify", self.ssl_verify) self.vault = loaded_config.get("vault", {}) if self.vault: diff --git a/fotoobo/helpers/vault.py b/fotoobo/helpers/vault.py index c093d9a..3dc35a5 100644 --- a/fotoobo/helpers/vault.py +++ b/fotoobo/helpers/vault.py @@ -8,9 +8,10 @@ import logging from pathlib import Path -from typing import Any, Optional +from typing import Any, Optional, Union import requests +import urllib3 from fotoobo.exceptions.exceptions import GeneralError @@ -31,6 +32,7 @@ def __init__( # pylint: disable=too-many-arguments data_path: str, role_id: str, secret_id: str, + ssl_verify: Union[bool, str] = True, token_file: Optional[str] = None, token_ttl_limit: int = 0, ) -> None: @@ -42,6 +44,11 @@ def __init__( # pylint: disable=too-many-arguments data_path: The path where the vault data for fotoobo is stored role_id: The approle role_id secret_id: The approle secret_id + ssl_verify: Enable/disable SSL certificate check + When ssl_verify is enabled you have to install a trusted SSL certificate onto + the device you wish to connect to. If you set ssl_verify to false it will also + disable the warnings in urllib3. This prevents unwanted SSL warnings to be + logged. token_file: The file to store the access token to. If no file is given the token is not loaded or stored to a file and every execution will issue a new token. @@ -57,10 +64,15 @@ def __init__( # pylint: disable=too-many-arguments self.token_file: Optional[Path] = None self.token_ttl_limit: int = token_ttl_limit self.url: str = url.strip("/") + self.ssl_verify: Union[bool, str] = ssl_verify + if not self.ssl_verify: + log.warning("SSL verify for vault service is disabled which may be a security issue") + urllib3.disable_warnings(category=urllib3.exceptions.InsecureRequestWarning) log.debug("vault_client_url: '%s'", self.url) + log.debug("vault_client_ssl_verify: '%s'", self.ssl_verify) log.debug("vault_client_namespace: '%s'", self.namespace) - log.debug("vault_data_path: '%s'", self.data_path) + log.debug("vault_client_data_path: '%s'", self.data_path) log.debug("vault_client_role_id: '%s...%s'", self.role_id[:4], self.role_id[-5:-1]) log.debug("vault_client_secret_id: '%s...%s'", self.secret_id[:4], self.secret_id[-5:-1]) log.debug("vault_client_token_ttl_limit: '%s'", self.token_ttl_limit) @@ -93,7 +105,7 @@ def get_data(self, timeout: int = 3) -> Any: "X-Vault-Token": self.token, "X-Vault-Namespace": self.namespace, } - response = requests.get(url=url, headers=headers, timeout=timeout) + response = requests.get(url=url, headers=headers, timeout=timeout, verify=self.ssl_verify) if response.ok: log.debug("Response status_code is '%s'", response.status_code) @@ -117,17 +129,19 @@ def get_token(self, timeout: int = 3) -> bool: url = f"{self.url}/v1/auth/approle/login" log.debug("Get new token from '%s'", url) data = {"role_id": self.role_id, "secret_id": self.secret_id} - response = requests.post(url, data=data, timeout=timeout) - log.debug("Response status_code is '%s'", response.status_code) - if response.ok: - self.token = response.json()["auth"]["client_token"] - if self.token_file: - self.save_token() + self.token = "" + try: + response = requests.post(url, data=data, timeout=timeout, verify=self.ssl_verify) + log.debug("Response status_code is '%s'", response.status_code) + if response.ok: + self.token = response.json()["auth"]["client_token"] + if self.token_file: + self.save_token() - else: - self.token = "" + except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err: + log.error("Request Error: %s", str(err)) - return response.ok + return bool(self.token) def load_token(self) -> bool: """Load the token from a file @@ -175,17 +189,24 @@ def validate_token(self, timeout: int = 3) -> bool: url = f"{self.url}/v1/auth/token/lookup-self" log.debug("Check if vault token still is valid") headers = {"X-Vault-Token": self.token} - response = requests.get(url=url, headers=headers, timeout=timeout) - log.debug("Response status_code is '%s'", response.status_code) - if response.ok: - log.debug("Vault token is valid for '%s' seconds", response.json()["data"]["ttl"]) - log.debug("vault_client_token: '%s...%s'", self.token[:8], self.token[-5:-1]) - if response.json()["data"]["ttl"] < self.token_ttl_limit: - log.debug("Invalidate token due to ttl limit") + try: + response = requests.get( + url=url, headers=headers, timeout=timeout, verify=self.ssl_verify + ) + log.debug("Response status_code is '%s'", response.status_code) + if response.ok: + log.debug("Vault token is valid for '%s' seconds", response.json()["data"]["ttl"]) + log.debug("vault_client_token: '%s...%s'", self.token[:8], self.token[-5:-1]) + if response.json()["data"]["ttl"] < self.token_ttl_limit: + log.debug("Invalidate token due to ttl limit") + self.token = "" + + else: + log.debug("Token is not valid") self.token = "" - else: - log.debug("Token is not valid") + except (requests.exceptions.SSLError, requests.exceptions.ConnectionError) as err: self.token = "" + log.error("Request Error: %s", str(err)) return bool(self.token) diff --git a/pyproject.toml b/pyproject.toml index 89360f3..053d995 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "fotoobo" -version = "2.0.0" +version = "2.0.1" description = "The awesome Fortinet Toolbox" authors = ["Patrik Spiess ", "Lukas Murer-Jäckle "] readme = "README.md" diff --git a/tests/helpers/test_vault.py b/tests/helpers/test_vault.py index 360621f..5bb162d 100644 --- a/tests/helpers/test_vault.py +++ b/tests/helpers/test_vault.py @@ -7,6 +7,7 @@ from unittest.mock import MagicMock import pytest +import requests from _pytest.monkeypatch import MonkeyPatch from fotoobo.exceptions.exceptions import GeneralError @@ -30,6 +31,7 @@ def test_init(token_file: str, monkeypatch: MonkeyPatch) -> None: monkeypatch.setattr("fotoobo.helpers.vault.Client.load_token", MagicMock(result=True)) client = Client( url="dummy_url", + ssl_verify=False, namespace="dummy_namespace", data_path="dummy_data_path", role_id="dummy_role_id", @@ -127,6 +129,34 @@ def test_validate_token(response: ResponseMock, valid: bool, monkeypatch: Monkey assert client.validate_token() == valid assert bool(client.token) == valid + @staticmethod + @pytest.mark.parametrize( + "mock", + ( + pytest.param( + MagicMock(side_effect=requests.exceptions.SSLError("dummy")), + id="SSLError", + ), + pytest.param( + MagicMock(side_effect=requests.exceptions.ConnectionError("dummy")), + id="ConnectionError", + ), + ), + ) + def test_validate_token_exception(mock: ResponseMock, monkeypatch: MonkeyPatch) -> None: + """Test the Client validate_token with exception""" + monkeypatch.setattr("fotoobo.helpers.vault.requests.get", mock) + client = Client( + url="https://dummy_url", + namespace="dummy_namespace", + data_path="dummy_data_path", + role_id="dummy_role_id", + secret_id="dummy_secret_id", + ) + client.token = "dummy_token" + assert client.validate_token() is False + assert client.token == "" + @staticmethod @pytest.mark.parametrize( "response, token", @@ -165,6 +195,34 @@ def test_get_token( if token: assert token_file.is_file() + @staticmethod + @pytest.mark.parametrize( + "mock", + ( + pytest.param( + MagicMock(side_effect=requests.exceptions.SSLError("dummy")), + id="SSLError", + ), + pytest.param( + MagicMock(side_effect=requests.exceptions.ConnectionError("dummy")), + id="ConnectionError", + ), + ), + ) + def test_get_token_exception(mock: ResponseMock, monkeypatch: MonkeyPatch) -> None: + """Test the Client get_token with exception""" + monkeypatch.setattr("fotoobo.helpers.vault.requests.post", mock) + client = Client( + url="https://dummy_url", + namespace="dummy_namespace", + data_path="dummy_data_path", + role_id="dummy_role_id", + secret_id="dummy_secret_id", + ) + client.token = "dummy_token" + assert client.get_token() is False + assert client.token == "" + @staticmethod @pytest.mark.parametrize( "response, data",