In [24]:
import pandas as pd
import ollama
import re
import os
from datetime import datetime
import concurrent.futures
from tqdm.notebook import tqdm
import time
import random
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import warnings
import logging
import hashlib  # Added for hash generation
import numpy as np
import concurrent.futures
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from functools import partial
import glob

import requests
warnings.filterwarnings('ignore')

In [25]:

def create_snowflake_connection():
    try:
        conn = snowflake.connector.connect(
            authenticator='externalbrowser',
            account='newrelicorg-dataos',
            # warehouse='APP_TABLEAU_PLATFORM_PROD',
            user='XCHENG',
        )
        print("Connected to Snowflake successfully!")
        return conn
    except Exception as e:
        print(f"Error connecting to Snowflake: {e}")
        return None

def execute_query(conn, query):
    try:
        cur = conn.cursor()
        cur.execute(query)
        results = cur.fetchall()
        columns = [desc[0] for desc in cur.description]
        df = pd.DataFrame(results, columns=columns)
        cur.close()
        return df
    except Exception as e:
        print(f"Error executing query: {e}")
        return None

conn = create_snowflake_connection()

2025-04-07 10:47:07,958 - INFO - Snowflake Connector for Python Version: 3.14.0, Python Version: 3.9.6, Platform: macOS-15.4-arm64-arm-64bit
2025-04-07 10:47:07,958 - INFO - Connecting to GLOBAL Snowflake domain
2025-04-07 10:47:07,959 - INFO - This connection is in OCSP Fail Open Mode. TLS Certificates would be checked for validity and revocation status. Any other Certificate Revocation related exceptions or OCSP Responder failures would be disregarded in favor of connectivity.


Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...
Going to open: https://newrelic.okta.com/app/snowflake/exkugjs4xeGHw0Vo10x7/sso/saml?SAMLRequest=lVJdj9owEPwrkfuc2KEgqAWc4OhxUa8tOnKn0jeTbMDFsVOvQ6C%2Fvg4f1bXSndQ3az2zM7uzw5tDqYI9WJRGj0gcMRKAzkwu9WZEntK7cEACdELnQhkNI3IEJDfjIYpSVXxSu61%2BhJ81oAt8I428%2FRiR2mpuBErkWpSA3GV8Ofn8wDsR4wIRrPNy5ELJUXqtrXMVp7Rpmqh5Hxm7oR3GGGUfqEe1kHfkhUT1tkZljTOZUVfKwc%2F0ikRMWbeV8AivsLgQp1KfV%2FCWyvoMQn6fpotw8XWZkmByne7WaKxLsEuwe5nB0%2BPD2QB6BxoaC0pm3kKYCycMRqhNUyixg8yUVe1828i%2FaAE5VWYj%2FbKS2YhUO5lPj8lk3vs0LTar7cds5bJ8t5ZrterNi%2F7%2BW5z%2FMsW6kxYPM51kJHi%2BRttpo00Qa0h0G6jzJdbphawbsn4a93i3z9kgGvR730kw84FKLdyJ%2Ba%2FryOycOLkTVUX%2FGKdw2NWbH9g9wPy%2BYc8mZoc%2BRTS0DYycb4afHNjxf29iSF%2FSL%2Ff3xUeSzBbGNzgGd8aWwr2eWBzFp4rMw%2BIE5VAKqSZ5bgHRJ6eUaW4tCOfP3NkaCB2fVf8%2B9PFv&RelayState=ver

In [26]:
query_tasks = """
with churn_notes as (
    select *
    exclude (rn)
    from (select
        "rpm_account_id",
        "churn_risk_score",
        date_trunc('month', "close_date") close_month,
        replace(
            "summary",
            'Please refer to the Portfolio Churn Risk Review Dashboard for more details and comprehensive understanding of key reasons for this account to be flagged risk of churn',
            'Please refer to the Portfolio Churn Risk Review Dashboard for more details'
        ) "summary",
        row_number() over (partition by "rpm_account_id","close_date" order by "churn_risk_score" desc) as rn
    from SAGEMAKER_PRODUCTION.DEV_YAN.SLG_PTC_SUMMARY_NEW
    )
    where rn = 1
)
, churn AS (
    SELECT
        f.rpm_account_id AS subscription_account_id,
        f.close_month AS close_date,
        c."summary" AS churn_notes,
        row_number() over (partition by subscription_account_id order by close_date desc) n,
        MAX(churn_score) AS churn_risk_score,
        CASE
            WHEN churn_risk_score >= 0.75 THEN 'High'
            WHEN churn_risk_score < 0.75 AND churn_risk_score >= 0.5 THEN 'Medium'
            ELSE 'Low'
        END AS risk_indicator,
        max(close_date) over (partition by subscription_account_id) = close_date last_close_date
    FROM (
        select 
            "subscription_rpm_account_id" rpm_account_id,
            date_trunc('month', "close_week") close_month,
            max("churn_risk_score") churn_score
        from sagemaker_production.v3_ptc."weekly_ptc_d_summaries"
        group by 1,2 
        union all
        select
            "rpm_account_id" rpm_account_id,
            "close_month" close_month,
            max("score_mean") churn_score
        from SAGEMAKER_PRODUCTION.DEV_YAN.SLG_PTC_FORECAST
        group by 1,2     
    ) f
    left join churn_notes c on c."rpm_account_id" = f.rpm_account_id and c.close_month = f.close_month
    GROUP BY 1, 2, 3
)
, churned_accounts as (
    SELECT
        subscription_account_id,
        SFDC_ACCOUNT_ID AS SFDC_ID,
        SFDC_ACCOUNT_NAME AS name,
        act_acr,
        churn_indicator,
        contract_start_date,
        contract_end_date,
        buying_program,
        report_as_of_dt,
        date_trunc('month',report_as_of_dt) as report_month
    FROM reporting.consumption_metrics.consumption_daily_metrics
    where report_as_of_dt > '2024-04-01'
    and sales_hier_geo = 'AMER'
    qualify rank() over (partition by subscription_account_id, report_month order by report_as_of_dt desc) = 1
)
, churn_daily as (
    select 
        c.*
    ,   ca.* exclude(report_month, subscription_account_id)
    ,   first_value(act_acr) over (partition by ca.sfdc_id order by REPORT_AS_OF_DT) as first_acr
    from churn c
    inner join churned_accounts ca 
        on c.subscription_account_id = ca.subscription_account_id
        and c.close_date = ca.report_month
)
, risk_sum as (
    select 
        subscription_account_id
    ,   sfdc_id
    ,   sum(case when risk_indicator = 'High' then 1 else 0 end) as risk_sum
    ,   sum(case when last_close_date = true then 1 else 0 end) as closed
    from churn_daily
    where buying_program in ('Savings Plan', 'Volume Plan')
    group by all
    having risk_sum != 0 and closed != 0
)
, sfdc_filtered as (
    select 
       *
    from churn_daily
    where sfdc_id in (select distinct sfdc_id from risk_sum)
)

-- Get top 10 accounts with highest first_acr for each churn_indicator
, churned_sfdc as (
select 
    sfdc_id
,   first_acr
,   sum(case when churn_indicator = 'Y' then 1 else 0 end) as churn
from sfdc_filtered
group by all
)

, final_rank as (
select 
    *
from (select *, row_number() over (partition by churn order by first_acr desc) as acr_rank from churned_sfdc
    )
)
select 
   subject, 
   description,
   activity_date,
   account_id
from conformed.sfdc.stg_sfdc_task
where 1=1
    and activity_date > '2024-04-01'
    and account_id is not null 
    and account_id in (select distinct sfdc_id from final_rank)
    and subject is not null
    and description is not null
group by all
"""

In [27]:
query_churn = """
with churn_notes as (
    select *
exclude (rn)
from (select
    "rpm_account_id",
    "churn_risk_score",
    date_trunc('month', "close_date") close_month,
    replace(
        "summary",
        'Please refer to the Portfolio Churn Risk Review Dashboard for more details and comprehensive understanding of key reasons for this account to be flagged risk of churn',
        'Please refer to the Portfolio Churn Risk Review Dashboard for more details'
    ) "summary",
    row_number() over (partition by "rpm_account_id","close_date" order by "churn_risk_score" desc) as rn
from SAGEMAKER_PRODUCTION.DEV_YAN.SLG_PTC_SUMMARY_NEW
)
where rn = 1
)

, churn AS (
    SELECT
        f.rpm_account_id AS subscription_account_id,
        f.close_month AS close_date,
        c."summary" AS churn_notes,
        row_number() over (partition by subscription_account_id order by close_date desc) n,
        MAX(churn_score) AS churn_risk_score,
        CASE
            WHEN churn_risk_score >= 0.75 THEN 'High'
            WHEN churn_risk_score < 0.75 AND churn_risk_score >= 0.5 THEN 'Medium'
            ELSE 'Low'
        END AS risk_indicator,
        max(close_date) over (partition by subscription_account_id) = close_date last_close_date
    FROM (
        select 
            "subscription_rpm_account_id" rpm_account_id,
            date_trunc('month', "close_week") close_month,
            max("churn_risk_score") churn_score
        from sagemaker_production.v3_ptc."weekly_ptc_d_summaries"
        group by 1,2 
        union all
        select
            "rpm_account_id" rpm_account_id,
            "close_month" close_month,
            max("score_mean") churn_score
        from SAGEMAKER_PRODUCTION.DEV_YAN.SLG_PTC_FORECAST
        group by 1,2     
    ) f
		left join churn_notes c on c."rpm_account_id" = f.rpm_account_id and c.close_month = f.close_month
    GROUP BY 1, 2, 3
)

, churned_accounts as (
    SELECT
        subscription_account_id,
        SFDC_ACCOUNT_ID AS SFDC_ID,
        SFDC_ACCOUNT_NAME AS name,
        act_acr,
        churn_indicator,
        contract_start_date,
        contract_end_date,
        buying_program,
        report_as_of_dt,
        date_trunc('month',report_as_of_dt) as report_month
    FROM reporting.consumption_metrics.consumption_daily_metrics
    where report_as_of_dt > '2024-04-01'
    qualify rank() over (partition by subscription_account_id, report_month order by report_as_of_dt desc) = 1
)

select 
    c.*
,   ca.* exclude(report_month, subscription_account_id)
from churn c
inner join churned_accounts ca 
    on c.subscription_account_id = ca.subscription_account_id
    and c.close_date = ca.report_month
"""

In [28]:
df_task = execute_query(conn, query_tasks)
df_churn = execute_query(conn, query_churn)

In [29]:
tasks_df = df_task.copy()
churn_df = df_churn.copy()
tasks_df['ACTIVITY_DATE'] = pd.to_datetime(tasks_df['ACTIVITY_DATE'], errors='coerce')
churn_df['CLOSE_DATE'] = pd.to_datetime(churn_df['CLOSE_DATE'], errors='coerce')

In [70]:


# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

# Constants
BATCH_SIZE = 50  # Smaller batches for better distribution
NUM_THREADS = 10  # More threads for I/O bound tasks
LLM_MODEL = "llama3.2:3b"
CATEGORIES = [
    "Support Case Management",
    "Technical Discussion",
    "Billing and Invoicing", 
    "Contract and Renewal",
    "Meeting Scheduling",
    "Account Management",
    "Churn Risk Notification",
    "Event Management and Invitations",
    "Marketing Communications",
    "Gift Tracking",
    "Product Feedback and Strategy",
    "Brief Acknowledgments",
    "Call Logs",
    "Other"
]

# Global session for Ollama API
ollama_session = None

def create_ollama_session():
    """Create a session with connection pooling and retry strategy"""
    session = requests.Session()
    retries = Retry(total=3, backoff_factor=0.5, 
                   status_forcelist=[429, 500, 502, 503, 504],
                   allowed_methods=["POST"])
    adapter = HTTPAdapter(pool_connections=20, pool_maxsize=20, max_retries=retries)
    session.mount('http://', adapter)
    return session

def normalize_subject(subject):
    """Normalize email subject line for better matching"""
    if pd.isna(subject) or subject is None:
        return ""
    subject = str(subject).strip()
    old_subject = ""
    while old_subject != subject:
        old_subject = subject
        subject = re.sub(r'^(re|fwd|fw|forward):\s*', '', subject, flags=re.IGNORECASE)
    subject = re.sub(r'(email|mail):\s*[<>]+\s*', '', subject, flags=re.IGNORECASE)
    subject = re.sub(r'\[(inbox|outbox|sent|draft|spam|trash|folder)\]\s*-?\s*', '', subject, flags=re.IGNORECASE)
    subject = re.sub(r'\[external\]\s*', '', subject, flags=re.IGNORECASE)
    subject = re.sub(r'^[^a-zA-Z0-9]+', '', subject)
    subject = re.sub(r'\s+', ' ', subject).strip()
    return subject

def extract_top_response(text):
    """Extract top response from email chains"""
    if not isinstance(text, str):
        return ""
        
    # Look for common patterns that indicate the beginning of a previous email
    separators = [
        r"\nFrom:.*?\nSent:", 
        r"\nFrom:.*?\n",
        r"\nOn .*? wrote:",
        r"\n-+\s*Original Message\s*-+",
        r"\n_{3,}",
        r"\nOn .* at .*:"
    ]
        
    # Join all patterns with OR
    combined_pattern = '|'.join(f'({pattern})' for pattern in separators)
        
    # Find the first occurrence of any separator
    match = re.search(combined_pattern, text, re.IGNORECASE | re.DOTALL)
        
    if match:
        # Extract everything before the first separator
        first_part = text[:match.start()].strip()
        return first_part
        
    # If no separator found, return the whole text
    return text.strip()

def extract_sender(text):
    """Extract sender from emails"""
    if not isinstance(text, str):
        return None
        
    # Compile regex patterns for better performance
    email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
    from_pattern = re.compile(r"From:\s*(.*?)(?:\[([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\]|<([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})>|([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}))")
        
    # Find the first section (before any "Original Message" text)
    separator_pos = text.find("Original Message")
    first_section = text[:separator_pos] if separator_pos > 0 else text
        
    # Try to find a sender from "From:" line
    from_match = from_pattern.search(first_section)
    if from_match:
        # Return the first non-None group containing an email
        for group in from_match.groups()[1:]:
            if group:
                return group.strip()
                
        # Check if the name part contains an email
        name_part = from_match.group(1) if from_match.group(1) else ""
        email_in_name = email_pattern.search(name_part)
        if email_in_name:
            return email_in_name.group(0).strip()
        
    # If no match in the "From:" line, search for any email address in the text
    email_match = email_pattern.search(first_section)
    if email_match:
        return email_match.group(0).strip()
        
    return None

def generate_hashkey(text):
    """Generate a hash key from text"""
    text = str(text).lower().strip()
    return hashlib.md5(text.encode()).hexdigest()[:10]

def generate_conversation_id(row):
    """Generate a conversation ID based on subject and account info"""
    subject = normalize_subject(row.get('SUBJECT', ""))
    account_id = str(row.get('ACCOUNT_ID', "unknown"))
    base_data = f"{account_id}-"
    
    # Try to extract organization name and ID from subject
    core_topic_match = re.search(r'([A-Za-z-]+)\s*[\(\[]?(\d+)[\)\]]?', subject)
    if core_topic_match:
        org_name = core_topic_match.group(1).strip()
        id_number = core_topic_match.group(2).strip()
        topic_key = f"{base_data}{org_name.lower()}-{id_number}"
        return generate_hashkey(topic_key)
    
    # Check for meeting patterns
    meeting_patterns = [
        r'(invitation|accepted|declined|tentative).*?(\w+\s*-\s*\w+)',
        r'(meeting|call|conference|sync).*?(\w+\s*-\s*\w+)',
        r'(zoom|teams|webex|google\s*meet).*?(\w+\s*-\s*\w+)'
    ]
    for pattern in meeting_patterns:
        meeting_match = re.search(pattern, subject, re.IGNORECASE)
        if meeting_match:
            meeting_name = meeting_match.group(2).strip().lower()
            return generate_hashkey(f"{base_data}meeting-{meeting_name}")
    
    # If subject exists, use it
    if subject:
        return generate_hashkey(f"{base_data}{subject.lower()}")
    
    # If TOP_RESPONSE exists, use first 50 chars
    if 'TOP_RESPONSE' in row and not pd.isna(row['TOP_RESPONSE']):
        text_for_hash = f"{base_data}{str(row['TOP_RESPONSE'])[:50].lower()}"
        return generate_hashkey(text_for_hash)
    
    # Final fallback - use activity date
    fallback_text = f"{base_data}{str(row.get('ACTIVITY_DATE', ''))}"
    return generate_hashkey(fallback_text)

def calculate_response_time(df):
    """Calculate response time between messages, calculating conversation position first"""
    df_new = df.copy()
    
    # Ensure RESPONSE_TIME column exists
    if 'RESPONSE_TIME' not in df_new.columns:
        df_new['RESPONSE_TIME'] = np.nan
    
    # Check if required columns exist for calculation
    required_cols = ['ACCOUNT_ID', 'CONVERSATION_ID', 'SENDER', 'ACTIVITY_DATE']
    if not all(col in df_new.columns for col in required_cols):
        logging.warning(f"Missing required columns for response time calculation. Have: {list(df_new.columns)}")
        return df_new
    
    # Sort by account, conversation, and time to establish the natural order
    df_new = df_new.sort_values(['ACCOUNT_ID', 'CONVERSATION_ID', 'ACTIVITY_DATE'])
    
    # Calculate the conversation position within each conversation
    df_new['CONVERSATION_POSITION'] = df_new.groupby(['ACCOUNT_ID', 'CONVERSATION_ID']).cumcount() + 1
    
    # Now calculate response times
    for (account_id, conv_id), group in df_new.groupby(['ACCOUNT_ID', 'CONVERSATION_ID']):
        if len(group) <= 1:
            continue
            
        # Track the last message time for customer senders
        customer_last_time = None
        
        # Process messages in chronological order
        for idx, row in group.iterrows():
            sender = row['SENDER']
            current_time = row['ACTIVITY_DATE']
            
            # Skip if missing data
            if pd.isna(sender) or pd.isna(current_time) or not isinstance(current_time, (pd.Timestamp, datetime)):
                continue
            
            is_newrelic = '@newrelic.com' in str(sender).lower()
            
            # Calculate response time only for NewRelic responses to customer messages
            if is_newrelic:  # This is a NewRelic sender
                if customer_last_time is not None and customer_last_time < current_time:
                    # This is a NewRelic response to a customer message
                    time_diff = (current_time - customer_last_time).total_seconds() / 3600
                    df_new.at[idx, 'RESPONSE_TIME'] = time_diff
            else:  # This is a customer sender
                customer_last_time = current_time
    
    return df_new

def api_call_with_backoff(prompt, retries=5):
    """Make API call with exponential backoff"""
    global ollama_session
    
    for attempt in range(retries):
        try:
            response = ollama_session.post(
                'http://localhost:11434/api/generate',
                json={
                    'model': LLM_MODEL,
                    'prompt': prompt,
                    'stream': False,
                    'temperature': 0.1
                },
                timeout=10 + attempt * 5  # Increasing timeout with each retry
            )
            
            if response.status_code == 200:
                return response.json()
                
        except Exception as e:
            logging.warning(f"API error attempt {attempt+1}/{retries}: {str(e)}")
            
        # Exponential backoff
        if attempt < retries - 1:
            sleep_time = 0.5 * (2 ** attempt)  # 0.5, 1, 2, 4, 8...
            time.sleep(sleep_time)
    
    return None  # All retries failed

def process_email(row_data):
    """Process a single email for categorization"""
    idx, row = row_data
    
    # Get the email content
    content = row['TOP_RESPONSE'] if 'TOP_RESPONSE' in row and not pd.isna(row['TOP_RESPONSE']) else ""
    subject = row['SUBJECT'] if 'SUBJECT' in row and not pd.isna(row['SUBJECT']) else ""
    
    # Clean content for better categorization
    if isinstance(content, str):
        lines = content.split('\n')
        clean_lines = []
        skip_line = False
        
        for line in lines:
            # Skip lines with common email metadata patterns
            if (re.match(r'^\s*(From|To|Cc|Bcc|Date|Sent|Subject|Reply-To):', line, re.IGNORECASE) or
                re.match(r'^\s*On .* wrote:', line, re.IGNORECASE) or
                re.match(r'^\s*-+\s*Original Message\s*-+', line, re.IGNORECASE) or
                re.match(r'^\s*-{3,}$', line)):
                skip_line = True
                continue
                
            # If we see a blank line after metadata, start including content again
            if skip_line and not line.strip():
                skip_line = False
                
            if not skip_line:
                clean_lines.append(line)
        
        # Join back into a clean text
        clean_content = '\n'.join(clean_lines).strip()
    else:
        clean_content = ""
        
    # Normalize the subject
    normalized_subject = normalize_subject(subject)
    
    # Create the prompt for the LLM
    prompt = f"""
    Task: You are categorizing business emails.
    
    Categories (select ONLY ONE from this list):
    Support Case Management, Technical Discussion, Billing and Invoicing, Contract and Renewal, Meeting Scheduling, Account Management, Churn Risk Notification, 
    Event Management and Invitations, Marketing Communications, Gift Tracking, Product Feedback and Strategy, Brief Acknowledgments, Call Logs, Other
    
    Email subject: {normalized_subject}
    
    Email content:
    {clean_content[:1000]}
    
    IMPORTANT: Output ONLY ONE category name from the list above.
    Do not output any other text, explanation, or create new categories.
    """
    
    # Make API call to Ollama with retries
    result = api_call_with_backoff(prompt)
    other_data = None
    
    if result:
        response_text = result.get('response', '').strip()
        
        # Normalize and match category
        response_lower = response_text.lower()
        
        # Check for exact matches first
        matched = False
        for category_name in CATEGORIES:
            if category_name.lower() == response_lower:
                category = category_name
                matched = True
                break
                
        # If no exact match, check for partial matches
        if not matched:
            for category_name in CATEGORIES:
                if category_name.lower() in response_lower:
                    category = category_name
                    matched = True
                    break
        
        # If still no match, default to "Other"
        if not matched:
            category = "Other"
            
        # Create data for "Other" category if matched
        if category == "Other":
            other_data = {
                'SUBJECT': subject,
                'TOP_RESPONSE': content[:500] if isinstance(content, str) else "",
                'LLM_RESPONSE': response_text
            }
    else:
        # API failed completely
        category = "Other"
        other_data = {
            'SUBJECT': subject,
            'TOP_RESPONSE': content[:500] if isinstance(content, str) else "",
            'LLM_RESPONSE': "API Error"
        }
    
    return idx, category, other_data

def process_batch(batch_df):
    """Process a batch of emails"""
    results = []
    others = []
    
    # Prepare row data tuples for processing
    row_data = list(batch_df.iterrows())
    
    for idx_data in tqdm(row_data, total=len(row_data), desc="Processing emails", leave=False):
        idx, category, other_data = process_email(idx_data)
        results.append((idx, category))
        if other_data:
            others.append(other_data)
    
    return results, others

def preprocess_emails(df):
    """Preprocess emails to extract TOP_RESPONSE and SENDER if needed"""
    df_new = df.copy()
    
    # Ensure required columns exist
    for col in ['TOP_RESPONSE', 'SENDER']:
        if col not in df_new.columns:
            df_new[col] = None
            logging.info(f"Created column '{col}' in the dataframe")
    
    # Process each row
    for idx, row in tqdm(df_new.iterrows(), total=len(df_new), desc="Preprocessing emails"):
        # Extract TOP_RESPONSE if not already set
        if pd.isna(df_new.at[idx, 'TOP_RESPONSE']) or df_new.at[idx, 'TOP_RESPONSE'] == '':
            if 'DESCRIPTION' in row and not pd.isna(row['DESCRIPTION']):
                content = row['DESCRIPTION']
                top_response = extract_top_response(content)
                df_new.at[idx, 'TOP_RESPONSE'] = top_response
        
        # Extract SENDER if not already set
        if pd.isna(df_new.at[idx, 'SENDER']) or df_new.at[idx, 'SENDER'] == '':
            if not pd.isna(df_new.at[idx, 'TOP_RESPONSE']):
                sender = extract_sender(df_new.at[idx, 'TOP_RESPONSE'])
                df_new.at[idx, 'SENDER'] = sender.lower() if sender else None
    
    return df_new

def process_with_threading(df_input, num_threads=None, batch_size=None):
    """
    Process emails with threading instead of multiprocessing
    
    Args:
        df_input: Input dataframe
        num_threads: Number of threads to use (None for auto)
        batch_size: Size of batches for processing (None for default)
        
    Returns:
        Processed dataframe
    """
    global ollama_session
    
    if ollama_session is None:
        ollama_session = create_ollama_session()
    
    df = df_input.copy()
    
    # Use default values if not specified
    if num_threads is None:
        num_threads = NUM_THREADS
    if batch_size is None:
        batch_size = BATCH_SIZE
        
    logging.info(f"Using {num_threads} threads and batch size of {batch_size}")
    
    # Initialize required columns if they don't exist
    for col in ['CONVERSATION_ID', 'CATEGORY']:
        if col not in df.columns:
            df[col] = None
            logging.info(f"Created column '{col}' in the dataframe")
    
    # Calculate number of batches
    num_batches = (len(df) + batch_size - 1) // batch_size
    batches = [df.iloc[i * batch_size:(i + 1) * batch_size] for i in range(num_batches)]
    logging.info(f"Split data into {num_batches} batches of size {batch_size}")
    
    all_results = []
    all_others = []
    
    # Process batches with ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
        # Submit all batches for processing
        future_to_batch = {executor.submit(process_batch, batch): i for i, batch in enumerate(batches)}
        
        # Process results as they complete
        for future in tqdm(concurrent.futures.as_completed(future_to_batch), 
                          total=len(batches), desc="Processing batches"):
            try:
                batch_results, batch_others = future.result()
                all_results.extend(batch_results)
                all_others.extend(batch_others)
            except Exception as e:
                logging.error(f"Error processing batch: {str(e)}")
    
    # Update the dataframe with the results
    for idx, category in all_results:
        df.at[idx, 'CATEGORY'] = category
    
    # Save "Other" categories
    if all_others:
        others_df = pd.DataFrame(all_others)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f'other_categories_{timestamp}.csv'
        others_df.to_csv(filename, index=False)
        logging.info(f"Saved {len(all_others)} 'Other' categorizations to {filename}")
    
    return df

def progressive_processing(df_input, chunk_size=5000):
    """Process large datasets in progressive chunks to avoid memory issues"""
    df = df_input.copy()
    total_rows = len(df)
    processed_dfs = []
    
    # Initialize required columns if they don't exist
    for col in ['CONVERSATION_ID', 'CATEGORY']:
        if col not in df.columns:
            df[col] = None
            logging.info(f"Created column '{col}' in the dataframe")
    
    # Add conversation IDs to all rows (do this once for efficiency)
    logging.info("Adding conversation IDs...")
    for idx, row in tqdm(df.iterrows(), total=len(df), desc="Generating conversation IDs"):
        df.at[idx, 'CONVERSATION_ID'] = generate_conversation_id(row)
    
    # Process in chunks
    for start_idx in range(0, total_rows, chunk_size):
        end_idx = min(start_idx + chunk_size, total_rows)
        logging.info(f"Processing chunk {start_idx+1}-{end_idx} of {total_rows}")
        
        chunk_df = df.iloc[start_idx:end_idx].copy()
        processed_chunk = process_with_threading(chunk_df)
        processed_dfs.append(processed_chunk)
        
        # Save intermediate results
        temp_df = pd.concat(processed_dfs, ignore_index=False)
        temp_filename = f'processed_emails_intermediate_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
        temp_df.to_csv(temp_filename, index=False)
        logging.info(f"Saved intermediate results to {temp_filename}")
    
    # Combine all processed chunks
    return pd.concat(processed_dfs, ignore_index=False)

def improved_email_processing(df_input):
    """
    Main function for improved email processing with optimizations for M1 Pro Max
    
    Args:
        df_input: Input dataframe with email data
        
    Returns:
        Processed dataframe with additional columns
    """
    try:
        # Initialize global Ollama session
        global ollama_session
        ollama_session = create_ollama_session()
        
        # Check original size
        original_row_count = len(df_input)
        logging.info(f"Original dataframe has {original_row_count} rows and columns: {list(df_input.columns)}")
        
        # Step 1: Preprocess to extract TOP_RESPONSE and SENDER
        logging.info("Step 1: Preprocessing emails...")
        df = preprocess_emails(df_input.copy())
        
        # Step 2: Process emails with progressive approach for large datasets
        # or direct threading approach for smaller datasets
        if original_row_count > 10000:
            logging.info(f"Large dataset detected ({original_row_count} rows). Using progressive processing...")
            chunk_size = 5000
            df = progressive_processing(df, chunk_size=chunk_size)
        else:
            # Add conversation IDs
            logging.info("Adding conversation IDs...")
            for idx, row in tqdm(df.iterrows(), total=len(df), desc="Generating conversation IDs"):
                df.at[idx, 'CONVERSATION_ID'] = generate_conversation_id(row)
            
            # Process with threading
            logging.info(f"Processing {len(df)} emails with threading...")
            df = process_with_threading(df, num_threads=NUM_THREADS, batch_size=BATCH_SIZE)
        
        # Step 3: Calculate response times
        logging.info("Calculating response times...")
        df = calculate_response_time(df)
        
        # Check if we lost any rows
        final_row_count = len(df)
        if final_row_count != original_row_count:
            logging.warning(f"Row count changed: {original_row_count} -> {final_row_count} ({final_row_count - original_row_count} difference)")
        
        # Save the results to a CSV file
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        result_filename = f'processed_emails_{timestamp}.csv'
        df.to_csv(result_filename, index=False)
        logging.info(f"Results saved to {result_filename}")
        
        return df
        
    except Exception as e:
        logging.error(f"Error in email processing: {str(e)}")
        import traceback
        logging.error(traceback.format_exc())
        return None

In [31]:
# def main(resume=False, processed_files=None):
#     """Main execution function"""
#     try:
#         # Access the tasks_df from the global scope
#         global tasks_df
#         if 'tasks_df' not in globals():
#             logging.error("tasks_df not found. Please load the dataframe before running.")
#             return None
        
#         # Option to resume processing
#         if resume:
#             logging.info("Resuming from interrupted processing...")
            
#             if processed_files is None:
#                 # Default files if none provided
#                 csv_path = './'
#                 processed_files = glob.glob(os.path.join(csv_path, 'processed_emails_intermediate_*.csv'))

            
#             # Import the function directly from the module - FIXED LINE
#             from filter_processed_rows import filter_processed_rows
            
#             # Now call the function directly - FIXED LINE
#             filtered_df = filter_processed_rows(tasks_df, processed_files)
            
#             if filtered_df is not None and len(filtered_df) > 0:
#                 # Update tasks_df to only contain unprocessed rows
#                 tasks_df = filtered_df
#                 logging.info(f"Updated tasks_df to contain only {len(filtered_df)} unprocessed rows")
                
#                 # Process the remaining rows
#                 processed_df = improved_email_processing(tasks_df)
#                 return processed_df
#             else:
#                 logging.info("No unprocessed rows to process or error in filtering")
#                 return None
        
#         # Otherwise, process all emails from scratch
#         processed_df = improved_email_processing(tasks_df)
        
#         # Replace the global tasks_df with the processed version
#         tasks_df = processed_df
        
#         return processed_df
#     except Exception as e:
#         logging.error(f"Error in main function: {str(e)}")
#         import traceback
#         logging.error(traceback.format_exc())
#         return None

In [32]:
main(resume=True)

2025-04-07 10:48:08,140 - INFO - Resuming from interrupted processing...
2025-04-07 10:48:08,142 - INFO - Loading processed data from ./processed_emails_intermediate_20250404_200232.csv
2025-04-07 10:48:08,956 - INFO - Loaded 20000 processed rows from ./processed_emails_intermediate_20250404_200232.csv
2025-04-07 10:48:08,957 - INFO - Loading processed data from ./processed_emails_intermediate_20250404_164724.csv
2025-04-07 10:48:09,821 - INFO - Loaded 20000 processed rows from ./processed_emails_intermediate_20250404_164724.csv
2025-04-07 10:48:09,822 - INFO - Loading processed data from ./processed_emails_intermediate_20250405_184509.csv
2025-04-07 10:48:10,641 - INFO - Loaded 20000 processed rows from ./processed_emails_intermediate_20250405_184509.csv
2025-04-07 10:48:10,642 - INFO - Loading processed data from ./processed_emails_intermediate_20250404_192927.csv
2025-04-07 10:48:11,291 - INFO - Loaded 15000 processed rows from ./processed_emails_intermediate_20250404_192927.csv
202

Preprocessing emails:   0%|          | 0/170 [00:00<?, ?it/s]

2025-04-07 10:48:34,876 - INFO - Adding conversation IDs...


Generating conversation IDs:   0%|          | 0/170 [00:00<?, ?it/s]

2025-04-07 10:48:34,893 - INFO - Processing 170 emails with threading...
2025-04-07 10:48:34,894 - INFO - Using 10 threads and batch size of 50
2025-04-07 10:48:34,894 - INFO - Created column 'CATEGORY' in the dataframe
2025-04-07 10:48:34,895 - INFO - Split data into 4 batches of size 50


Processing emails:   0%|          | 0/50 [00:00<?, ?it/s]

Processing emails:   0%|          | 0/20 [00:00<?, ?it/s]

Processing emails:   0%|          | 0/50 [00:00<?, ?it/s]

Processing emails:   0%|          | 0/50 [00:00<?, ?it/s]

Processing batches:   0%|          | 0/4 [00:00<?, ?it/s]

2025-04-07 10:49:32,991 - INFO - Calculating response times...
2025-04-07 10:49:33,014 - INFO - Results saved to processed_emails_20250407_104932.csv


Unnamed: 0,SUBJECT,DESCRIPTION,ACTIVITY_DATE,ACCOUNT_ID,TOP_RESPONSE,SENDER,CONVERSATION_ID,CATEGORY,RESPONSE_TIME
9916,Email: New Relic support case update: Case #00...,Additional To: jaiprasad.arsikere@alight.com\n...,2025-04-07,0011U00001S8qObQAJ,Additional To: jaiprasad.arsikere@alight.com\n...,jaiprasad.arsikere@alight.com,785d5e7a39,Support Case Management,
9917,Email: New Relic support case update: Case #00...,Additional To: felipe.braun@patientpoint.com\n...,2025-04-07,0011U00001S8a8tQAB,Additional To: felipe.braun@patientpoint.com\n...,felipe.braun@patientpoint.com,7926649e69,Support Case Management,
9918,Email: New Relic support case update: Case #00...,Additional To: harpreet_singh12@optum.com\nCC:...,2025-04-07,0011U00001S92XeQAJ,Additional To: harpreet_singh12@optum.com\nCC:...,harpreet_singh12@optum.com,b9787916e5,Support Case Management,
9919,Email: New Relic support case created: Case #0...,Additional To: akumar5@bjs.com\nCC: samyx@newr...,2025-04-07,0011U00001S91CEQAZ,Additional To: akumar5@bjs.com\nCC: samyx@newr...,akumar5@bjs.com,4de2ce8f35,Support Case Management,
10484,Email: Unlocking Infra & Logs in New Relic,https://app.salesloft.com/app/emails/detail/24...,2025-04-07,0011U00001mXXwSQAW,https://app.salesloft.com/app/emails/detail/24...,chris.lewicki@meridianlink.com,bc7730854a,Support Case Management,
...,...,...,...,...,...,...,...,...,...
96146,Email: How Other Teams Are Maximizing Infra & ...,https://app.salesloft.com/app/emails/detail/24...,2025-04-07,0011U00001S8xXvQAJ,https://app.salesloft.com/app/emails/detail/24...,jimmy.caroupapoulle@on24.com,9201a4f7a9,Product Feedback and Strategy,
96175,Email: RE: [EXTERNAL] - RE: New Relic support ...,Additional To: rmedida@opentext.com\nCC: mcast...,2025-04-07,0011U00001S8bvlQAB,Additional To: rmedida@opentext.com\nCC: mcast...,rmedida@opentext.com,563b0afab1,Account Management,
96359,Email: New Relic support case comment: Case #0...,Additional To: john.stauffer@usfoods.com\nCC: ...,2025-04-07,0011U00001S8XGAQA3,Additional To: john.stauffer@usfoods.com\nCC: ...,john.stauffer@usfoods.com,279f03981e,Support Case Management,
96360,Email: New Relic support case closed #00257871...,Additional To: john.stauffer@usfoods.com\nCC: ...,2025-04-07,0011U00001S8XGAQA3,Additional To: john.stauffer@usfoods.com\nCC: ...,john.stauffer@usfoods.com,279f03981e,Account Management,


In [33]:
# # Set up logging
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s - %(levelname)s - %(message)s'
# )

# # Constants
# BATCH_SIZE = 500
# LLM_MODEL = "llama3.2:3b"
# CATEGORIES = [
#     "Support Case Management",
#     "Technical Discussion",
#     "Billing and Invoicing", 
#     "Contract and Renewal",
#     "Meeting Scheduling",
#     "Account Management",
#     "Churn Risk Notification",
#     "Event Management and Invitations",
#     "Marketing Communications",
#     "Gift Tracking",
#     "Product Feedback and Strategy",
#     "Brief Acknowledgments",
#     "Call Logs",
#     "Other"
# ]

# def normalize_subject(subject):
#     """Normalize email subject line for better matching"""
#     if pd.isna(subject) or subject is None:
#         return ""
#     subject = str(subject).strip()
#     old_subject = ""
#     while old_subject != subject:
#         old_subject = subject
#         subject = re.sub(r'^(re|fwd|fw|forward):\s*', '', subject, flags=re.IGNORECASE)
#     subject = re.sub(r'(email|mail):\s*[<>]+\s*', '', subject, flags=re.IGNORECASE)
#     subject = re.sub(r'\[(inbox|outbox|sent|draft|spam|trash|folder)\]\s*-?\s*', '', subject, flags=re.IGNORECASE)
#     subject = re.sub(r'\[external\]\s*', '', subject, flags=re.IGNORECASE)
#     subject = re.sub(r'^[^a-zA-Z0-9]+', '', subject)
#     subject = re.sub(r'\s+', ' ', subject).strip()
#     return subject

# def extract_top_response(text):
#     """Extract top response from email chains"""
#     if not isinstance(text, str):
#         return ""
        
#     # Look for common patterns that indicate the beginning of a previous email
#     separators = [
#         r"\nFrom:.*?\nSent:", 
#         r"\nFrom:.*?\n",
#         r"\nOn .*? wrote:",
#         r"\n-+\s*Original Message\s*-+",
#         r"\n_{3,}",
#         r"\nOn .* at .*:"
#     ]
        
#     # Join all patterns with OR
#     combined_pattern = '|'.join(f'({pattern})' for pattern in separators)
        
#     # Find the first occurrence of any separator
#     match = re.search(combined_pattern, text, re.IGNORECASE | re.DOTALL)
        
#     if match:
#         # Extract everything before the first separator
#         first_part = text[:match.start()].strip()
#         return first_part
        
#     # If no separator found, return the whole text
#     return text.strip()

# def extract_sender(text):
#     """Extract sender from emails"""
#     if not isinstance(text, str):
#         return None
        
#     # Compile regex patterns for better performance
#     email_pattern = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
#     from_pattern = re.compile(r"From:\s*(.*?)(?:\[([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})\]|<([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})>|([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}))")
        
#     # Find the first section (before any "Original Message" text)
#     separator_pos = text.find("Original Message")
#     first_section = text[:separator_pos] if separator_pos > 0 else text
        
#     # Try to find a sender from "From:" line
#     from_match = from_pattern.search(first_section)
#     if from_match:
#         # Return the first non-None group containing an email
#         for group in from_match.groups()[1:]:
#             if group:
#                 return group.strip()
                
#         # Check if the name part contains an email
#         name_part = from_match.group(1) if from_match.group(1) else ""
#         email_in_name = email_pattern.search(name_part)
#         if email_in_name:
#             return email_in_name.group(0).strip()
        
#     # If no match in the "From:" line, search for any email address in the text
#     email_match = email_pattern.search(first_section)
#     if email_match:
#         return email_match.group(0).strip()
        
#     return None

# def generate_hashkey(text):
#     """Generate a hash key from text"""
#     text = str(text).lower().strip()
#     return hashlib.md5(text.encode()).hexdigest()[:10]

# def generate_conversation_id(row):
#     """Generate a conversation ID based on subject and account info"""
#     subject = normalize_subject(row.get('SUBJECT', ""))
#     account_id = str(row.get('ACCOUNT_ID', "unknown"))
#     base_data = f"{account_id}-"
    
#     # Try to extract organization name and ID from subject
#     core_topic_match = re.search(r'([A-Za-z-]+)\s*[\(\[]?(\d+)[\)\]]?', subject)
#     if core_topic_match:
#         org_name = core_topic_match.group(1).strip()
#         id_number = core_topic_match.group(2).strip()
#         topic_key = f"{base_data}{org_name.lower()}-{id_number}"
#         return generate_hashkey(topic_key)
    
#     # Check for meeting patterns
#     meeting_patterns = [
#         r'(invitation|accepted|declined|tentative).*?(\w+\s*-\s*\w+)',
#         r'(meeting|call|conference|sync).*?(\w+\s*-\s*\w+)',
#         r'(zoom|teams|webex|google\s*meet).*?(\w+\s*-\s*\w+)'
#     ]
#     for pattern in meeting_patterns:
#         meeting_match = re.search(pattern, subject, re.IGNORECASE)
#         if meeting_match:
#             meeting_name = meeting_match.group(2).strip().lower()
#             return generate_hashkey(f"{base_data}meeting-{meeting_name}")
    
#     # If subject exists, use it
#     if subject:
#         return generate_hashkey(f"{base_data}{subject.lower()}")
    
#     # If TOP_RESPONSE exists, use first 50 chars
#     if 'TOP_RESPONSE' in row and not pd.isna(row['TOP_RESPONSE']):
#         text_for_hash = f"{base_data}{str(row['TOP_RESPONSE'])[:50].lower()}"
#         return generate_hashkey(text_for_hash)
    
#     # Final fallback - use activity date
#     fallback_text = f"{base_data}{str(row.get('ACTIVITY_DATE', ''))}"
#     return generate_hashkey(fallback_text)

# def calculate_response_time(df):
#     """Calculate response time between messages with account context"""
#     df_new = df.copy()
    
#     # Ensure RESPONSE_TIME column exists
#     if 'RESPONSE_TIME' not in df_new.columns:
#         df_new['RESPONSE_TIME'] = np.nan
    
#     # Check if required columns exist for calculation
#     required_cols = ['ACCOUNT_ID', 'CONVERSATION_ID', 'CONVERSATION_POSITION', 'SENDER', 'ACTIVITY_DATE']
#     if not all(col in df_new.columns for col in required_cols):
#         logging.warning(f"Missing required columns for response time calculation. Have: {list(df_new.columns)}")
#         return df_new
    
#     df_new = df_new.sort_values(['ACCOUNT_ID', 'CONVERSATION_ID', 'CONVERSATION_POSITION'])
    
#     for (account_id, conv_id), group in df_new.groupby(['ACCOUNT_ID', 'CONVERSATION_ID']):
#         if len(group) <= 1:
#             continue
            
#         indices = group.index
#         positions = group['CONVERSATION_POSITION'].tolist()
#         senders = group['SENDER'].tolist()
#         dates = group['ACTIVITY_DATE'].tolist()
        
#         for i in range(1, len(positions)):
#             if pd.notna(senders[i]) and pd.notna(senders[i-1]) and senders[i] != senders[i-1]:
#                 # Check if dates are datetime objects before calculating difference
#                 if pd.notna(dates[i]) and pd.notna(dates[i-1]) and isinstance(dates[i], (pd.Timestamp, datetime)) and isinstance(dates[i-1], (pd.Timestamp, datetime)):
#                     time_diff = (dates[i] - dates[i-1]).total_seconds() / 3600
#                     idx = indices[i]
#                     df_new.at[idx, 'RESPONSE_TIME'] = time_diff
    
#     return df_new

# def categorize_email_with_llm(batch_df, batch_num=0):
#     """
#     Categorize a batch of emails using the LLM model
    
#     Args:
#         batch_df (pandas.DataFrame): Batch of emails to categorize
#         batch_num (int): Batch number for reporting
        
#     Returns:
#         pandas.DataFrame: Dataframe with categorized emails
#     """
#     result_df = batch_df.copy()
    
#     # Ensure CATEGORY column exists
#     if 'CATEGORY' not in result_df.columns:
#         result_df['CATEGORY'] = None
    
#     # Create a list to store emails categorized as "Other"
#     others = []
    
#     # Process each email in the batch
#     logging.info(f"Categorizing batch {batch_num+1} with {len(batch_df)} emails...")
#     for idx, row in tqdm(batch_df.iterrows(), total=len(batch_df)):
#         # Get the email content
#         content = row['TOP_RESPONSE'] if 'TOP_RESPONSE' in row and not pd.isna(row['TOP_RESPONSE']) else ""
#         subject = row['SUBJECT'] if 'SUBJECT' in row and not pd.isna(row['SUBJECT']) else ""
        
#         # Clean content for better categorization
#         if isinstance(content, str):
#             lines = content.split('\n')
#             clean_lines = []
#             skip_line = False
            
#             for line in lines:
#                 # Skip lines with common email metadata patterns
#                 if (re.match(r'^\s*(From|To|Cc|Bcc|Date|Sent|Subject|Reply-To):', line, re.IGNORECASE) or
#                     re.match(r'^\s*On .* wrote:', line, re.IGNORECASE) or
#                     re.match(r'^\s*-+\s*Original Message\s*-+', line, re.IGNORECASE) or
#                     re.match(r'^\s*-{3,}$', line)):
#                     skip_line = True
#                     continue
                    
#                 # If we see a blank line after metadata, start including content again
#                 if skip_line and not line.strip():
#                     skip_line = False
                    
#                 if not skip_line:
#                     clean_lines.append(line)
            
#             # Join back into a clean text
#             clean_content = '\n'.join(clean_lines).strip()
#         else:
#             clean_content = ""
            
#         # Normalize the subject
#         normalized_subject = normalize_subject(subject)
        
#         # Create the prompt for the LLM
#         prompt = f"""
#         Task: You are categorizing business emails.
        
#         Categories (select ONLY ONE from this list):
#         Support Case Management, Technical Discussion, Billing and Invoicing, Contract and Renewal, Meeting Scheduling, Account Management, Churn Risk Notification, 
#         Event Management and Invitations, Marketing Communications, Gift Tracking, Product Feedback and Strategy, Brief Acknowledgments, Call Logs, Other
        
#         Email subject: {normalized_subject}
        
#         Email content:
#         {clean_content[:1000]}
        
#         IMPORTANT: Output ONLY ONE category name from the list above.
#         Do not output any other text, explanation, or create new categories.
#         """
        
#         # Make API call to Ollama with retries
#         max_retries = 3
#         retry_delay = 2  # seconds
#         category = None
        
#         for attempt in range(max_retries):
#             try:
#                 response = requests.post(
#                     'http://localhost:11434/api/generate',
#                     json={
#                         'model': LLM_MODEL,
#                         'prompt': prompt,
#                         'stream': False,
#                         'temperature': 0.1
#                     },
#                     timeout=30
#                 )
                
#                 if response.status_code == 200:
#                     result = response.json()
#                     response_text = result.get('response', '').strip()
                    
#                     # Normalize and match category
#                     response_lower = response_text.lower()
                    
#                     # Check for exact matches first
#                     matched = False
#                     for category_name in CATEGORIES:
#                         if category_name.lower() == response_lower:
#                             category = category_name
#                             matched = True
#                             break
                            
#                     # If no exact match, check for partial matches
#                     if not matched:
#                         for category_name in CATEGORIES:
#                             if category_name.lower() in response_lower:
#                                 category = category_name
#                                 matched = True
#                                 break
                    
#                     # If still no match, default to "Other"
#                     if not matched:
#                         category = "Other"
#                         logging.info(f"LLM response '{response_text}' didn't match any category, defaulting to 'Other'")
                    
#                     # Add to "Other" list ONLY if categorized as "Other"
#                     if category == "Other":
#                         others.append({
#                             'SUBJECT': subject,
#                             'TOP_RESPONSE': content[:500] if isinstance(content, str) else "",
#                             'LLM_RESPONSE': response_text
#                         })
                    
#                     break  # Success, exit retry loop
#                 else:
#                     logging.warning(f"API error: {response.status_code}, attempt {attempt+1}/{max_retries}")
#                     if attempt < max_retries - 1:
#                         time.sleep(retry_delay)
#             except Exception as e:
#                 logging.warning(f"Error calling Ollama API: {str(e)}, attempt {attempt+1}/{max_retries}")
#                 if attempt < max_retries - 1:
#                     time.sleep(retry_delay)
        
#         # If all retries failed, default to "Other"
#         if category is None:
#             category = "Other"
#             logging.warning(f"All API calls failed for email with subject '{subject[:30]}...', defaulting to 'Other'")
            
#             # Add to "Other" list
#             others.append({
#                 'SUBJECT': subject,
#                 'TOP_RESPONSE': content[:500] if isinstance(content, str) else "",
#                 'LLM_RESPONSE': "API Error"
#             })
        
#         # Set the category in the result dataframe
#         result_df.at[idx, 'CATEGORY'] = category
    
#     # Save "Other" categories for this batch
#     if others:
#         others_df = pd.DataFrame(others)
#         timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
#         filename = f'other_categories_batch_{batch_num+1}_{timestamp}.csv'
#         others_df.to_csv(filename, index=False)
#         logging.info(f"Saved {len(others)} 'Other' categorizations to {filename}")
    
#     return result_df

# def preprocess_emails(df):
#     """Preprocess emails to extract TOP_RESPONSE and SENDER if needed"""
#     df_new = df.copy()
    
#     # Ensure required columns exist
#     for col in ['TOP_RESPONSE', 'SENDER']:
#         if col not in df_new.columns:
#             df_new[col] = None
#             logging.info(f"Created column '{col}' in the dataframe")
    
#     # Process each row
#     for idx, row in tqdm(df_new.iterrows(), total=len(df_new)):
#         # Extract TOP_RESPONSE if not already set
#         if pd.isna(df_new.at[idx, 'TOP_RESPONSE']) or df_new.at[idx, 'TOP_RESPONSE'] == '':
#             if 'DESCRIPTION' in row and not pd.isna(row['DESCRIPTION']):
#                 content = row['DESCRIPTION']
#                 top_response = extract_top_response(content)
#                 df_new.at[idx, 'TOP_RESPONSE'] = top_response
        
#         # Extract SENDER if not already set
#         if pd.isna(df_new.at[idx, 'SENDER']) or df_new.at[idx, 'SENDER'] == '':
#             if not pd.isna(df_new.at[idx, 'TOP_RESPONSE']):
#                 sender = extract_sender(df_new.at[idx, 'TOP_RESPONSE'])
#                 df_new.at[idx, 'SENDER'] = sender.lower() if sender else None
    
#     return df_new

# def process_emails_in_batches(df_input, num_batches=5):
#     """
#     Process emails in batches, categorizing only a specified number of batches with LLM
    
#     Args:
#         df_input (pandas.DataFrame): Input dataframe with email data
#         num_batches (int): Number of batches to process with LLM
        
#     Returns:
#         pandas.DataFrame: Processed dataframe with additional columns
#     """
#     df = df_input.copy()
    
#     # Step 1: Preprocess to extract TOP_RESPONSE and SENDER
#     logging.info("Step 1: Preprocessing emails...")
#     df = preprocess_emails(df)
    
#     # Step 2: Add CONVERSATION_ID
#     logging.info("Step 2: Adding conversation IDs...")
    
#     # Initialize required columns if they don't exist
#     for col in ['CONVERSATION_ID', 'CATEGORY']:
#         if col not in df.columns:
#             df[col] = None
#             logging.info(f"Created column '{col}' in the dataframe")
    
#     # Add conversation ID
#     for idx, row in tqdm(df.iterrows(), total=len(df), desc="Generating conversation IDs"):
#         df.at[idx, 'CONVERSATION_ID'] = generate_conversation_id(row)
    
#     # Step 3: Split into batches randomly
#     logging.info(f"Step 3: Splitting into batches for categorization ({num_batches} batches will be processed)...")
    
#     # Shuffle and split
#     df_shuffled = df.sample(frac=1, random_state=42)  # Shuffle with fixed random seed
#     batches = [df_shuffled.iloc[i:i+BATCH_SIZE] for i in range(0, len(df_shuffled), BATCH_SIZE)]
    
#     # Step 4: Process specified number of batches
#     logging.info(f"Step 4: Processing {min(num_batches, len(batches))} batches with LLM...")
    
#     for i in range(min(num_batches, len(batches))):
#         batch_df = batches[i]
        
#         # Categorize emails in the batch
#         categorized_batch = categorize_email_with_llm(batch_df, i)
#         # We don't update the main dataframe since we're only interested in the "Other" files
    
#     # Report completed
#     logging.info(f"Processing complete! Processed {min(num_batches, len(batches))} batches.")
    
#     # We don't need to return the dataframe since we're only saving the "Other" files
#     return None

# # %%
# def main():
#     """Main execution function"""
#     try:
#         # Access the tasks_df from the global scope
#         global tasks_df
#         if 'tasks_df' not in globals():
#             logging.error("tasks_df not found. Please load the dataframe before running.")
#             return None
        
#         # Display basic information about the dataframe
#         logging.info(f"Processing dataframe with {len(tasks_df)} rows and columns: {list(tasks_df.columns)}")
        
#         # Process the emails (we don't need to store the return value)
#         process_emails_in_batches(tasks_df, num_batches=5)
        
#         # No need to save the main results file
#         logging.info("Processing complete. 'Other' category files have been saved.")
        
#         return None
#     except Exception as e:
#         logging.error(f"Error in main function: {str(e)}")
#         import traceback
#         logging.error(traceback.format_exc())
#         return None

# # %%
# main()

In [35]:
import pandas as pd
import os
import glob

# Define the path where your CSV files are located
# Replace with your actual path if different
csv_path = './'  # Current directory

# Use glob to get all CSV files in the directory
all_csv_files = glob.glob(os.path.join(csv_path, 'processed_emails_intermediate_*.csv'))

# Print the files that will be combined
print(f"Found {len(all_csv_files)} CSV files to combine:")
for file in all_csv_files:
    print(f"  - {os.path.basename(file)}")

# Function to combine all CSV files
def combine_csv_files(file_list):
    # Create an empty list to store individual dataframes
    df_list = []
    
    # Loop through all files and read them into dataframes
    for file in file_list:
        try:
            df = pd.read_csv(file)
            # Add source filename as a column (optional)
            df['source_file'] = os.path.basename(file)
            df_list.append(df)
            print(f"Successfully read {os.path.basename(file)} with {len(df)} rows")
        except Exception as e:
            print(f"Error reading {file}: {str(e)}")
    
    # Combine all dataframes
    if df_list:
        combined_df = pd.concat(df_list, ignore_index=True)
        print(f"\nCombined dataframe has {len(combined_df)} rows and {len(combined_df.columns)} columns")
        return combined_df
    else:
        print("No dataframes to combine")
        return None

# Combine the CSV files
combined_df = combine_csv_files(all_csv_files)

# Save the combined dataframe to a new CSV file
if combined_df is not None:
    output_file = 'combined_processed_emails.csv'
    combined_df.to_csv(output_file, index=False)
    print(f"\nCombined data saved to {output_file}")
    
    # Display a summary of the combined data
    print("\nCombined DataFrame Summary:")
    print(combined_df.info())
    print("\nFirst few rows:")
    print(combined_df.head())

Found 20 CSV files to combine:
  - processed_emails_intermediate_20250404_200232.csv
  - processed_emails_intermediate_20250404_164724.csv
  - processed_emails_intermediate_20250405_184509.csv
  - processed_emails_intermediate_20250404_192927.csv
  - processed_emails_intermediate_20250405_192634.csv
  - processed_emails_intermediate_20250404_172119.csv
  - processed_emails_intermediate_20250404_182228.csv
  - processed_emails_intermediate_20250405_181607.csv
  - processed_emails_intermediate_20250404_120711.csv
  - processed_emails_intermediate_20250404_211011.csv
  - processed_emails_intermediate_20250404_185557.csv
  - processed_emails_intermediate_20250405_174703.csv
  - processed_emails_intermediate_20250405_191407.csv
  - processed_emails_intermediate_20250404_203552.csv
  - processed_emails_intermediate_20250405_171748.csv
  - processed_emails_intermediate_20250404_124134.csv
  - processed_emails_intermediate_20250404_150529.csv
  - processed_emails_intermediate_20250404_161337.c

In [38]:
combined_df = pd.read_csv('combined_processed_emails.csv')


Unnamed: 0,SUBJECT,DESCRIPTION,ACTIVITY_DATE,ACCOUNT_ID,TOP_RESPONSE,SENDER,CONVERSATION_ID,CATEGORY,source_file
0,Email: New Relic support case comment: Case #0...,Additional To: john.stauffer@usfoods.com\nCC: ...,2025-04-04,0011U00001S8XGAQA3,Additional To: john.stauffer@usfoods.com\nCC: ...,john.stauffer@usfoods.com,279f03981e,Support Case Management,processed_emails_intermediate_20250404_200232.csv
1,Email: New Relic support case comment: Case #0...,Additional To: john.stauffer@usfoods.com\nCC: ...,2025-04-04,0011U00001S8XGAQA3,Additional To: john.stauffer@usfoods.com\nCC: ...,john.stauffer@usfoods.com,c78b829888,Support Case Management,processed_emails_intermediate_20250404_200232.csv
2,Email: New Relic support case comment: Case #0...,Additional To: jesus.garcia@sopristec.com\nCC:...,2025-04-04,0011U00001aUhSFQA0,Additional To: jesus.garcia@sopristec.com\nCC:...,jesus.garcia@sopristec.com,80c283aa97,Support Case Management,processed_emails_intermediate_20250404_200232.csv
3,Call: Left Voicemail | No Answer,"N/A, Left Vm \n\nRecording: https://recordings...",2025-04-04,0011U00001S8nshQAB,"N/A, Left Vm \n\nRecording: https://recordings...",,a3bb9fb841,Call Logs,processed_emails_intermediate_20250404_200232.csv
4,Email: Re: New Relic support case created: Cas...,Additional To: serena.xu@pantheon.io\nCC: \nBC...,2025-04-04,0011U00001S8iBfQAJ,Additional To: serena.xu@pantheon.io\nCC: \nBC...,serena.xu@pantheon.io,f170d0ee68,Support Case Management,processed_emails_intermediate_20250404_200232.csv


In [39]:
combined_df.drop(columns=['CONVERSATION_ID', 'source_file'], inplace=True)
# %%

In [44]:
# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

def generate_hash_key(row, key_columns):
    """Generate a unique hash key from multiple columns"""
    values = []
    for col in key_columns:
        if col in row and not pd.isna(row[col]):
            # Handle different data types appropriately
            if hasattr(row[col], 'strftime'):  # For datetime objects
                values.append(str(row[col].strftime('%Y-%m-%d %H:%M:%S.%f')))
            else:
                values.append(str(row[col]))
        else:
            values.append('NULL')
    
    # Join all values and hash
    combined = '|'.join(values)
    return hashlib.md5(combined.encode()).hexdigest()

def deduplicate_df(df, output_file):
    """Deduplicate a dataframe based on key columns"""
    # Make a copy to avoid modifying the original
    df = df.copy()
    original_count = len(df)
    logging.info(f"Working with DataFrame containing {original_count} rows")
    
    # Convert date columns to datetime if needed
    date_columns = ['ACTIVITY_DATE']
    for col in date_columns:
        if col in df.columns:
            try:
                df[col] = pd.to_datetime(df[col])
                logging.info(f"Converted {col} to datetime")
            except Exception as e:
                logging.warning(f"Could not convert {col} to datetime: {str(e)}")
    
    # Define key columns for deduplication
    # Adjust these columns based on what uniquely identifies a row in your data
    key_columns = ['ACCOUNT_ID', 'ACTIVITY_DATE', 'SUBJECT', 'DESCRIPTION']
    
    # Verify key columns exist
    missing_columns = [col for col in key_columns if col not in df.columns]
    if missing_columns:
        logging.error(f"Missing key columns: {missing_columns}")
        # Try to use alternative columns
        available_columns = list(df.columns)
        logging.info(f"Available columns: {available_columns}")
        # Adjust key_columns to use only available columns
        key_columns = [col for col in key_columns if col in df.columns]
        if len(key_columns) == 0:
            logging.error("No key columns available for deduplication")
            return None
        logging.info(f"Using alternative key columns: {key_columns}")
    
    # Generate hash keys for each row
    logging.info("Generating hash keys...")
    hash_keys = []
    for _, row in tqdm(df.iterrows(), total=len(df), desc="Generating hash keys"):
        hash_key = generate_hash_key(row, key_columns)
        hash_keys.append(hash_key)
    
    df['hash_key'] = hash_keys
    
    # Count occurrences of each hash key
    hash_counts = df['hash_key'].value_counts()
    duplicate_keys = hash_counts[hash_counts > 1].index.tolist()
    
    if duplicate_keys:
        logging.info(f"Found {len(duplicate_keys)} duplicate keys affecting {sum(hash_counts[duplicate_keys] - 1)} rows")
        # Display a few examples of duplicates
        for key in duplicate_keys[:3]:
            dupe_rows = df[df['hash_key'] == key]
            logging.info(f"Example duplicate (key {key}):")
            for col in key_columns:
                if col in dupe_rows.columns:
                    values = dupe_rows[col].unique()
                    logging.info(f"  {col}: {values}")
    
    # Remove duplicates
    df_deduped = df.drop_duplicates(subset=['hash_key'])
    
    # Log results
    deduped_count = len(df_deduped)
    logging.info(f"After deduplication: {deduped_count} rows (removed {original_count - deduped_count} duplicates)")
    
    # Remove hash key column before saving
    df_deduped = df_deduped.drop(columns=['hash_key'])
    
    # Save deduplicated data
    df_deduped.to_csv(output_file, index=False)
    logging.info(f"Deduplicated data saved to {output_file}")
    
    return df_deduped



In [None]:
# Then use it like this:
output_file = "deduplicated_df.csv"
deduplicated_df = deduplicate_df(combined_df, output_file)

In [73]:
dedup_df = pd.read_csv('deduplicated_df.csv')
dedup_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 97186 entries, 0 to 97185
Data columns (total 7 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   SUBJECT        97185 non-null  object
 1   DESCRIPTION    97184 non-null  object
 2   ACTIVITY_DATE  97185 non-null  object
 3   ACCOUNT_ID     97185 non-null  object
 4   TOP_RESPONSE   97184 non-null  object
 5   SENDER         92328 non-null  object
 6   CATEGORY       97183 non-null  object
dtypes: object(7)
memory usage: 5.2+ MB


In [74]:
dedup_df_copy = dedup_df.copy()

In [76]:
def apply_to_dataframe(df):
    """Apply all conversation analysis functions to the dataframe"""
    # Generate conversation IDs
    df['CONVERSATION_ID'] = df.apply(generate_conversation_id, axis=1)
    
    # Create a copy to avoid SettingWithCopyWarning
    df = df.copy()
    
    # Convert ACTIVITY_DATE to datetime if it's not already, with error handling
    try:
        # First check if there are problematic values and print them for debugging
        if df['ACTIVITY_DATE'].dtype == 'object':  # If it's a string or mixed type
            non_date_values = df[~df['ACTIVITY_DATE'].astype(str).str.match(r'^\d{4}-\d{2}-\d{2}.*$')]
            if len(non_date_values) > 0:
                print(f"Found {len(non_date_values)} rows with problematic date values:")
                print(non_date_values['ACTIVITY_DATE'].head())
                
                # Replace problematic values with NaT (Not a Time)
                df['ACTIVITY_DATE'] = pd.to_datetime(df['ACTIVITY_DATE'], errors='coerce')
            else:
                df['ACTIVITY_DATE'] = pd.to_datetime(df['ACTIVITY_DATE'])
        else:
            # Already datetime or easily convertible
            df['ACTIVITY_DATE'] = pd.to_datetime(df['ACTIVITY_DATE'])
    except Exception as e:
        print(f"Error converting dates: {e}")
        # Fallback: coerce invalid dates to NaT
        df['ACTIVITY_DATE'] = pd.to_datetime(df['ACTIVITY_DATE'], errors='coerce')
    
    # Drop rows with invalid dates
    invalid_dates = df['ACTIVITY_DATE'].isna()
    if invalid_dates.any():
        print(f"Dropping {invalid_dates.sum()} rows with invalid dates")
        df = df.dropna(subset=['ACTIVITY_DATE'])
    
    # Sort by account, conversation and date
    df = df.sort_values(['ACCOUNT_ID', 'CONVERSATION_ID', 'ACTIVITY_DATE'])
    
    # Initialize or reset conversation positions
    df['CONVERSATION_POSITION'] = 0
    
    # Set conversation positions
    position_counter = {}
    for idx, row in df.iterrows():
        key = (row['ACCOUNT_ID'], row['CONVERSATION_ID'])
        if key not in position_counter:
            position_counter[key] = 1  # Start from 1 instead of 0
        else:
            position_counter[key] += 1
        df.at[idx, 'CONVERSATION_POSITION'] = position_counter[key]
    
    # Calculate response times
    df = calculate_response_time(df)
    
    return df

In [77]:

# Then run your original code
dedup_resp_df = apply_to_dataframe(dedup_df_copy)
dedup_resp_df.info()

Found 4 rows with problematic date values:
79082             NaN
79083     sin embargo
79084      905cbc369c
87185     sin embargo
Name: ACTIVITY_DATE, dtype: object
Dropping 4 rows with invalid dates
<class 'pandas.core.frame.DataFrame'>
Index: 97182 entries, 90197 to 33742
Data columns (total 10 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   SUBJECT                97182 non-null  object        
 1   DESCRIPTION            97182 non-null  object        
 2   ACTIVITY_DATE          97182 non-null  datetime64[ns]
 3   ACCOUNT_ID             97182 non-null  object        
 4   TOP_RESPONSE           97182 non-null  object        
 5   SENDER                 92326 non-null  object        
 6   CATEGORY               97181 non-null  object        
 7   CONVERSATION_ID        97182 non-null  object        
 8   CONVERSATION_POSITION  97182 non-null  int64         
 9   RESPONSE_TIME          4945 non-null 

In [78]:
dedup_resp_df.to_csv('processed_emails_final.csv', index=False)