Skip to content

Commit

Permalink
Merge pull request #197 from patrikspiess/Support-for-HTTP-PATCH-and-…
Browse files Browse the repository at this point in the history
…DELETE-method-in-FortiClientEMS-#196

Support for http patch and delete method in forti client ems #196
  • Loading branch information
lucmurer committed Mar 5, 2024
2 parents 920ca1c + a790df9 commit 59135c0
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 30 deletions.
2 changes: 2 additions & 0 deletions WHATSNEW.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
### Added

- Option `--raw` for `fmg get devices`
- Support for HTTP `PATCH` and `DELETE` method in FortiClientEMS
- Option `--smtp` for `fgt config check`


### Changed

- `fmg get devices` also shows ha nodes if device is a cluster
Expand Down
57 changes: 41 additions & 16 deletions fotoobo/fortinet/forticlientems.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""
FortiClient EMS Class
"""

import logging
import pickle
import re
from pathlib import Path
from typing import Any, Dict, Optional

Expand All @@ -20,6 +22,8 @@ class FortiClientEMS(Fortinet):
Represents one FortiClient EMS (digital twin)
"""

ALLOWED_HTTP_METHODS = ["DELETE", "GET", "PATCH", "POST"]

def __init__(
self,
hostname: str,
Expand All @@ -41,9 +45,9 @@ def __init__(
super().__init__(hostname, **kwargs)
self.api_url = f"https://{self.hostname}:{self.https_port}/api/v1"
self.cookie_path = cookie_path
self.password = password
self.username = username
self.type = "forticlientems"
self.password: str = password
self.username: str = username
self.type: str = "forticlientems"

def api( # pylint: disable=too-many-arguments
self,
Expand All @@ -68,6 +72,9 @@ def api( # pylint: disable=too-many-arguments
Returns:
Response from the request
"""
if not headers:
headers = self.session.headers # type: ignore

return super().api(
method, url, payload=payload, params=params, timeout=timeout, headers=headers
)
Expand Down Expand Up @@ -109,15 +116,19 @@ def login(self) -> int:
"""
status = 401
cookie = Path(self.cookie_path).expanduser() / f"{self.hostname}.cookie"
csrf = Path(self.cookie_path).expanduser() / f"{self.hostname}.csrf"

if self.cookie_path:
log.debug("Searching cookie in '%s'", cookie)
log.debug("Searching cookie and csrf token in '%s'", cookie.parents[0])

if cookie.is_file():
log.debug("Cookie exists. Skipping login")
if cookie.is_file() and csrf.is_file():
log.debug("Cookie and csrf token both exist")
with cookie.open("rb") as cookie_file:
self.session.cookies.update(pickle.load(cookie_file)) # type: ignore

self.session.headers["Referer"] = f"https://{self.hostname}"
self.session.headers["X-CSRFToken"] = csrf.read_text()

try:
response = self.api("get", "/system/serial_number")
if (
Expand All @@ -135,22 +146,36 @@ def login(self) -> int:
status = err.code

else:
log.debug("No cookie found for '%s'", self.hostname)
log.debug("No cookie or csrf token found for '%s'", self.hostname)

if status == 401:
log.debug("Login to '%s'", self.hostname)
payload = {"name": self.username, "password": self.password}
response = self.api("post", "/auth/signin", payload=payload)

if response.status_code == 200 and self.cookie_path:
log.debug("Saving cookie for '%s'", self.hostname)

try:
with cookie.open("wb") as cookie_file:
pickle.dump(self.session.cookies, cookie_file)

except FileNotFoundError:
log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))
if response.status_code == 200:
self.session.headers["Referer"] = f"https://{self.hostname}"
if match := re.match(r"csrftoken=(\S+);", response.headers["Set-Cookie"]):
csrf_token = match.group(1)
self.session.headers["X-CSRFToken"] = csrf_token

if self.cookie_path:
log.debug("Saving cookie for '%s'", self.hostname)
try:
with cookie.open("wb") as cookie_file:
pickle.dump(self.session.cookies, cookie_file)

except FileNotFoundError as exc:
log.debug(exc)
log.warning("Unable to save cookie file '%s'", str(cookie.resolve()))

log.debug("Saving csrf token for '%s'", self.hostname)
try:
csrf.write_text(csrf_token)

except NameError as exc:
log.debug(exc)
log.warning("Unable to save csrf token file '%s'", str(csrf.resolve()))

status = response.status_code

Expand Down
5 changes: 4 additions & 1 deletion fotoobo/fortinet/fortinet.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class Fortinet(ABC):
defined here with the abstractmethod decorator.
"""

# Use the ALLOWED_HTTP_METHODS class constant to define the supported HTTP methods. By default
# we should support GET and POST but you may override this list of supported methods in every
# subclass. Treat this setting as a constant which must not be redefined during runtime.
ALLOWED_HTTP_METHODS = ["GET", "POST"]

def __init__(self, hostname: str, **kwargs: Any) -> None:
Expand Down Expand Up @@ -94,7 +97,7 @@ def api( # pylint: disable=too-many-arguments
timeout = timeout or self.timeout
start = time()

if method.upper() not in Fortinet.ALLOWED_HTTP_METHODS:
if method.upper() not in self.ALLOWED_HTTP_METHODS:
error = f"HTTP method '{method.upper()}' is not implemented"
log.error(error)
raise NotImplementedError(error)
Expand Down
1 change: 1 addition & 0 deletions tests/data/ems_dummy.csrf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
dummy_csrf_token_from_cache
Binary file removed tests/data/host.cookie
Binary file not shown.
62 changes: 49 additions & 13 deletions tests/fortinet/test_forticlientems.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"""
Test the FortiClient EMS class
"""
from unittest.mock import MagicMock
from pathlib import Path
from unittest.mock import ANY, MagicMock

import pytest
import requests
Expand All @@ -23,13 +24,17 @@ def test_login_without_cookie(monkeypatch: MonkeyPatch) -> None:
"fotoobo.fortinet.forticlientems.requests.Session.post",
MagicMock(
return_value=ResponseMock(
json={"result": {"retval": 1, "message": "Login successful."}}, status=200
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={"result": {"retval": 1, "message": "Login successful."}},
status=200,
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS("ems_dummy", "dummy_user", "dummy_pass", ssl_verify=False)
assert ems.api_url == "https://ems_dummy:443/api/v1"
assert ems.login() == 200
assert ems.session.headers["Referer"] == "https://ems_dummy"
assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token"

@staticmethod
def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
Expand All @@ -42,12 +47,16 @@ def test_login_with_valid_cookie(monkeypatch: MonkeyPatch) -> None:
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS(
"ems_dummy", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False
)
assert ems.api_url == "https://ems_dummy:443/api/v1"
assert ems.login() == 200
assert ems.session.headers["Referer"] == "https://ems_dummy"
assert ems.session.headers["X-CSRFToken"] == "dummy_csrf_token_from_cache\n"

@staticmethod
def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch, temp_dir: Path) -> None:
"""Test the login to a FortiClient EMS with no session cookie given"""
monkeypatch.setattr(
"fotoobo.fortinet.forticlientems.requests.Session.get",
Expand All @@ -67,33 +76,60 @@ def test_login_with_invalid_cookie(monkeypatch: MonkeyPatch) -> None:
"fotoobo.fortinet.forticlientems.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
),
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", "tests/data/", ssl_verify=False)
source = Path("tests/data/ems_dummy.cookie")
destination = Path(temp_dir / "ems_dummy.cookie")
destination.write_bytes(source.read_bytes())
source = Path("tests/data/ems_dummy.csrf")
destination = Path(temp_dir / "ems_dummy.csrf")
destination.write_bytes(source.read_bytes())
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
assert ems.login() == 200

@staticmethod
def test_login_with_invalid_cookie_path(temp_dir: str, monkeypatch: MonkeyPatch) -> None:
def test_login_with_invalid_cookie_path(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
"""Test the login to a FortiClient EMS with an invalid cookie path"""
monkeypatch.setattr(
"fotoobo.fortinet.fortinet.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
)
),
)
ems = FortiClientEMS("host", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host:443/api/v1"
ems = FortiClientEMS("host_1", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host_1:443/api/v1"
assert ems.login() == 200

@staticmethod
def test_login_with_csrf_token_not_found(temp_dir: Path, monkeypatch: MonkeyPatch) -> None:
"""Test the login to a FortiClient EMS when the csrf token was not found in the headers"""
monkeypatch.setattr(
"fotoobo.fortinet.fortinet.requests.Session.post",
MagicMock(
return_value=ResponseMock(
headers={"Set-Cookie": "csrftoken_missing=dummy_csrf_token;"},
json={
"result": {"retval": 1, "message": "Login successful."},
},
status=200,
)
),
)
ems = FortiClientEMS("host_2", "dummy_user", "dummy_pass", temp_dir, ssl_verify=False)
assert ems.api_url == "https://host_2:443/api/v1"
assert ems.login() == 200

@staticmethod
Expand Down Expand Up @@ -143,7 +179,7 @@ def test_get_version_ok(monkeypatch: MonkeyPatch) -> None:
response = ems.get_version()
requests.Session.get.assert_called_with(
"https://host:443/api/v1/system/consts/get?system_update_time=1",
headers=None,
headers=ANY,
json=None,
params=None,
timeout=3,
Expand All @@ -167,7 +203,7 @@ def test_get_version_invalid(monkeypatch: MonkeyPatch) -> None:
assert "Did not find any FortiClient EMS version number in response" in str(err.value)
requests.Session.get.assert_called_with(
"https://host:443/api/v1/system/consts/get?system_update_time=1",
headers=None,
headers=ANY,
json=None,
params=None,
timeout=3,
Expand Down
1 change: 1 addition & 0 deletions tests/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self, **kwargs: Any) -> None:
status (int, optional): http status code. Defaults to 444 (No Response)
"""
self.text = kwargs.get("text", "")
self.headers = kwargs.get("headers", [])
self.json = MagicMock(return_value=kwargs.get("json", None))
self.status_code = kwargs.get("status", 444)
self.ok = kwargs.get("ok", True)
Expand Down

0 comments on commit 59135c0

Please sign in to comment.