# Judge Labeling - LLM-as-Judge for Alignment Scores

This notebook implements the **Judge** component of the VIF pipeline. It takes journal entries (from synthetic personas or real users) and labels them with per-dimension alignment scores using an LLM.

## Overview

The Judge assigns a **3-point categorical alignment score** for each of the 10 Schwartz value dimensions:
- **-1 (Misaligned):** Entry actively conflicts with this value
- **0 (Neutral):** Entry is irrelevant to the value or maintains status quo  
- **+1 (Aligned):** Entry actively supports this value

These labels serve as ground-truth training targets for the Critic MLP.


In [24]:
import asyncio
import json
import os
import yaml
import polars as pl
from pathlib import Path
from dotenv import load_dotenv
from jinja2 import Template
from openai import AsyncOpenAI
from pydantic import BaseModel, Field
from typing import Literal

# Load environment variables
load_dotenv()

# Check for API Key
if not os.getenv("OPENAI_API_KEY"):
    print("WARNING: OPENAI_API_KEY not found in environment variables.")


In [25]:
# Configuration Loading
CONFIG_PATH = Path("config/synthetic_data.yaml")
if not CONFIG_PATH.exists():
    CONFIG_PATH = Path("../config/synthetic_data.yaml")

SCHWARTZ_VALUES_PATH = Path("config/schwartz_values.yaml")
if not SCHWARTZ_VALUES_PATH.exists():
    SCHWARTZ_VALUES_PATH = Path("../config/schwartz_values.yaml")


def load_config(path: str | Path) -> dict:
    with open(path, "r") as f:
        return yaml.safe_load(f)


config = load_config(CONFIG_PATH)
schwartz_config = load_config(SCHWARTZ_VALUES_PATH)

# Define Schwartz value order (must match across all components)
SCHWARTZ_VALUE_ORDER = [
    "Self-Direction",
    "Stimulation", 
    "Hedonism",
    "Achievement",
    "Power",
    "Security",
    "Conformity",
    "Tradition",
    "Benevolence",
    "Universalism"
]

print("Configs loaded successfully.")
print(f"Schwartz Values ({len(SCHWARTZ_VALUE_ORDER)}): {SCHWARTZ_VALUE_ORDER}")


Configs loaded successfully.
Schwartz Values (10): ['Self-Direction', 'Stimulation', 'Hedonism', 'Achievement', 'Power', 'Security', 'Conformity', 'Tradition', 'Benevolence', 'Universalism']


## Data Models

Defining structured outputs for the Judge's alignment labels.


In [26]:
class AlignmentLabel(BaseModel):
    """Per-dimension alignment scores from the Judge."""
    
    # Each value dimension gets a score in {-1, 0, +1}
    self_direction: int = Field(ge=-1, le=1, description="Self-Direction alignment: -1=misaligned, 0=neutral, +1=aligned")
    stimulation: int = Field(ge=-1, le=1, description="Stimulation alignment: -1=misaligned, 0=neutral, +1=aligned")
    hedonism: int = Field(ge=-1, le=1, description="Hedonism alignment: -1=misaligned, 0=neutral, +1=aligned")
    achievement: int = Field(ge=-1, le=1, description="Achievement alignment: -1=misaligned, 0=neutral, +1=aligned")
    power: int = Field(ge=-1, le=1, description="Power alignment: -1=misaligned, 0=neutral, +1=aligned")
    security: int = Field(ge=-1, le=1, description="Security alignment: -1=misaligned, 0=neutral, +1=aligned")
    conformity: int = Field(ge=-1, le=1, description="Conformity alignment: -1=misaligned, 0=neutral, +1=aligned")
    tradition: int = Field(ge=-1, le=1, description="Tradition alignment: -1=misaligned, 0=neutral, +1=aligned")
    benevolence: int = Field(ge=-1, le=1, description="Benevolence alignment: -1=misaligned, 0=neutral, +1=aligned")
    universalism: int = Field(ge=-1, le=1, description="Universalism alignment: -1=misaligned, 0=neutral, +1=aligned")
    
    def to_vector(self) -> list[int]:
        """Convert to ordered vector matching SCHWARTZ_VALUE_ORDER."""
        return [
            self.self_direction,
            self.stimulation,
            self.hedonism,
            self.achievement,
            self.power,
            self.security,
            self.conformity,
            self.tradition,
            self.benevolence,
            self.universalism
        ]


# JSON schema for strict mode
ALIGNMENT_LABEL_SCHEMA = {
    "type": "object",
    "additionalProperties": False,
    "properties": {
        "self_direction": {"type": "integer", "minimum": -1, "maximum": 1},
        "stimulation": {"type": "integer", "minimum": -1, "maximum": 1},
        "hedonism": {"type": "integer", "minimum": -1, "maximum": 1},
        "achievement": {"type": "integer", "minimum": -1, "maximum": 1},
        "power": {"type": "integer", "minimum": -1, "maximum": 1},
        "security": {"type": "integer", "minimum": -1, "maximum": 1},
        "conformity": {"type": "integer", "minimum": -1, "maximum": 1},
        "tradition": {"type": "integer", "minimum": -1, "maximum": 1},
        "benevolence": {"type": "integer", "minimum": -1, "maximum": 1},
        "universalism": {"type": "integer", "minimum": -1, "maximum": 1},
    },
    "required": [
        "self_direction", "stimulation", "hedonism", "achievement", "power",
        "security", "conformity", "tradition", "benevolence", "universalism"
    ],
}

ALIGNMENT_LABEL_RESPONSE_FORMAT = {
    "type": "json_schema",
    "name": "AlignmentLabel",
    "schema": ALIGNMENT_LABEL_SCHEMA,
    "strict": True,
}


In [27]:
def build_value_rubric_context(schwartz_config: dict) -> str:
    """Build a concise rubric for each Schwartz value to guide the Judge."""
    context_parts = []
    
    for value_name in SCHWARTZ_VALUE_ORDER:
        if value_name not in schwartz_config["values"]:
            continue
            
        v = schwartz_config["values"][value_name]
        
        context_parts.append(f"""
### {value_name}
**Core Motivation:** {v["core_motivation"].strip()}

**Key Behaviors (Aligned):**
{chr(10).join(f"- {b}" for b in v["behavioral_manifestations"][:3])}

**Key Behaviors (Misaligned):**
- Acting against the core motivation
- Neglecting or undermining this value
- Making choices that conflict with this value's principles
""")
    
    return "\n".join(context_parts)


# Test the function
test_rubric = build_value_rubric_context(schwartz_config)
print("Sample rubric context (first 500 chars):")
print(test_rubric[:500] + "..." if len(test_rubric) > 500 else test_rubric)


Sample rubric context (first 500 chars):

### Self-Direction
**Core Motivation:** The fundamental drive to think for oneself, make one's own choices, and resist external control. Self-Direction-oriented individuals feel most alive when they are authoring their own path, even if that path is harder or less conventional.

**Key Behaviors (Aligned):**
- Resists being told what to do; bristles at micromanagement or rigid hierarchies
- Seeks out problems that require novel solutions rather than following established procedures
- Makes caree...


In [28]:
judge_prompt_template = Template("""
You are a Judge evaluating a journal entry's alignment with Schwartz's Theory of Basic Human Values.

## Your Task

Evaluate how the journal entry aligns with each of the 10 Schwartz value dimensions. For each dimension, assign one of three scores:

- **-1 (Misaligned):** The entry actively conflicts with this value. The person's behavior, choices, or expressed attitudes undermine or go against this value's core motivation.
- **0 (Neutral):** The entry is irrelevant to this value, or the person's behavior maintains the status quo without clearly supporting or undermining it.
- **+1 (Aligned):** The entry actively supports this value. The person's behavior, choices, or expressed attitudes demonstrate or advance this value's core motivation.

## Important Guidelines

1. **Consider the entry holistically:** A single entry may impact multiple values simultaneously. For example, working late might align with Achievement but misalign with Hedonism (rest/pleasure).

2. **Look for concrete behaviors:** Base scores on what the person actually did, said, or chose, not just abstract statements about values.

3. **Profile context matters:** Consider the persona's stated core values ({{ core_values }}) when evaluating alignment, but don't let it bias youâ€”evaluate what the entry actually shows.

4. **Be strict with -1 and +1:** Reserve these for clear cases. Use 0 when the entry doesn't clearly relate to a value or maintains neutral status.

5. **Trade-offs are normal:** It's common for an entry to align with some values while misaligning with others. This is expected and realistic.

## Schwartz Value Rubrics

{{ value_rubric }}

## Journal Entry to Evaluate

**Persona Context:**
- Name: {{ persona_name }}
- Age: {{ persona_age }}
- Profession: {{ persona_profession }}
- Culture: {{ persona_culture }}
- Core Values (from profile): {{ core_values }}
- Bio: {{ persona_bio }}

**Journal Entry:**
Date: {{ entry_date }}
Content: {{ entry_content }}

## Output

Return a JSON object with integer scores (-1, 0, or +1) for each value dimension:
{
  "self_direction": <integer>,
  "stimulation": <integer>,
  "hedonism": <integer>,
  "achievement": <integer>,
  "power": <integer>,
  "security": <integer>,
  "conformity": <integer>,
  "tradition": <integer>,
  "benevolence": <integer>,
  "universalism": <integer>
}
""")


## LLM Client Setup

Using OpenAI API with GPT models. You can switch to other models as needed.


In [29]:
client = AsyncOpenAI()
MODEL_NAME = "gpt-4o-mini"  # Can switch to gpt-4o, gpt-4-turbo, etc.

# Type alias for reasoning effort levels (for GPT-5 models)
ReasoningEffort = Literal["minimal", "low", "medium", "high"]

# Default reasoning effort - only used for GPT-5 models
DEFAULT_REASONING_EFFORT: ReasoningEffort = "high"


async def generate_completion(
    prompt: str,
    response_format: dict | None = None,
) -> str | None:
    """Generate a completion using the OpenAI API (async).
    
    Supports both standard Chat API and Responses API (GPT-5).
    """
    try:
        # Check if using GPT-5 (Responses API)
        if MODEL_NAME.startswith("gpt-5"):
            kwargs = {
                "model": MODEL_NAME,
                "input": [{"role": "user", "content": prompt}],
                "reasoning": {"effort": DEFAULT_REASONING_EFFORT},
            }
            if response_format:
                kwargs["text"] = {"format": response_format}
            response = await client.responses.create(**kwargs)
            return response.output_text
        else:
            # Standard Chat API
            kwargs = {
                "model": MODEL_NAME,
                "messages": [{"role": "user", "content": prompt}],
            }
            if response_format:
                # Transform Responses API format to Chat API format
                # Chat API expects json_schema nested under response_format
                chat_format = {
                    "type": response_format.get("type", "json_schema"),
                    "json_schema": {
                        "name": response_format.get("name", "Response"),
                        "schema": response_format.get("schema", {}),
                        "strict": response_format.get("strict", False),
                    }
                }
                kwargs["response_format"] = chat_format
            response = await client.chat.completions.create(**kwargs)
            return response.choices[0].message.content
            
    except Exception as e:
        print(f"Error generating completion: {e}")
        return None


In [30]:
async def judge_entry(
    entry_text: str,
    entry_date: str,
    persona_name: str,
    persona_age: str,
    persona_profession: str,
    persona_culture: str,
    persona_core_values: list[str],
    persona_bio: str,
    schwartz_config: dict,
    max_attempts: int = 2,
) -> tuple[AlignmentLabel | None, str]:
    """Judge a single journal entry and return alignment scores.
    
    Args:
        entry_text: The journal entry content
        entry_date: Date of the entry
        persona_name: Name of the persona
        persona_age: Age of the persona
        persona_profession: Profession of the persona
        persona_culture: Cultural background
        persona_core_values: List of Schwartz values from persona profile
        persona_bio: Persona biography
        schwartz_config: Schwartz values configuration
        max_attempts: Number of retry attempts for validation
        
    Returns:
        Tuple of (AlignmentLabel or None, prompt used)
    """
    value_rubric = build_value_rubric_context(schwartz_config)
    
    prompt = judge_prompt_template.render(
        entry_content=entry_text,
        entry_date=entry_date,
        persona_name=persona_name,
        persona_age=persona_age,
        persona_profession=persona_profession,
        persona_culture=persona_culture,
        core_values=", ".join(persona_core_values),
        persona_bio=persona_bio,
        value_rubric=value_rubric,
    )
    
    last_label: AlignmentLabel | None = None
    
    for attempt in range(max_attempts):
        raw_json = await generate_completion(
            prompt, response_format=ALIGNMENT_LABEL_RESPONSE_FORMAT
        )
        
        if not raw_json:
            continue
            
        try:
            data = json.loads(raw_json)
            label = AlignmentLabel(**data)
            last_label = label
            return label, prompt
        except Exception as e:
            print(f"Attempt {attempt + 1} failed to parse: {e}")
            continue
    
    return last_label, prompt


## Data Loading

Load personas and journal entries. This assumes you have data from `journal_gen.ipynb` or saved elsewhere.

You can either:
1. Load from saved JSON/Parquet files
2. Import directly from the generator notebook
3. Load from a database

For now, we'll show a simple example structure.


In [31]:
# Example data structure - replace with your actual data loading
# This matches the output structure from journal_gen.ipynb

from dataclasses import dataclass
from typing import Optional

@dataclass
class PersonaData:
    """Container for persona information."""
    persona_id: int
    name: str
    age: str
    profession: str
    culture: str
    core_values: list[str]
    bio: str

@dataclass
class EntryData:
    """Container for journal entry information."""
    persona_id: int
    t_index: int
    date: str
    content: str

@dataclass
class JudgeResult:
    """Container for Judge labeling result."""
    persona_id: int
    t_index: int
    alignment_label: AlignmentLabel
    prompt: str
    error: Optional[str] = None


# Example: Load data from a JSON file or database
# For now, we'll create a simple example structure
# Replace this with your actual data loading logic

def load_personas_and_entries(data_path: str | Path) -> tuple[list[PersonaData], list[EntryData]]:
    """Load personas and entries from a data file.
    
    Replace this with your actual data loading implementation.
    """
    # TODO: Implement actual data loading
    # Example structure:
    # {
    #   "personas": [
    #     {
    #       "persona_id": 1,
    #       "name": "Yuna Park",
    #       "age": "31",
    #       "profession": "Parent (Stay-at-home)",
    #       "culture": "East Asian",
    #       "core_values": ["Benevolence", "Universalism"],
    #       "bio": "..."
    #     }
    #   ],
    #   "entries": [
    #     {
    #       "persona_id": 1,
    #       "t_index": 0,
    #       "date": "2023-10-27",
    #       "content": "..."
    #     }
    #   ]
    # }
    return [], []


print("Data loading functions defined. Implement load_personas_and_entries() with your data source.")


Data loading functions defined. Implement load_personas_and_entries() with your data source.


In [32]:
async def judge_entry_batch(
    personas: list[PersonaData],
    entries: list[EntryData],
    schwartz_config: dict,
) -> list[JudgeResult]:
    """Judge multiple entries in parallel.
    
    Args:
        personas: List of persona data
        entries: List of entry data
        schwartz_config: Schwartz values configuration
        
    Returns:
        List of JudgeResult objects
    """
    # Create persona lookup
    persona_dict = {p.persona_id: p for p in personas}
    
    # Create tasks for parallel execution
    tasks = []
    for entry in entries:
        persona = persona_dict.get(entry.persona_id)
        if not persona:
            continue
            
        task = judge_entry(
            entry_text=entry.content,
            entry_date=entry.date,
            persona_name=persona.name,
            persona_age=persona.age,
            persona_profession=persona.profession,
            persona_culture=persona.culture,
            persona_core_values=persona.core_values,
            persona_bio=persona.bio,
            schwartz_config=schwartz_config,
        )
        tasks.append((entry.persona_id, entry.t_index, task))
    
    # Execute all tasks in parallel
    results = await asyncio.gather(*[t[2] for t in tasks], return_exceptions=True)
    
    # Package results
    judge_results = []
    for (persona_id, t_index, _), result in zip(tasks, results):
        if isinstance(result, Exception):
            judge_results.append(
                JudgeResult(
                    persona_id=persona_id,
                    t_index=t_index,
                    alignment_label=None,  # type: ignore
                    prompt="",
                    error=str(result),
                )
            )
        else:
            label, prompt = result
            judge_results.append(
                JudgeResult(
                    persona_id=persona_id,
                    t_index=t_index,
                    alignment_label=label,
                    prompt=prompt,
                )
            )
    
    return judge_results


## Execution Example

This cell shows how to run the Judge on your data. Replace the example data with your actual personas and entries.


In [33]:
# Example: Create sample data for testing
# Replace this with your actual data loading

example_personas = [
    PersonaData(
        persona_id=1,
        name="Yuna Park",
        age="31",
        profession="Parent (Stay-at-home)",
        culture="East Asian",
        core_values=["Benevolence", "Universalism"],
        bio="Yuna left her preschool job after her daughter was born and now spends her days managing naps and pediatrician appointments."
    )
]

example_entries = [
    EntryData(
        persona_id=1,
        t_index=0,
        date="2023-10-27",
        content="The food pantry shift ran late again. I'm exhausted but it felt good to help. My partner is frustrated I keep saying yes to these commitments."
    )
]

# Run the Judge
print(f"Judging {len(example_entries)} entries using model: {MODEL_NAME}")
print(f"Personas: {len(example_personas)}")

results = await judge_entry_batch(
    personas=example_personas,
    entries=example_entries,
    schwartz_config=schwartz_config,
)

# Display results
for result in results:
    print(f"\n{'='*80}")
    print(f"Persona {result.persona_id}, Entry {result.t_index}")
    print(f"{'='*80}")
    
    if result.error:
        print(f"ERROR: {result.error}")
    elif result.alignment_label:
        print("Alignment Scores:")
        label = result.alignment_label
        for i, value_name in enumerate(SCHWARTZ_VALUE_ORDER):
            score = label.to_vector()[i]
            score_str = {1: "+1 (Aligned)", 0: " 0 (Neutral)", -1: "-1 (Misaligned)"}[score]
            print(f"  {value_name:20s}: {score_str}")
        
        print(f"\nVector representation: {label.to_vector()}")
    else:
        print("No label generated")


Judging 1 entries using model: gpt-4o-mini
Personas: 1

Persona 1, Entry 0
Alignment Scores:
  Self-Direction      :  0 (Neutral)
  Stimulation         :  0 (Neutral)
  Hedonism            : -1 (Misaligned)
  Achievement         :  0 (Neutral)
  Power               :  0 (Neutral)
  Security            :  0 (Neutral)
  Conformity          : -1 (Misaligned)
  Tradition           :  0 (Neutral)
  Benevolence         : +1 (Aligned)
  Universalism        : +1 (Aligned)

Vector representation: [0, 0, -1, 0, 0, 0, -1, 0, 1, 1]


## Save Results

Save the Judge labels to a format suitable for training the Critic (e.g., Parquet, JSON, or database).


In [34]:
def save_judge_labels(results: list[JudgeResult], output_path: str | Path) -> None:
    """Save Judge labels to a Parquet file for downstream processing.
    
    This creates a table matching the JudgeLabel schema from VIF_05.
    """
    rows = []
    for result in results:
        if result.error or not result.alignment_label:
            continue
            
        alignment_vector = result.alignment_label.to_vector()
        
        row = {
            "persona_id": result.persona_id,
            "t_index": result.t_index,
            "alignment_vector": alignment_vector,  # List of 10 integers
        }
        
        # Also include individual scores for easier inspection
        for i, value_name in enumerate(SCHWARTZ_VALUE_ORDER):
            row[f"alignment_{value_name.lower().replace('-', '_')}"] = alignment_vector[i]
        
        rows.append(row)
    
    if rows:
        df = pl.DataFrame(rows)
        df.write_parquet(output_path)
        print(f"Saved {len(rows)} Judge labels to {output_path}")
        print(f"\nSchema:")
        print(df.schema)
        print(f"\nSample rows:")
        print(df.head())
    else:
        print("No valid labels to save")


# Example: Save results
# output_path = Path("data/judge_labels.parquet")
# save_judge_labels(results, output_path)
