<a href="https://colab.research.google.com/github/sayakju/llm-practice/blob/main/cc_statement_analyzer_using_gpt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install --quiet google-api-python-client pdfplumber PyPDF2 openai python-dotenv

In [None]:
import os
import base64
import pdfplumber
import PyPDF2
from google.colab import files, userdata
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from openai import OpenAI

# === Upload OAuth files ===
print("Please upload both 'token.json' and 'credentials.json'")
uploaded = files.upload()  # This will prompt file upload

# Save uploaded files to disk
for fn in uploaded.keys():
    with open(fn, 'wb') as f:
        f.write(uploaded[fn])

# Confirm upload
if not os.path.exists("token.json") or not os.path.exists("credentials.json"):
    raise FileNotFoundError("Both 'token.json' and 'credentials.json' must be uploaded.")
else:
    print("✅ Files uploaded successfully.")

# CONFIG
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
CREDENTIALS_PATH = 'token.json'
PDF_PASSWORD = input("Enter your PDF password: ")

In [None]:
# Initialize Gmail API
creds = Credentials.from_authorized_user_file(CREDENTIALS_PATH, SCOPES)
gmail_service = build('gmail', 'v1', credentials=creds)

In [None]:
def search_emails(query):
    results = gmail_service.users().messages().list(userId='me', q=query).execute()
    return results.get('messages', [])

def get_email_attachments(msg_id):
    msg = gmail_service.users().messages().get(userId='me', id=msg_id, format='full').execute()
    attachments = []
    def extract(parts):
        for part in parts:
            filename = part.get("filename", "")
            body = part.get("body", {})
            if filename.endswith(".PDF") or filename.endswith(".pdf"):
                if "attachmentId" in body:
                    att_id = body["attachmentId"]
                    attachment = gmail_service.users().messages().attachments().get(userId='me', messageId=msg_id, id=att_id).execute()
                    data = base64.urlsafe_b64decode(attachment['data'])
                    attachments.append((filename, data))
            if "parts" in part:
                extract(part['parts'])
    payload = msg.get("payload", {})
    if "parts" in payload:
        extract(payload['parts'])
    return attachments

In [None]:
from io import BytesIO

def extract_text_from_pdf(data):
    with open("temp.pdf", "wb") as f:
        f.write(data)
    try:
        with open("temp.pdf", "rb") as f:
            reader = PyPDF2.PdfReader(f)
            if reader.is_encrypted:
                reader.decrypt(PDF_PASSWORD)
            text = ""
            for page in reader.pages:
                text += page.extract_text() or ""
        return text
    except Exception as e:
        print("PDF decryption or extraction failed:", e)
        return ""

def extract_summary_page(pdf_bytes):
    with pdfplumber.open(BytesIO(pdf_bytes)) as pdf:
        # Assuming first page contains total spend
        return pdf.pages[0].extract_text()

def extract_summary_page_from_protected_pdf(pdf_bytes):
    # First decrypt with PyPDF2
    decrypted_stream = BytesIO()
    with BytesIO(pdf_bytes) as input_stream:
        reader = PyPDF2.PdfReader(input_stream)
        if reader.is_encrypted:
            try:
                reader.decrypt(PDF_PASSWORD)
            except:
                raise ValueError("Incorrect PDF password.")
        writer = PyPDF2.PdfWriter()
        writer.add_page(reader.pages[0])
        writer.write(decrypted_stream)

    # Now extract text from decrypted page using pdfplumber
    decrypted_stream.seek(0)
    with pdfplumber.open(decrypted_stream) as pdf:
        return pdf.pages[0].extract_text()

In [None]:
def summarize_with_gpt4(text):
    client = OpenAI(api_key=userdata.get('OPENAI_API_KEY'))

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {
                "role": "system",
                "content": (
                    "You are a financial assistant that extracts numeric data from anonymized credit card statement texts."
                    " Do not retain or refer to personal data."
                )
            },
            {
                "role": "user",
                "content": (
                    "Extract only the *total spend amount* from the following credit card statement text.\n"
                    "Return only the number with no extra text, no currency symbol, and no formatting.\n\n"
                    f"{text}"
                )
            }
        ],
        temperature=0,
        max_tokens=50
    )
    return response.choices[0].message.content


In [None]:
def interpret_credit_card_statement(text):
    print("Summarizing extracted text with GPT-4...")
    short_text = text[:4000]  # GPT input limit
    summary = summarize_with_gpt4(short_text)
    return summary

In [None]:
from datetime import datetime, timedelta

# Change this to match your filename or date format in PDF
def extract_date_from_filename(filename):
    # Example filename: 4375XXXXXXXXXX76_09-07-2025.PDF
    try:
        date_str = filename.split('_')[-1].replace('.PDF', '')
        return datetime.strptime(date_str, "%d-%m-%Y")
    except Exception as e:
        print(f"Could not parse date from {filename}: {e}")
        return None

# GPT-4 total spend extraction
def extract_total_spend(text):
    try:
        total_str = summarize_with_gpt4(text)
        return float(total_str)
    except Exception as e:
        print(f"Failed to extract total spend: {e}")
        return 0.0


# Define anniversary start date
anniversary_start = datetime(2023, 11, 9)  # 09-Nov-2023
today = datetime.today()

# Build anniversary buckets
buckets = []
current_start = anniversary_start
while current_start < today:
    next_start = current_start.replace(year=current_start.year + 1)
    is_ongoing = today < next_start
    buckets.append({
        "start": current_start,
        "end": next_start - timedelta(days=1),
        "spend": 0.0,
        "status": "ongoing" if is_ongoing else "complete"
    })
    current_start = next_start

# Main processing
query = 'subject:"Your HDFC Bank - Infinia Credit Card Statement" has:attachment'
# query = 'subject:"ICICI Bank Credit Card Statement for the period" has:attachment'
messages = search_emails(query)
print(f"Found {len(messages)} email(s) matching the query.")

for msg in messages:
    print(f"Processing email ID: {msg['id']}")
    attachments = get_email_attachments(msg['id'])
    for fname, data in attachments:
        print(f"  - {fname}")
        statement_date = extract_date_from_filename(fname)
        if not statement_date:
            print("    Skipping: Invalid or missing date in filename.")
            continue

        # Match to correct bucket
        matched_bucket = None
        for b in buckets:
            if b["start"] <= statement_date <= b["end"]:
                matched_bucket = b
                break
        if not matched_bucket:
            print("    Skipping: Statement outside all buckets.")
            continue

        # Extract and accumulate spend
        text = extract_summary_page_from_protected_pdf(data)
        if text:
            spend = extract_total_spend(text)
            print(f"    Total Spend: ₹{spend}")
            matched_bucket["spend"] += spend

# Final summary output
print("\n📊 HDFC Infinia Spend Summary by Anniversary Year:")
for i, b in enumerate(buckets, 1):
    status = "🟢 Ongoing" if b["status"] == "ongoing" else "✅ Completed"
    print(f"  {status} | Bucket {i}: {b['start'].strftime('%d-%b-%Y')} to {b['end'].strftime('%d-%b-%Y')} → ₹{b['spend']:.2f}")
