In [1]:
%load_ext autoreload
%autoreload 2

from common import add_workspace_to_path

add_workspace_to_path()

In [3]:
from cloud.functions.infrastructure.google.helper import load_google_credentials

credentials = load_google_credentials()

In [5]:
from datetime import datetime


class GmailQueryBuilder:
    """Helper class to build Gmail search queries"""

    def __init__(self):
        self._query_parts = []

    def from_email(self, email):
        """Add from: filter"""
        self._query_parts.append(f"from:{email}")
        return self

    def subject(self, text, exact=True):
        """Add subject: filter"""
        if exact:
            self._query_parts.append(f'subject:"{text}"')
        else:
            self._query_parts.append(f"subject:{text}")
        return self

    def after_date(self, date):
        """Add after: filter using Unix timestamp"""
        if isinstance(date, datetime):
            timestamp = int(date.timestamp())
        else:
            timestamp = int(date)
        self._query_parts.append(f"after:{timestamp}")
        return self

    def build(self):
        """Build the final query string"""
        return " ".join(self._query_parts)

In [6]:
from datetime import datetime, timezone, timedelta

time_threshold = datetime.now(timezone.utc) - timedelta(hours=48)

query = GmailQueryBuilder() \
        .from_email("order-update@amazon.de") \
        .subject("Ihr Paket kann bei DHL", exact=True) \
        .after_date(time_threshold) \
        .build()

In [None]:
from pydantic import BaseModel


class MessageId(BaseModel):
    id: str
    thread_id: str

    @staticmethod
    def from_response(response: dict) -> "MessageId":
        return MessageId(id=response["id"], thread_id=response["threadId"])

In [46]:
import base64

def _extract_email_body(payload: dict) -> str:
    def _decode(inner: dict) -> str:
        return base64.urlsafe_b64decode(inner["body"]["data"].encode("utf-8")).decode("utf-8")

    if "parts" in payload:
        for part in payload["parts"]:
            if part["mimeType"] == "text/html":
                return _decode(part)
    else:
        return _decode(payload)

    return ""

class FullMailResponse(BaseModel):
    id: str
    threadId: str
    labelIds: list[str]
    snippet: str
    payload: str
    sizeEstimate: int
    historyId: str
    internalDate: str

    @staticmethod
    def from_response(response: dict) -> "FullMailResponse":
        return FullMailResponse(
            id=response["id"],
            threadId=response["threadId"],
            labelIds=response["labelIds"],
            snippet=response["snippet"],
            historyId=response["historyId"],
            internalDate=response["internalDate"],
            payload=_extract_email_body(response["payload"]),
            sizeEstimate=response["sizeEstimate"]
        )

In [None]:
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials

class GmailClient:
    def __init__(self, credentials: Credentials):
        self._service = build(
            "gmail", "v1", credentials=credentials, cache_discovery=False
        )

    def _fetch_message_ids(self,  query: str) -> list[MessageId]:
        """Fetch message IDs matching the query"""
        response = (
            self._service.users()
            .messages()
            .list(userId="me", q=query)
            .execute()
        )
        messages = response.get("messages", [])
        return [MessageId.from_response(msg) for msg in messages]

    def fetch_mails(self, query: str) -> list[FullMailResponse]:
        """Fetch full email messages matching the query"""
        message_ids = self._fetch_message_ids(query)
        emails = []
        for msg_id in message_ids:
            message = (
                self._service.users()
                .messages()
                .get(userId="me", id=msg_id.id, format="full")
                .execute()
            )
            emails.append(FullMailResponse.from_response(message))
        return emails
         

In [None]:
s = GmailClient(credentials)

In [49]:
ids = s._fetch_message_ids(query)
ids

[MessageId(id='19a594b35638b2ec', thread_id='19a5949022f29cdd'),
 MessageId(id='19a5949022f29cdd', thread_id='19a5949022f29cdd'),
 MessageId(id='19a54932933bd33d', thread_id='19a54932933bd33d')]

In [50]:
s.fetch_mails(query)

[FullMailResponse(id='19a594b35638b2ec', threadId='19a5949022f29cdd', labelIds=['CATEGORY_UPDATES', 'INBOX'], snippet='Amazon.de Guten Tag Oliver Rüger, Ihr Paket mit 1 Artikel kann bei DHL Packstation 158 abgeholt werden. Sie erhalten den erforderlichen Abholcode in einer separaten Nachricht direkt vom Transporteur.', payload=' <!DOCTYPE html>\r\n    <html xmlns="https://www.w3.org/1999/xhtml" dir="ltr">\r\n <head> \r\n  <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" /> \r\n  <style type="text/css">\r\n\r\nhtml, div, span, applet, object, iframe,\r\nh1, h2, h3, h4, h5, h6, p, blockquote, pre,\r\na, abbr, acronym, address, big, cite, code,\r\ndel, dfn, em, font, img, ins, kbd, q, s, samp,\r\nsmall, strike, strong, sub, sup, tt, var,\r\ndl, dt, dd, ol, ul, li,\r\nfieldset, form, label, legend,\r\ntable, caption, tbody, tfoot, thead, tr, th, td {\r\n\tmargin: 0;\r\n\tpadding: 0;\r\n\tborder: 0;\r\n\toutline: 0;\r\n\tfont-weight: inherit;\r\n\tfont-style: inherit;\r\n\