Using OpenAPI 4.0-Turbo perform sentiment analysis on return comments

In [None]:
import pandas as pd
import duckdb
from typing import Optional, Tuple
import openpyxl

## Define key variables
- file_path - import file path, xlsx, csv, or parquet
- tname - Duckdb table name to import to
- db_path - duckdb location, creates if not present
- row_id - unique identifier row to create (assumes no unique ID)
- comment_column - The name of the column containing the comment to be analyzed
- con -establishes duckdb.connect to db_path

In [None]:
# Define important parameters
file_path = r'C:\Code\URBN\review_analyzer\data\IID_RR_GROSS.csv'
tname = 'iid_return_reasonsv3'
db_path='iid_return_commentv3'
row_id = "row_id"
# Define the column with returns comments
comment_column = "RETURN_COMMENT"
con = duckdb.connect(db_path)


CSV File Processing

Sets SQL mode based on clear parameter (OR REPLACE TABLE or TABLE IF NOT EXISTS)
Implements a three-attempt strategy with progressively more lenient options:
First Attempt:

Uses standard options with CSV auto-detection
Adds a row_id column using ROW_NUMBER() window function
Second Attempt (if first fails):

Adds strict_mode=false to handle unterminated quotes
Prints failure message from first attempt
Third Attempt (not shown in the visible snippet but likely continues):

Would likely add more permissive options if second attempt fails

In [None]:
def import_data(
        fname: str,
        clear=True,
        db_path='db_path',
        tname='staging_table',
        ftype=None,
        ):
    """
    load CSV, Parquet, or Excel file into a DuckDB table.
    Returns confirmation message.
    clear = Replace existing table if true, else create if not exists.
    Adds a unique row_id column to each record.

    Returnss: print statement confirmation of import.
    """
    import os


    # Infer file type if not provided
    if ftype is None:
        ext = os.path.splitext(fname)[1].lower()
        if ext == '.csv':
            ftype = 'csv'
        elif ext == '.parquet':
            ftype = 'parquet'
        elif ext in ('.xlsx', '.xls'):
            ftype = 'excel'
        else:
            con.close()
            raise ValueError("Unsupported file extension.")

    if ftype == 'csv':
        mode = 'OR REPLACE TABLE' if clear else 'TABLE IF NOT EXISTS'
        try:
            # First attempt with standard options, add row_id
            con.execute(f"""
                CREATE {mode} {tname} AS
                SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto('{fname}', escape='\\', encoding='utf-8', header=True)
            """)
        except Exception as e:
            print(f"First attempt failed, trying with strict_mode=false: {str(e)}")
            try:
                # Second attempt with strict_mode=false to handle unterminated quotes
                con.execute(f"""
                    CREATE {mode} {tname} AS
                    SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto(
                        '{fname}', 
                        escape='\\',
                        encoding='utf-8',
                        header=True,
                        strict_mode=false
                    )
                """)
            except Exception as e2:
                print(f"Second attempt failed, trying with ignore_errors=true: {str(e2)}")
                try:
                    # Third attempt with ignore_errors=true to skip problematic rows
                    con.execute(f"""
                        CREATE {mode} {tname} AS
                        SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto(
                            '{fname}', 
                            escape='\\',
                            encoding='utf-8',
                            header=True,
                            strict_mode=false,
                            ignore_errors=true
                        )
                    """)
                except Exception as e3:
                    con.close()
                    raise ValueError(f"Failed to import CSV after multiple attempts: {str(e3)}")
    elif ftype == 'parquet':
        mode = 'OR REPLACE TABLE' if clear else 'TABLE IF NOT EXISTS'
        con.execute(f"""
            CREATE {mode} {tname} AS
            SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_parquet('{fname}')
        """)
    elif ftype == 'excel':
        df = pd.read_excel(fname)
        if clear:
            con.execute(f"DROP TABLE IF EXISTS {tname}")
        # Add row_id column to DataFrame
        df['row_id'] = range(1, len(df) + 1)
        con.register('temp_excel_df', df)
        con.execute(f"CREATE TABLE {tname} AS SELECT * FROM temp_excel_df")
        con.unregister('temp_excel_df')
    else:
        con.close()
        raise ValueError("Unsupported file type.")

    return print(f"Import completed: {fname} into {tname} at {db_path}")


In [None]:
# Run import data, skip function if data already exists in duckdb.
import_data(file_path, db_path=db_path,tname=tname, clear=True)


Pull data from duckdb into DataFrame
- is_sample fetches 500 rows

In [None]:
def fetch_return_comments(con, tname, is_sample=False, comment_column='RETURN COMMENT', row_id='row_id'):
    """
    Extract return comments from the DuckDB table with a row_id for later matching.
    con: DuckDB connection
    tname: Table name
    is_sample: Whether to take a sample or full dataset
    comment_column: Column containing return comments
    row_id: Column to use for joining results back to main dataset
    
    Returns:
        DataFrame with comments and row_id
    """
    # Filter for non-empty comments
    comment_filter = f"""WHERE "{comment_column}" IS NOT NULL AND TRIM("{comment_column}") != ''"""
    
    if is_sample:
        sample_query = "ORDER BY RANDOM() LIMIT 500"  
    else:
        sample_query = ""
    
    # Select the comment, row_id, and add row identifier
    query = f"""
    SELECT 
        "{row_id}" as row_id,
        "{comment_column}" as comment,
    FROM {tname}
    {comment_filter}
    {sample_query}
    """
    
    result = con.execute(query).df()
    print(f"Extracted {len(result)} comments from {tname}")
    
    return result


Set Client to API key used for OpenAi

In [None]:
from apikey import create_secret

client = create_secret()


Sends Client, Prompt, and DataFrame to OpenAI for actioning.
- client: API key
- prompt: text prompt for OpenAI
- df: DataFrame of data
- debug: verbose logging for send/receive
- gpt_model: ChatGPT model to use.
- sys_prompt: System message to send prior to user messages and data

In [None]:
import json
import logging
import openai
import re
import os


def ai_analyze_comments(client,sys_prompt: str, prompt: str, df: pd.DataFrame, debug: bool = True, gpt_model="gpt-4o") -> str:
    """
    Sends `prompt` plus the JSON version of `df` to ChatGPT,
    and returns the model's response.strip()
    """

    df_json = df.to_json(orient="records")
    if debug:
        print("Prompt sent to model:\n", prompt)

    messages = [
    {"role": "system",
    "content": 
        "You are an expert linguistic analyst specializing in extracting and scoring themes from customer return comments. "
        "You always return your output exactly in the structure specified in the user's instructions. "
        "Be precise, consistent, and strictly follow the output schema and scoring rules provided."
        },
        {"role": "user", "content": prompt},
        {"role": "user", "content": df_json}
    ]

    resp = client.chat.completions.create(
        model=gpt_model,
        messages=messages,
        temperature=0.1,
        max_tokens=15000,
    )

    content = resp.choices[0].message.content.strip()

    if debug:
        print("Raw response from OpenAI:\n", content)
    if not content:
        raise ValueError("Empty response from OpenAI")

    return content


Fucntions to clean data of characters that can impact ChatGPT's ability to parse correctly.

In [None]:
import re

def prepare_data_for_analysis(text):
    """Sanitize comment text and strip code block formatting from model output."""
    if not isinstance(text, str):
        return ""
    # Remove control characters and excessive whitespace
    text = re.sub(r'[\x00-\x1F\x7F]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text

def strip_code_block(text):
    """Remove Markdown code block wrappers from OpenAI response."""
    text = text.strip()
    code_block_pattern = r"^```(?:json)?\s*([\s\S]*?)\s*```$"
    match = re.match(code_block_pattern, text)
    if match:
        return match.group(1).strip()
    return text


Fetch comments_df - which is the row_id and comment only, after analysis this will be rejoined based on the row_id

In [None]:
# SAMPLE - Fetch a sample of comments and row_id only (for analysis and joining)
#comments_df = fetch_return_comments(con, tname, is_sample=False, comment_column=comment_column, row_id=row_id)

# Full set
comments_df = fetch_return_comments(con, tname, is_sample=False, comment_column=comment_column, row_id=row_id)

# Show counts
print(f"Comments sample rows: {len(comments_df)}")

# Display the first few rows of comments
if len(comments_df) > 0:
    print("\nSample of comments:")
    for i, comment in enumerate(comments_df['comment'].head(3)):
        print(f"{i+1}: {comment[:100]}...")
else:
    print("WARNING: No valid comments found in the sample")

comments_df.head()


Hardcode batching if needed for testing.

In [None]:
batch1 = comments_df.iloc[:20]
print(f"confirming batch size for batch 1 : {len(batch1)}")
batch2 = comments_df.iloc[21:75]
print(f"confirming batch size for batch 1 : {len(batch2)}")
batch3 = comments_df.iloc[76:276]

Function to run pipeline on extracted data
- Prepares DF with striping of special characters
- Sets and manages batching as per batch_size
- Runs Product Analysis using product_prompt, then again with customer_sentiment_prompt
- For each batch, 
- - strips special characters in DataFrame
- - unnests "theme" .json which contains 1-4 themes as per current scripts
- - catches failed batches, and attempts 1 re-run after full processing
- - saves each batch as a .csv {type}_batch_x.csv
- combines all batches into a combined .csv
- Runs the above for Customer_sentiment
- combines all customer_sentiments into a single .csv
- returns two DataFrames, one for each analysis type as a row_id, {data}


In [None]:
def handle_sentiment_analysis(
    comments_df,
    client=None,
    product_prompt: str = 'prompts/product_prompt.txt',
    customer_prompt: str = 'prompts/customer_sentiment_prompt.txt',
    batch_size: int = 500,
    debug: bool = False
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Run both Product and Customer analysis on same data set, batching results and saving each batch to CSV.
    At the end, create a combined CSV of all results.
    """
    import time
    from datetime import datetime
    import pandas as pd
    import os
    from io import StringIO
    import traceback

    print(f"Starting analysis at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total records to process: {len(comments_df)}")

    # Helper to batch DataFrame
    def batch_df(df, batch_size):
        for i in range(0, len(df), batch_size):
            yield i // batch_size + 1, df.iloc[i:i+batch_size]

    # Clean and sanitize comments before batching
    comments_df = comments_df.copy()
    if 'comment' in comments_df.columns:
        comments_df['comment'] = comments_df['comment'].apply(prepare_data_for_analysis)

    # PRODUCT FEEDBACK ANALYSIS
    print("\n===== PRODUCT FEEDBACK ANALYSIS =====")
    print(f"using product prompt: {product_prompt}")
    product_batches = []
    for batch_num, batch_df_ in batch_df(comments_df, batch_size):
        print(f"Processing product batch {batch_num} with {len(batch_df_)} records...")
        print(f"Batch {batch_num} - DataFrame sent to OpenAI:")
        print(batch_df_)
        product_result = ai_analyze_comments(
            client=client,
            prompt=product_prompt,
            df=batch_df_,
            debug=debug
        )
        # Logging before parsing
        print(f"Batch {batch_num} - Raw OpenAI response type: {type(product_result)}")
        print(f"Batch {batch_num} - Raw OpenAI response length: {len(product_result) if isinstance(product_result, str) else 'N/A'}")
        print(f"Batch {batch_num} - Raw OpenAI response:\n{product_result}")
        try:
            cleaned_result = strip_code_block(product_result)
            import json
            parsed_json = json.loads(cleaned_result)
            
            # If 'themes' is a nested list of dicts, create columns for each theme
            if isinstance(parsed_json, list) and 'themes' in parsed_json[0]:
                # Create a base dataframe with non-theme data
                base_records = []
                for record in parsed_json:
                    # Extract the base record without themes
                    base_record = {k: v for k, v in record.items() if k != 'themes'}
                    
                    # Add theme data as separate columns
                    themes = record.get('themes', [])
                    for i, theme_data in enumerate(themes, 1):
                        base_record[f'Theme_{i}'] = theme_data.get('theme', '')
                        base_record[f'Sentiment_{i}'] = theme_data.get('sentiment', 0)
                    
                    base_records.append(base_record)
                
                product_result_df = pd.DataFrame(base_records)
            else:
                product_result_df = pd.DataFrame(parsed_json)
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError for product batch {batch_num}: {e}")
            with open(f"failed_product_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(product_result)
            product_result_df = pd.DataFrame()
        except Exception as e:
            print(f"Exception for product batch {batch_num}: {e}")
            with open(f"failed_product_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(product_result)
            product_result_df = pd.DataFrame()
        product_batches.append(product_result_df)
        batch_csv = f"product_batch{batch_num}.csv"
        product_result_df.to_csv(batch_csv, index=False)
        print(f"Saved product batch {batch_num} to {batch_csv}")

    # Combine all product batches
    product_df = pd.concat(product_batches, ignore_index=True) if product_batches else pd.DataFrame()
    product_df.to_csv("product_all_batches_combined.csv", index=False)
    print("Saved combined product results to product_all_batches_combined.csv")

    # CUSTOMER FEEDBACK ANALYSIS
    print("\n===== CUSTOMER FEEDBACK ANALYSIS =====")
    print(f"using customer prompt: {customer_prompt}")
    customer_batches = []
    for batch_num, batch_df_ in batch_df(comments_df, batch_size):
        print(f"Processing customer batch {batch_num} with {len(batch_df_)} records...")
        print(f"Batch {batch_num} - DataFrame sent to OpenAI:")
        print(batch_df_)
        customer_result = ai_analyze_comments(
            client=client,
            prompt=customer_prompt,
            df=batch_df_,
            debug=debug
        )
        # Logging before parsing
        print(f"Batch {batch_num} - Raw OpenAI response type: {type(customer_result)}")
        print(f"Batch {batch_num} - Raw OpenAI response length: {len(customer_result) if isinstance(customer_result, str) else 'N/A'}")
        print(f"Batch {batch_num} - Raw OpenAI response:\n{customer_result}")
        try:
            cleaned_result = strip_code_block(customer_result)
            import json
            parsed_json = json.loads(cleaned_result)
            
            # If 'themes' is a nested list of dicts, create columns for each theme
            if isinstance(parsed_json, list) and 'themes' in parsed_json[0]:
                # Create a base dataframe with non-theme data
                base_records = []
                for record in parsed_json:
                    # Extract the base record without themes
                    base_record = {k: v for k, v in record.items() if k != 'themes'}
                    
                    # Add theme data as separate columns
                    themes = record.get('themes', [])
                    for i, theme_data in enumerate(themes, 1):
                        base_record[f'Theme_{i}'] = theme_data.get('theme', '')
                        base_record[f'Sentiment_{i}'] = theme_data.get('sentiment', 0)
                    
                    base_records.append(base_record)
                
                customer_result_df = pd.DataFrame(base_records)
            else:
                customer_result_df = pd.DataFrame(parsed_json)
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError for customer batch {batch_num}: {e}")
            with open(f"failed_customer_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(customer_result)
            customer_result_df = pd.DataFrame()
        except Exception as e:
            print(f"Exception for customer batch {batch_num}: {e}")
            with open(f"failed_customer_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(customer_result)
            customer_result_df = pd.DataFrame()
        customer_batches.append(customer_result_df)
        batch_csv = f"customer_batch{batch_num}.csv"
        customer_result_df.to_csv(batch_csv, index=False)
        print(f"Saved customer batch {batch_num} to {batch_csv}")

    # Combine all customer batches
    customer_df = pd.concat(customer_batches, ignore_index=True) if customer_batches else pd.DataFrame()
    customer_df.to_csv("customer_all_batches_combined.csv", index=False)
    print("Saved combined customer results to customer_all_batches_combined.csv")

    return product_df, customer_df

Run both Customer and Product Analysis

CHECK BATCH #!

In [None]:
from pathlib import Path

cust_prompt = Path(r"prompts\customer_sentiment_prompt.txt").read_text(encoding="utf8")
prod_prompt = Path(r"prompts\product_prompt.txt").read_text(encoding="utf8")
sys_prompt = Path(r"prompts\system_prompt.txt").read_text(encoding="utf8")

analyzed_product, analyzed_customer = handle_sentiment_analysis(
    batch2, # Make sure correct batch is passed!
    client=client,
    sys_prompt=sys_prompt,
    batch_size= 25,
    debug= False,
    id_column= 'row_id',
    db_path= db_path,
    product_prompt=prod_prompt,
    customer_prompt=cust_prompt
)

Creates a new duckdb table to store analyzed data

Expects customer and Product data to be combined

In [None]:
def create_completed_analytics_table(con):

    con.execute("""
    CREATE TABLE IF NOT EXISTS completed_analytics (
    row_id INTEGER,
    -- Product analytics
    RETURN_COMMENT_p VARCHAR,
    "Theme 1_p" VARCHAR,
    "Sentiment 1_p" DOUBLE,
    "Theme 2_p" VARCHAR,
    "Sentiment 2_p" DOUBLE,
    "Theme 3_p" VARCHAR,
    "Sentiment 3_p" DOUBLE,
    "Theme 4_p" VARCHAR,
    "Sentiment 4_p" DOUBLE,
    "Pos_mean_p" DOUBLE,
    "Neg_mean_p" DOUBLE,
    "Total_sentiment_p" DOUBLE,
    -- Customer analytics
    RETURN_COMMENT_c VARCHAR,
    "Theme 1_c" VARCHAR,
    "Sentiment 1_c" DOUBLE,
    "Theme 2_c" VARCHAR,
    "Sentiment 2_c" DOUBLE,
    "Theme 3_c" VARCHAR,
    "Sentiment 3_c" DOUBLE,
    "Theme 4_c" VARCHAR,
    "Sentiment 4_c" DOUBLE,
    "Pos_mean_c" DOUBLE,
    "Neg_mean_c" DOUBLE,
    "Total_sentiment_c" DOUBLE,
    processed_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

Combines product_df and customer_df into new table

In [None]:
def save_sentiment_only_to_duckdb(con, analyzed_product_df, analyzed_customer_df, tname):
    
    # Rename columns for product and customer
    def rename_cols(df, suffix):
        rename_map = {}
        for col in df.columns:
            if col != 'row_id':  # Keep row_id as is for joining
                rename_map[col] = f"{col}_{suffix}"
        return df.rename(columns=rename_map)
    
    product_df = rename_cols(analyzed_product_df, 'p')
    customer_df = rename_cols(analyzed_customer_df, 'c')

    # Merge product and customer on row_id
    combined_df = pd.merge(product_df, customer_df, on='row_id', how='outer')

    # Ensure table is created before inserting data
    create_completed_analytics_table(con)

    con.register('batch_results', combined_df)
    con.execute("""
    INSERT INTO completed_analytics (
        row_id,
        -- Product analytics
        RETURN_COMMENT_p, "Theme 1_p", "Sentiment 1_p", "Theme 2_p", "Sentiment 2_p", 
        "Theme 3_p", "Sentiment 3_p", "Theme 4_p", "Sentiment 4_p", 
        "Pos_mean_p", "Neg_mean_p", "Total_sentiment_p",
        -- Customer analytics
        RETURN_COMMENT_c, "Theme 1_c", "Sentiment 1_c", "Theme 2_c", "Sentiment 2_c", 
        "Theme 3_c", "Sentiment 3_c", "Theme 4_c", "Sentiment 4_c", 
        "Pos_mean_c", "Neg_mean_c", "Total_sentiment_c"
    )
    SELECT 
        br.row_id,
        -- Product analytics
        br.RETURN_COMMENT_p, br."Theme 1_p", br."Sentiment 1_p", br."Theme 2_p", br."Sentiment 2_p", 
        br."Theme 3_p", br."Sentiment 3_p", br."Theme 4_p", br."Sentiment 4_p", 
        br."Pos_mean_p", br."Neg_mean_p", br."Total_sentiment_p",
        -- Customer analytics
        br.RETURN_COMMENT_c, br."Theme 1_c", br."Sentiment 1_c", br."Theme 2_c", br."Sentiment 2_c", 
        br."Theme 3_c", br."Sentiment 3_c", br."Theme 4_c", br."Sentiment 4_c", 
        br."Pos_mean_c", br."Neg_mean_c", br."Total_sentiment_c"
    FROM batch_results br
    WHERE NOT EXISTS (
        SELECT 1 FROM completed_analytics ca 
        WHERE ca.row_id = br.row_id
    )
    """)

    # Confirm the insertion
    count = con.execute("SELECT COUNT(*) FROM completed_analytics").fetchone()[0]
    print(f"Total records in completed_analytics: {count}, current batch started with: {len(combined_df)}")

    return combined_df

In [None]:
combined_customer_prod_df = save_sentiment_only_to_duckdb(con, analyzed_product, analyzed_customer, tname)

In [None]:
con.close()