
# 03 - Perspective Scoring

Attach Perspective API scores to imitation outputs and normalize them for downstream analysis notebooks.



**Goals**
- Load the imitation bundle and reuse any existing Perspective runs.
- Provide a resumable scorer against Perspective API with defensive retries plus dry-run mode.
- Flatten scores into analysis-friendly columns for later topic/safety notebooks.


In [None]:
from pathlib import Path
import os
import pickle
import time
from typing import Callable, Dict, List, Optional, Sequence

import pandas as pd
from googleapiclient import discovery
from googleapiclient.errors import HttpError

from utils.data_io import load_df_list_pickle, flatten_conversation_bundles, describe_bundle

In [None]:
# Paths and run toggles
PROJECT_ROOT = Path.cwd()
ASSETS_PROCESSED = PROJECT_ROOT / "assets" / "processed"

IMITATION_PATH = ASSETS_PROCESSED / "combat_threads_with_imitation.pkl"
RESUME_PERSPECTIVE = ASSETS_PROCESSED / "combat_threads_with_perspective.pkl"
OUTPUT_PATH = ASSETS_PROCESSED / "combat_threads_with_perspective.pkl"
FLAT_SCORES_PATH = ASSETS_PROCESSED / "combat_threads_with_perspective_scores.parquet"

PERSPECTIVE_API_KEY = os.getenv("PERSPECTIVE_API_KEY")
DRY_RUN = True  # set to False to hit Perspective; requires PERSPECTIVE_API_KEY
BATCH_LIMIT = 2  # smoke-test size; set to None for full run
RATE_LIMIT_SECONDS = 1.0

PERSPECTIVE_DISCOVERY_URL = "https://commentanalyzer.googleapis.com/$discovery/rest?version=v1alpha1"
PERSPECTIVE_ATTRIBUTES = [
    "AFFINITY_EXPERIMENTAL",
    "COMPASSION_EXPERIMENTAL",
    "CURIOSITY_EXPERIMENTAL",
    "IDENTITY_ATTACK",
    "IDENTITY_ATTACK_EXPERIMENTAL",
    "INSULT",
    "INSULT_EXPERIMENTAL",
    "NUANCE_EXPERIMENTAL",
    "PERSONAL_STORY_EXPERIMENTAL",
    "PROFANITY",
    "PROFANITY_EXPERIMENTAL",
    "REASONING_EXPERIMENTAL",
    "RESPECT_EXPERIMENTAL",
    "SEVERE_TOXICITY",
    "SEVERE_TOXICITY_EXPERIMENTAL",
    "SEXUALLY_EXPLICIT",
    "THREAT",
    "THREAT_EXPERIMENTAL",
    "TOXICITY",
    "TOXICITY_EXPERIMENTAL",
]

ASSETS_PROCESSED, OUTPUT_PATH


### Asset manifest
List which assets are needed, the optional resume file, and where derived artifacts land.


In [None]:
manifest = [
    {
        "role": "input",
        "path": IMITATION_PATH,
        "note": "Imitation output with imm_1 + imm_1_check (processed bundle).",
    },
    {
        "role": "resume_optional",
        "path": RESUME_PERSPECTIVE,
        "note": "Existing Perspective run to reuse or compare.",
    },
    {
        "role": "output",
        "path": OUTPUT_PATH,
        "note": "Bundle with Perspective attributeScores attached.",
    },
    {
        "role": "artifact_optional",
        "path": FLAT_SCORES_PATH,
        "note": "Optional flattened scores for quick analysis joins.",
    },
]
manifest_df = pd.DataFrame(manifest)
manifest_df["exists"] = manifest_df["path"].apply(lambda p: Path(p).exists())
manifest_df


### Inspect source bundle
Confirm column layout and reuse any previous Perspective run when present.


In [None]:
imitation_bundle = load_df_list_pickle(IMITATION_PATH)
existing_perspective = load_df_list_pickle(RESUME_PERSPECTIVE) if RESUME_PERSPECTIVE.exists() else None

print("imitation bundle:", describe_bundle(imitation_bundle))
if existing_perspective is not None:
    print("existing perspective:", describe_bundle(existing_perspective))

preview = flatten_conversation_bundles(imitation_bundle[:1])
display(preview.head())
if existing_perspective is not None:
    existing_preview = flatten_conversation_bundles(existing_perspective[:1])
    display(existing_preview.head())


### Perspective client and scoring helpers
Keeps retries/backoff in one place and defaults to a dry-run stub unless you provide `PERSPECTIVE_API_KEY` and flip `DRY_RUN` to `False`.


In [None]:

def build_perspective_client(api_key: Optional[str]):
    if not api_key:
        raise RuntimeError("Set PERSPECTIVE_API_KEY before running live requests.")
    return discovery.build(
        "commentanalyzer",
        "v1alpha1",
        developerKey=api_key,
        discoveryServiceUrl=PERSPECTIVE_DISCOVERY_URL,
        static_discovery=False,
    )


def dummy_score(text: str, attributes: Sequence[str] = PERSPECTIVE_ATTRIBUTES) -> Dict[str, dict]:
    span = len(text)
    return {
        attr: {
            "summaryScore": {"value": 0.0, "type": "PROBABILITY"},
            "spanScores": [
                {"begin": 0, "end": span, "score": {"value": 0.0, "type": "PROBABILITY"}}
            ],
        }
        for attr in attributes
    }


def score_with_backoff(
    text: str,
    client,
    attributes: Sequence[str] = PERSPECTIVE_ATTRIBUTES,
    max_retries: int = 5,
    pause: float = RATE_LIMIT_SECONDS,
    dry_run: bool = DRY_RUN,
) -> Optional[Dict[str, dict]]:
    if not text:
        return None
    if dry_run:
        return dummy_score(text, attributes=attributes)

    request_body = {
        "comment": {"text": text},
        "requestedAttributes": {attr: {} for attr in attributes},
    }
    last_error: Optional[HttpError] = None
    for attempt in range(max_retries):
        try:
            response = client.comments().analyze(body=request_body).execute()
            return response.get("attributeScores")
        except HttpError as exc:  # noqa: PERF203 (want explicit HttpError handling)
            last_error = exc
            if exc.resp.status == 429 and attempt < max_retries - 1:
                time.sleep(pause * (2 ** attempt))
                continue
            raise
    print(f"Failed to score after retries: {last_error}")
    return None


def make_score_fn(dry_run: bool = DRY_RUN, api_key: Optional[str] = PERSPECTIVE_API_KEY):
    client = None if dry_run else build_perspective_client(api_key)

    def _score(text: str) -> Optional[Dict[str, dict]]:
        return score_with_backoff(text, client=client, dry_run=dry_run)

    return _score



### Attach scores to the bundle
Reuse prior Perspective results when available and keep the list-of-DataFrames shape intact.


In [None]:
def merge_existing_perspective(frame: pd.DataFrame, existing: Optional[pd.DataFrame]) -> pd.DataFrame:
    merged = frame.copy()
    if existing is not None and "perspective" in existing:
        merged = merged.join(existing[["perspective"]], how="left", rsuffix="_existing")
        if "perspective_existing" in merged.columns:
            merged["perspective"] = merged["perspective_existing"].combine_first(merged["perspective"])
            merged = merged.drop(columns=["perspective_existing"])
    return merged


def attach_perspective(
    frame: pd.DataFrame,
    score_fn: Callable[[str], Optional[dict]],
    existing: Optional[pd.DataFrame] = None,
) -> pd.DataFrame:
    merged = merge_existing_perspective(frame, existing)
    merged["perspective"] = merged.apply(
        lambda row: row["perspective"]
        if isinstance(row.get("perspective"), dict)
        else score_fn(str(row["text"])),
        axis=1,
    )
    return merged


def add_perspective_to_bundle(
    bundle: Sequence,
    score_fn: Callable[[str], Optional[dict]],
    existing_bundle: Optional[Sequence] = None,
    limit: Optional[int] = None,
):
    limit = len(bundle) if limit is None else min(limit, len(bundle))
    scored: List = []
    for idx in range(limit):
        convo = bundle[idx]
        prev_convo = existing_bundle[idx] if existing_bundle is not None and idx < len(existing_bundle) else None
        if isinstance(convo, pd.DataFrame):
            prev_frame = prev_convo if isinstance(prev_convo, pd.DataFrame) else None
            scored.append(attach_perspective(convo, score_fn, prev_frame))
        else:
            prev_frames = prev_convo if isinstance(prev_convo, (list, tuple)) else []
            frames = []
            for j, frame in enumerate(convo):
                prev_frame = (
                    prev_frames[j]
                    if j < len(prev_frames) and isinstance(prev_frames[j], pd.DataFrame)
                    else None
                )
                frames.append(attach_perspective(frame, score_fn, prev_frame))
            scored.append(frames)
    return scored


### Smoke test on a small slice
Keeps runtime short; flip `DRY_RUN = False` and `BATCH_LIMIT = None` for a full API-backed run.


In [None]:
score_fn = make_score_fn(dry_run=DRY_RUN, api_key=PERSPECTIVE_API_KEY)
scored_subset = add_perspective_to_bundle(
    imitation_bundle,
    score_fn=score_fn,
    existing_bundle=existing_perspective,
    limit=BATCH_LIMIT,
)
subset_flat = flatten_conversation_bundles(scored_subset)
subset_flat.head()


### Full run (long; writes `OUTPUT_PATH`)
Uncomment to score all conversations. Respects `DRY_RUN` and will overwrite `OUTPUT_PATH` with the newest run.


In [None]:
# full_score_fn = make_score_fn(dry_run=DRY_RUN, api_key=PERSPECTIVE_API_KEY)
# scored_bundle = add_perspective_to_bundle(
#     imitation_bundle,
#     score_fn=full_score_fn,
#     existing_bundle=existing_perspective,
# )
# with OUTPUT_PATH.open("wb") as fp:
#     pickle.dump(scored_bundle, fp)
# OUTPUT_PATH


### Sanity checks and flat score view
Inspect the saved bundle (or the subset if you have not run a full job) and derive per-attribute score columns for quick joins.


In [None]:
bundle_for_checks = None
if OUTPUT_PATH.exists():
    bundle_for_checks = load_df_list_pickle(OUTPUT_PATH)
elif existing_perspective is not None:
    bundle_for_checks = existing_perspective
else:
    bundle_for_checks = scored_subset

checks_flat = flatten_conversation_bundles(bundle_for_checks)
print("rows", len(checks_flat))
print(checks_flat["perspective"].apply(lambda x: type(x).__name__).value_counts().head())


def extract_summary_scores(entry: Optional[dict], attributes: Sequence[str] = PERSPECTIVE_ATTRIBUTES) -> Dict[str, Optional[float]]:
    scores: Dict[str, Optional[float]] = {}
    for attr in attributes:
        value = None
        if isinstance(entry, dict):
            value = entry.get(attr, {}).get("summaryScore", {}).get("value")
        scores[f"persp_{attr.lower()}"] = value
    span_end = None
    if isinstance(entry, dict):
        spans = entry.get(attributes[0], {}).get("spanScores", [])
        if spans:
            span_end = spans[0].get("end")
    scores["persp_span_end"] = span_end
    return scores

score_frame = pd.DataFrame(checks_flat["perspective"].apply(extract_summary_scores).tolist())
flat_with_scores = pd.concat([checks_flat.drop(columns=["perspective"]), score_frame], axis=1)
flat_with_scores.head()

# Optionally persist the flattened scores for downstream notebooks
# flat_with_scores.to_parquet(FLAT_SCORES_PATH, index=False)
# FLAT_SCORES_PATH