In [1]:
# ==========================================
# Cell 1: Verify Pipeline Directory Structure
#
# Imports shared config, which auto-creates all directories on import.
# This cell serves as a sanity check before running the pipeline.
# ==========================================

import os
from config import DIRS

print("Pipeline directories:")
for key, path in DIRS.items():
    print(f"  [{key:>12}] {path}/")
print(f"\nAll {len(DIRS)} directories ready.")

Pipeline directories:
  [         raw] ./01_raw_inbox/
  [     staging] ./02_staging/
  [      review] ./03_manual_review/
  [        gold] ./04_gold_standard/
  [test_lockbox] ./05_test_lockbox/
  [      models] ./06_models/
  [   workspace] ./07_daily_workspace/
  [     reports] ./08_client_reports/
  [    baseline] ./09_internal_baseline/

All 9 directories ready.


In [2]:
# ==========================================
# Cell 2: Cold-Start Ingest — GPT Labeling & Conflict Detection
#
# Reads raw data, labels each row with GPT-4o-mini, then splits:
#   - Match  (GPT == existing sentiment) → 04_gold_standard (auto-approved)
#   - Conflict (GPT != existing)         → 03_manual_review (needs human)
#   - Full baseline                      → 09_internal_baseline (internal ref)
#
# GPT API failures are counted and reported — rows that fail
# default to "neutral" but a warning is printed at the end.
# ==========================================

import os
import datetime
import uuid

import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI
from tqdm import tqdm
from config import DIRS

load_dotenv()
TODAY = datetime.datetime.now().strftime("%Y%m%d")

# -- Config --
PROJECT_NAME = "cold_start"
TARGET_FILENAME = "raw.csv"
BATCH_LABEL = "cold_start"

INPUT_PATH = f"{DIRS['raw']}/{PROJECT_NAME}/{TARGET_FILENAME}"
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

if not os.path.exists(INPUT_PATH):
    raise FileNotFoundError(
        f"Input not found: {INPUT_PATH}. "
        f"Create {DIRS['raw']}/{PROJECT_NAME}/ and add {TARGET_FILENAME}."
    )

print(f"Reading [{PROJECT_NAME}]: {TARGET_FILENAME} ...")
df = pd.read_csv(INPUT_PATH)

# Add stable row IDs and batch tag for traceability
df["global_uuid"] = [str(uuid.uuid4()) for _ in range(len(df))]
df["batch_source"] = BATCH_LABEL

# -- GPT Labeling --
print("Labeling with GPT-4o-mini ...")
tqdm.pandas()

VALID_LABELS = {"positive", "negative", "neutral"}
_gpt_error_count = 0

def get_gpt_score(text):
    """Call GPT-4o-mini for sentiment. Tracks errors instead of silently swallowing them."""
    global _gpt_error_count
    try:
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": (
                        "You are an expert sentiment analyst. "
                        "Analyze the text for subtle nuances (e.g., sarcasm, nostalgia, mixed feelings).\n\n"
                        "MANDATORY MAPPING RULES:\n"
                        "1. Sarcasm / Complaint / Regret -> 'Negative'\n"
                        "2. Praise / Nostalgia / Excitement -> 'Positive'\n"
                        "3. Facts / Questions / Mixed / Unclear -> 'Neutral'\n\n"
                        "OUTPUT FORMAT: Strictly ONE word only from: [Positive, Negative, Neutral]."
                    )
                },
                {"role": "user", "content": f"Text: {text}"}
            ],
            temperature=0,
        )
        raw_label = response.choices[0].message.content.strip().lower().replace(".", "")
        return raw_label if raw_label in VALID_LABELS else "neutral"
    except Exception as e:
        _gpt_error_count += 1
        if _gpt_error_count <= 3:
            print(f"  [GPT ERROR #{_gpt_error_count}] {type(e).__name__}: {e}")
        return "neutral"

df_run = df.copy()
df_run["gpt_label"] = df_run["text"].progress_apply(get_gpt_score)
df_run["gpt_label"] = df_run["gpt_label"].str.lower()

# Surface GPT failures clearly so the user knows the data may be unreliable
if _gpt_error_count > 0:
    print(f"\n*** WARNING: {_gpt_error_count}/{len(df_run)} GPT calls failed "
          f"(defaulted to 'neutral'). Check API key / quota. ***\n")

# -- Split by agreement: match → gold, conflict → review --
def check_conflict(row):
    original = str(row["sentiment"]).lower().strip()
    gpt = str(row["gpt_label"]).lower().strip()
    return "auto_pass" if original == gpt else "needs_review"

df_run["status"] = df_run.apply(check_conflict, axis=1)
df_pass = df_run[df_run["status"] == "auto_pass"]
df_review = df_run[df_run["status"] == "needs_review"]

# -- Write outputs --
now = datetime.datetime.now().isoformat()

# Gold: auto-approved rows (standardized schema: text, label, gold_origin, created_at)
gold_pass = df_pass[['text', 'gpt_label']].copy()
gold_pass.rename(columns={'gpt_label': 'label'}, inplace=True)
gold_pass['gold_origin'] = 'cold_start_auto'
gold_pass['created_at'] = now
gold_pass.to_csv(f"{DIRS['gold']}/{TODAY}_{BATCH_LABEL}_auto.csv", index=False)

# Review: conflict rows (full columns preserved for human inspection)
df_review.to_csv(f"{DIRS['review']}/{TODAY}_{BATCH_LABEL}_review.csv", index=False)

# Baseline: complete dataset for internal reference (not client-facing)
baseline_path = f"{DIRS['baseline']}/{TODAY}_{BATCH_LABEL}_{len(df_run)}rows.csv"
df_run.to_csv(baseline_path, index=False)

print("=" * 30)
print(f"Auto-pass  -> gold:     {len(df_pass)}")
print(f"Conflict   -> review:   {len(df_review)}")
print(f"Baseline   -> internal: {len(df_run)} rows")
if _gpt_error_count > 0:
    print(f"GPT errors (defaulted): {_gpt_error_count}")
print("=" * 30)

Reading [cold_start]: raw.csv ...
Labeling with GPT-4o-mini ...


100%|██████████| 982/982 [09:20<00:00,  1.75it/s]

Auto-pass  -> gold:     531
Conflict   -> review:   451
Baseline   -> internal: 982 rows



