# LLM Inference Pipeline

This notebook implements the LLM inference pipeline described in `pipeline_spec.md`.

Sections:
- Setup & installs
- Parse and validate spec
- Tokenizer & Model loader
- Prompt construction
- Dataset loading & sampling
- Batched inference & streaming generator
- Extraction, metrics, and plotting
- Demo runs and simple tests


In [1]:
# Setup: Install dependencies and import packages

# If running in binder or a fresh environment you may need to install packages.
# Use `%pip install` to ensure installs are available in the notebook kernel.

# Uncomment and run if you need to install packages
# %pip install -q transformers accelerate torch pandas matplotlib seaborn datasets evaluate tqdm regex sentencepiece

import os
import json
import re
from dataclasses import dataclass, asdict
from typing import List, Optional, Dict, Any, Tuple
import logging
import random
from pathlib import Path

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.auto import tqdm

# Try importing transformers and torch and give friendly guidance if missing
try:
    import torch
    from transformers import (
        AutoTokenizer,
        AutoModelForCausalLM,
        pipeline,
        logging as hf_logging,
    )
except Exception as e:
    raise RuntimeError(
        "Transformers or Torch not available in the kernel. Please run the install cell: `%pip install transformers torch`"
    ) from e

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("llm_pipeline")
hf_logging.set_verbosity_error()

# Device detection
DEVICE = (
    "cuda"
    if torch.cuda.is_available()
    else "mps" if torch.mps.is_available() else "cpu"
)
logger.info(f"Using device: {DEVICE}")

# Reproducible seeds
DEFAULT_SEED = 42
random.seed(DEFAULT_SEED)
torch.manual_seed(DEFAULT_SEED)

# Default paths / constants
PROJECT_ROOT = Path.cwd()
DATA_DIR = PROJECT_ROOT / "datasets"
PIPELINE_SPEC = PROJECT_ROOT / "pipeline_spec.md"
DEFAULT_MODEL = (
    "Qwen/Qwen2.5-0.5B-Instruct"  # per spec; we will fallback if unavailable
)

INFO:llm_pipeline:Using device: mps


In [2]:
# Load and parse pipeline_spec.md

print(f"Reading spec from: {PIPELINE_SPEC}")
try:
    with open(PIPELINE_SPEC, "r") as f:
        spec_text = f.read()
        print(spec_text)
except Exception as e:
    logger.warning(
        "Could not read pipeline_spec.md; proceeding with default inferred spec."
    )
    spec_text = ""


# Quick parser: extract main required fields
@dataclass
class PipelineSpec:
    default_model: str = DEFAULT_MODEL
    tokenizer_context_length: int = 2048
    classification_system_prompt: str = (
        "You are a news trustworthiness classifier.\n"
        "Your task is to classify news articles as either trustworthy or untrustworthy.\n"
        "Your answer should consist of exactly one token: 0 if the article is untrustworthy and 1 if it is trustworthy.\n\n"
    )
    user_template: str = "ARTICLE: {article}\n\nCLASSIFICATION (0 or 1):"
    # more fields could be added


pipeline_spec = PipelineSpec()
print("Parsed pipeline spec:", pipeline_spec)

Reading spec from: /Users/paultalma/Documents/UCLA/Work/Courses/2025-2026/cs_263/project_repo/pipeline_spec.md
# pipeline spec

- load dataset
    - place dataset into memory
- sample k from dataset
    - selects k data points from the dataset
    - k should be a variable parameter
    - should be able to select whether the sampling is random or from the first k
    - random sampling should be seeded
- make prompts
    - prompts should be structured as follows:
        - system: "You are a news trustworthiness classifier.\n"
          "Your task is to classify news articles as either trustworthy or untrustworthy.\n"
          "Your answer should consist of exactly one token: 0 if the article is untrustworthy and 1 if it is trustworthy.\n\n"
        - user: "ARTICLE: {article}\n\n"
          "CLASSIFICATION (0 or 1):"

- load model
    - this should be modular, to make it easy to switch between models
    - default model should be "Qwen/Qwen2.5-0.5B-Instruct"
- load tokenizer
    - the 

In [3]:
# Utility helpers: logger, retry/backoff, cache

from functools import wraps
import time


def retry_backoff(retries: int = 3, initial_delay: float = 0.5, factor: float = 2.0):
    def decorator(fn):
        @wraps(fn)
        def inner(*args, **kwargs):
            delay = initial_delay
            for i in range(retries):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    logger.warning(f"Call failed (attempt {i+1}/{retries}): {e}")
                    if i == retries - 1:
                        raise
                    time.sleep(delay)
                    delay *= factor

        return inner

    return decorator


class SimpleCache:
    def __init__(self):
        self._cache = {}

    def get(self, key):
        return self._cache.get(key)

    def set(self, key, value):
        self._cache[key] = value


cache = SimpleCache()


# Small structured logger wrapper
def log_info(msg: str):
    logger.info(msg)

In [14]:
# Tokenizer setup and verification


def load_tokenizer(model_name: str, max_length: Optional[int] = None):
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        if tokenizer.pad_token is None:
            # Ensure pad token exists for batching
            tokenizer.add_special_tokens({"pad_token": "<|pad|>"})
        if max_length is not None:
            tokenizer.model_max_length = max_length
        logger.info(
            f"Loaded tokenizer for: {model_name} with max_length={tokenizer.model_max_length}"
        )
        return tokenizer
    except Exception as e:
        logger.warning(f"Failed to load tokenizer for {model_name}: {e}")
        raise


# Round-trip test helper


def tokenizer_round_trip_test(tokenizer):
    txt = "This is a short test sentence."
    encoded = tokenizer(txt)
    decoded = tokenizer.decode(encoded.input_ids)
    assert isinstance(decoded, str)
    assert txt == decoded
    logger.info("Tokenizer round-trip test passed.")


# Example default tokenizer load (with safe fallback)
try:
    default_tokenizer = load_tokenizer(
        pipeline_spec.default_model, max_length=pipeline_spec.tokenizer_context_length
    )
except Exception:
    logger.info("Falling back to 'gpt2' tokenizer for demo purposes.")
    default_tokenizer = load_tokenizer("gpt2", max_length=1024)

# run simple test
tokenizer_round_trip_test(default_tokenizer)

INFO:llm_pipeline:Loaded tokenizer for: Qwen/Qwen2.5-0.5B-Instruct with max_length=2048
INFO:llm_pipeline:Tokenizer round-trip test passed.


In [5]:
# Model loader: device placement, caching, and smoke tests


@retry_backoff(retries=2)
def load_model(
    model_name: str, device: str = DEVICE, dtype: Optional[torch.dtype] = None
):
    """Load a causal LM model with graceful fallback.

    Returns: (model, tokenizer)
    """
    # Check cache
    cache_key = f"model::{model_name}::device::{device}"
    cached = cache.get(cache_key)
    if cached is not None:
        logger.info(f"Using cached model for {model_name}")
        return cached

    try:
        model = AutoModelForCausalLM.from_pretrained(model_name)
        tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        # Add pad token if missing
        if tokenizer.pad_token is None:
            tokenizer.add_special_tokens({"pad_token": "<|pad|>"})
            model.resize_token_embeddings(len(tokenizer))
        model.to(device)
        model.eval()
        cache.set(cache_key, (model, tokenizer))
        logger.info(f"Loaded model {model_name} to {device}")
        return model, tokenizer
    except Exception as e:
        logger.warning(f"Failed to load model {model_name}: {e}")
        raise


# Safe loader that falls back to a small model


def load_model_safe(
    preferred: str = pipeline_spec.default_model, fallback: str = "gpt2"
) -> Tuple[Any, Any]:
    try:
        return load_model(preferred)
    except Exception:
        logger.info(f"Falling back to {fallback}")
        return load_model(fallback)


# Try a smoke load of a small model for the demo
smoke_model_name = "gpt2"
smoke_model, smoke_tokenizer = load_model_safe(smoke_model_name)
logger.info("Smoke model loaded for demo runs.")

INFO:llm_pipeline:Loaded model gpt2 to mps
INFO:llm_pipeline:Smoke model loaded for demo runs.


In [6]:
# Prompt templates and preprocessing

SYSTEM_PROMPT = pipeline_spec.classification_system_prompt
USER_TEMPLATE = pipeline_spec.user_template


def make_prompt(
    article: str, system: str = SYSTEM_PROMPT, user_template: str = USER_TEMPLATE
) -> str:
    """Create a single-string prompt combining system and user prompts.
    Many HF models accept a single string; some support role-based chat models instead.
    """
    user_text = user_template.format(article=article)
    prompt = system + "\n" + user_text
    return prompt


# Example
print(make_prompt("This is a short example article.")[:500])

You are a news trustworthiness classifier.
Your task is to classify news articles as either trustworthy or untrustworthy.
Your answer should consist of exactly one token: 0 if the article is untrustworthy and 1 if it is trustworthy.


ARTICLE: This is a short example article.

CLASSIFICATION (0 or 1):


In [7]:
# Dataset loader, sampling, and a small demo dataset


def load_dataset(
    data_dir: Path = DATA_DIR, max_rows: Optional[int] = None
) -> pd.DataFrame:
    """Load dataset by combining `fake.csv` (label 0) and `true.csv` (label 1)."""
    fake_path = data_dir / "fake.csv"
    true_path = data_dir / "true.csv"

    if fake_path.exists() and true_path.exists():
        try:
            fake = pd.read_csv(fake_path)
            true = pd.read_csv(true_path)

            # Try to find an `article` column; fallback to first column
            def get_text_col(df):
                for c in ["article", "text", "content"]:
                    if c in df.columns:
                        return df[c].astype(str)
                return df.iloc[:, 0].astype(str)

            fake_text = get_text_col(fake)
            true_text = get_text_col(true)
            fake_df = pd.DataFrame({"article": fake_text, "label": 0})
            true_df = pd.DataFrame({"article": true_text, "label": 1})
            df = pd.concat([fake_df, true_df], ignore_index=True)
            if max_rows is not None:
                df = df.sample(n=min(len(df), max_rows), random_state=DEFAULT_SEED)
            return df
        except Exception as e:
            logger.warning(f"Failed to read CSVs: {e}")

    # Fallback synthetic dataset
    logger.info("Using synthetic demo dataset (small).")
    demo = [
        (
            "Government releases new public health guidelines that are consistent with prior research and expert consensus.",
            1,
        ),
        (
            "Aliens landed on the White House lawn last night, multiple sources confirm",
            0,
        ),
        (
            "Scientific team publishes peer-reviewed article showing new vaccine efficacy.",
            1,
        ),
        (
            "Miracle cure for diabetes discovered in backyard herb; no clinical trials yet",
            0,
        ),
    ]
    df = pd.DataFrame(demo, columns=["article", "label"])
    if max_rows:
        df = df.head(max_rows)
    return df


def sample_k(
    df: pd.DataFrame, k: int, mode: str = "random", seed: Optional[int] = None
) -> pd.DataFrame:
    if seed is None:
        seed = DEFAULT_SEED
    if mode == "random":
        return df.sample(n=min(k, len(df)), random_state=seed).reset_index(drop=True)
    elif mode == "first":
        return df.head(k).reset_index(drop=True)
    else:
        raise ValueError("mode must be 'random' or 'first'")


# Quick demo load
dataset = load_dataset(max_rows=200)
dataset.head()

Unnamed: 0,article,label
22216,"21st Century Wire says Ben Stein, reputable pr...",0
27917,WASHINGTON (Reuters) - U.S. President Donald T...,1
25007,(Reuters) - Puerto Rico Governor Ricardo Rosse...,1
1377,"On Monday, Donald Trump once again embarrassed...",0
32476,"GLASGOW, Scotland (Reuters) - Most U.S. presid...",1


In [8]:
# Batched synchronous generation and streaming demo

from typing import Iterable


def generate_batch(
    model,
    tokenizer,
    prompts: List[str],
    device: str = DEVICE,
    max_new_tokens: int = 32,
    temperature: float = 0.0,
    do_sample: bool = False,
    batch_size: int = 4,
) -> List[str]:
    outputs = []
    model_device = next(model.parameters()).device
    for i in range(0, len(prompts), batch_size):
        batch_prompts = prompts[i : i + batch_size]
        enc = tokenizer(
            batch_prompts, return_tensors="pt", padding=True, truncation=True
        )
        input_ids = enc.input_ids.to(model_device)
        attention_mask = enc.attention_mask.to(model_device)
        with torch.no_grad():
            generated_ids = model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                max_new_tokens=max_new_tokens,
                do_sample=do_sample,
                temperature=temperature,
                pad_token_id=tokenizer.pad_token_id,
            )
        # For each generated sequence, decode only the generated portion
        for ids, prompt in zip(generated_ids, batch_prompts):
            text = tokenizer.decode(ids, skip_special_tokens=True)
            # Remove the prompt prefix if present
            if prompt.strip() and text.startswith(prompt.strip()):
                text = text[len(prompt.strip()) :].strip()
            outputs.append(text)
    return outputs


def streaming_infer_simple(model, tokenizer, prompt: str, max_new_tokens: int = 20):
    """A simple (inefficient) streaming generator for demo: generates 1 token at a time by increasing max_new_tokens."""
    partial = ""
    for step in range(1, max_new_tokens + 1):
        out = generate_batch(model, tokenizer, [prompt], max_new_tokens=step)[0]
        if out == partial:
            # no new tokens
            continue
        partial = out
        yield partial


# Extraction of classification label


def extract_classification(text: str) -> Optional[int]:
    if not isinstance(text, str):
        return None
    # Try to find an explicit 0/1 token
    m = re.search(r"\b([01])\b", text)
    if m:
        return int(m.group(1))
    # Try to find words
    txt = text.lower()
    if "untrust" in txt or "fake" in txt or "not" in txt and "trust" in txt:
        return 0
    if "trust" in txt or "true" in txt or "reliable" in txt:
        return 1
    return None


# Demo of extraction
tests = ["0", "1", "The answer is 0.", "Trustworthy", "This seems fake."]
for t in tests:
    print(t, "->", extract_classification(t))

0 -> 0
1 -> 1
The answer is 0. -> 0
Trustworthy -> 1
This seems fake. -> 0


In [9]:
# Metrics and plotting

from sklearn.metrics import accuracy_score, confusion_matrix


def compute_metrics(y_true: List[int], y_pred: List[Optional[int]]) -> Dict[str, Any]:
    # Convert None to a special class or treat as incorrect
    y_pred_clean = [(p if p is not None else -1) for p in y_pred]
    # For accuracy, treat None as incorrect
    valid_mask = [p in (0, 1) for p in y_pred]
    acc = accuracy_score(y_true, [p if p in (0, 1) else -1 for p in y_pred_clean])
    cm = confusion_matrix(
        y_true, [p if p in (0, 1) else 2 for p in y_pred_clean], labels=[0, 1, 2]
    )
    return {"accuracy": acc, "confusion_matrix": cm}


def plot_metric_comparison(metrics_by_model: Dict[str, Dict[str, Any]]):
    df = pd.DataFrame(
        [
            {"model": m, "accuracy": metrics["accuracy"]}
            for m, metrics in metrics_by_model.items()
        ]
    )
    sns.barplot(data=df, x="model", y="accuracy")
    plt.ylim(0, 1)
    plt.title("Model accuracy comparison")


# Example usage with dummy preds
print(compute_metrics([1, 0, 1, 0], [1, 0, None, 0]))

{'accuracy': 0.75, 'confusion_matrix': array([[2, 0, 0],
       [0, 1, 1],
       [0, 0, 0]])}


In [10]:
# Full pipeline runner for a model name (small, non-optimized demo)


def run_pipeline_on_dataset(
    model_name: str,
    df: pd.DataFrame,
    k: int = 20,
    sample_mode: str = "random",
    seed: Optional[int] = None,
):
    model, tokenizer = load_model_safe(model_name)
    samples = sample_k(df, k=k, mode=sample_mode, seed=seed)
    prompts = [make_prompt(a) for a in samples.article.tolist()]
    outputs = generate_batch(model, tokenizer, prompts, batch_size=4)
    preds = [extract_classification(o) for o in outputs]
    metrics = compute_metrics(samples.label.tolist(), preds)
    return {"model": model_name, "metrics": metrics, "preds": preds, "samples": samples}


# Run on smoke model for a small k
demo_res = run_pipeline_on_dataset("gpt2", dataset, k=8)
print(demo_res["metrics"])

INFO:llm_pipeline:Using cached model for gpt2


{'accuracy': 0.5, 'confusion_matrix': array([[0, 2, 0],
       [0, 4, 2],
       [0, 0, 0]])}


In [11]:
# Unit tests (pytest style) - small subset

# Note: Running pytest in-notebook is possible via `!pytest -q` but here we provide small asserts for demo.


def _test_tokenizer_roundtrip():
    tok = default_tokenizer
    txt = "Round trip test"
    enc = tok(txt)
    dec = tok.decode(enc.input_ids)
    assert isinstance(dec, str)


def _test_extract_classification():
    assert extract_classification("0") == 0
    assert extract_classification("1") == 1
    assert extract_classification("This is fake news") == 0
    assert extract_classification("Reliable and trustworthy") == 1


_test_tokenizer_roundtrip()
_test_extract_classification()
print("Basic tests passed.")

Basic tests passed.
