# Imports

In [None]:
# Standard library imports
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

# Third-party imports
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import platform

# Configuration
%matplotlib inline
sns.set_style("whitegrid")
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

# Constants

In [None]:
# File paths
DATA_DIR = Path("data")
#INPUT_FILENAME = DATA_DIR / "PORTRAIT_last.csv"
#OUTPUT_FILENAME = DATA_DIR / "PORTRAIT_last_updated.csv"
INPUT_FILENAME = DATA_DIR / "phq.csv"
OUTPUT_FILENAME = DATA_DIR / "phq_updated.csv"

# Column names
RESPONDENT_ID = "respondent_id"
USER_CODE = "Código de usuario"
GENDER = (
    "¿Con qué género se identifica más usted? "
    "(Selecciona la opción que más te identifique)"
)

# Users to exclude
USERS_TO_REMOVE = {
    "1XIH2", "B1Q2C", "KY12C", "QO12D", "S1HA2",
    "XZ21K", "21WYJ", "B21DT",
}

# Duplicate users to remove (user_code: respondent_id)
DUPLICATES_TO_REMOVE = {
    "1H2GG": "118898041284",
    "IC21Y": "118919025758",
    "B2I1M": "118915917238",
    "1Y2ZF": "118877646327"
}

# Functions

In [None]:
def find_column_index(header_row: pd.Series, target_value: str) -> int:
    """Find index of target_value in header_row.

    Args:
        header_row: Pandas Series containing column headers
        target_value: String to find in header_row

    Returns:
        int: Index of the target value

    Raises:
        ValueError: If target_value is not found in header_row
    """
    try:
        return header_row.tolist().index(target_value)
    except ValueError as e:
        raise ValueError(
            f"Header value '{target_value}' not found in DataFrame header.\n"
            f"Available headers: {header_row.tolist()}"
        ) from e

def clean_user_code(df: pd.DataFrame, user_col_idx: int) -> None:
    """Clean and standardize user codes in the DataFrame.

    Args:
        df: Input DataFrame
        user_col_idx: Index of the user code column
    """
    df.iloc[3:, user_col_idx] = (
        df.iloc[3:, user_col_idx]
        .astype(str)
        .str.strip()
        .str.upper()
    )

def remove_duplicate_users(
    df: pd.DataFrame,
    user_col_idx: int,
    dupes: Dict[str, str]
) -> pd.DataFrame:
    """Remove duplicate users based on user code and respondent ID.

    Args:
        df: Input DataFrame
        user_col_idx: Index of the user code column
        dupes: Dictionary mapping user codes to respondent IDs to remove

    Returns:
        DataFrame with duplicates removed
    """
    mask = ~(
        (df.iloc[:, user_col_idx].isin(dupes.keys())) &
        (df[df.columns[0]].astype(str).isin(dupes.values()))
    )
    return df[mask].copy()

# Data loading

In [None]:
# Ensure data directory exists
DATA_DIR.mkdir(exist_ok=True)

# Read the raw data
print(f"Reading data from {INPUT_FILENAME}...")
df = pd.read_csv(INPUT_FILENAME, header=None)

# Add sequence numbers as first row
seq_df = pd.DataFrame([range(1, df.shape[1] + 1)])
df = pd.concat([seq_df, df], ignore_index=True)

# Display initial data info
print(f"Initial data shape: {df.shape}")
df.head(3)

# Data Cleaning

In [None]:
# Find column indices
try:
    user_col_idx = find_column_index(df.iloc[1], USER_CODE)
    print(f"Found columns: user_code at index {user_col_idx}")
except ValueError as e:
    print(f"Error finding required columns: {e}")
    raise

# Clean and standardize user codes
clean_user_code(df, user_col_idx)

# Remove duplicate users
df = remove_duplicate_users(df, user_col_idx, DUPLICATES_TO_REMOVE)

# Remove users in the exclusion list
df = df[~df.iloc[:, user_col_idx].isin(USERS_TO_REMOVE)]

# EXTRA CASES
# Change usrername 02E1T to O2E1T
df.iloc[3:, user_col_idx] = df.iloc[3:, user_col_idx].replace("02E1T", "O2E1T")

# Save processed data
df.to_csv(OUTPUT_FILENAME, index=False, header=False)
print(f"Processed data saved to {OUTPUT_FILENAME}")
print(f"Number of participants: {len(df) - 3}")

# Display the cleaned data
df.head()

# Data Analysis

# Read updated csv and initialize needed variables

In [None]:
# Load the updated CSV file
df = pd.read_csv(OUTPUT_FILENAME, header=None)
header_row = df.iloc[0]
subheader_row    = df.iloc[1]   # e.g. contains "Durante las últimas dos semanas…", “Código de usuario”, etc.
subsubheader_row = df.iloc[2]   # e.g. contains "Little interest…", “Ninguna de las anteriores”, etc.
user_col_index = find_column_index(subheader_row, "Código de usuario")

# Define all test configurations
TEST_CONFIG = {
    "PHQ": {
        "header": "Durante las últimas dos semanas ¿con qué frecuencia ha tenido molestias debido a los siguientes problemas?",
        "end_marker": "Si marcó cualquiera de los problemas, ¿qué tanta dificultad le han dado estos problemas para hacer su trabajo, encargarse de las tareas del hogar, o llevarse bien con otras personas?",
        "end_marker_row": "subheader",
        "end_adjustment": -1,  # Adjust end index by -1 for PHQ
        "range": (0, 3),
        "description": "Patient Health Questionnaire"
    },
    "BAI": {
        "header": "En el cuestionario hay una lista de síntomas comunes de la ansiedad. lea cada uno de los ítems atentamente, e indique cuanto le ha afectado en la última semana incluyendo hoy:",
        "end_marker": "Con sudores, frios o calientes.",
        "end_marker_row": "subsubheader",
        "range": (0, 3),
        "description": "Beck Anxiety Inventory"
    },
}

def find_test_indices(header_lookup, test_config):
    """Find start and end indices for a test section."""
    start_index = find_column_index(header_lookup, test_config["header"])

    # Determine which row to find the end marker in
    if test_config.get("end_marker_row") == "subsubheader":
        end_row = subsubheader_row
    else:
        end_row = subheader_row

    end_index = find_column_index(end_row, test_config["end_marker"])

    # Apply any adjustments to the end index
    if "end_adjustment" in test_config:
        end_index += test_config["end_adjustment"]

    return start_index, end_index

def score_survey(df: pd.DataFrame, column_start_idx: int, column_end_idx: int) -> pd.DataFrame:
    """Score the survey by adjusting each question score.

    Args:
        df: Input DataFrame containing survey data
        column_start_idx: Index of the first question column
        column_end_idx: Index of the last question column

    Returns:
        DataFrame with adjusted scores
    """
    # Ensure indices are within the range of DataFrame columns
    if column_start_idx < 0 or column_end_idx >= len(df.columns):
        raise IndexError("Column indices are out of range")

    # Adjust scores by subtracting 1 from each question column
    df.iloc[3:, column_start_idx:column_end_idx + 1] = (
        df.iloc[3:, column_start_idx:column_end_idx + 1].astype(int) - 1
    )

    return df

# Find all test indices
test_indices = {}
for test_name, config in TEST_CONFIG.items():
    try:
        start, end = find_test_indices(subheader_row, config)
        test_indices[f"{test_name.lower()}_start_index"] = start
        test_indices[f"{test_name.lower()}_end_index"] = end
        print(f"Found {config['description']} ({test_name}) at columns {start}-{end}")
        if test_name == "BAI":
            # Adjust BAI scores
            df = score_survey(df, int(start), int(end))
    except ValueError as e:
        print(f"Warning: Could not find {test_name} test: {e}")

# Add standard column indices
test_indices.update({
    "user_col_index": find_column_index(subheader_row, "Código de usuario"),
})

# Make all indices available as variables in the notebook
locals().update(test_indices)


In [None]:
def get_test_description(test_name, test_config, start_idx, end_idx):
    """Generate a detailed description of the test validation."""
    description = f"# Test {test_name}\n"
    description += f"- Description: {test_config.get('description', 'No description available')}\n"
    description += f"- Columns: {start_idx} to {end_idx}\n"
    description += f"- Valid range: {test_config.get('range', (0, 3))}\n"
    return description

def validate_test_responses(df, test_name, start_idx, end_idx, valid_range, start_row=3):
    """
    Validate that all responses for a test fall within the specified range.
    Returns: tuple of (passed, issues, detailed_report)
    """
    min_val, max_val = valid_range
    issues = []
    detailed_report = []
    passed = True

    # Add test header to detailed report
    detailed_report.append(f"# Test {test_name}")

    # Check if indices are valid
    if start_idx >= len(df.columns) or end_idx >= len(df.columns):
        error_msg = f"  - Error: Column indices out of bounds (max: {len(df.columns)-1})"
        issues.append(error_msg)
        detailed_report.append(error_msg)
        return False, issues, detailed_report

    # Add column range info
    detailed_report.append(f"- Validating columns {start_idx} to {end_idx} (range: {min_val}-{max_val})")

    # Get the range of columns to check
    cols_to_test = list(range(start_idx, end_idx + 1))
    valid_count = 0
    total_cells = 0

    for row_idx in range(start_row, len(df)):
        for col_idx in cols_to_test:
            total_cells += 1
            header_val = df.iloc[0, col_idx] if col_idx < len(df.iloc[0]) else "N/A"
            cell = df.iloc[row_idx, col_idx]

            # Check for missing values
            if pd.isna(cell):
                issue = f"  - Row {row_idx+1}, Column {col_idx+1} (header: {header_val}) is empty"
                issues.append(issue)
                detailed_report.append(issue)
                passed = False
                continue

            # Check if value is numeric and in range
            try:
                value = float(cell)
                if not (min_val <= value <= max_val):
                    issue = f"  - Row {row_idx+1}, Column {col_idx+1} (header: {header_val}): Value {value} out of range"
                    issues.append(issue)
                    detailed_report.append(issue)
                    passed = False
                else:
                    valid_count += 1
            except (ValueError, TypeError):
                issue = f"  - Row {row_idx+1}, Column {col_idx+1} (header: {header_val}): Non-numeric value: {cell}"
                issues.append(issue)
                detailed_report.append(issue)
                passed = False

    # Add summary to detailed report
    if passed:
        detailed_report.append(f"\n✓ All {valid_count} values are valid")
    else:
        detailed_report.append(f"\n✗ Found {len(issues)} issues in {total_cells} cells")

    return passed, issues, detailed_report

def validate_all_tests(df, test_configs, test_indices):
    """Validate all tests and return detailed reports."""
    results = {}

    for test_name, config in test_configs.items():
        if test_name == "ASSIST":
            continue  # Skip ASSIST as it has different validation rules

        test_key = test_name.lower()
        start_idx = test_indices.get(f"{test_key}_start_index")
        end_idx = test_indices.get(f"{test_key}_end_index")

        if start_idx is None or end_idx is None:
            print(f"Skipping {test_name}: Missing indices")
            continue

        print(f"\n=== Validating {test_name} ===")
        valid_range = config.get("range", (0, 3))

        # Get test description
        test_desc = get_test_description(test_name, config, start_idx, end_idx)
        print(test_desc)

        # Run validation
        passed, issues, detailed_report = validate_test_responses(
            df, test_name, start_idx, end_idx, valid_range
        )

        # Print detailed report
        print("\n".join(detailed_report))

        # Store results
        results[test_name] = {
            "passed": passed,
            "issues": issues,
            "num_issues": len(issues),
            "columns": f"{start_idx}-{end_idx}",
            "valid_range": valid_range,
            "report": detailed_report
        }

        status = "✓ PASSED" if passed else f"✗ FAILED ({len(issues)} issues)"
        print(f"\n{status}")

    return results

# Run the validation
print("=== Starting Validation ===\n")
validation_results = validate_all_tests(df, TEST_CONFIG, test_indices)

# Print final summary
print("\n=== Final Validation Summary ===")
print("PASSED TESTS:")
passed_tests = [name for name, result in validation_results.items() if result["passed"]]
if passed_tests:
    for name in passed_tests:
        print(f"✓ {name} (columns {validation_results[name]['columns']})")
else:
    print("No tests passed validation.")

print("\nFAILED TESTS:")
failed_tests = [name for name, result in validation_results.items() if not result["passed"]]
if failed_tests:
    for name in failed_tests:
        result = validation_results[name]
        print(f"✗ {name} (columns {result['columns']}): {result['num_issues']} issues")
else:
    print("All tests passed validation!")

# TOTAL RESULTS

In [None]:
def calculate_test_scores(row, start_idx, end_idx, test_name):
    """Helper function to calculate test scores with error handling."""
    try:
        scores = row[start_idx:end_idx + 1].astype(float)
        return int(scores.sum())
    except Exception as e:
        raise ValueError(f"Error processing {test_name} responses for user '{row[user_col_index]}': {e}")

def process_stai_scores(row, start_idx, end_idx, items_to_reverse, sex_col_index):
    """Process STAI scores with reverse scoring and classification."""
    try:
        scores = row[start_idx:end_idx + 1].astype(float).copy()

        # Apply reverse scoring
        for item in items_to_reverse:
            col_idx = start_idx + item
            if not pd.isna(row[col_idx]):
                scores.iloc[item] = 3 - int(scores.iloc[item])

        total = int(scores.sum())
        sex = row[sex_col_index]

        # Classify based on gender
        if sex == "3":
            classification = "Binary"
        elif (sex == "2" and total >= 29) or (sex == "1" and total >= 37):
            classification = "Severe"
        else:
            classification = "Low/Normal"

        return total, classification

    except Exception as e:
        raise ValueError(f"Error processing STAI responses for user '{row[user_col_index]}': {e}")

def process_bfi_scores(row, start_idx, subscales, reverse_items):
    """Process BFI scores with reverse scoring."""
    try:
        bfi_scores = {}
        for subscale, cols in subscales.items():
            scores = []
            for col in cols:
                try:
                    value = float(row[col])
                    # Reverse score if needed
                    if col - start_idx + 1 in reverse_items.get(subscale, []):
                        value = 6 - value
                    scores.append(value)
                except ValueError:
                    raise ValueError(f"Invalid BFI value in column {col}: {row[col]}")
            bfi_scores[subscale] = round(sum(scores) / len(scores), 2)
        return bfi_scores
    except Exception as e:
        raise ValueError(f"Error processing BFI responses: {e}")

def classify_phq(score):
    """Classify PHQ score into depression severity levels."""
    if score < 5: return "Low"
    if score < 10: return "Mild depression"
    if score < 15: return "Moderate depression"
    if score < 20: return "Moderately severe depression"
    return "Severe depression"

def classify_bai(score):
    """Classify BAI score into anxiety levels."""
    if score <= 21: return "Low anxiety"
    if score <= 35: return "Moderate anxiety"
    return "Potentially concerning levels of anxiety"

print("START")
# Process each user's responses
for idx in range(3, len(df)):
    row = df.iloc[idx]
    username = row[user_col_index]
    try:
        # Calculate test scores
        phq_total = calculate_test_scores(row, phq_start_index, phq_end_index, "PHQ")
        bai_total = calculate_test_scores(row, bai_start_index, bai_end_index, "BAI")

        # Classify scores
        classifications = {
            "phq": (phq_total, classify_phq(phq_total)),
            "bai": (bai_total, classify_bai(bai_total)),
        }

        # Update dataframe with results
        for prefix, (total, classification) in classifications.items():
            df.at[idx, f"{prefix}_total"] = total
            df.at[idx, f"{prefix}_classification"] = classification

        # Print report
        report = (
            f"{username}: PHQ Total = {phq_total} ({classifications['phq'][1]}), "
            f"BAI Total = {bai_total} ({classifications['bai'][1]}), "
        )
        print(report)

    except ValueError as e:
        print(f"Error processing {username}: {str(e)}")
        continue

In [None]:
responses_df = df[3:]

# Define output files
output_files = {
    'excel': 'results__prescreen.xlsx',
    'csv': 'results_prescreen.csv'
}

# Get responses and create results DataFrame
responses_df = df[3:]
results_cols = {
    'username': responses_df.iloc[:, user_col_index],
    **{col: responses_df[col] for col in [
        'phq_total', 'phq_classification',
        'bai_total', 'bai_classification',
    ]}
}
# Add any additional assist columns here if needed (example):
# "assist_column": responses_df["assist_column_name"]

results_df = pd.DataFrame(results_cols)

results_df.to_excel(output_files['excel'], index=False)
results_df.to_csv(output_files['csv'], index=False)

print(f"Results exported to {output_files['excel']} and {output_files['csv']}")
