# PoC - Portfolio Health Report

In [1]:
# Imports
import os, re, json
import pandas as pd
import glob
import time
from natsort import natsorted
from tqdm.notebook import tqdm

try:
    import openai
except ImportError:
    openai = None

try:
    import google.generativeai as genai
except ImportError:
    genai = None

# We will use Gemini for this project since it has free API credits
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))

In [2]:
# Constants
HEADER_PATTERN = re.compile(r"^(From|To|Date|Subject|Cc):\s*(.*)")
FORWARD_MARKER = "--- Forwarded Message ---"
BASE_DATA_PATH = "../AI_Developer"
OUTPUT_FILE = "Portfolio Health Report.md"


### Data Ingestion & Preprocessing

In [3]:
def parse_colleagues(path):
    "Read the colleagues table and map multiple emails from the same person into one person_id."
    
    colleagues = []
    name_to_id = {}
    person_counter = 0

    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line or line == "Characters:":
                continue
            
            # Expected format: [Role]: [Name] ([Email])
            try:
                role, rest = line.split(":", 1)
                name, email = rest.strip().split("(")
                email = email.strip(")").strip()
                name = name.strip()
                role = role.strip()

                # Map duplicates to same ID
                if name in name_to_id:
                    person_id = name_to_id[name]
                else:
                    person_id = f"person-{person_counter}"
                    name_to_id[name] = person_id
                    person_counter += 1

                colleagues += [{"person_id": person_id, "role": role, "name": name, "email": email}]

            except Exception as e:
                print("Failed parsing line:", line, e)

    return pd.DataFrame(colleagues)

In [4]:
def get_anonymous_people(people_field, colleagues):
    "Normalize an email field value (e.g. 'Péter Kovács (peter@x.com)') into 'p1 Role1, p2 Role2, ...'."

    if not people_field:
        return people_field
    
    results = []
    # Split multiple recipients by comma
    for part in people_field.split(","):
        name = part.strip()

        # Remove email if in parentheses or trailing
        if "(" in name:
            name = name.split("(")[0].strip()
        elif "@" in name:  # case with no parentheses but email
            name = " ".join(name.split()[:-1])

        # Lookup in colleagues table
        match = colleagues[colleagues["name"] == name]
        if not match.empty:
            row = match.iloc[0]
            results.append(f"{row['person_id']} {row['role']}")
        else:
            results.append("external external")

    return ", ".join(results)


def parse_email_file(path, thread_id, colleagues):
    "Parse a raw email thread in a .txt file into structured emails (header + body)."

    with open(path, "r", encoding="utf-8") as f:
        lines = f.readlines()

    emails = []
    current_headers = {}
    current_body = []
    message_id = 0

    def commit_email():
        nonlocal current_headers, current_body, message_id
        if current_headers and current_body:
            from_field = get_anonymous_people(current_headers.get("From", ""), colleagues)
            to_field = get_anonymous_people(current_headers.get("To", ""), colleagues)
            cc_field = get_anonymous_people(current_headers.get("Cc", ""), colleagues)

            emails.append({
                "thread_id": thread_id,
                "message_id": f"message-{message_id}",
                "From": from_field,
                "To": to_field,
                "Cc": cc_field,
                "Date": current_headers.get("Date", ""),
                "Subject": current_headers.get("Subject", ""),
                "Body": "\n".join(current_body).strip()
            })
            current_headers = {}
            current_body = []
            message_id += 1

    for line in lines:
        stripped = line.strip()

        # Detect forwarded email marker
        if stripped.startswith(FORWARD_MARKER):
            commit_email()  # close out the current email
            continue

        # Detect header lines
        header_match = HEADER_PATTERN.match(stripped)
        if header_match:
            key, value = header_match.groups()
            current_headers[key] = value.strip()
        else:
            current_body.append(line.rstrip())

    # Final commit for the last email
    commit_email()

    return emails

### Classification

In [5]:
def format_thread(emails):
    "Format emails into the canonical structure required by the prompt."

    formatted = []
    for email in emails:
        formatted.append(
            f"Thread ID: {email['thread_id']}, "
            f"Message ID: {email['message_id']}, "
            f"From: {email['From']}, "
            f"To: {email['To']}, "
            f"Cc: {email['Cc']}, "
            f"Date: {email['Date']}, "
            f"Subject: {email['Subject']} "
            f"Message: {email['Body']}"
        )
    return "\n\n".join(formatted)

def extract_json(text):
    "Extract JSON from model output that is wrapped in Markdown."

    match = re.search(r"(\[.*\]|\{.*\})", text, re.DOTALL)
    if match:
        try:
            return json.loads(match.group(1))
        except json.JSONDecodeError:
            return []
    return []


def classify_thread(emails, model_client):
    "Run classification on a single email thread."
    
    thread_text = format_thread(emails)

    classification_prompt = f"""
    You are an assistant that analyzes the full email thread and flags if the thread contains any attention flags that haven't been resolved.

    Here are the attention flags:
    - Unresolved High-Priority Issues:
        - Crucial issue that pose a threat to the system
        - Issue unresolved for a long period of time
        - Issue that were identified by high role members
    - Emerging Risks or Blockers:
        - Issue which indicate no clear path to resolution
        - Issue missing ownership

    For each flagged issue output JSON object:
    {{
        "title": str,
        "attention_flag": str,
        "priority": "low" | "medium" | "high" | null,
        "owner": str | null,
        "days_since_last_update": int,
        "evidence_quote": str,
        "evidence_location": {{"thread_id": str, "message_id": str}},
        "confidence": 0..1
    }}

    - Use ONLY the provided text.
    - If uncertain, set fields to null, never invent.
    - Consider the entire conversation to decide if an issue is unresolved.
    - If no issues are found, return an empty list [].

    Thread: "{thread_text}"
    """

    # response = model_client.chat.completions.create(
    #     model="gpt-4o-mini",
    #     messages=[{"role": "user", "content": classification_prompt}],
    #     temperature=0,
    # )
    # response = response.choices[0].message["content"]

    resp = model_client.generate_content(classification_prompt)
    response = resp.text

    try:
        flagged_issues = extract_json(response)
    except Exception:
        print("Main output format error")
        print(response)
        flagged_issues = []

    # Referee step: Verify flagged issues
    referee_prompt = f"""
    Given the thread of emails and flagged issues (with evidence and location) verify for each issue:
        1. The evidence_quote is word for word in the thread.
        2. The flagged issue has not been resolved within the thread.
        3. The output format follows the expect format.

    Expected format:
    {{
        "title": str,
        "attention_flag": str,
        "priority": "low" | "medium" | "high" | null,
        "owner": str | null,
        "days_since_last_update": int,
        "evidence_quote": str,
        "evidence_location": {{"thread_id": str, "message_id": str}},
        "confidence": 0..1
    }}

    Return the same list of issues, but add a field {{"verified": True | False}} to each issue.

    Thread: "{thread_text}"
    Flagged issues: "{json.dumps(flagged_issues, ensure_ascii=False)}"
    """

    # referee_response = model_client.chat.completions.create(
    #     model="gpt-4o-mini",
    #     messages=[{"role": "user", "content": referee_prompt}],
    #     temperature=0,
    # )
    # referee_response = referee_response.choices[0].message["content"]

    resp = model_client.generate_content(referee_prompt)
    referee_response = resp.text

    try:
        verified_issues = extract_json(referee_response)
        flagged_issues = [issue for issue in verified_issues if issue.get("verified")]
    except Exception:
        print("Referee output format error")
        print(response)
        flagged_issues = []

    return flagged_issues


### Post-Processing

In [6]:
def resolve_owner_name(issues, colleagues):
    "Replace anonymized owner IDs by their name"

    for issue in issues:
        owner = issue.get("owner")
        if owner and owner.startswith("person-"):
            row = colleagues[colleagues["person_id"] == owner.split()[0]]
            if not row.empty:
                issue["owner"] = row.iloc[0]["name"]

def write_report_md(attention_flags, output_path):
    "Writes the attention flags to a Markdown report file."

    with open(output_path, "w", encoding="utf-8") as f:
        f.write("# Portfolio Health Report\n\n")

        if not attention_flags:
            f.write("No urgent blockers or emerging risks detected.\n")
            return

        for idx, issue in enumerate(attention_flags, start=1):
            f.write(f"## Issue {idx}\n")
            f.write(f"- **Title:** {issue.get('title', 'N/A')}\n")
            f.write(f"- **Attention Flag:** {issue.get('attention_flag', 'N/A')}\n")
            f.write(f"- **Priority:** {issue.get('priority', 'N/A')}\n")
            f.write(f"- **Owner:** {issue.get('owner', 'N/A')}\n")
            f.write(f"- **Days Since Last Update:** {issue.get('days_since_last_update', 'N/A')}\n")
            f.write(f"- **Evidence Quote:** {issue.get('evidence_quote', 'N/A')}\n")
            loc = issue.get("evidence_location", {})
            f.write(f"- **Location:** {loc.get('thread_id', 'N/A')}, {loc.get('message_id', 'N/A')}\n")
            f.write(f"- **Confidence:** {issue.get('confidence', 'N/A')}\n")
            f.write("\n---\n\n")


### Main

In [7]:
colleagues = parse_colleagues(f"{BASE_DATA_PATH}/Colleagues.txt")

model = genai.GenerativeModel("gemini-2.5-flash")
attention_flags = []
thread_id = 1

# This can be run concurrently with Spark
for file in tqdm(natsorted(glob.glob(f"{BASE_DATA_PATH}/email*.txt"))):
    emails = parse_email_file(file, f"thread-{thread_id}", colleagues)
    flagged = classify_thread(emails, model)
    if flagged:
        attention_flags.extend(flagged)
    thread_id += 1
    time.sleep(15)  # Sleep to avoid API timing out

resolve_owner_name(attention_flags, colleagues)

write_report_md(attention_flags, OUTPUT_FILE)

  0%|          | 0/18 [00:00<?, ?it/s]