In [1]:
import asyncio
import aiohttp
from google.oauth2.credentials import Credentials

GMAIL_API = "https://gmail.googleapis.com/gmail/v1/users/me"

class AsyncGmailClient:
    def __init__(self, token_path, for_del=False, max_connections=5):
        scopes = ['https://www.googleapis.com/auth/gmail.modify']
        if for_del:
            scopes.append('https://mail.google.com/')
        self.creds = Credentials.from_authorized_user_file(token_path, scopes)
        self.semaphore = asyncio.Semaphore(max_connections)
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, *args):
        await self.session.close()

In [4]:
import pandas as pd
from dags.utils.encode_utils import decode_zip
from dags.utils.payload_utils import decode_gmail_payload
path_1 = "data/imp_22-08-2025-03-23.json.gz"

decompressed_data_1 = decode_zip(path_1)
df =pd.DataFrame(decompressed_data_1)
df[["Date", "Subject", "Body"]] = df["Payload"].apply(lambda row: pd.Series(decode_gmail_payload(row)))

In [5]:
from dags.utils.encode_utils import Serialize, Deserialize 
r = len(df)
df["Payload"] =df["Payload"].apply(lambda row: Serialize(row))
# reordering columns
df = df[["Id", "Date", "Subject", "Body", "Payload"]]
chunk_size = 50 if r>50 else r
df = df.drop(["Body"], axis=1)
chunks: list[tuple] = [tuple(df.iloc[i:i+chunk_size].itertuples(index=False, name=None))
                            for i in range(0, r, chunk_size)]
chunks: bytes = Serialize(chunks)

In [7]:
chunks: list[tuple[tuple]] = Deserialize(chunks)