From 6917ff2813daa653406e0c55820bd593190e6d74 Mon Sep 17 00:00:00 2001 From: Ihor Bilous Date: Sun, 14 Sep 2025 16:32:48 +0300 Subject: [PATCH 1/3] Fix issue #26: Add MessagesApi, releated models, examples, tests --- examples/testing/messages.py | 82 +++ mailtrap/__init__.py | 1 + mailtrap/api/resources/messages.py | 152 +++++ mailtrap/api/testing.py | 5 + mailtrap/http.py | 16 +- mailtrap/models/messages.py | 142 ++++ tests/unit/api/testing/test_messages.py | 869 ++++++++++++++++++++++++ 7 files changed, 1264 insertions(+), 3 deletions(-) create mode 100644 examples/testing/messages.py create mode 100644 mailtrap/api/resources/messages.py create mode 100644 mailtrap/models/messages.py create mode 100644 tests/unit/api/testing/test_messages.py diff --git a/examples/testing/messages.py b/examples/testing/messages.py new file mode 100644 index 0000000..fd3597c --- /dev/null +++ b/examples/testing/messages.py @@ -0,0 +1,82 @@ +from typing import Optional + +import mailtrap as mt +from mailtrap.models.messages import AnalysisReport +from mailtrap.models.messages import EmailMessage +from mailtrap.models.messages import ForwardedMessage +from mailtrap.models.messages import SpamReport + +API_TOKEN = "YOU_API_TOKEN" +ACCOUNT_ID = "YOU_ACCOUNT_ID" +INBOX_ID = "YOUR_INBOX_ID" + +client = mt.MailtrapClient(token=API_TOKEN, account_id=ACCOUNT_ID) +messages_api = client.testing_api.messages + + +def get_message(inbox_id: int, message_id: int) -> EmailMessage: + return messages_api.show_message(inbox_id=inbox_id, message_id=message_id) + + +def update_message(inbox_id: int, message_id: int, is_read: bool) -> EmailMessage: + return messages_api.update( + inbox_id=inbox_id, + message_id=message_id, + message_params=mt.UpdateEmailMessageParams(is_read=is_read), + ) + + +def delete_message(inbox_id: int, message_id: int) -> EmailMessage: + return messages_api.delete(inbox_id=inbox_id, message_id=message_id) + + +def list_messages( + inbox_id: int, + search: Optional[str] = None, + last_id: Optional[int] = None, + page: Optional[int] = None, +) -> list[EmailMessage]: + return messages_api.get_list( + inbox_id=inbox_id, search=search, last_id=last_id, page=page + ) + + +def forward_message(inbox_id: int, message_id: int, email: str) -> ForwardedMessage: + return messages_api.forward(inbox_id=inbox_id, message_id=message_id, email=email) + + +def get_spam_report(inbox_id: int, message_id: str) -> SpamReport: + return messages_api.get_spam_report(inbox_id=inbox_id, message_id=message_id) + + +def get_html_analysis(inbox_id: int, message_id: str) -> AnalysisReport: + return messages_api.get_html_analysis(inbox_id=inbox_id, message_id=message_id) + + +def get_text_body(inbox_id: int, message_id: str) -> str: + return messages_api.get_text_body(inbox_id=inbox_id, message_id=message_id) + + +def get_raw_body(inbox_id: int, message_id: str) -> str: + return messages_api.get_raw_body(inbox_id=inbox_id, message_id=message_id) + + +def get_html_source(inbox_id: int, message_id: str) -> str: + return messages_api.get_html_source(inbox_id=inbox_id, message_id=message_id) + + +def get_html_body(inbox_id: int, message_id: str) -> str: + return messages_api.get_html_body(inbox_id=inbox_id, message_id=message_id) + + +def get_eml_body(inbox_id: int, message_id: str) -> str: + return messages_api.get_eml_body(inbox_id=inbox_id, message_id=message_id) + + +def get_mail_headers(inbox_id: int, message_id: str) -> str: + return messages_api.get_mail_headers(inbox_id=inbox_id, message_id=message_id) + + +if __name__ == "__main__": + messages = list_messages(inbox_id=INBOX_ID) + print(messages) diff --git a/mailtrap/__init__.py b/mailtrap/__init__.py index 6c3670a..b4844fd 100644 --- a/mailtrap/__init__.py +++ b/mailtrap/__init__.py @@ -18,6 +18,7 @@ from .models.mail import Disposition from .models.mail import Mail from .models.mail import MailFromTemplate +from .models.messages import UpdateEmailMessageParams from .models.projects import ProjectParams from .models.templates import CreateEmailTemplateParams from .models.templates import UpdateEmailTemplateParams diff --git a/mailtrap/api/resources/messages.py b/mailtrap/api/resources/messages.py new file mode 100644 index 0000000..1832f55 --- /dev/null +++ b/mailtrap/api/resources/messages.py @@ -0,0 +1,152 @@ +from typing import Any +from typing import Optional +from typing import cast + +from mailtrap.http import HttpClient +from mailtrap.models.messages import AnalysisReport +from mailtrap.models.messages import AnalysisReportResponse +from mailtrap.models.messages import EmailMessage +from mailtrap.models.messages import ForwardedMessage +from mailtrap.models.messages import SpamReport +from mailtrap.models.messages import UpdateEmailMessageParams + + +class MessagesApi: + def __init__(self, client: HttpClient, account_id: str) -> None: + self._account_id = account_id + self._client = client + + def show_message(self, inbox_id: int, message_id: int) -> EmailMessage: + """Get email message by ID.""" + response = self._client.get(self._api_path(inbox_id, message_id)) + return EmailMessage(**response) + + def update( + self, inbox_id: int, message_id: int, message_params: UpdateEmailMessageParams + ) -> EmailMessage: + """ + Update message attributes + (right now only the **is_read** attribute is available for modification). + """ + response = self._client.patch( + self._api_path(inbox_id, message_id), + json={"message": message_params.api_data}, + ) + return EmailMessage(**response) + + def delete(self, inbox_id: int, message_id: int) -> EmailMessage: + """Delete message from inbox.""" + response = self._client.delete(self._api_path(inbox_id, message_id)) + return EmailMessage(**response) + + def get_list( + self, + inbox_id: int, + search: Optional[str] = None, + last_id: Optional[int] = None, + page: Optional[int] = None, + ) -> list[EmailMessage]: + """ + Get messages from the inbox. + + The response contains up to 30 messages per request. You can use pagination + parameters (`last_id` or `page`) to retrieve additional results. + + Args: + inbox_id (int): ID of the inbox to retrieve messages from. + search (Optional[str]): + Search query string. Matches `subject`, `to_email`, and `to_name`. + Example: `"welcome"` + last_id (Optional[int]): + If specified, returns a page of records before the given `last_id`. + Overrides `page` if both are provided. + Must be `>= 1`. + Example: `123` + page (Optional[int]): + Page number for paginated results. + Ignored if `last_id` is also provided. + Must be `>= 1`. + Example: `5` + + Returns: + list[EmailMessage]: A list of email messages. + + Notes: + - Only one of `last_id` or `page` should typically be used. + - `last_id` has higher priority if both are provided. + - Each response contains at most 30 messages. + """ + params: dict[str, Any] = {} + if search: + params["search"] = search + if last_id: + params["last_id"] = last_id + if page: + params["page"] = page + + response = self._client.get(self._api_path(inbox_id), params=params) + return [EmailMessage(**message) for message in response] + + def forward(self, inbox_id: int, message_id: int, email: str) -> ForwardedMessage: + """ + Forward message to an email address. + The email address must be confirmed by the recipient in advance. + """ + response = self._client.post( + f"{self._api_path(inbox_id, message_id)}/forward", json={"email": email} + ) + return ForwardedMessage(**response) + + def get_spam_report(self, inbox_id: int, message_id: int) -> SpamReport: + """Get a brief spam report by message ID.""" + response = self._client.get(f"{self._api_path(inbox_id, message_id)}/spam_report") + return SpamReport(**response["report"]) + + def get_html_analysis(self, inbox_id: int, message_id: int) -> AnalysisReport: + """Get a brief HTML report by message ID.""" + response = self._client.get(f"{self._api_path(inbox_id, message_id)}/analyze") + return AnalysisReportResponse(**response).report + + def get_text_body(self, inbox_id: int, message_id: int) -> str: + """Get text email body, if it exists.""" + return cast( + str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.txt") + ) + + def get_raw_body(self, inbox_id: int, message_id: int) -> str: + """Get raw email body.""" + return cast( + str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.raw") + ) + + def get_html_source(self, inbox_id: int, message_id: int) -> str: + """Get HTML source of email.""" + return cast( + str, + self._client.get(f"{self._api_path(inbox_id, message_id)}/body.htmlsource"), + ) + + def get_html_body(self, inbox_id: int, message_id: int) -> str: + """Get formatted HTML email body. Not applicable for plain text emails.""" + return cast( + str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.html") + ) + + def get_eml_body(self, inbox_id: int, message_id: int) -> str: + """Get email message in .eml format.""" + return cast( + str, self._client.get(f"{self._api_path(inbox_id, message_id)}/body.eml") + ) + + def get_mail_headers(self, inbox_id: int, message_id: int) -> dict[str, Any]: + """Get mail headers of a message.""" + response = self._client.get( + f"{self._api_path(inbox_id, message_id)}/mail_headers" + ) + return cast(dict[str, Any], response["headers"]) + + def _api_path(self, inbox_id: int, message_id: Optional[int] = None) -> str: + path = f"/api/accounts/{self._account_id}/inboxes/{inbox_id}/messages" + if message_id: + return f"{path}/{message_id}" + return path diff --git a/mailtrap/api/testing.py b/mailtrap/api/testing.py index e545df4..c515e55 100644 --- a/mailtrap/api/testing.py +++ b/mailtrap/api/testing.py @@ -1,6 +1,7 @@ from typing import Optional from mailtrap.api.resources.inboxes import InboxesApi +from mailtrap.api.resources.messages import MessagesApi from mailtrap.api.resources.projects import ProjectsApi from mailtrap.http import HttpClient @@ -20,3 +21,7 @@ def projects(self) -> ProjectsApi: @property def inboxes(self) -> InboxesApi: return InboxesApi(account_id=self._account_id, client=self._client) + + @property + def messages(self) -> MessagesApi: + return MessagesApi(account_id=self._account_id, client=self._client) diff --git a/mailtrap/http.py b/mailtrap/http.py index 06524bd..90c80c8 100644 --- a/mailtrap/http.py +++ b/mailtrap/http.py @@ -2,6 +2,7 @@ from typing import NoReturn from typing import Optional +from requests import JSONDecodeError from requests import Response from requests import Session @@ -54,14 +55,23 @@ def _process_response(self, response: Response) -> Any: if not response.content.strip(): return None - return response.json() + try: + return response.json() + except (JSONDecodeError, ValueError): + return response.text def _handle_failed_response(self, response: Response) -> NoReturn: status_code = response.status_code + + if not response.content: + if status_code == 404: + raise APIError(status_code, errors=["Not Found"]) + raise APIError(status_code, errors=["Empty response body"]) + try: data = response.json() - except ValueError as exc: - raise APIError(status_code, errors=["Unknown Error"]) from exc + except (JSONDecodeError, ValueError) as exc: + raise APIError(status_code, errors=["Invalid JSON"]) from exc errors = self._extract_errors(data) diff --git a/mailtrap/models/messages.py b/mailtrap/models/messages.py new file mode 100644 index 0000000..6b6f6da --- /dev/null +++ b/mailtrap/models/messages.py @@ -0,0 +1,142 @@ +from datetime import datetime +from enum import Enum +from typing import Any +from typing import Optional +from typing import Union + +from pydantic import Field +from pydantic.dataclasses import dataclass + +from mailtrap.models.common import RequestParams + + +class BlacklistsResult(str, Enum): + SUCCESS = "success" + PENDING = "pending" + ERROR = "error" + + +@dataclass +class BlacklistsReport: + name: str + url: str + in_black_list: bool + + +@dataclass +class Blacklists: + result: BlacklistsResult + domain: str + ip: str + report: list[BlacklistsReport] + + +@dataclass +class SmtpData: + mail_from_addr: str + client_ip: str + + +@dataclass +class SmtpInformation: + ok: bool + data: Optional[SmtpData] = None + + +@dataclass +class EmailMessage: + id: int + inbox_id: int + subject: str + sent_at: datetime + from_email: str + from_name: str + to_email: str + to_name: str + email_size: int + is_read: bool + created_at: datetime + updated_at: datetime + html_body_size: int + text_body_size: int + human_size: str + html_path: str + txt_path: str + raw_path: str + download_path: str + html_source_path: str + blacklists_report_info: Union[bool, Blacklists] + smtp_information: SmtpInformation + + +@dataclass +class UpdateEmailMessageParams(RequestParams): + is_read: bool + + @property + def api_data(self) -> dict[str, Any]: + data = dict(super().api_data) + data["is_read"] = str(data["is_read"]).lower() + return data + + +@dataclass +class ForwardedMessage: + message: str + + +@dataclass +class SpamDetail: + pts: float = Field(alias="Pts") + rule_name: str = Field(alias="RuleName") + description: str = Field(alias="Description") + + +@dataclass +class SpamReport: + response_code: int = Field(alias="ResponseCode") + response_message: str = Field(alias="ResponseMessage") + response_version: str = Field(alias="ResponseVersion") + score: float = Field(alias="Score") + spam: bool = Field(alias="Spam") + threshold: int = Field(alias="Threshold") + details: list[SpamDetail] = Field(alias="Details") + + +@dataclass +class EmailClients: + desktop: list[str] + mobile: list[str] + web: list[str] + + +@dataclass +class ErrorItem: + error_line: int + rule_name: str + email_clients: EmailClients + + +class AnalysisReportStatus(str, Enum): + SUCCESS = "success" + ERROR = "error" + + +@dataclass +class AnalysisReport: + status: AnalysisReportStatus + + +@dataclass +class AnalysisReportError(AnalysisReport): + msg: str + + +@dataclass +class AnalysisReportSuccess(AnalysisReport): + errors: list[ErrorItem] + + +@dataclass +class AnalysisReportResponse: + report: Union[AnalysisReportError, AnalysisReportSuccess] diff --git a/tests/unit/api/testing/test_messages.py b/tests/unit/api/testing/test_messages.py new file mode 100644 index 0000000..e388341 --- /dev/null +++ b/tests/unit/api/testing/test_messages.py @@ -0,0 +1,869 @@ +from typing import Any + +import pytest +import responses + +from mailtrap.api.resources.messages import MessagesApi +from mailtrap.config import GENERAL_HOST +from mailtrap.exceptions import APIError +from mailtrap.http import HttpClient +from mailtrap.models.messages import EmailMessage +from mailtrap.models.messages import UpdateEmailMessageParams +from tests import conftest + +ACCOUNT_ID = "321" +INBOX_ID = 3538 +MESSAGE_ID = 2323 +ATTACHMENT_ID = 67 +BASE_MESSAGES_URL = ( + f"https://{GENERAL_HOST}/api/accounts/{ACCOUNT_ID}/inboxes/{INBOX_ID}/messages" +) + + +@pytest.fixture +def client() -> MessagesApi: + return MessagesApi(account_id=ACCOUNT_ID, client=HttpClient(GENERAL_HOST)) + + +@pytest.fixture +def sample_message_dict() -> dict[str, Any]: + return { + "id": MESSAGE_ID, + "inbox_id": INBOX_ID, + "subject": "Test email", + "sent_at": "2022-07-01T19:29:59.295Z", + "from_email": "john@mailtrap.io", + "from_name": "John", + "to_email": "mary@mailtrap.io", + "to_name": "Mary", + "email_size": 300, + "is_read": False, + "created_at": "2022-07-01T19:29:59.295Z", + "updated_at": "2022-07-01T19:29:59.295Z", + "html_body_size": 150, + "text_body_size": 100, + "human_size": "300 Bytes", + "html_path": ( + f"/api/accounts/{ACCOUNT_ID}" + f"/inboxes/{INBOX_ID}" + f"/messages/{MESSAGE_ID}/body.html" + ), + "txt_path": ( + f"/api/accounts/{ACCOUNT_ID}" + f"/inboxes/{INBOX_ID}" + f"/messages/{MESSAGE_ID}/body.txt" + ), + "raw_path": ( + f"/api/accounts/{ACCOUNT_ID}" + f"/inboxes/{INBOX_ID}" + f"/messages/{MESSAGE_ID}/body.raw" + ), + "download_path": ( + f"/api/accounts/{ACCOUNT_ID}" + f"/inboxes/{INBOX_ID}" + f"/messages/{MESSAGE_ID}/body.eml" + ), + "html_source_path": ( + f"/api/accounts/{ACCOUNT_ID}" + f"/inboxes/{INBOX_ID}" + f"/messages/{MESSAGE_ID}/body.htmlsource" + ), + "blacklists_report_info": False, + "smtp_information": { + "ok": True, + "data": {"mail_from_addr": "john@mailtrap.io", "client_ip": "193.62.62.184"}, + }, + } + + +@pytest.fixture +def sample_spam_report_dict() -> dict[str, Any]: + return { + "report": { + "ResponseCode": 2, + "ResponseMessage": "Not spam", + "ResponseVersion": "1.2", + "Score": 1.2, + "Spam": False, + "Threshold": 5, + "Details": [ + { + "Pts": 0, + "RuleName": "HTML_MESSAGE", + "Description": "BODY: HTML included in message", + } + ], + } + } + + +@pytest.fixture +def sample_html_analysis_dict() -> dict[str, Any]: + return { + "report": { + "status": "success", + "errors": [ + { + "error_line": 15, + "rule_name": "style", + "email_clients": { + "desktop": ["Notes 6 / 7"], + "mobile": ["Gmail"], + "web": [], + }, + } + ], + } + } + + +@pytest.fixture +def sample_forwarded_message_dict() -> dict[str, Any]: + return {"message": "Your email message has been successfully forwarded"} + + +@pytest.fixture +def sample_mail_headers_dict() -> dict[str, Any]: + return { + "headers": { + "bcc": "john_doe@example.com", + "cc": "john_doe@example.com", + "from": "john_doe@example.com", + "to": "john_doe@example.com", + "subject": "Your Example Order Confirmation", + } + } + + +class TestMessagesApi: + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_get_list_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + responses.get( + BASE_MESSAGES_URL, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.get_list(INBOX_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_list_should_return_message_list( + self, client: MessagesApi, sample_message_dict: dict + ) -> None: + responses.get( + BASE_MESSAGES_URL, + json=[sample_message_dict], + status=200, + ) + + messages = client.get_list(INBOX_ID) + + assert isinstance(messages, list) + assert all(isinstance(m, EmailMessage) for m in messages) + assert messages[0].id == MESSAGE_ID + + @responses.activate + def test_get_list_with_params_should_include_query_params( + self, client: MessagesApi, sample_message_dict: dict + ) -> None: + responses.get( + BASE_MESSAGES_URL, + json=[sample_message_dict], + status=200, + ) + + client.get_list(INBOX_ID, search="welcome", last_id=123, page=5) + + assert len(responses.calls) == 1 + request = responses.calls[0].request + assert "search=welcome" in request.url + assert "last_id=123" in request.url + assert "page=5" in request.url + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_show_message_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + responses.get( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.show_message(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_show_message_should_return_single_message( + self, client: MessagesApi, sample_message_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + responses.get( + url, + json=sample_message_dict, + status=200, + ) + + message = client.show_message(INBOX_ID, MESSAGE_ID) + + assert isinstance(message, EmailMessage) + assert message.id == MESSAGE_ID + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_update_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + responses.patch( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.update(INBOX_ID, MESSAGE_ID, UpdateEmailMessageParams(is_read=True)) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_update_should_return_updated_message( + self, client: MessagesApi, sample_message_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + updated_message_dict = sample_message_dict.copy() + updated_message_dict["is_read"] = True + + responses.patch( + url, + json=updated_message_dict, + status=200, + ) + + message = client.update( + INBOX_ID, MESSAGE_ID, UpdateEmailMessageParams(is_read=True) + ) + + assert isinstance(message, EmailMessage) + assert message.is_read is True + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_delete_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + responses.delete( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.delete(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_delete_should_return_deleted_message( + self, client: MessagesApi, sample_message_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}" + responses.delete( + url, + json=sample_message_dict, + status=200, + ) + + result = client.delete(INBOX_ID, MESSAGE_ID) + + assert isinstance(result, EmailMessage) + assert result.id == MESSAGE_ID + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_forward_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/forward" + responses.post( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.forward(INBOX_ID, MESSAGE_ID, "jack@mailtrap.io") + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_forward_should_return_forwarded_message( + self, client: MessagesApi, sample_forwarded_message_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/forward" + responses.post( + url, + json=sample_forwarded_message_dict, + status=200, + ) + + result = client.forward(INBOX_ID, MESSAGE_ID, "jack@mailtrap.io") + + assert result.message == "Your email message has been successfully forwarded" + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_get_spam_report_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/spam_report" + responses.get( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.get_spam_report(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_spam_report_should_return_spam_report( + self, client: MessagesApi, sample_spam_report_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/spam_report" + responses.get( + url, + json=sample_spam_report_dict, + status=200, + ) + + result = client.get_spam_report(INBOX_ID, MESSAGE_ID) + + assert result.response_code == 2 + assert result.spam is False + assert result.score == 1.2 + + @pytest.mark.parametrize( + "status_code,response_json,expected_error_message", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + conftest.UNAUTHORIZED_RESPONSE, + conftest.UNAUTHORIZED_ERROR_MESSAGE, + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + conftest.FORBIDDEN_RESPONSE, + conftest.FORBIDDEN_ERROR_MESSAGE, + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + conftest.NOT_FOUND_RESPONSE, + conftest.NOT_FOUND_ERROR_MESSAGE, + ), + ], + ) + @responses.activate + def test_get_html_analysis_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_json: dict, + expected_error_message: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/analyze" + responses.get( + url, + status=status_code, + json=response_json, + ) + + with pytest.raises(APIError) as exc_info: + client.get_html_analysis(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_html_analysis_should_return_analysis_report( + self, client: MessagesApi, sample_html_analysis_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/analyze" + responses.get( + url, + json=sample_html_analysis_dict, + status=200, + ) + + result = client.get_html_analysis(INBOX_ID, MESSAGE_ID) + + assert result.status.value == "success" + assert len(result.errors) == 1 + assert result.errors[0].error_line == 15 + + @pytest.mark.parametrize( + "status_code,response_body,expected_error_message,content_type", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + '{"error": "Incorrect API token"}', + conftest.UNAUTHORIZED_ERROR_MESSAGE, + "text/plain", + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + '{"errors": "Inbox is not active or you have insufficient permissions"}', + "Inbox is not active or you have insufficient permissions", + "text/plain", + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + "", + conftest.NOT_FOUND_ERROR_MESSAGE, + "text/plain", + ), + ], + ) + @responses.activate + def test_get_text_body_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_body: str, + expected_error_message: str, + content_type: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.txt" + responses.get( + url, + status=status_code, + body=response_body, + content_type=content_type, + ) + + with pytest.raises(APIError) as exc_info: + client.get_text_body(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_text_body_should_return_text_content(self, client: MessagesApi) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.txt" + text_content = "Congrats for sending test email with Mailtrap!" + responses.get( + url, + body=text_content, + status=200, + content_type="text/plain", + ) + + result = client.get_text_body(INBOX_ID, MESSAGE_ID) + + assert result == text_content + + @pytest.mark.parametrize( + "status_code,response_body,expected_error_message,content_type", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + '{"error": "Incorrect API token"}', + conftest.UNAUTHORIZED_ERROR_MESSAGE, + "application/json", + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + '{"errors": "Inbox is not active or you have insufficient permissions"}', + "Inbox is not active or you have insufficient permissions", + "application/json", + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + "", + conftest.NOT_FOUND_ERROR_MESSAGE, + "text/html", + ), + ], + ) + @responses.activate + def test_get_html_body_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_body: str, + expected_error_message: str, + content_type: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.html" + responses.get( + url, + status=status_code, + body=response_body, + content_type=content_type, + ) + + with pytest.raises(APIError) as exc_info: + client.get_html_body(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_html_body_should_return_html_content(self, client: MessagesApi) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.html" + html_content = "Test HTML content" + responses.get( + url, + body=html_content, + status=200, + content_type="text/html", + ) + + result = client.get_html_body(INBOX_ID, MESSAGE_ID) + + assert result == html_content + + @pytest.mark.parametrize( + "status_code,response_body,expected_error_message,content_type", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + '{"error": "Incorrect API token"}', + conftest.UNAUTHORIZED_ERROR_MESSAGE, + "text/plain", + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + '{"errors": "Inbox is not active or you have insufficient permissions"}', + "Inbox is not active or you have insufficient permissions", + "text/plain", + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + "", + conftest.NOT_FOUND_ERROR_MESSAGE, + "text/plain", + ), + ], + ) + @responses.activate + def test_get_raw_body_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_body: str, + expected_error_message: str, + content_type: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.raw" + responses.get( + url, + status=status_code, + body=response_body, + content_type=content_type, + ) + + with pytest.raises(APIError) as exc_info: + client.get_raw_body(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_raw_body_should_return_raw_content(self, client: MessagesApi) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.raw" + raw_content = ( + "From: test@example.com\nTo: recipient@example.com\n" + "Subject: Test\n\nBody content" + ) + responses.get( + url, + body=raw_content, + status=200, + content_type="text/plain", + ) + + result = client.get_raw_body(INBOX_ID, MESSAGE_ID) + + assert result == raw_content + + @pytest.mark.parametrize( + "status_code,response_body,expected_error_message,content_type", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + '{"error": "Incorrect API token"}', + conftest.UNAUTHORIZED_ERROR_MESSAGE, + "text/plain", + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + '{"errors": "Inbox is not active or you have insufficient permissions"}', + "Inbox is not active or you have insufficient permissions", + "text/plain", + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + "", + conftest.NOT_FOUND_ERROR_MESSAGE, + "text/html", + ), + ], + ) + @responses.activate + def test_get_html_source_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_body: str, + expected_error_message: str, + content_type: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.htmlsource" + responses.get( + url, + status=status_code, + body=response_body, + content_type=content_type, + ) + + with pytest.raises(APIError) as exc_info: + client.get_html_source(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_html_source_should_return_html_source(self, client: MessagesApi) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.htmlsource" + html_source = "Source" + responses.get( + url, + body=html_source, + status=200, + content_type="text/html", + ) + + result = client.get_html_source(INBOX_ID, MESSAGE_ID) + + assert result == html_source + + @pytest.mark.parametrize( + "status_code,response_body,expected_error_message,content_type", + [ + ( + conftest.UNAUTHORIZED_STATUS_CODE, + '{"error": "Incorrect API token"}', + conftest.UNAUTHORIZED_ERROR_MESSAGE, + "text/plain", + ), + ( + conftest.FORBIDDEN_STATUS_CODE, + '{"errors": "Inbox is not active or you have insufficient permissions"}', + "Inbox is not active or you have insufficient permissions", + "text/plain", + ), + ( + conftest.NOT_FOUND_STATUS_CODE, + "", + conftest.NOT_FOUND_ERROR_MESSAGE, + "text/plain", + ), + ], + ) + @responses.activate + def test_get_eml_body_should_raise_api_errors( + self, + client: MessagesApi, + status_code: int, + response_body: str, + expected_error_message: str, + content_type: str, + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.eml" + responses.get( + url, + status=status_code, + body=response_body, + content_type=content_type, + ) + + with pytest.raises(APIError) as exc_info: + client.get_eml_body(INBOX_ID, MESSAGE_ID) + + assert expected_error_message in str(exc_info.value) + + @responses.activate + def test_get_eml_body_should_return_eml_content(self, client: MessagesApi) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/body.eml" + eml_content = ( + "From: test@example.com\nTo: recipient@example.com\n" + "Subject: Test\n\nBody content" + ) + responses.get( + url, + body=eml_content, + status=200, + content_type="message/rfc822", + ) + + result = client.get_eml_body(INBOX_ID, MESSAGE_ID) + + assert result == eml_content + + @responses.activate + def test_get_mail_headers_should_return_headers( + self, client: MessagesApi, sample_mail_headers_dict: dict + ) -> None: + url = f"{BASE_MESSAGES_URL}/{MESSAGE_ID}/mail_headers" + responses.get( + url, + json=sample_mail_headers_dict, + status=200, + ) + + result = client.get_mail_headers(INBOX_ID, MESSAGE_ID) + + assert result == sample_mail_headers_dict["headers"] From c75cb4225af7da38c0dea73f27ed6e5e6ddf457c Mon Sep 17 00:00:00 2001 From: Ihor Bilous Date: Mon, 15 Sep 2025 02:55:53 +0300 Subject: [PATCH 2/3] Fix issue #26: Change json error import --- mailtrap/http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mailtrap/http.py b/mailtrap/http.py index 90c80c8..60b474d 100644 --- a/mailtrap/http.py +++ b/mailtrap/http.py @@ -1,8 +1,8 @@ +from json import JSONDecodeError from typing import Any from typing import NoReturn from typing import Optional -from requests import JSONDecodeError from requests import Response from requests import Session From 52f7e3c8243dfeae8372043a1a889551cf1a50a1 Mon Sep 17 00:00:00 2001 From: Ihor Bilous Date: Mon, 15 Sep 2025 23:42:36 +0300 Subject: [PATCH 3/3] Fix issue #26: Add tests for http client --- tests/unit/test_http.py | 104 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 tests/unit/test_http.py diff --git a/tests/unit/test_http.py b/tests/unit/test_http.py new file mode 100644 index 0000000..27d32e0 --- /dev/null +++ b/tests/unit/test_http.py @@ -0,0 +1,104 @@ +import json +from unittest.mock import Mock + +import pytest + +from mailtrap.exceptions import APIError +from mailtrap.exceptions import AuthorizationError +from mailtrap.http import HttpClient + + +class TestHttpClient: + + def test_extract_errors_with_singular_error_key(self) -> None: + data = {"error": "Simple error message"} + errors = HttpClient._extract_errors(data) + assert errors == ["Simple error message"] + + def test_extract_errors_with_string_value(self) -> None: + data = {"errors": "Error message"} + errors = HttpClient._extract_errors(data) + assert errors == ["Error message"] + + def test_extract_errors_with_string_list(self) -> None: + data = {"errors": ["Error 1", "Error 2"]} + errors = HttpClient._extract_errors(data) + assert errors == ["Error 1", "Error 2"] + + def test_extract_errors_with_nested_dict(self) -> None: + data = {"errors": {"field1": ["Error in field1"], "field2": "Error in field2"}} + errors = HttpClient._extract_errors(data) + assert "field1: Error in field1" in errors + assert "field2: Error in field2" in errors + + def test_extract_errors_with_unknown_format(self) -> None: + data = {"unknown_key": "Some error"} + errors = HttpClient._extract_errors(data) + assert errors == ["Unknown error"] + + def test_handle_failed_response_401_raises_authorization_error(self) -> None: + client = HttpClient("test.mailtrap.com") + + mock_response = Mock() + mock_response.status_code = 401 + mock_response.content = b'{"error": "Unauthorized"}' + mock_response.json.return_value = {"error": "Unauthorized"} + + with pytest.raises(AuthorizationError) as exc_info: + client._handle_failed_response(mock_response) + + assert "Unauthorized" in str(exc_info.value) + + def test_handle_failed_response_404_with_empty_content(self) -> None: + client = HttpClient("test.mailtrap.com") + + mock_response = Mock() + mock_response.status_code = 404 + mock_response.content = b"" + + with pytest.raises(APIError) as exc_info: + client._handle_failed_response(mock_response) + + assert exc_info.value.status == 404 + assert "Not Found" in exc_info.value.errors + + def test_handle_failed_response_with_empty_content(self) -> None: + client = HttpClient("test.mailtrap.com") + + mock_response = Mock() + mock_response.status_code = 500 + mock_response.content = b"" + + with pytest.raises(APIError) as exc_info: + client._handle_failed_response(mock_response) + + assert exc_info.value.status == 500 + assert "Empty response body" in exc_info.value.errors + + def test_handle_failed_response_invalid_json(self) -> None: + client = HttpClient("test.mailtrap.com") + + mock_response = Mock() + mock_response.status_code = 400 + mock_response.content = b"Invalid JSON content" + mock_response.json.side_effect = json.JSONDecodeError("Invalid JSON", "", 0) + + with pytest.raises(APIError) as exc_info: + client._handle_failed_response(mock_response) + + assert exc_info.value.status == 400 + assert "Invalid JSON" in exc_info.value.errors + + def test_handle_failed_response_generic_error(self) -> None: + client = HttpClient("test.mailtrap.com") + + mock_response = Mock() + mock_response.status_code = 500 + mock_response.content = b'{"error": "Internal server error"}' + mock_response.json.return_value = {"error": "Internal server error"} + + with pytest.raises(APIError) as exc_info: + client._handle_failed_response(mock_response) + + assert exc_info.value.status == 500 + assert "Internal server error" in exc_info.value.errors