Using OpenAPI 4.0-Turbo perform sentiment analysis on return comments

In [1]:
import pandas as pd
import duckdb
from typing import Optional, Tuple
import openpyxl

Fetch data, load into duckdb table

For now, from CSV

In [2]:
def import_data(
        fname: str,
        clear=True,
        db_path='temp_db',
        tname='staging_table',
        ftype=None,
        ):
    """
    load CSV, Parquet, or Excel file into a DuckDB table.
    Returns confirmation message.
    clear = Replace existing table if true, else create if not exists.
    Adds a unique row_id column to each record.
    """
    import os
    con = duckdb.connect(db_path)

    # Infer file type if not provided
    if ftype is None:
        ext = os.path.splitext(fname)[1].lower()
        if ext == '.csv':
            ftype = 'csv'
        elif ext == '.parquet':
            ftype = 'parquet'
        elif ext in ('.xlsx', '.xls'):
            ftype = 'excel'
        else:
            con.close()
            raise ValueError("Unsupported file extension.")

    if ftype == 'csv':
        mode = 'OR REPLACE TABLE' if clear else 'TABLE IF NOT EXISTS'
        try:
            # First attempt with standard options, add row_id
            con.execute(f"""
                CREATE {mode} {tname} AS
                SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto('{fname}', escape='\\', encoding='utf-8', header=True)
            """)
        except Exception as e:
            print(f"First attempt failed, trying with strict_mode=false: {str(e)}")
            try:
                # Second attempt with strict_mode=false to handle unterminated quotes
                con.execute(f"""
                    CREATE {mode} {tname} AS
                    SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto(
                        '{fname}', 
                        escape='\\',
                        encoding='utf-8',
                        header=True,
                        strict_mode=false
                    )
                """)
            except Exception as e2:
                print(f"Second attempt failed, trying with ignore_errors=true: {str(e2)}")
                try:
                    # Third attempt with ignore_errors=true to skip problematic rows
                    con.execute(f"""
                        CREATE {mode} {tname} AS
                        SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_csv_auto(
                            '{fname}', 
                            escape='\\',
                            encoding='utf-8',
                            header=True,
                            strict_mode=false,
                            ignore_errors=true
                        )
                    """)
                except Exception as e3:
                    con.close()
                    raise ValueError(f"Failed to import CSV after multiple attempts: {str(e3)}")
    elif ftype == 'parquet':
        mode = 'OR REPLACE TABLE' if clear else 'TABLE IF NOT EXISTS'
        con.execute(f"""
            CREATE {mode} {tname} AS
            SELECT *, ROW_NUMBER() OVER () AS row_id FROM read_parquet('{fname}')
        """)
    elif ftype == 'excel':
        df = pd.read_excel(fname)
        if clear:
            con.execute(f"DROP TABLE IF EXISTS {tname}")
        # Add row_id column to DataFrame
        df['row_id'] = range(1, len(df) + 1)
        con.register('temp_excel_df', df)
        con.execute(f"CREATE TABLE {tname} AS SELECT * FROM temp_excel_df")
        con.unregister('temp_excel_df')
    else:
        con.close()
        raise ValueError("Unsupported file type.")

    return print(f"Import completed: {fname} into {tname} at {db_path}")


In [42]:
# Define important parameters
file_path = r'C:\Code\URBN\review_analyzer\data\IID_RR_GROSS.csv'
tname = 'iid_return_reasons'
db_path='iid_return_comment'
row_id = "row_id"
# Define the column with returns comments
comment_column = "RETURN_COMMENT"
con = duckdb.connect(db_path)


In [4]:
import_data(file_path, db_path=db_path,tname=tname, clear=True)


First attempt failed, trying with strict_mode=false: Invalid Input Error: The CSV Parser state machine reached an invalid state.
This can happen when is not possible to parse your CSV File with the given options, or the CSV File is not RFC 4180 compliant 
Possible fixes:
* Disable the parser's strict mode (strict_mode=false) to allow reading rows that do not comply with the CSV standard.

  file = C:\Code\URBN\review_analyzer\data\IID_RR_GROSS.csv
  delimiter = | (Auto-Detected)
  quote = ' (Auto-Detected)
  escape = \ (Set By User)
  new_line = \r\n (Auto-Detected)
  header = true (Set By User)
  skip_rows = 0 (Auto-Detected)
  comment = (empty) (Auto-Detected)
  strict_mode = true (Auto-Detected)
  date_format =  (Auto-Detected)
  timestamp_format =  (Auto-Detected)
  null_padding = 0
  sample_size = 20480
  ignore_errors = false
  all_varchar = 0


Second attempt failed, trying with ignore_errors=true: Conversion Error: CSV Error on Line: 190281
Original Line: 
AN19985240,"016I99AFE

In [5]:
con = duckdb.connect(db_path)


In [43]:
describe = con.execute(f"SELECT * FROM {tname} limit 5 ").fetchdf()
print(describe.describe())

                       ORDER_DATE           SKU  SALES_QTY  RETURN_QTY  \
count                           5  5.000000e+00   5.000000    5.000000   
mean   2023-03-15 22:51:41.400000  7.563743e+07   1.200000    0.200000   
min           2022-06-22 18:17:34  6.690194e+07   1.000000    0.000000   
25%           2022-07-19 13:26:01  6.718854e+07   1.000000    0.000000   
50%           2022-09-11 01:10:32  6.910816e+07   1.000000    0.000000   
75%           2022-10-30 13:57:12  7.857367e+07   1.000000    0.000000   
max           2025-06-20 19:27:08  9.641482e+07   2.000000    1.000000   
std                           NaN  1.255633e+07   0.447214    0.447214   

           GROSS          RETURN_DATE  MAX(RR.F_ID)  \
count    5.00000                    1           1.0   
mean    69.43800  2022-07-26 19:46:08          17.0   
min      9.99000  2022-07-26 19:46:08          17.0   
25%     27.20000  2022-07-26 19:46:08          17.0   
50%     64.00000  2022-07-26 19:46:08          17.0   
75%

Fetch sample or full data into DataFrame

In [7]:
def fetch_return_comments(con, tname, is_sample=False, comment_column='RETURN COMMENT', row_id_column='row_id'):
    """
    Extract return comments from the DuckDB table with a row_id for later matching.
    con: DuckDB connection
    tname: Table name
    is_sample: Whether to take a sample or full dataset
    comment_column: Column containing return comments
    row_id_column: Column to use for joining results back to main dataset
    
    Returns:
        DataFrame with comments and row_id
    """
    # Filter for non-empty comments
    comment_filter = f"""WHERE "{comment_column}" IS NOT NULL AND TRIM("{comment_column}") != ''"""
    
    if is_sample:
        sample_query = "ORDER BY RANDOM() LIMIT 500"  
    else:
        sample_query = ""
    
    # Select the comment, row_id, and add row identifier
    query = f"""
    SELECT 
        "{row_id_column}" as row_id,
        "{comment_column}" as comment,
    FROM {tname}
    {comment_filter}
    {sample_query}
    """
    
    result = con.execute(query).df()
    print(f"Extracted {len(result)} comments from {tname}")
    
    return result


In [None]:
con = duckdb.connect(db_path)


Fetch comments_df - which is the row_id and comment only, after analysis this will be rejoined based on the row_id

In [8]:


# SAMPLE - Fetch a sample of comments and row_id only (for analysis and joining)
comments_df = fetch_return_comments(con, tname, is_sample=True, comment_column=comment_column, row_id_column=row_id)

# Full set
# comments_df = fetch_return_comments(con, tname, is_sample=False, comment_column=comment_column, row_id_column=row_id_column)

# Show counts
print(f"Comments sample rows: {len(comments_df)}")

# Display the first few rows of comments
if len(comments_df) > 0:
    print("\nSample of comments:")
    for i, comment in enumerate(comments_df['comment'].head(3)):
        print(f"{i+1}: {comment[:100]}...")
else:
    print("WARNING: No valid comments found in the sample")

comments_df.head()


Extracted 500 comments from iid_return_reasons
Comments sample rows: 500

Sample of comments:
1: Didnt look like they did online...
2: Too large....
3: Too large...


Unnamed: 0,row_id,comment
0,714228,Didnt look like they did online
1,1514206,Too large.
2,2006660,Too large
3,1528173,My top has a hole on the side under the arm af...
4,1744900,You sent the mini instead of the full-length I...


Fetch DS OpenAI API Key

In [9]:
from google.cloud import secretmanager
from google.cloud import storage

def access_secret(secret_path):
    """Establishes connection to GCP secret manager and retrieves secret value.
    ensure authentication is setup for GCP: in bash: gcloud auth application-default login"""

    client = secretmanager.SecretManagerServiceClient()
    response = client.access_secret_version(name=secret_path)
    secret_payload = response.payload.data.decode("UTF-8")

    return secret_payload


In [10]:

import openai
secret = access_secret("projects/572292574132/secrets/openai_monday_status_alerts/versions/latest")
client = openai.OpenAI(api_key=secret)

with open('prompts/customer_sentiment_prompt.txt', 'r', encoding='utf-8') as f:
    prompt = f.read()

In [11]:
import json
import logging
import openai
import re
from dotenv import load_dotenv
import os


def ai_analyze_comments(client, prompt: str, df: pd.DataFrame, debug: bool = True) -> str:
    """
    Sends `prompt` plus the JSON version of `df` to ChatGPT,
    and returns the model's response.strip()
    """
    import re
    df_json = df.to_json(orient="records")
    if debug:
        print("Prompt sent to model:\n", prompt)

    messages = [
    {"role": "system",
    "content": 
        "You are an expert linguistic analyst specializing in extracting and scoring themes from customer return comments. "
        "You always return your output as a single JSON array of objects, one per input record, using exactly the keys and structure specified in the user's instructions. "
        "Do not include any explanations, extra text, or formatting outside the required JSON array. "
        "Be precise, consistent, and strictly follow the output schema and scoring rules provided."
        },
        {"role": "user", "content": prompt},
        {"role": "user", "content": df_json}
    ]

    resp = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=0.1,
    )

    content = resp.choices[0].message.content.strip()

    if debug:
        print("Raw response from OpenAI:\n", content)
    if not content:
        raise ValueError("Empty response from OpenAI")

    return content


Function to run both Product and Customer analysis on same data set, combine into one .xlsx with two tabs.

In [12]:
import re

def prepare_data_for_analysis(text):
    """Sanitize comment text and strip code block formatting from model output."""
    if not isinstance(text, str):
        return ""
    # Remove control characters and excessive whitespace
    text = re.sub(r'[\x00-\x1F\x7F]', ' ', text)
    text = re.sub(r'\s+', ' ', text)
    text = text.strip()
    return text

def strip_code_block(text):
    """Remove Markdown code block wrappers from OpenAI response."""
    text = text.strip()
    code_block_pattern = r"^```(?:json)?\s*([\s\S]*?)\s*```$"
    match = re.match(code_block_pattern, text)
    if match:
        return match.group(1).strip()
    return text


In [13]:
def handle_sentiment_analysis(
    comments_df,
    client=None,
    product_prompt: str = 'prompts/product_prompt.txt',
    customer_prompt: str = 'prompts/customer_sentiment_prompt.txt',
    batch_size: int = 750,
    debug: bool = False,
    excel_export: bool = False,
    csv_export: bool = True,
    id_column: str = 'row_id',
    db_path: str = 'iid_return_comment.db',
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Run both Product and Customer analysis on same data set, batching results and saving each batch to CSV.
    At the end, create a combined CSV of all results.
    """
    import time
    from datetime import datetime
    import pandas as pd
    import os
    from io import StringIO
    import traceback

    print(f"Starting analysis at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Total records to process: {len(comments_df)}")

    # Define table names
    product_table = 'product_feedback_results'
    customer_table = 'customer_feedback_results'

    # Helper to batch DataFrame
    def batch_df(df, batch_size):
        for i in range(0, len(df), batch_size):
            yield i // batch_size + 1, df.iloc[i:i+batch_size]

    # Clean and sanitize comments before batching
    comments_df = comments_df.copy()
    if 'comment' in comments_df.columns:
        comments_df['comment'] = comments_df['comment'].apply(prepare_data_for_analysis)

    # PRODUCT FEEDBACK ANALYSIS
    print("\n===== PRODUCT FEEDBACK ANALYSIS =====")
    product_batches = []
    for batch_num, batch_df_ in batch_df(comments_df, batch_size):
        print(f"Processing product batch {batch_num} with {len(batch_df_)} records...")
        print(f"Batch {batch_num} - DataFrame sent to OpenAI:")
        print(batch_df_)
        product_result = ai_analyze_comments(
            client=client,
            prompt=product_prompt,
            df=batch_df_,
            debug=debug
        )
        # Logging before parsing
        print(f"Batch {batch_num} - Raw OpenAI response type: {type(product_result)}")
        print(f"Batch {batch_num} - Raw OpenAI response length: {len(product_result) if isinstance(product_result, str) else 'N/A'}")
        print(f"Batch {batch_num} - Raw OpenAI response:\n{product_result}")
        try:
            cleaned_result = strip_code_block(product_result)
            product_result_df = pd.read_json(StringIO(cleaned_result), orient="records")
        except Exception as e:
            print(f"Exception for product batch {batch_num}: {e}")
            traceback.print_exc()
            print(f"Failed JSON string for product batch {batch_num}:\n{product_result}")
            with open(f"failed_product_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(product_result)
            raise
        product_batches.append(product_result_df)
        batch_csv = f"product_batch{batch_num}.csv"
        product_result_df.to_csv(batch_csv, index=False)
        print(f"Saved product batch {batch_num} to {batch_csv}")

    # Combine all product batches
    product_df = pd.concat(product_batches, ignore_index=True) if product_batches else pd.DataFrame()
    product_df.to_csv("product_all_batches_combined.csv", index=False)
    print("Saved combined product results to product_all_batches_combined.csv")

    # CUSTOMER FEEDBACK ANALYSIS
    print("\n===== CUSTOMER FEEDBACK ANALYSIS =====")
    customer_batches = []
    for batch_num, batch_df_ in batch_df(comments_df, batch_size):
        print(f"Processing customer batch {batch_num} with {len(batch_df_)} records...")
        print(f"Batch {batch_num} - DataFrame sent to OpenAI:")
        print(batch_df_)
        customer_result = ai_analyze_comments(
            client=client,
            prompt=customer_prompt,
            df=batch_df_,
            debug=debug
        )
        # Logging before parsing
        print(f"Batch {batch_num} - Raw OpenAI response type: {type(customer_result)}")
        print(f"Batch {batch_num} - Raw OpenAI response length: {len(customer_result) if isinstance(customer_result, str) else 'N/A'}")
        print(f"Batch {batch_num} - Raw OpenAI response:\n{customer_result}")
        try:
            cleaned_result = strip_code_block(customer_result)
            customer_result_df = pd.read_json(StringIO(cleaned_result), orient="records")
        except Exception as e:
            print(f"Exception for customer batch {batch_num}: {e}")
            traceback.print_exc()
            print(f"Failed JSON string for customer batch {batch_num}:\n{customer_result}")
            with open(f"failed_customer_batch_{batch_num}.json", "w", encoding="utf-8") as f:
                f.write(customer_result)
            raise
        customer_batches.append(customer_result_df)
        batch_csv = f"customer_batch{batch_num}.csv"
        customer_result_df.to_csv(batch_csv, index=False)
        print(f"Saved customer batch {batch_num} to {batch_csv}")

    # Combine all customer batches
    customer_df = pd.concat(customer_batches, ignore_index=True) if customer_batches else pd.DataFrame()
    customer_df.to_csv("customer_all_batches_combined.csv", index=False)
    print("Saved combined customer results to customer_all_batches_combined.csv")

    # Create a combined CSV of both analyses
    combined_df = pd.concat([product_df, customer_df], ignore_index=True)
    combined_df.to_csv("combined_product_customer_analysis.csv", index=False)
    print("Saved combined product and customer results to combined_product_customer_analysis.csv")

    return product_df, customer_df

Run both Customer and Product Analysis

In [14]:
analyzed_product, analyzed_customer = handle_sentiment_analysis(
    comments_df,
    client=client,
    batch_size= 10,
    debug= False,
    excel_export= False,
    csv_export= True,
    id_column= 'row_id',
    db_path= 'iid_return_comment.db',
)

Starting analysis at 2025-07-11 13:04:07
Total records to process: 500

===== PRODUCT FEEDBACK ANALYSIS =====
Processing product batch 1 with 10 records...
Batch 1 - DataFrame sent to OpenAI:
    row_id                                            comment
0   714228                    Didnt look like they did online
1  1514206                                         Too large.
2  2006660                                          Too large
3  1528173  My top has a hole on the side under the arm af...
4  1744900  You sent the mini instead of the full-length I...
5   527388  One glass arrived broken with a small crack on...
6  1149131  Pillow are made poorly and are ripping at the ...
7  1359233                                  cx wants a refund
8  1378363                                  It is too small .
9   495076               Colors are muted compared to picture
Batch 1 - Raw OpenAI response type: <class 'str'>
Batch 1 - Raw OpenAI response length: 1686
Batch 1 - Raw OpenAI response:
``

In [38]:
if con:con.close()
con = duckdb.connect('iid_return_comment')

In [39]:
def save_sentiment_to_duckdb(
    con, analyzed_product_df, analyzed_customer_df, tname, replace=True
):
    """
    Join product and customer sentiment results, label columns with _p and _c, and join back to base data.
    Save as {tname}_with_sentiment in DuckDB.
    """
    import pandas as pd
    # Rename columns for product and customer
    def rename_cols(df, suffix):
        rename_map = {}
        for col in df.columns:
            if col != 'row_id':
                rename_map[col] = f"{col}_{suffix}"
        return df.rename(columns=rename_map)
    product_df = rename_cols(analyzed_product_df, 'p')
    customer_df = rename_cols(analyzed_customer_df, 'c')
    # Merge product and customer on row_id
    combined_df = pd.merge(product_df, customer_df, on='row_id', how='outer')
    # Register combined sentiment table
    combined_table_name = f"{tname}_sentiment_combined"
    con.register(combined_table_name, combined_df)
    if replace:
        con.execute(f"CREATE OR REPLACE TABLE {combined_table_name} AS SELECT * FROM {combined_table_name}")
    else:
        con.execute(f"CREATE TABLE IF NOT EXISTS {combined_table_name} AS SELECT * FROM {combined_table_name}")
    con.unregister(combined_table_name)
    # Join back to base data
    join_sql = f"""
        SELECT base.*, sent.*
        FROM {tname} AS base
        LEFT JOIN {combined_table_name} AS sent
        ON base.row_id = sent.row_id
    """
    joined_df = con.execute(join_sql).fetchdf()
    # Save joined result as a new table
    joined_table_name = f"{tname}_with_sentiment"
    con.register(joined_table_name, joined_df)
    if replace:
        con.execute(f"CREATE OR REPLACE TABLE {joined_table_name} AS SELECT * FROM {joined_table_name}")
    else:
        con.execute(f"CREATE TABLE IF NOT EXISTS {joined_table_name} AS SELECT * FROM {joined_table_name}")
    con.unregister(joined_table_name)
    print(f"Joined product and customer sentiment results saved to DuckDB as '{joined_table_name}'.")

# Example usage:
save_sentiment_to_duckdb(con, analyzed_product, analyzed_customer, tname, replace=True)

Joined product and customer sentiment results saved to DuckDB as 'iid_return_reasons_with_sentiment'.


In [40]:
t1 = "joined_product_sentiment_results"
t2 = "joined_customer_sentiment_results"
t3 = "iid_return_reasons_with_sentiment"

# q1 = con.execute(f"SELECT * FROM {t1}").fetchdf()
# q2 = con.execute(f"SELECT * FROM {t2}").fetchdf()
q3 = con.execute(f"SELECT * FROM {t3}").fetchdf()

print(q3.describe())


                       ORDER_DATE           SKU     SALES_QTY    RETURN_QTY  \
count                     2029137  2.029137e+06  2.029137e+06  2.029137e+06   
mean   2023-12-23 12:58:47.112833  8.094631e+07  1.049283e+00  2.207889e-01   
min           2022-06-01 00:03:45  1.631000e+04  1.000000e+00  0.000000e+00   
25%           2023-03-26 15:21:01  7.658606e+07  1.000000e+00  0.000000e+00   
50%           2023-12-08 21:04:46  8.368451e+07  1.000000e+00  0.000000e+00   
75%           2024-10-10 13:17:16  9.003153e+07  1.000000e+00  0.000000e+00   
max           2025-07-11 03:50:12  1.039728e+08  1.499400e+02  2.800000e+01   
std                           NaN  1.385072e+07  4.214671e-01  4.298578e-01   

              GROSS                 RETURN_DATE  MAX(RR.F_ID)  \
count  2.029137e+06                      441225      440529.0   
mean   8.329977e+01  2024-04-04 12:39:01.102808     10.823628   
min    0.000000e+00         2022-06-03 10:46:29           2.0   
25%    3.200000e+01         

In [35]:
for col in q1.columns:
    print(f"Column: {col}")
print(f"\n\n starting q2 \n\n")
for col in q2.columns:
    print(f"Column: {col}")

print(q1.describe())
print(q2.describe())

Column: SALES_ORDER_NO
Column: IID
Column: ORDER_DATE
Column: SKU
Column: SALES_QTY
Column: RETURN_QTY
Column: GROSS
Column: UNITS_RETURNED_FLAG
Column: RETURN_DATE
Column: RETURN_NO
Column: RETURN_COMMENT
Column: RETURN_REASON
Column: ORDERLINK
Column: MAX(RR.F_ID)
Column: MAX(RR.F_NAME)
Column: MAX(RR.Q_D2C_RTRN_REASONCODE_ID)
Column: MAX(RR.Q_D2C_RET_REASON_NAME)
Column: MAX(RR.Q_D2C_RET_REASON)
Column: MAX(RR.Q_D2C_RET_REASON_GROUP_NAME0)
Column: Q_CLS_ID
Column: Q_SKU_DESC
Column: Q_GMM_ID
Column: Q_SKU_ID
Column: CLASS_
Column: DIVISION_
Column: BRAND_
Column: Q_CLR_DNUM
Column: Q_CLR_DESC
Column: VENDOR_STYLE
Column: SIZE_
Column: row_id
Column: row_id_1
Column: RETURN_COMMENT_1
Column: Theme 1
Column: Sentiment 1
Column: Theme 2
Column: Sentiment 2
Column: Theme 3
Column: Sentiment 3
Column: Theme 4
Column: Sentiment 4
Column: Pos_mean
Column: Neg_mean
Column: Total_sentiment


 starting q2 


Column: SALES_ORDER_NO
Column: IID
Column: ORDER_DATE
Column: SKU
Column: SALES_QTY
C

In [44]:
con.close()

In [17]:
analyzed_product.describe()

Unnamed: 0,row_id,score
count,500.0,20.0
mean,1019215.0,1.79
std,571542.8,1.135504
min,786.0,0.2
25%,552200.8,0.9
50%,1021444.0,1.0
75%,1511778.0,3.0
max,2026881.0,3.0


Test: Run a single comment through the analysis and inspect what is sent to ChatGPT

Test: Run a single comment through the full handle_sentiment_analysis pipeline

Run the single comment through the complete analysis pipeline to validate end-to-end processing.

In [None]:
# Select one comment from comments_df
single_comment_df = comments_df.head(1)
print("Single comment DataFrame:")
print(single_comment_df)

# Run handle_sentiment_analysis on the single comment
print("\n--- Running handle_sentiment_analysis on single comment ---")
single_product_df, single_customer_df = handle_sentiment_analysis(
    single_comment_df,
    client=client,
    batch_size=1,
    debug=True,
    excel_export=False,
    csv_export=False,
    id_column='row_id',
    db_path='iid_return_comment.db',
)

print("\n--- Product Analysis DataFrame ---")
print(single_product_df)

print("\n--- Customer Analysis DataFrame ---")
print(single_customer_df)

# Joining Analysis CSV with Base DuckDB Table

The script below demonstrates how to join the combined product and customer analysis CSV with the original data from the DuckDB table and save the result as a new CSV file.

In [None]:
# Join the combined analysis CSV with the original DuckDB table
import pandas as pd
import duckdb

# Define parameters
csv_path = "combined_product_customer_analysis.csv"
output_path = "sample_joined_analysis_GROSS.csv"
tname = 'iid_return_reasons'
db_path = 'iid_return_comment'
row_id = "row_id"

print("NOTE: This script will only keep rows where sentiment analysis was completed")

# Read the CSV file
print(f"Reading analysis data from {csv_path}...")
analysis_df = pd.read_csv(csv_path)
print(f"Loaded {len(analysis_df)} records from analysis CSV")

# Connect to DuckDB
print(f"Connecting to DuckDB database: {db_path}")
con = duckdb.connect(db_path)

# Read the base data from DuckDB
print(f"Reading base data from {tname}...")
base_df = con.execute(f"SELECT * FROM {tname}").fetchdf()
print(f"Loaded {len(base_df)} records from DuckDB table")

# Check data types of row_id in both DataFrames
print(f"Data type of row_id in base_df: {base_df[row_id].dtype}")
print(f"Data type of row_id in analysis_df: {analysis_df[row_id].dtype}")

# Convert row_id to the same type (string) in both DataFrames to ensure compatibility
base_df[row_id] = base_df[row_id].astype(str)
analysis_df[row_id] = analysis_df[row_id].astype(str)

print(f"After conversion - Data type of row_id in base_df: {base_df[row_id].dtype}")
print(f"After conversion - Data type of row_id in analysis_df: {analysis_df[row_id].dtype}")

# Join the DataFrames
print("Joining datasets on row_id...")
joined_df = pd.merge(
    base_df, 
    analysis_df,
    on=row_id, 
    how='inner'  # Changed from 'left' to 'inner' to keep only matching rows
)

# Remove rows where sentiment analysis data is missing
# Get all columns from analysis_df except row_id
analysis_columns = [c for c in analysis_df.columns if c != row_id]

if analysis_columns:
    # Keep only rows where at least one analysis column has a non-null value
    print("Filtering to keep only rows with completed sentiment analysis...")
    joined_df = joined_df.dropna(subset=analysis_columns, how='all')

# Count records with analysis
print(f"Result: {joined_df.shape[0]} total records with completed sentiment analysis")

# Save the result to CSV
print(f"Saving joined data to {output_path}...")
joined_df.to_csv(output_path, index=False)

print(f"Join completed successfully. Output saved to: {output_path}")

# Close the connection
con.close()