In [1]:
import pandas as pd
import os
import email
import email.policy
from pydantic import BaseModel, ValidationError
import requests
import json
import numpy as np
import pytesseract
from PIL import Image
import cv2
from tqdm import tqdm
try:
    # pdf2image might not be installed in all environments
    from pdf2image import convert_from_bytes
    HAVE_PDF2IMAGE = True
except ImportError:
    print("pdf2image not installed. PDF OCR will fail.")
    HAVE_PDF2IMAGE = False
from sklearn.metrics import classification_report
from typing import Any

In [2]:
def extract_email_data(eml_file_path: str) -> dict:
    """
    Parse a .eml file, return a dictionary containing:
      {
        "from": str or None,
        "to": str or None,
        "subject": str or None,
        "body": str or None,      # Prefer plain text if available
        "attachments": list of {
            "filename": str,
            "content_type": str,
            "data": bytes
        }
      }

    Args:
        eml_file_path (str): Full path to the .eml file.

    Returns:
        dict: A dictionary with the extracted data.
    """
    # 1) Read the raw .eml text
    with open(eml_file_path, "r", encoding="utf-8", errors="replace") as f:
        raw_email = f.read()

    # 2) Parse into an EmailMessage (new-style) using a modern policy
    msg = email.message_from_string(raw_email, policy=email.policy.default)

    # 3) Extract top-level headers
    from_ = msg.get("From", "")
    to_ = msg.get("To", "")
    subject = msg.get("Subject", "")

    # 4) Walk the parts to find the best body and attachments
    body_text = None
    attachments = []

    for part in msg.walk():
        # If it's a container (multipart/*), skip it—we want actual payload parts
        if part.get_content_maintype() == "multipart":
            continue

        # Check for attachments
        filename = part.get_filename()
        if filename:
            # It's an attachment
            attach_data = part.get_payload(decode=True)
            attachments.append({
                "filename": filename,
                "content_type": part.get_content_type(),
                "data": attach_data
            })
        else:
            # Potentially a text part (plain or html or something else)
            ctype = part.get_content_type()
            if ctype == "text/plain":
                # If we haven't chosen a body yet, or we prefer plain text,
                # decode it here
                if body_text is None:
                    payload = part.get_payload(decode=True)
                    charset = part.get_content_charset() or "utf-8"
                    body_text = payload.decode(charset, errors="replace")

            elif ctype == "text/html":
                # Optionally handle HTML if no plain text was found
                if body_text is None:
                    payload = part.get_payload(decode=True)
                    charset = part.get_content_charset() or "utf-8"
                    body_text = payload.decode(charset, errors="replace")

    # Build the result
    result = {
        "from": from_,
        "to": to_,
        "subject": subject,
        "body": body_text,
        "attachments": attachments
    }
    return result


In [4]:
example_email = extract_email_data("./emails_attachments/email_pdf_text_1.eml")

In [5]:
final_df = pd.DataFrame([example_email])

# LLM Code

In [7]:
prompt = """Consider yourself as a customer bank request classification expert who can classify any kind of customer bank request emails. You need to classify them into below request and subrequest categories {
  "Adjustment": {
    "General Adjustment": "For modifications in payment terms or amounts.",
    "subrequest": {
      "Rate Correction Adjustment": "Adjustments targeting specific rate errors.",
      "Data Correction": "Fixes for data entry mistakes."
    }
  },
  "AU Transfer": {
    "Standard AU Transfer": "Routine authorized transfers.",
    "subrequest": {
      "Intra-AU Reallocation": "Reassigning funds within segments under the same authorization.",
      "Cross-Entity Transfer": "Transfers between subsidiaries/legal entities within the same group.",
      "Split Transfer Request": "Single transfer split among multiple destination accounts."
    }
  },
  "Closing Notice": {
    "subrequest": {
      "Reallocation Fees": "",
      "Reallocation Principal": "",
      "Amendment Fees": "",
      "Partial Settlement Notice": "For partial settlements requiring separate tracking.",
      "Overpayment Notification": "To handle overpayments distinctly.",
      "Early Closure Notification": "Facility closed before scheduled date.",
      "Deferred or Delayed Closure": "Postponed closure due to issues.",
      "Document Submission Requirement": "Closure pending due to missing documentation."
    }
  },
  "Commitment Change": {
    "subrequest": {
      "Cashless Roll": "",
      "Decrease": "",
      "Increase": "",
      "Drawdown Revision": "Adjustments to scheduled drawdown amounts.",
      "Borrowing Base Recalculation": "Triggered by changes in collateral value.",
      "Commitment Reaffirmation": "Reaffirming existing commitment without altering financials.",
      "Term Extension": "Extending the facility term.",
      "Covenant Update": "Changes in covenants like financial ratios."
    }
  },
  "Fee Payment": {
    "subrequest": {
      "Ongoing Fee": "",
      "Letter of Credit Fee": "",
      "Retroactive Fee Correction": "Adjusting fees from previous periods.",
      "Fee Allocation Across Accounts": "Splitting fees across accounts.",
      "Fee Waiver Request": "Request to waive fees.",
      "Fee Reversal": "Reversing erroneously charged fees."
    }
  },
  "Money Movement Inbound": {
    "subrequest": {
      "Principal": "",
      "Interest": "",
      "Principal + Interest": "",
      "Principal + Interest + Fee": "",
      "Consolidated Payment Notification": "Aggregating multiple incoming payments.",
      "Segregated or Partitioned Payment": "Earmarking received funds for different purposes.",
      "Currency Conversion Inbound": "Inbound payment received in foreign currency.",
      "Escrow Payment": "Funds directed into escrow.",
      "Partial Payment": "Portion of expected payment received."
    }
  },
  "Money Movement Outbound": {
    "subrequest": {
      "Timebound": "",
      "Foreign Currency": "",
      "Failed Transfer Resolution": "Corrective action for failed outbound transfers.",
      "Reversal and Reissue": "Reversing and correctly reissuing outbound payments.",
      "Scheduled Outbound Payment Notification": "Future-dated outbound payment setup.",
      "Automated Reversal": "Automated error-triggered reversal process.",
      "Fee Refund": "Returning fees as part of outbound transaction."
    }
  },
  "Additional Process/Operational Exceptions": {
    "subrequest": {
      "System-Generated Error Correction": "Correcting system-generated errors.",
      "Manual Intervention Alert": "Request for manual review due to unexpected data."
    }
  },
  "Collateral Management": {
    "subrequest": {
      "Collateral Revaluation Request": "Request for updated or additional collateral.",
      "Collateral Deficiency Notification": "Insufficient collateral notification."
    }
  },
  "Documentation & Compliance": {
    "subrequest": {
      "Missing Documentation Follow-up": "Follow-up for unreceived documentation.",
      "Digital Signature Verification Issue": "Invalid digital signature flag."
    }
  }
}


Output format:{"request_type":, "subrequest_type":}. Please don't provide anything other than the json"""

In [6]:
class ExtractCorrectAnswer(BaseModel):
    request_type: str
    subrequest_type: str


    
    

In [8]:
def find_benefits(prompt: str, data_dict: str) -> dict:
    ollama_api_url = "http://localhost:11434/api/chat"

    # Ensure the prompt is structured correctly
    full_prompt = f"{prompt}\n\n email: {data_dict}"

    # Request payload for Ollama
    payload = {
        "model": "llama3.1",
        "messages": [{"role": "user", "content": full_prompt}],
        "options": {
            "seed": 101,
            "temperature": 0
        },
        "stream": False
    }

    try:


        error = True

        while error == True:
        
            response = requests.post(ollama_api_url, json=payload)
            response.raise_for_status()

            # Extract response JSON correctly
            response_json = response.json()
            model_response = response_json.get("message", {}).get("content", "")

            
            # model_response = model_response.split("</think>")[-1][9:-3]
            
            print(model_response)

            # Convert and validate JSON response
            parsed_response = json.loads(model_response)  # Extract valid JSON
            validated_response = ExtractCorrectAnswer(**parsed_response)
            
            return validated_response.model_dump()  # Return as dictionary
        
        

    except (json.JSONDecodeError, ValidationError, ValueError) as e:
        print(f"Error parsing model response: {e}")
        return  {"request_type": "manual", "subrequest_type":None}
    except requests.exceptions.RequestException as re:
        print(f"Request Error: {re}")
        return {"request_type": "manual", "subrequest_type":None}

    except Exception as e:
        print(f"Unexpected Error: {e}")
        return {"request_type": "manual", "subrequest_type":None}


In [9]:
def clean_text(text):
    if isinstance(text, str):
        return text.encode("utf-8", errors="ignore").decode("utf-8", errors="ignore")
    return text

final_df['subject'] = final_df['subject'].apply(clean_text)
final_df['body'] = final_df['body'].apply(clean_text)
final_df['attachments'] = final_df['attachments'].apply(clean_text)


In [10]:
# Force all fields to string type and clean them
final_df['subject'] = final_df['subject'].astype(str).str.encode('utf-8', errors='ignore').str.decode('utf-8', errors='ignore')
final_df['body'] = final_df['body'].astype(str).str.encode('utf-8', errors='ignore').str.decode('utf-8', errors='ignore')
final_df['attachments'] = final_df['attachments'].astype(str).str.encode('utf-8', errors='ignore').str.decode('utf-8', errors='ignore')


In [11]:
final_df_json = json.loads(final_df[['subject', 'body', 'attachments']].to_json(orient="records"))

In [12]:

def preprocess_image_for_ocr(pil_img: Image.Image) -> np.ndarray:
    """
    Convert a PIL Image to an OpenCV (NumPy) image, apply grayscale
    and adaptive threshold for better OCR on noisy/uneven scans.
    """
    img = np.array(pil_img)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    thresh = cv2.adaptiveThreshold(
        gray,
        255,
        cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
        cv2.THRESH_BINARY,
        31, 2
    )
    return thresh


def run_ocr_on_pil(pil_img: Image.Image) -> str:
    """
    Apply OpenCV preprocessing to a PIL Image,
    then run Tesseract with custom config.
    """
    processed = preprocess_image_for_ocr(pil_img)
    config_str = r"--oem 3 --psm 6"
    pil_processed = Image.fromarray(processed)
    text = pytesseract.image_to_string(pil_processed, config=config_str)
    return text


def ocr_pdf(pdf_data: bytes) -> str:
    """
    Convert PDF bytes to images (pdf2image), then
    do OCR on each page. Returns full text.
    """
    if not HAVE_PDF2IMAGE:
        return "[Error: pdf2image not installed, cannot OCR PDF]"
    
    pages = convert_from_bytes(pdf_data)
    all_texts = []
    for page_img in pages:
        page_text = run_ocr_on_pil(page_img)
        all_texts.append(page_text)
    return "\n".join(all_texts)


def ocr_image(img_data: bytes) -> str:
    """
    Load the raw image (PNG/JPG/etc.) into PIL,
    do OpenCV-based preprocessing, and run Tesseract.
    """
    with io.BytesIO(img_data) as buf:
        pil_img = Image.open(buf).convert("RGB")
        text = run_ocr_on_pil(pil_img)
    return text





In [13]:
from multiprocessing import Pool, cpu_count
from tqdm import tqdm


def process_jso(args):
    """
    Processes a single jso dictionary.

    Args:
        args (tuple): Typically (jso, prompt)

    Returns:
        The output from find_benefits(...) for this jso
    """
    jso, prompt = args

    temp_json = eval(jso["attachments"])

    if temp_json:
        text = ""
        for index, attachment in enumerate(temp_json):
            attachment_text = ocr_pdf(attachment["data"]) 
            text += f"attachement_{index}{attachment_text}"


        final_text_sent_model = text + jso["subject"] + jso["body"]

    else:
        final_text_sent_model = jso["subject"] + jso["body"]


    output = find_benefits(prompt, final_text_sent_model)
    return output


def parallel_process(final_df_json, prompt):
    """
    Runs 'process_jso' in parallel over final_df_json,
    preserving order. Returns a list of results in the
    same order as final_df_json.
    """

    args_list = [(jso, prompt) for jso in final_df_json]


    with Pool(processes=cpu_count()) as pool:

        results = list(tqdm(pool.map(process_jso, args_list),
                            total=len(args_list),
                            desc="Processing JSON items"))

    return results




 

In [14]:
outputs = parallel_process(final_df_json, prompt)
print("All done!")


{"request_type": "Adjustment", "subrequest_type": "General Adjustment"}


Processing JSON items: 100%|██████████| 1/1 [00:00<00:00, 4718.00it/s]

All done!





# Related Data

In [15]:
class ExtractCorrectAnswer(BaseModel):

    account_number: str| None = None
    principle_amount:float| None = None
    interest:float| None = None
    fees:float| None = None
    escalation:str| None = None
    appreciation:str| None = None


    
    

In [16]:
prompt = """Consider yourself as a customer bank request classification expert who can classify any kind of customer bank request emails. You need to  identify account_number, principle_amount, interest, fees, escalation, appreciation. If any of these are not present return null. Also, just give the json output and nothing else.

.Output should only be a json with all these keys present in json. If any of the keys doesn't have value give null as value { "account_number":float, "principle_amount":float, "interest":float, "fees":float , "escalation":str ,"appreciation": str} Please don't provide anything other than the json"""

In [17]:
def find_attributes(prompt: str, data_dict: str) -> dict:
    ollama_api_url = "http://localhost:11434/api/chat"

    # Ensure the prompt is structured correctly
    full_prompt = f"{prompt}\n\n email: {data_dict}"

    # Request payload for Ollama
    payload = {
        "model": "llama3.1",
        "messages": [{"role": "user", "content": full_prompt}],
        "options": {
            "seed": 101,
            "temperature": 0
        },
        "stream": False
    }

    try:


        error = True

        while error == True:
        
            response = requests.post(ollama_api_url, json=payload)
            response.raise_for_status()

            # Extract response JSON correctly
            response_json = response.json()
            model_response = response_json.get("message", {}).get("content", "")

            
            # model_response = model_response.split("</think>")[-1][9:-3]
            
            print(model_response)

            # Convert and validate JSON response
            parsed_response = json.loads(model_response)  # Extract valid JSON
            validated_response = ExtractCorrectAnswer(**parsed_response)
            
            return validated_response.model_dump()  # Return as dictionary
        
        

    except (json.JSONDecodeError, ValidationError, ValueError) as e:
        print(f"Error parsing model response: {e}")
        return  {"request_type": "manual", "subrequest_type":None}
    except requests.exceptions.RequestException as re:
        print(f"Request Error: {re}")
        return {"request_type": "manual", "subrequest_type":None}

    except Exception as e:
        print(f"Unexpected Error: {e}")
        return {"request_type": "manual", "subrequest_type":None}


In [18]:
from multiprocessing import Pool, cpu_count
from tqdm import tqdm


def process_jso(args):
    """
    Processes a single jso dictionary.

    Args:
        args (tuple): Typically (jso, prompt)

    Returns:
        The output from find_benefits(...) for this jso
    """
    jso, prompt = args

    temp_json = eval(jso["attachments"])

    if temp_json:
        text = ""
        for index, attachment in enumerate(temp_json):
            attachment_text = ocr_pdf(attachment["data"]) 
            text += f"attachement_{index}{attachment_text}"


        final_text_sent_model = text + jso["subject"] + jso["body"]

    else:
        final_text_sent_model = jso["subject"] + jso["body"]


    output = find_attributes(prompt, final_text_sent_model)
    return output


def parallel_process(final_df_json, prompt):
    """
    Runs 'process_jso' in parallel over final_df_json,
    preserving order. Returns a list of results in the
    same order as final_df_json.
    """

    args_list = [(jso, prompt) for jso in final_df_json]


    with Pool(processes=cpu_count()) as pool:

        results = list(tqdm(pool.map(process_jso, args_list),
                            total=len(args_list),
                            desc="Processing JSON items"))

    return results




 

In [19]:
outputs_attributes = parallel_process(final_df_json, prompt)
print("All done!")


{
  "account_number": null,
  "principle_amount": null,
  "interest": null,
  "fees": null,
  "escalation": "Payment term adjustment",
  "appreciation": null
}


Processing JSON items: 100%|██████████| 1/1 [00:00<00:00, 8355.19it/s]

All done!



