<a href="https://colab.research.google.com/github/laurencoetzee001/Beads_Co-detect/blob/main/expert_edgecase_codebook__integrated.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
# Expert-Validated Bead Exchange Coding System
# Based on proven working approach with enhanced codebook integration
# For Google Colab - December 2025

# =============================================================================
# SETUP AND INSTALLATION
# =============================================================================

import subprocess
import sys

# Install dependencies if needed
def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

for pkg in ["anthropic", "openpyxl", "xlrd", "pandas"]:
    try:
        __import__(pkg if pkg != "openpyxl" else "openpyxl")
    except ImportError:
        install(pkg)

import pandas as pd
from anthropic import Anthropic
import json
import datetime
import os
import time
from google.colab import drive, files

# Mount Google Drive
drive.mount('/content/drive')

# =============================================================================
# CONFIGURATION
# =============================================================================

timestamp_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
GDRIVE_DIR = f"/content/drive/MyDrive/bead_annotation/session_{timestamp_str}"
os.makedirs(GDRIVE_DIR, exist_ok=True)

# Settings
MODEL_NAME = "claude-3-5-sonnet-20241022"  # Using the more capable model
SAVE_EVERY = 25  # Save progress every 25 rows
LOG_FILE = "annotation_log.txt"
MAX_RETRIES = 3

print("🔬 EXPERT-VALIDATED BEAD EXCHANGE CODING SYSTEM")
print("=" * 60)
print(f"📁 Session directory: {GDRIVE_DIR}")

# =============================================================================
# EXPERT-VALIDATED CODEBOOK (BUILT-IN)
# =============================================================================

EXPERT_CODEBOOK = """
# Expert-Validated Bead Exchange Codebook v4.0

## CRITICAL INSTRUCTIONS FOR AI
**PRIORITY**: Follow ALL decision rules strictly. When uncertain, be CONSERVATIVE and require explicit evidence.
**KEY PRINCIPLE**: Code only what is explicitly stated. Do NOT infer, assume, or interpret beyond what is written.

## MANDATORY DECISION FLOWCHART
1. **Read Entry Assessment** (read_entry)
2. **Exchange Occurrence Check** (4a_exchange) - CRITICAL DECISION POINT
3. **If exchange="no"**: Set exchange variables to "NA" BUT capture contextual details
4. **If exchange="xo"**: Code all exchange variables based on explicit evidence
5. **Function coding**: Can be done regardless of exchange status

## CRITICAL EXCHANGE DETERMINATION (4a_exchange)
**EXPERT-ENHANCED CRITERIA FOR "xo":**
1. **Explicit transaction verbs**: "traded", "sold", "bought", "exchanged", "gave for", "received for", "purchased", "offered presents"
2. **Clear parties**: Identifiable giver AND receiver
3. **Completed action**: Past tense indicating transaction happened
4. **Specific items**: What was given AND what was received
5. **INTANGIBLE GOODS INCLUDED**: Knowledge, secrets, services count

**❌ NEVER code "xo" for:**
- **Historical generalizations**: "when trading first commenced, natives sold ivory for beads"
- **Observational descriptions**: "showed me his beads", "displayed the beads"
- **Wearing/using descriptions**: "wore beads", "adorned with beads"
- **Price lists without transactions**: "beads cost £2"

## EXPERT EDGE CASE DECISIONS:
- **Historical patterns** → 4a_exchange="no" but capture context in notes
- **Gift-giving** → 4a_exchange="xo", 2_nature_of_exchange="4" (social)
- **Intangible goods** → Valid exchanges (secrets, knowledge, services)
- **Observational contexts** → Function coding only, no exchange

## ALL VARIABLES TO CODE:
1. read_entry [0/1/2/NaN] - 0=not about beads, 1=about beads, 2=typos, NaN=corrupted
2. 4a_exchange [xo/no/NA/NaN] - xo=exchange occurred, no=no exchange
3. 2_nature_of_exchange [1/2/3/4/NA] - 1=consensual, 2=conflictual, 3=competitive, 4=social
4. 3_between_groups [1/2/3/4/NA] - 1=local/traveler, 2=inter-ethnic, 3=intra-ethnic, 4=travelers
5. 4b_beads_exchanged [description or NA]
6. 4c_exchanged_item [description or NA]
7. 1a_physical_function [2/NA] - 2=aesthetic function
8. 1b_trade_function [2/NA] - 2=currency function
9. 1c_social_function [3/NA] - 3=ceremonial function
10. 6_bead_ethnic_group [groups or NA]
11. 7_market_town [0/1/NA] - 0=no, 1=yes
12. 8_location_name [location or NA]
13. 9_place_of_manufacture [origin or NA]
14. 10a_size, 10b_color, 10c_shape, 10d_type [characteristics or NA]
15. 11_units_of_measurement [1/2/3/4/NA] - 1=string, 2=plaited, 3=jewelry, 4=other
16. 12_local_name [name or NA]
17. 13_notes [context or NA]

**CONSISTENCY RULES:**
- If 4a_exchange="no" → 2_nature_of_exchange and 3_between_groups MUST="NA"
- If 4a_exchange="xo" → 4b_beads_exchanged should have content
- Use "NA" when variable doesn't apply, "NaN" only for corrupted text
"""

# =============================================================================
# FILE LOADING FUNCTIONS
# =============================================================================

def load_excel_file():
    """Load Excel file with text data"""
    print("📊 Upload your Excel file with bead exchange data")
    print("Expected: Column 'text_page_gp' with text to be coded")

    uploaded = files.upload()

    if not uploaded:
        print("❌ No file uploaded")
        return None

    filename = list(uploaded.keys())[0]

    try:
        # Try different loading methods
        if filename.endswith('.xlsx'):
            df = pd.read_excel(filename, engine="openpyxl")
        elif filename.endswith('.xls'):
            df = pd.read_excel(filename, engine="xlrd")
        else:
            df = pd.read_csv(filename)

        print(f"✅ Successfully loaded: {filename}")
        print(f"📊 Shape: {df.shape[0]:,} rows × {df.shape[1]} columns")

        # Check for text column
        if 'text_page_gp' not in df.columns:
            print(f"❌ Column 'text_page_gp' not found")
            print(f"📋 Available columns: {list(df.columns)}")

            # Auto-detect text column
            text_cols = [col for col in df.columns if 'text' in col.lower()]
            if text_cols:
                suggested_col = text_cols[0]
                use_col = input(f"Use '{suggested_col}' as text column? (y/n): ")
                if use_col.lower() == 'y':
                    df = df.rename(columns={suggested_col: 'text_page_gp'})
                    print(f"✅ Using '{suggested_col}' as text column")
                else:
                    return None
            else:
                return None

        # Basic stats
        valid_texts = df['text_page_gp'].notna().sum()
        print(f"📝 Valid text entries: {valid_texts:,}")

        # Show sample
        if valid_texts > 0:
            sample_text = df['text_page_gp'].dropna().iloc[0]
            preview = str(sample_text)[:150]
            print(f"📋 Sample text: {preview}{'...' if len(str(sample_text)) > 150 else ''}")

        return df

    except Exception as e:
        print(f"❌ Error loading file: {e}")
        return None

# =============================================================================
# API AND CODING FUNCTIONS
# =============================================================================

def setup_anthropic_client():
    """Setup Anthropic API client"""
    print("🔑 Setting up Anthropic API client...")

    try:
        from google.colab import userdata
        api_key = userdata.get('ANTHROPIC_API_KEY')
        print("✅ API key loaded from Colab secrets")
    except Exception as e:
        print("❌ API key not found in secrets")
        print("💡 Add 'ANTHROPIC_API_KEY' to Colab secrets or enter manually")
        api_key = input("Enter your Anthropic API key: ").strip()

    if not api_key:
        raise ValueError("No API key provided")

    client = Anthropic(api_key=api_key)

    # Test connection
    try:
        test_response = client.messages.create(
            model=MODEL_NAME,
            max_tokens=10,
            messages=[{"role": "user", "content": "Test"}]
        )
        print("✅ API connection successful!")
        return client
    except Exception as e:
        print(f"❌ API connection failed: {e}")
        raise

def construct_expert_prompt(entry_text):
    """Construct prompt with proven high-agreement codebook"""
    return f"""You are an expert historian using a PROVEN codebook that achieved 55% human-AI agreement. You must respond with ONLY valid JSON - no explanatory text.

TEXT TO ANALYZE:
"{entry_text}"

CRITICAL RULES:
1. Conservative approach: Code "xo" ONLY with explicit verbs + parties + completed action + specific items
2. Historical generalizations → "no" but capture context
3. Gift-giving → "xo" with nature="4"
4. Intangible goods (knowledge, secrets) → valid exchanges
5. Use "NA" when variables don't apply
6. Include ALL 23 variables

RESPOND WITH PURE JSON ONLY (no text before or after):
{{
  "read_entry": 1,
  "1a_physical_function": "NA",
  "1b_trade_function": "NA",
  "1c_social_function": "NA",
  "4a_exchange": "no",
  "2_nature_of_exchange": "NA",
  "3_between_groups": "NA",
  "4b_beads_exchanged": "NA",
  "4c_exchanged_item": "NA",
  "5a_related_raw_materials": "NA",
  "5b_related_jewelry_fashion": "NA",
  "5c_related_consumables": "NA",
  "5d_related_decoratives": "NA",
  "6_bead_ethnic_group": "NA",
  "7_market_town": 0,
  "8_location_name": "NA",
  "9_place_of_manufacture": "NA",
  "10a_size": "NA",
  "10b_color": "NA",
  "10c_shape": "NA",
  "10d_type": "NA",
  "11_units_of_measurement": "NA",
  "12_local_name": "NA",
  "13_notes": "NA"
}}

IMPORTANT: Replace values but NO explanatory text. Start response with {{ and end with }}."""

def validate_response(parsed_json):
    """Validate that all required variables are present"""
    required_variables = [
        # Basic assessment
        'read_entry',

        # Functions
        '1a_physical_function', '1b_trade_function', '1c_social_function',

        # Exchange analysis
        '4a_exchange', '2_nature_of_exchange', '3_between_groups',
        '4b_beads_exchanged', '4c_exchanged_item',

        # Related items
        '5a_related_raw_materials', '5b_related_jewelry_fashion',
        '5c_related_consumables', '5d_related_decoratives',

        # Context and location
        '6_bead_ethnic_group', '7_market_town', '8_location_name', '9_place_of_manufacture',

        # Bead characteristics
        '10a_size', '10b_color', '10c_shape', '10d_type',

        # Measurement and names
        '11_units_of_measurement', '12_local_name',

        # Additional context
        '13_notes'
    ]

    missing_vars = [var for var in required_variables if var not in parsed_json]

    if missing_vars:
        # Add missing variables as "NaN"
        for var in missing_vars:
            parsed_json[var] = "NaN"
        return parsed_json, missing_vars

    return parsed_json, []

def process_single_entry(client, entry_text, row_num, log_file):
    """Process a single text entry with retries"""

    for attempt in range(MAX_RETRIES):
        try:
            prompt = construct_expert_prompt(entry_text)

            response = client.messages.create(
                model=MODEL_NAME,
                max_tokens=1500,
                messages=[{"role": "user", "content": prompt}]
            )

            response_text = response.content[0].text.strip()

            # Debug output for first few entries
            if row_num < 3:
                print(f"🔍 DEBUG Row {row_num} response: {response_text[:300]}...")

            # Parse JSON
            try:
                parsed_json = json.loads(response_text)

                # Validate all variables are present
                parsed_json, missing_vars = validate_response(parsed_json)

                if missing_vars:
                    print(f"Row {row_num}: ⚠️ Added missing vars: {missing_vars}")
                    log_file.write(f"Row {row_num}: Added missing variables: {missing_vars}\n")

                # Add metadata
                parsed_json['_meta'] = {
                    'row_num': row_num,
                    'input_tokens': response.usage.input_tokens,
                    'output_tokens': response.usage.output_tokens,
                    'attempt': attempt + 1,
                    'timestamp': datetime.datetime.now().isoformat(),
                    'missing_vars_added': len(missing_vars)
                }

                status = "✅ SUCCESS" if not missing_vars else f"✅ SUCCESS (+{len(missing_vars)} vars)"
                print(f"Row {row_num}: {status} (attempt {attempt + 1})")
                return parsed_json, response.usage.input_tokens, response.usage.output_tokens

            except json.JSONDecodeError as e:
                print(f"Row {row_num}: ❌ JSON parse error (attempt {attempt + 1}): {str(e)[:50]}")
                log_file.write(f"Row {row_num} attempt {attempt + 1}: JSON error - {e}\n")

                if attempt < MAX_RETRIES - 1:
                    time.sleep(2)
                else:
                    # Return raw response on final failure
                    return {
                        'error': 'JSON parse failed',
                        'raw_response': response_text,
                        'row_num': row_num
                    }, response.usage.input_tokens, response.usage.output_tokens

        except Exception as e:
            print(f"Row {row_num}: ❌ API error (attempt {attempt + 1}): {str(e)[:50]}")
            log_file.write(f"Row {row_num} attempt {attempt + 1}: API error - {e}\n")

            if attempt < MAX_RETRIES - 1:
                time.sleep(5)

    # Complete failure
    return {
        'error': 'All attempts failed',
        'row_num': row_num
    }, 0, 0

# =============================================================================
# MAIN PROCESSING FUNCTION
# =============================================================================

def run_expert_bead_coding():
    """Main function to run expert-validated bead coding"""

    # Step 1: Load Excel file
    print("\n📊 Step 1: Load Excel Data")
    df = load_excel_file()
    if df is None:
        print("❌ Failed to load data")
        return

    # Step 2: Setup API client
    print("\n🔑 Step 2: Setup Anthropic API")
    client = setup_anthropic_client()

    # Step 3: Determine processing range
    print(f"\n🎯 Step 3: Configure Processing")
    total_rows = len(df)
    process_all = input(f"Process all {total_rows:,} rows? (y/n): ")

    if process_all.lower() != 'y':
        try:
            start_row = int(input("Start row (0-based index): ") or 0)
            end_row = int(input(f"End row (max {total_rows-1}): ") or min(2000, total_rows))
            df_subset = df.iloc[start_row:end_row+1].copy()
        except ValueError:
            print("❌ Invalid row numbers")
            return
    else:
        # Default to first 2000 for safety
        max_process = min(2000, total_rows)
        confirm = input(f"Process first {max_process:,} rows? (y/n): ")
        if confirm.lower() != 'y':
            return
        df_subset = df.iloc[:max_process].copy()

    rows_to_process = len(df_subset)

    # Cost estimation
    avg_tokens_per_text = 800
    estimated_cost = (rows_to_process * avg_tokens_per_text * 0.003 / 1000) + (rows_to_process * 400 * 0.015 / 1000)

    print(f"\n💰 Cost Estimation:")
    print(f"   Texts to process: {rows_to_process:,}")
    print(f"   Estimated cost: ${estimated_cost:.2f}")
    print(f"   Estimated time: {rows_to_process * 3 / 60:.1f} minutes")

    proceed = input("Proceed with processing? (y/n): ")
    if proceed.lower() != 'y':
        print("❌ Processing cancelled")
        return

    # Step 4: Check for existing progress
    print(f"\n🔄 Step 4: Process with Auto-Save")

    responses = []
    total_input_tokens = 0
    total_output_tokens = 0
    start_index = 0

    # Check for resume
    try:
        index_path = os.path.join(GDRIVE_DIR, 'last_index.txt')
        if os.path.exists(index_path):
            with open(index_path, 'r') as idx_file:
                start_index = int(idx_file.read().strip())
            resume = input(f"🔄 Previous session detected at row {start_index}. Resume? (y/n): ").strip().lower()
            if resume != 'y':
                print("🔁 Starting fresh from row 0")
                start_index = 0
    except:
        pass

    # Process entries
    log_path = os.path.join(GDRIVE_DIR, LOG_FILE)

    with open(log_path, "a", encoding="utf-8") as log:
        log.write(f"\n=== Expert coding session started at {datetime.datetime.now().isoformat()} ===\n")

        start_time = time.time()

        for i, row in df_subset.iloc[start_index:].iterrows():
            entry_text = str(row.get("text_page_gp", ""))

            if not entry_text.strip():
                responses.append(None)
                print(f"Row {i}: ⚠️ EMPTY TEXT")
            else:
                result, input_tokens, output_tokens = process_single_entry(client, entry_text, i, log)
                responses.append(result)
                total_input_tokens += input_tokens
                total_output_tokens += output_tokens

            # Auto-save every SAVE_EVERY rows
            if i % SAVE_EVERY == 0 and i > 0:
                # Save intermediate results
                df_partial = df_subset.iloc[:i+1].copy()
                df_partial["LLM_output"] = responses + [None] * (len(df_partial) - len(responses))
                df_partial.to_excel(os.path.join(GDRIVE_DIR, "intermediate_output.xlsx"), index=False)

                # Save progress index
                with open(os.path.join(GDRIVE_DIR, 'last_index.txt'), 'w') as idx_file:
                    idx_file.write(str(i))

                # Save cost summary
                cost_so_far = total_input_tokens * 0.003 / 1000 + total_output_tokens * 0.015 / 1000
                with open(os.path.join(GDRIVE_DIR, 'cost_log.txt'), 'a') as cost_log:
                    cost_log.write(f"Row {i}: ${cost_so_far:.4f} ({total_input_tokens} in, {total_output_tokens} out)\n")

                elapsed_time = time.time() - start_time
                rate = (i - start_index + 1) / elapsed_time * 60  # per minute
                remaining = (rows_to_process - i - 1) / rate if rate > 0 else 0

                print(f"💾 Progress saved at row {i} | Rate: {rate:.1f}/min | ETA: {remaining:.1f}min | Cost: ${cost_so_far:.2f}")

        log.write(f"=== Coding completed at {datetime.datetime.now().isoformat()} ===\n")

    # Step 5: Final processing and save
    print(f"\n💾 Step 5: Final Save and Analysis")

    # Create final dataframe
    df_final = df_subset.copy()

    # Expand JSON responses into columns
    try:
        # Filter out None responses for normalization
        valid_responses = [r for r in responses if r is not None]
        if valid_responses:
            output_df = pd.json_normalize(valid_responses)

            # Ensure we have the right number of rows
            if len(output_df) != len(responses):
                # Handle mixed valid/None responses
                expanded_data = []
                for response in responses:
                    if response is not None:
                        expanded_data.append(response)
                    else:
                        expanded_data.append({})  # Empty dict for None responses
                output_df = pd.json_normalize(expanded_data)

            # Merge with original data
            final_df = pd.concat([df_final.reset_index(drop=True), output_df.reset_index(drop=True)], axis=1)
        else:
            final_df = df_final
            print("⚠️ No valid responses to expand")
    except Exception as e:
        print(f"⚠️ Error expanding JSON responses: {e}")
        df_final["LLM_output"] = responses
        final_df = df_final

    # Save final results
    final_path = os.path.join(GDRIVE_DIR, "coded_entries_final.xlsx")
    final_df.to_excel(final_path, index=False)

    # Save summary
    total_cost = total_input_tokens * 0.003 / 1000 + total_output_tokens * 0.015 / 1000
    successful_codes = len([r for r in responses if r is not None and 'error' not in str(r)])

    print(f"\n✅ CODING COMPLETE!")
    print(f"📊 Total processed: {len(responses):,}")
    print(f"✅ Successful codings: {successful_codes:,}")
    print(f"📈 Success rate: {successful_codes/len(responses)*100:.1f}%")
    print(f"💰 Total cost: ${total_cost:.2f}")
    print(f"📁 Results saved to: {final_path}")

    # Reset progress
    try:
        os.remove(os.path.join(GDRIVE_DIR, 'last_index.txt'))
    except:
        pass

    return final_df

# =============================================================================
# INITIALIZATION AND EXECUTION
# =============================================================================

print("✅ EXPERT-VALIDATED BEAD CODING SYSTEM INITIALIZED!")
print(f"🎯 Features:")
print(f"   ✅ Expert edge case decisions built-in")
print(f"   ✅ Proven Excel loading and JSON parsing")
print(f"   ✅ Auto-save every {SAVE_EVERY} entries")
print(f"   ✅ Resume functionality")
print(f"   ✅ Cost tracking")
print(f"   ✅ Comprehensive error handling")

print(f"\n🚀 QUICK START:")
print(f"   run_expert_bead_coding()")

print(f"\n📋 REQUIREMENTS:")
print(f"   1. Add 'ANTHROPIC_API_KEY' to Colab secrets")
print(f"   2. Excel file with 'text_page_gp' column")
print(f"   3. Sufficient API credits")

# Auto-execute if user wants
auto_run = input("\nRun coding process now? (y/n): ").strip().lower()
if auto_run == 'y':
    run_expert_bead_coding()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
🔬 EXPERT-VALIDATED BEAD EXCHANGE CODING SYSTEM
📁 Session directory: /content/drive/MyDrive/bead_annotation/session_2025-08-12_14-35-02
✅ EXPERT-VALIDATED BEAD CODING SYSTEM INITIALIZED!
🎯 Features:
   ✅ Expert edge case decisions built-in
   ✅ Proven Excel loading and JSON parsing
   ✅ Auto-save every 25 entries
   ✅ Resume functionality
   ✅ Cost tracking
   ✅ Comprehensive error handling

🚀 QUICK START:
   run_expert_bead_coding()

📋 REQUIREMENTS:
   1. Add 'ANTHROPIC_API_KEY' to Colab secrets
   2. Excel file with 'text_page_gp' column
   3. Sufficient API credits

📊 Step 1: Load Excel Data
📊 Upload your Excel file with bead exchange data
Expected: Column 'text_page_gp' with text to be coded


Saving Munashe_Cleaned.xlsx to Munashe_Cleaned (2).xlsx
✅ Successfully loaded: Munashe_Cleaned (2).xlsx
📊 Shape: 1,453 rows × 23 columns
📝 Valid text entries: 1,453
📋 Sample text: SOKU 177 | ance of a clown. There were commonly two stiff plaits hanging down, one in front of each ear, and from the extremities of these strings of ...

🔑 Step 2: Setup Anthropic API
🔑 Setting up Anthropic API client...
❌ API key not found in secrets
💡 Add 'ANTHROPIC_API_KEY' to Colab secrets or enter manually
✅ API connection successful!

🎯 Step 3: Configure Processing

💰 Cost Estimation:
   Texts to process: 1,453
   Estimated cost: $12.21
   Estimated time: 72.7 minutes

🔄 Step 4: Process with Auto-Save
🔍 DEBUG Row 0 response: {
  "read_entry": 1,
  "1a_physical_function": "jewelry",
  "1b_trade_function": "marketplace_goods",
  "1c_social_function": "personal_adornment",
  "4a_exchange": "no",
  "2_nature_of_exchange": "NA",
  "3_between_groups": "NA",
  "4b_beads_exchanged": "no",
  "4c_exchanged_item"