# Thinking XGBoost: Multi-Stage Reasoning for Fraud Detection

This notebook demonstrates how to make small models "think" by applying LLM-inspired reasoning patterns to XGBoost.

## Key Concepts
- **Multi-step logic**: Decompose fraud detection into specialized reasoning heads
- **Self-correction**: A critic model that triggers re-evaluation when uncertain
- **Explainability**: Full reasoning trace for every prediction

## Architecture
```
Input Features
     │
     ▼
┌─────────────────┐
│  Stage 1: XGB   │ → Per-dimension risk scores (amount, velocity, location...)
│  Reasoning Heads│
└─────────────────┘
     │
     ▼
┌─────────────────┐
│  Stage 2: Hybrid│ → Blended prediction (weighted avg + XGBoost)
│  Aggregator     │
└─────────────────┘
     │
     ▼
┌─────────────────┐
│  Stage 3: XGB   │ → "Should I reconsider?" (error detection)
│  Critic         │
└─────────────────┘
     │
     ▼
┌─────────────────┐
│  Stage 4: XGB   │ → Refined prediction (only if critic flags)
│  Refiner        │
└─────────────────┘
     │
     ▼
Final Decision + Reasoning Trace
```

---
## 1. Setup

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, cross_val_predict
from sklearn.metrics import classification_report, roc_auc_score, f1_score
import xgboost as xgb
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional
import warnings
warnings.filterwarnings('ignore')

# Local imports
from data_generator import get_feature_groups, get_all_features

# Constants
RANDOM_STATE = 42
TEST_SIZE = 0.2

print(f"XGBoost version: {xgb.__version__}")

---
## 2. Load Dataset

In [None]:
# Load data
df = pd.read_csv('fraud_dataset.csv')
FEATURE_GROUPS = get_feature_groups()
ALL_FEATURES = get_all_features()

print(f"Dataset shape: {df.shape}")
print(f"Fraud rate: {df['is_fraud'].mean():.2%}")
print(f"\nFeature groups: {list(FEATURE_GROUPS.keys())}")
df.head()

In [None]:
# Train/test split
X = df[ALL_FEATURES]
y = df['is_fraud']

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y
)

SCALE_WEIGHT = (y_train == 0).sum() / (y_train == 1).sum()

print(f"Train: {len(X_train):,} samples ({y_train.mean():.2%} fraud)")
print(f"Test:  {len(X_test):,} samples ({y_test.mean():.2%} fraud)")
print(f"Scale weight: {SCALE_WEIGHT:.1f}")

---
## 3. Baseline Model

In [None]:
# Standard XGBoost baseline
baseline = xgb.XGBClassifier(
    n_estimators=100, max_depth=6, learning_rate=0.1,
    scale_pos_weight=SCALE_WEIGHT, random_state=RANDOM_STATE, eval_metric='auc'
)
baseline.fit(X_train, y_train)
baseline_preds = baseline.predict_proba(X_test)[:, 1]

print("=== BASELINE: Standard XGBoost ===")
print(f"ROC-AUC: {roc_auc_score(y_test, baseline_preds):.4f}")
print("\nClassification Report:")
print(classification_report(y_test, (baseline_preds > 0.5).astype(int), target_names=['Legit', 'Fraud']))

---
## 4. Stage 1: Reasoning Heads

Train specialized models for each fraud dimension. Each head focuses on one aspect:
- **Amount head**: Transaction amount patterns
- **Velocity head**: Transaction frequency
- **Merchant head**: Merchant risk factors
- **Location head**: Geographic signals
- **Device head**: Device/channel risk
- **Time head**: Temporal patterns

In [None]:
@dataclass
class ReasoningHead:
    """A specialized model for one fraud dimension."""
    name: str
    features: List[str]
    model: Optional[xgb.XGBClassifier] = None
    weight: float = 1.0
    
    def train(self, X: pd.DataFrame, y: pd.Series, scale_weight: float):
        self.model = xgb.XGBClassifier(
            n_estimators=60, max_depth=5, learning_rate=0.1,
            scale_pos_weight=scale_weight, random_state=RANDOM_STATE, eval_metric='auc'
        )
        self.model.fit(X[self.features], y)
        # Learn weight from training performance
        self.weight = roc_auc_score(y, self.predict_proba(X))
        return self
    
    def predict_proba(self, X: pd.DataFrame) -> np.ndarray:
        return self.model.predict_proba(X[self.features])[:, 1]
    
    def get_reasoning(self, X: pd.DataFrame) -> pd.DataFrame:
        p = self.predict_proba(X)
        return pd.DataFrame({
            f'head_{self.name}_score': p,
            f'head_{self.name}_signal': (p > 0.5).astype(int)
        })

In [None]:
# Train reasoning heads
reasoning_heads = {}
print("Training Stage 1: Reasoning Heads")
print("-" * 50)

for name, features in FEATURE_GROUPS.items():
    head = ReasoningHead(name, features)
    head.train(X_train, y_train, SCALE_WEIGHT)
    reasoning_heads[name] = head
    
    test_auc = roc_auc_score(y_test, head.predict_proba(X_test))
    print(f"  {name:12s}: AUC={test_auc:.4f}, weight={head.weight:.3f}")

# Normalize weights
total_weight = sum(h.weight for h in reasoning_heads.values())
for h in reasoning_heads.values():
    h.weight /= total_weight

print("\nNormalized weights:")
for name, head in reasoning_heads.items():
    print(f"  {name:12s}: {head.weight:.3f}")

In [None]:
# Helper function to get all head outputs
def get_stage1_outputs(X: pd.DataFrame, heads: Dict[str, ReasoningHead]) -> pd.DataFrame:
    return pd.concat([h.get_reasoning(X) for h in heads.values()], axis=1)

# Generate stage 1 outputs
stage1_train = get_stage1_outputs(X_train, reasoning_heads)
stage1_test = get_stage1_outputs(X_test, reasoning_heads)

print(f"Stage 1 output columns: {list(stage1_train.columns)}")
stage1_test.head()

---
## 5. Stage 2: Hybrid Aggregator

Combines head outputs using a **hybrid approach**:
- 60% weighted average (interpretable, faithful)
- 40% XGBoost with interactions (powerful)

In [None]:
BLEND_RATIO = 0.6  # 60% weighted avg, 40% XGBoost

def weighted_average(s1: pd.DataFrame, heads: Dict[str, ReasoningHead]) -> np.ndarray:
    """Direct weighted average of head scores."""
    result = np.zeros(len(s1))
    for name, head in heads.items():
        result += head.weight * s1[f'head_{name}_score'].values
    return result

def build_aggregator_features(s1: pd.DataFrame) -> pd.DataFrame:
    """Build XGBoost aggregator input with pairwise interactions."""
    X_agg = s1.copy()
    score_cols = [c for c in s1.columns if '_score' in c]
    for i, h1 in enumerate(score_cols):
        for h2 in score_cols[i+1:]:
            X_agg[f'{h1}_{h2}_int'] = s1[h1] * s1[h2]
    return X_agg

In [None]:
# Compute weighted average predictions
wav_train = weighted_average(stage1_train, reasoning_heads)
wav_test = weighted_average(stage1_test, reasoning_heads)

# Build features for XGBoost aggregator
X_agg_train = build_aggregator_features(stage1_train)
X_agg_test = build_aggregator_features(stage1_test)

# Train XGBoost aggregator
xgb_aggregator = xgb.XGBClassifier(
    n_estimators=30, max_depth=4, learning_rate=0.1,
    scale_pos_weight=SCALE_WEIGHT, random_state=RANDOM_STATE, eval_metric='auc'
)
xgb_aggregator.fit(X_agg_train, y_train.reset_index(drop=True))

xgb_train = xgb_aggregator.predict_proba(X_agg_train)[:, 1]
xgb_test = xgb_aggregator.predict_proba(X_agg_test)[:, 1]

# Blend predictions
agg_train = BLEND_RATIO * wav_train + (1 - BLEND_RATIO) * xgb_train
agg_test = BLEND_RATIO * wav_test + (1 - BLEND_RATIO) * xgb_test

print("=== Stage 2: Hybrid Aggregator ===")
print(f"Weighted Avg AUC:    {roc_auc_score(y_test, wav_test):.4f}")
print(f"XGBoost AUC:         {roc_auc_score(y_test, xgb_test):.4f}")
print(f"Blended AUC:         {roc_auc_score(y_test, agg_test):.4f}")

---
## 6. Stage 3: Critic (Self-Correction Gate)

The critic learns to predict **when the aggregator will be wrong**.

In [None]:
# Get cross-validation errors (more realistic than train errors)
cv_xgb = cross_val_predict(
    xgb.XGBClassifier(n_estimators=30, max_depth=4, scale_pos_weight=SCALE_WEIGHT, 
                      random_state=RANDOM_STATE, eval_metric='auc'),
    X_agg_train, y_train.reset_index(drop=True), cv=5, method='predict_proba'
)[:, 1]

cv_blend = BLEND_RATIO * wav_train + (1 - BLEND_RATIO) * cv_xgb
aggregator_errors = ((cv_blend > 0.5).astype(int) != y_train.reset_index(drop=True).values).astype(int)

print(f"Aggregator CV errors: {aggregator_errors.sum()} ({aggregator_errors.mean():.2%})")

In [None]:
def build_critic_features(s1: pd.DataFrame, wav: np.ndarray, 
                          xgb_pred: np.ndarray, blend: np.ndarray,
                          heads: Dict[str, ReasoningHead]) -> pd.DataFrame:
    """Build critic input features."""
    d = pd.DataFrame()
    d['blend_pred'] = blend
    d['wav_pred'] = wav
    d['xgb_pred'] = xgb_pred
    d['wav_xgb_diff'] = np.abs(wav - xgb_pred)  # Disagreement between methods
    d['blend_conf'] = np.abs(blend - 0.5) * 2
    
    for name in heads.keys():
        score = s1[f'head_{name}_score'].values
        d[f'{name}_score'] = score
        d[f'{name}_vs_blend'] = np.abs(score - blend)
    
    score_cols = [c for c in s1.columns if '_score' in c]
    d['head_std'] = s1[score_cols].std(axis=1).values
    d['head_range'] = s1[score_cols].max(axis=1).values - s1[score_cols].min(axis=1).values
    
    return d

In [None]:
# Build critic features
critic_train_feats = build_critic_features(stage1_train, wav_train, xgb_train, agg_train, reasoning_heads)
critic_test_feats = build_critic_features(stage1_test, wav_test, xgb_test, agg_test, reasoning_heads)

# Train critic
critic = xgb.XGBClassifier(
    n_estimators=100, max_depth=5, learning_rate=0.05,
    random_state=RANDOM_STATE, eval_metric='auc'
)
critic.fit(critic_train_feats, aggregator_errors)

# Find optimal threshold
critic_train_scores = critic.predict_proba(critic_train_feats)[:, 1]
best_threshold, best_f1 = 0.3, 0
for t in np.arange(0.1, 0.7, 0.02):
    flags = (critic_train_scores > t).astype(int)
    if flags.sum() > 0:
        f1 = f1_score(aggregator_errors, flags, zero_division=0)
        if f1 > best_f1:
            best_f1, best_threshold = f1, t

CRITIC_THRESHOLD = best_threshold
critic_test_scores = critic.predict_proba(critic_test_feats)[:, 1]

print("=== Stage 3: Critic ===")
print(f"Critic F1 (error detection): {best_f1:.4f}")
print(f"Optimal threshold: {CRITIC_THRESHOLD:.2f}")
print(f"Test samples flagged: {(critic_test_scores > CRITIC_THRESHOLD).sum()} / {len(critic_test_scores)}")

---
## 7. Stage 4: Refiner

A stronger model that re-evaluates cases flagged by the critic.

In [None]:
# Combine original features with stage 1 outputs for refiner
X_ref_train = pd.concat([X_train.reset_index(drop=True), stage1_train], axis=1)
X_ref_test = pd.concat([X_test.reset_index(drop=True), stage1_test], axis=1)

# Train refiner with emphasis on hard cases
refiner = xgb.XGBClassifier(
    n_estimators=180, max_depth=8, learning_rate=0.03,
    scale_pos_weight=SCALE_WEIGHT, random_state=RANDOM_STATE, eval_metric='auc'
)

# Weight errors more heavily
sample_weights = np.where(aggregator_errors == 1, 8.0, 1.0)
refiner.fit(X_ref_train, y_train.reset_index(drop=True), sample_weight=sample_weights)

refiner_test = refiner.predict_proba(X_ref_test)[:, 1]

print("=== Stage 4: Refiner ===")
print(f"Refiner AUC: {roc_auc_score(y_test, refiner_test):.4f}")

---
## 8. Complete Thinking Pipeline

In [None]:
@dataclass
class ThinkingXGBoostPipeline:
    """Complete 4-stage thinking pipeline with hybrid aggregation."""
    reasoning_heads: Dict[str, ReasoningHead]
    xgb_aggregator: xgb.XGBClassifier
    critic: xgb.XGBClassifier
    refiner: xgb.XGBClassifier
    critic_threshold: float = 0.43
    blend_ratio: float = 0.6
    
    def predict_with_reasoning(self, X: pd.DataFrame) -> Tuple[np.ndarray, pd.DataFrame]:
        """Run full pipeline with reasoning trace."""
        # Stage 1: Reasoning heads
        s1 = get_stage1_outputs(X, self.reasoning_heads)
        
        # Stage 2: Hybrid aggregation
        wav = weighted_average(s1, self.reasoning_heads)
        X_agg = build_aggregator_features(s1)
        xgb_pred = self.xgb_aggregator.predict_proba(X_agg)[:, 1]
        agg_preds = self.blend_ratio * wav + (1 - self.blend_ratio) * xgb_pred
        
        # Stage 3: Critic
        critic_feats = build_critic_features(s1, wav, xgb_pred, agg_preds, self.reasoning_heads)
        critic_scores = self.critic.predict_proba(critic_feats)[:, 1]
        needs_refinement = critic_scores > self.critic_threshold
        
        # Stage 4: Selective refinement
        final_preds = agg_preds.copy()
        if needs_refinement.any():
            X_ref = pd.concat([X.reset_index(drop=True), s1], axis=1)
            ref_preds = self.refiner.predict_proba(X_ref)[:, 1]
            final_preds[needs_refinement] = ref_preds[needs_refinement]
        
        # Build reasoning trace
        trace = s1.copy()
        trace['weighted_avg'] = wav
        trace['xgb_pred'] = xgb_pred
        trace['aggregator_pred'] = agg_preds
        trace['critic_score'] = critic_scores
        trace['needs_refinement'] = needs_refinement.astype(int)
        trace['final_pred'] = final_preds
        trace['decision'] = (final_preds > 0.5).astype(int)
        
        return final_preds, trace
    
    def explain(self, X_single: pd.DataFrame, idx: int = 0) -> str:
        """Generate human-readable explanation."""
        _, trace = self.predict_with_reasoning(X_single)
        row = trace.iloc[idx]
        
        lines = ["<REASONING>"]
        for name in self.reasoning_heads.keys():
            score = row[f'head_{name}_score']
            lines.append(f"  {name:12s} risk: {score:.3f}")
        lines.append("  " + "-"*20)
        lines.append(f"  Weighted avg:     {row['weighted_avg']:.3f}")
        lines.append(f"  XGBoost pred:     {row['xgb_pred']:.3f}")
        lines.append(f"  Blended:          {row['aggregator_pred']:.3f}")
        lines.append(f"  Critic score:     {row['critic_score']:.3f}")
        if row['needs_refinement']:
            lines.append("  [!] REFINEMENT TRIGGERED")
        lines.append("</REASONING>")
        lines.append("")
        lines.append("<SOLUTION>")
        decision = "FRAUD" if row['decision'] == 1 else "LEGITIMATE"
        lines.append(f"  Probability: {row['final_pred']:.3f}")
        lines.append(f"  Decision:    {decision}")
        lines.append("</SOLUTION>")
        
        return "\n".join(lines)

In [None]:
# Create pipeline
pipeline = ThinkingXGBoostPipeline(
    reasoning_heads=reasoning_heads,
    xgb_aggregator=xgb_aggregator,
    critic=critic,
    refiner=refiner,
    critic_threshold=CRITIC_THRESHOLD,
    blend_ratio=BLEND_RATIO
)

# Run on test set
final_preds, reasoning_trace = pipeline.predict_with_reasoning(X_test)

print("=" * 50)
print("THINKING XGBOOST PIPELINE - RESULTS")
print("=" * 50)
print(f"\nROC-AUC: {roc_auc_score(y_test, final_preds):.4f}")
print(f"Samples refined: {reasoning_trace['needs_refinement'].sum()} / {len(reasoning_trace)}")
print("\nClassification Report:")
print(classification_report(y_test, (final_preds > 0.5).astype(int), target_names=['Legit', 'Fraud']))

---
## 9. Comparison: Baseline vs Thinking Pipeline

In [None]:
print("=" * 50)
print("COMPARISON")
print("=" * 50)
print(f"\nBaseline XGBoost ROC-AUC:  {roc_auc_score(y_test, baseline_preds):.4f}")
print(f"Thinking Pipeline ROC-AUC: {roc_auc_score(y_test, final_preds):.4f}")

delta = roc_auc_score(y_test, final_preds) - roc_auc_score(y_test, baseline_preds)
print(f"\nDifference: {delta:+.4f}")

print("\n" + "-"*50)
print("KEY ADVANTAGE: Explainable reasoning trace!")
print("-"*50)

---
## 10. Explainable Reasoning Traces

The key differentiator: we can show **why** the model made each decision.

In [None]:
print("=== Example Reasoning Traces ===")

# Show fraud and legitimate examples
fraud_idx = y_test[y_test == 1].index[0]
legit_idx = y_test[y_test == 0].index[0]

print("\n--- FRAUD CASE (Actual: FRAUD) ---")
print(pipeline.explain(X_test.loc[[fraud_idx]]))

print("\n--- LEGITIMATE CASE (Actual: LEGIT) ---")
print(pipeline.explain(X_test.loc[[legit_idx]]))

In [None]:
# Show a case where refinement was triggered
refined_indices = reasoning_trace[reasoning_trace['needs_refinement'] == 1].index

if len(refined_indices) > 0:
    print("\n--- REFINED CASE (Self-Correction Triggered) ---")
    idx = refined_indices[0]
    actual = "FRAUD" if y_test.iloc[idx] == 1 else "LEGIT"
    print(f"Actual: {actual}")
    print(pipeline.explain(X_test.iloc[[idx]]))

---
## 11. Reasoning Quality Score (RQS) Evaluation

In [None]:
from rqs_evaluator import RQSEvaluator

evaluator = RQSEvaluator()
rqs_result = evaluator.evaluate(pipeline, X_test, y_test, n_perturbations=100)

print(rqs_result)

In [None]:
# Target comparison
print("Target Analysis:")
print("-" * 50)
targets = {
    'Decomposability': (rqs_result.Decomposability, 0.70, '>'),
    'Self-Correction': (rqs_result.Self_Correction_F1, 0.30, '>'),
    'Coherence': (rqs_result.Reasoning_Coherence, 0.50, '>'),
    'Faithfulness': (rqs_result.Explanation_Faithfulness, 0.60, '>'),
    'Graceful Degradation': (rqs_result.Graceful_Degradation, 0.50, '<'),
}

for name, (value, target, direction) in targets.items():
    if direction == '>':
        met = value > target
        symbol = '>' 
    else:
        met = value < target
        symbol = '<'
    status = "✓" if met else "✗"
    print(f"  {name:20s}: {value:.3f} ({symbol}{target}) {status}")

---
## 12. Summary

We built a **Thinking XGBoost** pipeline that:

1. **Decomposes** fraud detection into specialized reasoning heads
2. **Aggregates** using a hybrid weighted average + XGBoost approach
3. **Self-corrects** by detecting uncertain predictions with a critic model
4. **Refines** flagged cases with a stronger model
5. **Explains** every prediction with a full reasoning trace

### Key Contributions
- Novel application of LLM "thinking" patterns to gradient boosting
- Formal **Reasoning Quality Score (RQS)** framework for evaluating small model reasoning
- Practical self-correction mechanism that improves interpretability without sacrificing accuracy