# üéØ RLHF in Healthcare: Hands-On Practice

## Table of Contents
1. [Preference Data Creation and Analysis](#practice-1-preference-data-creation-and-analysis)
2. [Bradley-Terry Reward Model Training](#practice-2-bradley-terry-reward-model-training)
3. [Simple Reward Model Implementation](#practice-3-simple-reward-model-implementation)
4. [KL Divergence Calculation](#practice-4-kl-divergence-calculation)
5. [Safety Constraint Implementation](#practice-5-safety-constraint-implementation)
6. [Performance Monitoring Dashboard](#practice-6-performance-monitoring-dashboard)
7. [A/B Testing Simulation](#practice-7-ab-testing-simulation)
8. [Complete Mini RLHF Pipeline](#practice-8-complete-mini-rlhf-pipeline)

## Installing and Importing Essential Libraries

In [None]:
# Import essential libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix
from scipy.special import expit  # sigmoid function
from scipy.stats import entropy
import warnings
warnings.filterwarnings('ignore')

# Visualization settings
plt.rcParams['figure.figsize'] = (10, 6)
plt.rcParams['font.size'] = 12
sns.set_style('whitegrid')

print("‚úÖ All libraries loaded successfully!")
print("üìö Ready for RLHF practice in healthcare AI")

---
## Practice 1: Preference Data Creation and Analysis

### üéØ Learning Objectives
- Understand the structure of preference datasets
- Create synthetic medical preference data
- Analyze preference agreement patterns

### üìñ Key Concepts
**Preference Dataset:** Contains pairs of model outputs with expert rankings
- Input: Clinical query or task description
- Output A and Output B: Two candidate responses
- Preference Label: Which output is better (A > B or B > A)
- Confidence Score: Strength of preference (optional)

In [None]:
# 1.1 Create synthetic preference dataset
def create_preference_dataset(n_samples=100):
    """Generate synthetic medical preference data"""
    np.random.seed(42)
    
    # Sample clinical scenarios
    scenarios = [
        "Patient with fever and cough",
        "Elderly patient with chest pain",
        "Child with abdominal pain",
        "Patient with persistent headache",
        "Diabetic patient with foot wound"
    ]
    
    data = []
    for i in range(n_samples):
        scenario = np.random.choice(scenarios)
        
        # Simulate quality scores for two outputs
        quality_A = np.random.uniform(0.4, 0.9)
        quality_B = np.random.uniform(0.4, 0.9)
        
        # Determine preference based on quality
        preference = "A" if quality_A > quality_B else "B"
        
        # Confidence based on quality difference
        confidence = abs(quality_A - quality_B)
        
        data.append({
            'id': i,
            'scenario': scenario,
            'quality_A': quality_A,
            'quality_B': quality_B,
            'preference': preference,
            'confidence': confidence
        })
    
    df = pd.DataFrame(data)
    
    print("=" * 60)
    print("Preference Dataset Created")
    print("=" * 60)
    print(f"Total samples: {len(df)}")
    print(f"\nPreference distribution:")
    print(df['preference'].value_counts())
    print(f"\nAverage confidence: {df['confidence'].mean():.3f}")
    print(f"\nFirst 5 samples:")
    print(df[['scenario', 'quality_A', 'quality_B', 'preference', 'confidence']].head())
    
    return df

preference_df = create_preference_dataset()

In [None]:
# 1.2 Visualize preference distribution
def visualize_preferences(df):
    """Visualize preference patterns"""
    fig, axes = plt.subplots(1, 3, figsize=(15, 4))
    
    # Preference distribution
    df['preference'].value_counts().plot(kind='bar', ax=axes[0], color=['#1E64C8', '#6bcf7f'])
    axes[0].set_title('Preference Distribution')
    axes[0].set_xlabel('Preferred Output')
    axes[0].set_ylabel('Count')
    axes[0].set_xticklabels(['A', 'B'], rotation=0)
    
    # Confidence distribution
    axes[1].hist(df['confidence'], bins=20, color='#1E64C8', alpha=0.7, edgecolor='black')
    axes[1].set_title('Confidence Score Distribution')
    axes[1].set_xlabel('Confidence')
    axes[1].set_ylabel('Frequency')
    
    # Quality scatter
    colors = ['#1E64C8' if p == 'A' else '#6bcf7f' for p in df['preference']]
    axes[2].scatter(df['quality_A'], df['quality_B'], c=colors, alpha=0.6, s=50)
    axes[2].plot([0.4, 0.9], [0.4, 0.9], 'r--', label='Equal Quality')
    axes[2].set_title('Quality Comparison')
    axes[2].set_xlabel('Quality A')
    axes[2].set_ylabel('Quality B')
    axes[2].legend(['Equal', 'Prefer A', 'Prefer B'])
    axes[2].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\nüìä Visualization complete!")

visualize_preferences(preference_df)

---
## Practice 2: Bradley-Terry Reward Model Training

### üéØ Learning Objectives
- Understand the Bradley-Terry model formula
- Implement preference probability calculation
- Train a simple reward model

### üìñ Key Concepts
**Bradley-Terry Model:**
$$P(A > B) = \sigma(r(A) - r(B)) = \frac{1}{1 + e^{-(r(A) - r(B))}}$$

Where:
- $r(A)$ = reward score for output A
- $r(B)$ = reward score for output B
- $\sigma$ = sigmoid function

In [None]:
# 2.1 Bradley-Terry model implementation
def bradley_terry_probability(r_A, r_B):
    """
    Calculate probability that A is preferred over B
    P(A > B) = sigmoid(r(A) - r(B))
    """
    return expit(r_A - r_B)  # expit is the sigmoid function

def demonstrate_bradley_terry():
    """Demonstrate Bradley-Terry calculations"""
    print("Bradley-Terry Model Demonstration")
    print("=" * 60)
    
    # Example scenarios
    scenarios = [
        (0.8, 0.3, "A much better than B"),
        (0.6, 0.5, "A slightly better than B"),
        (0.5, 0.5, "A and B equal"),
        (0.3, 0.8, "B much better than A")
    ]
    
    results = []
    for r_A, r_B, description in scenarios:
        prob_A_wins = bradley_terry_probability(r_A, r_B)
        results.append({
            'r(A)': r_A,
            'r(B)': r_B,
            'Œîr': r_A - r_B,
            'P(A>B)': prob_A_wins,
            'Description': description
        })
    
    results_df = pd.DataFrame(results)
    print(results_df.to_string(index=False))
    
    # Visualization
    fig, ax = plt.subplots(figsize=(10, 6))
    
    delta_r = np.linspace(-3, 3, 100)
    prob = expit(delta_r)
    
    ax.plot(delta_r, prob, 'b-', linewidth=2, label='P(A>B) = œÉ(r(A) - r(B))')
    ax.axhline(y=0.5, color='r', linestyle='--', alpha=0.5, label='Equal preference')
    ax.axvline(x=0, color='gray', linestyle='--', alpha=0.3)
    
    # Mark example points
    for _, row in results_df.iterrows():
        ax.plot(row['Œîr'], row['P(A>B)'], 'ro', markersize=8)
        ax.annotate(f"Œîr={row['Œîr']:.1f}", 
                   xy=(row['Œîr'], row['P(A>B)']), 
                   xytext=(10, 10), 
                   textcoords='offset points',
                   fontsize=9)
    
    ax.set_xlabel('Reward Difference: r(A) - r(B)', fontsize=12)
    ax.set_ylabel('Probability P(A > B)', fontsize=12)
    ax.set_title('Bradley-Terry Model: Sigmoid Function', fontsize=14, fontweight='bold')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\n‚úÖ Key insight: Higher reward difference ‚Üí Stronger preference probability")

demonstrate_bradley_terry()

---
## Practice 3: Simple Reward Model Implementation

### üéØ Learning Objectives
- Build a simple reward model from preference data
- Calculate reward scores for medical outputs
- Evaluate model performance

In [None]:
# 3.1 Simple reward model training
class SimpleRewardModel:
    """A simple reward model based on preference learning"""
    
    def __init__(self):
        self.rewards = {}
    
    def train(self, preference_df):
        """Train reward model from preference data"""
        print("Training Simple Reward Model...")
        print("=" * 60)
        
        # Use quality scores as proxy for rewards
        # In real RLHF, these would be learned from a neural network
        all_scenarios = preference_df['scenario'].unique()
        
        for scenario in all_scenarios:
            scenario_data = preference_df[preference_df['scenario'] == scenario]
            avg_quality_A = scenario_data['quality_A'].mean()
            avg_quality_B = scenario_data['quality_B'].mean()
            
            self.rewards[scenario] = {
                'base_reward_A': avg_quality_A,
                'base_reward_B': avg_quality_B
            }
        
        print(f"Trained on {len(all_scenarios)} scenarios")
        print(f"Total preference pairs: {len(preference_df)}")
        
    def predict_preference(self, scenario, quality_A, quality_B):
        """Predict which output is preferred"""
        if scenario in self.rewards:
            r_A = quality_A
            r_B = quality_B
        else:
            r_A = quality_A
            r_B = quality_B
        
        # Bradley-Terry probability
        prob_A_preferred = bradley_terry_probability(r_A, r_B)
        
        return {
            'prob_A': prob_A_preferred,
            'prob_B': 1 - prob_A_preferred,
            'predicted': 'A' if prob_A_preferred > 0.5 else 'B',
            'confidence': abs(prob_A_preferred - 0.5) * 2
        }
    
    def evaluate(self, test_df):
        """Evaluate reward model on test data"""
        predictions = []
        
        for _, row in test_df.iterrows():
            pred = self.predict_preference(
                row['scenario'], 
                row['quality_A'], 
                row['quality_B']
            )
            predictions.append(pred['predicted'])
        
        accuracy = accuracy_score(test_df['preference'], predictions)
        
        print("\n" + "=" * 60)
        print("Model Evaluation Results")
        print("=" * 60)
        print(f"Accuracy: {accuracy:.2%}")
        print(f"\nConfusion Matrix:")
        cm = confusion_matrix(test_df['preference'], predictions)
        print(cm)
        
        return accuracy

# Train and evaluate
train_df, test_df = train_test_split(preference_df, test_size=0.2, random_state=42)

reward_model = SimpleRewardModel()
reward_model.train(train_df)
accuracy = reward_model.evaluate(test_df)

print("\n‚úÖ Reward model training complete!")

---
## Practice 4: KL Divergence Calculation

### üéØ Learning Objectives
- Understand KL divergence as a measure of distribution difference
- Calculate KL divergence between policy and reference
- Visualize the effect of KL penalty

### üìñ Key Concepts
**KL Divergence:** Measures how much the optimized policy $\pi_{\theta}$ deviates from the reference policy $\pi_{ref}$

$$KL(\pi_{\theta} || \pi_{ref}) = \mathbb{E}[\log(\pi_{\theta}(a|s)) - \log(\pi_{ref}(a|s))]$$

In [None]:
# 4.1 KL divergence calculation
def calculate_kl_divergence(p, q):
    """
    Calculate KL divergence between two probability distributions
    KL(P || Q) = sum(P(x) * log(P(x) / Q(x)))
    """
    # Ensure distributions are normalized
    p = np.array(p) / np.sum(p)
    q = np.array(q) / np.sum(q)
    
    # Add small epsilon to avoid log(0)
    epsilon = 1e-10
    p = p + epsilon
    q = q + epsilon
    
    kl = np.sum(p * np.log(p / q))
    return kl

def demonstrate_kl_divergence():
    """Demonstrate KL divergence with examples"""
    print("KL Divergence Demonstration")
    print("=" * 60)
    
    # Reference policy (original model)
    pi_ref = np.array([0.6, 0.3, 0.1])  # Probabilities for 3 actions
    
    # Different optimized policies
    scenarios = [
        (np.array([0.6, 0.3, 0.1]), "No change (identical)"),
        (np.array([0.65, 0.25, 0.1]), "Slight deviation"),
        (np.array([0.7, 0.2, 0.1]), "Moderate deviation"),
        (np.array([0.8, 0.15, 0.05]), "Large deviation"),
        (np.array([0.1, 0.3, 0.6]), "Very large deviation")
    ]
    
    results = []
    for pi_theta, description in scenarios:
        kl = calculate_kl_divergence(pi_theta, pi_ref)
        results.append({
            'Policy': description,
            'Distribution': str(pi_theta),
            'KL Divergence': kl
        })
    
    results_df = pd.DataFrame(results)
    print(results_df.to_string(index=False))
    
    # Visualization
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Bar plot of KL values
    colors = ['green', 'yellow', 'orange', 'red', 'darkred']
    axes[0].barh(range(len(results)), [r['KL Divergence'] for r in results], color=colors)
    axes[0].set_yticks(range(len(results)))
    axes[0].set_yticklabels([r['Policy'] for r in results])
    axes[0].set_xlabel('KL Divergence')
    axes[0].set_title('KL Divergence from Reference Policy')
    axes[0].grid(True, alpha=0.3)
    
    # Distribution comparison
    x = np.arange(3)
    width = 0.15
    
    axes[1].bar(x - 2*width, pi_ref, width, label='Reference', color='blue', alpha=0.7)
    for i, (pi_theta, desc) in enumerate(scenarios[1:4], 1):  # Show first 3 deviations
        axes[1].bar(x + (i-1)*width, pi_theta, width, label=desc, alpha=0.7)
    
    axes[1].set_xlabel('Action')
    axes[1].set_ylabel('Probability')
    axes[1].set_title('Policy Distribution Comparison')
    axes[1].set_xticks(x)
    axes[1].set_xticklabels(['Action 1', 'Action 2', 'Action 3'])
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\n‚úÖ Key insight: Higher KL divergence ‚Üí More deviation from original model")
    print("‚ö†Ô∏è  In medical AI: Keep KL low to preserve base knowledge!")

demonstrate_kl_divergence()

In [None]:
# 4.2 Effect of beta (KL penalty coefficient)
def demonstrate_kl_penalty():
    """Show how different beta values affect optimization"""
    print("\nKL Penalty Effect (Œ≤ coefficient)")
    print("=" * 60)
    
    # Simulate reward and KL values
    reward = 5.0  # Base reward
    kl_values = np.linspace(0, 2, 50)
    beta_values = [0.01, 0.05, 0.1, 0.2]  # Different penalty strengths
    
    plt.figure(figsize=(10, 6))
    
    for beta in beta_values:
        penalized_reward = reward - beta * kl_values
        plt.plot(kl_values, penalized_reward, label=f'Œ≤ = {beta}', linewidth=2)
    
    plt.axhline(y=0, color='red', linestyle='--', alpha=0.5, label='Zero reward')
    plt.xlabel('KL Divergence', fontsize=12)
    plt.ylabel('Penalized Reward', fontsize=12)
    plt.title('Effect of KL Penalty: Reward - Œ≤ √ó KL', fontsize=14, fontweight='bold')
    plt.legend(fontsize=11)
    plt.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\nüìä Interpretation:")
    print("  ‚Ä¢ Low Œ≤ (0.01): More freedom to optimize, risk of forgetting")
    print("  ‚Ä¢ High Œ≤ (0.1-0.2): Strong constraint, minimal deviation")
    print("  ‚Ä¢ Medical AI: Typical range Œ≤ ‚àà [0.01, 0.1]")

demonstrate_kl_penalty()

---
## Practice 5: Safety Constraint Implementation

### üéØ Learning Objectives
- Implement hard safety constraints for medical AI
- Build a constraint checker system
- Simulate constraint violation scenarios

In [None]:
# 5.1 Medical safety constraint checker
class MedicalSafetyChecker:
    """Enforce safety constraints for medical AI outputs"""
    
    def __init__(self):
        # Define contraindications
        self.contraindications = {
            'aspirin': ['bleeding_disorder', 'ulcer'],
            'ace_inhibitor': ['pregnancy', 'angioedema_history'],
            'beta_blocker': ['asthma', 'heart_block']
        }
        
        # Define dosage limits (mg/day)
        self.dosage_limits = {
            'aspirin': (81, 325),
            'ace_inhibitor': (2.5, 40),
            'beta_blocker': (25, 200)
        }
    
    def check_contraindication(self, medication, patient_conditions):
        """Check if medication is contraindicated"""
        if medication not in self.contraindications:
            return True, "No known contraindications"
        
        contraindicated_conditions = self.contraindications[medication]
        violations = [c for c in patient_conditions if c in contraindicated_conditions]
        
        if violations:
            return False, f"Contraindicated: {', '.join(violations)}"
        return True, "Safe"
    
    def check_dosage(self, medication, dosage):
        """Check if dosage is within safe limits"""
        if medication not in self.dosage_limits:
            return True, "No dosage limit defined"
        
        min_dose, max_dose = self.dosage_limits[medication]
        
        if dosage < min_dose:
            return False, f"Dose too low (min: {min_dose} mg)"
        elif dosage > max_dose:
            return False, f"Dose too high (max: {max_dose} mg)"
        else:
            return True, "Dosage within safe range"
    
    def validate_recommendation(self, medication, dosage, patient_conditions):
        """Comprehensive safety check"""
        results = {
            'medication': medication,
            'dosage': dosage,
            'patient_conditions': patient_conditions,
            'checks': []
        }
        
        # Check contraindication
        contra_safe, contra_msg = self.check_contraindication(medication, patient_conditions)
        results['checks'].append(('Contraindication', contra_safe, contra_msg))
        
        # Check dosage
        dose_safe, dose_msg = self.check_dosage(medication, dosage)
        results['checks'].append(('Dosage', dose_safe, dose_msg))
        
        # Overall safety
        results['safe'] = contra_safe and dose_safe
        
        return results

# Demonstrate safety checking
def demonstrate_safety_constraints():
    print("Medical Safety Constraint Checking")
    print("=" * 60)
    
    checker = MedicalSafetyChecker()
    
    # Test cases
    test_cases = [
        ('aspirin', 100, ['hypertension']),
        ('aspirin', 100, ['bleeding_disorder']),
        ('aspirin', 500, ['hypertension']),
        ('beta_blocker', 50, ['hypertension']),
        ('beta_blocker', 50, ['asthma'])
    ]
    
    for medication, dosage, conditions in test_cases:
        result = checker.validate_recommendation(medication, dosage, conditions)
        
        print(f"\n{'='*60}")
        print(f"Medication: {medication}")
        print(f"Dosage: {dosage} mg")
        print(f"Patient conditions: {', '.join(conditions)}")
        print(f"\nSafety Checks:")
        
        for check_name, is_safe, message in result['checks']:
            status = "‚úÖ PASS" if is_safe else "‚ùå FAIL"
            print(f"  {check_name}: {status} - {message}")
        
        overall = "‚úÖ SAFE TO RECOMMEND" if result['safe'] else "‚õî BLOCKED - UNSAFE"
        print(f"\nOverall: {overall}")
    
    print(f"\n{'='*60}")
    print("\nüí° Key insight: Hard constraints prevent harmful outputs regardless of reward")

demonstrate_safety_constraints()

---
## Practice 6: Performance Monitoring Dashboard

### üéØ Learning Objectives
- Create a simple monitoring system
- Track key performance indicators (KPIs)
- Visualize performance trends

In [None]:
# 6.1 Simulate monitoring data
def generate_monitoring_data(n_days=30):
    """Generate synthetic monitoring data"""
    np.random.seed(42)
    
    data = []
    for day in range(n_days):
        # Simulate gradual improvement with some noise
        base_accuracy = 0.75 + (day / n_days) * 0.1
        accuracy = base_accuracy + np.random.normal(0, 0.02)
        
        base_safety = 0.95 + (day / n_days) * 0.03
        safety_score = min(base_safety + np.random.normal(0, 0.01), 0.99)
        
        response_time = 0.5 + np.random.exponential(0.2)
        
        satisfaction = 0.7 + (day / n_days) * 0.15 + np.random.normal(0, 0.03)
        
        data.append({
            'day': day + 1,
            'accuracy': np.clip(accuracy, 0, 1),
            'safety_score': np.clip(safety_score, 0, 1),
            'response_time': response_time,
            'user_satisfaction': np.clip(satisfaction, 0, 1),
            'queries_processed': np.random.randint(800, 1200)
        })
    
    return pd.DataFrame(data)

def create_monitoring_dashboard(df):
    """Create performance monitoring dashboard"""
    print("Performance Monitoring Dashboard")
    print("=" * 60)
    
    # Current metrics
    latest = df.iloc[-1]
    print(f"\nüìä Current Metrics (Day {int(latest['day'])})")
    print(f"  Accuracy: {latest['accuracy']:.2%}")
    print(f"  Safety Score: {latest['safety_score']:.2%}")
    print(f"  Avg Response Time: {latest['response_time']:.2f}s")
    print(f"  User Satisfaction: {latest['user_satisfaction']:.2%}")
    print(f"  Queries Processed: {int(latest['queries_processed'])}")
    
    # Trends
    print(f"\nüìà Trends (Last 7 days vs Previous 7 days)")
    recent_7 = df.iloc[-7:]
    previous_7 = df.iloc[-14:-7]
    
    metrics = ['accuracy', 'safety_score', 'user_satisfaction']
    for metric in metrics:
        recent_avg = recent_7[metric].mean()
        previous_avg = previous_7[metric].mean()
        change = recent_avg - previous_avg
        arrow = "‚Üó" if change > 0 else "‚Üò"
        print(f"  {metric.replace('_', ' ').title()}: {recent_avg:.2%} {arrow} ({change:+.2%})")
    
    # Visualization
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # Accuracy over time
    axes[0, 0].plot(df['day'], df['accuracy'], 'b-', linewidth=2)
    axes[0, 0].axhline(y=0.8, color='green', linestyle='--', alpha=0.5, label='Target')
    axes[0, 0].set_xlabel('Day')
    axes[0, 0].set_ylabel('Accuracy')
    axes[0, 0].set_title('Model Accuracy Trend')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # Safety score
    axes[0, 1].plot(df['day'], df['safety_score'], 'g-', linewidth=2)
    axes[0, 1].axhline(y=0.95, color='red', linestyle='--', alpha=0.5, label='Threshold')
    axes[0, 1].set_xlabel('Day')
    axes[0, 1].set_ylabel('Safety Score')
    axes[0, 1].set_title('Safety Score Trend')
    axes[0, 1].legend()
    axes[0, 1].grid(True, alpha=0.3)
    
    # Response time distribution
    axes[1, 0].hist(df['response_time'], bins=20, color='orange', alpha=0.7, edgecolor='black')
    axes[1, 0].axvline(x=df['response_time'].mean(), color='red', linestyle='--', 
                       linewidth=2, label=f"Mean: {df['response_time'].mean():.2f}s")
    axes[1, 0].set_xlabel('Response Time (s)')
    axes[1, 0].set_ylabel('Frequency')
    axes[1, 0].set_title('Response Time Distribution')
    axes[1, 0].legend()
    axes[1, 0].grid(True, alpha=0.3)
    
    # User satisfaction
    axes[1, 1].plot(df['day'], df['user_satisfaction'], 'purple', linewidth=2)
    axes[1, 1].fill_between(df['day'], df['user_satisfaction'], alpha=0.3, color='purple')
    axes[1, 1].set_xlabel('Day')
    axes[1, 1].set_ylabel('Satisfaction Score')
    axes[1, 1].set_title('User Satisfaction Trend')
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\n‚úÖ Dashboard generated successfully!")

monitoring_df = generate_monitoring_data()
create_monitoring_dashboard(monitoring_df)

---
## Practice 7: A/B Testing Simulation

### üéØ Learning Objectives
- Design and run A/B tests for model comparison
- Perform statistical significance testing
- Make data-driven deployment decisions

In [None]:
# 7.1 A/B testing simulation
def simulate_ab_test(n_samples=1000, effect_size=0.05):
    """Simulate A/B test between two models"""
    np.random.seed(42)
    
    print("A/B Testing: Model A (Control) vs Model B (Treatment)")
    print("=" * 60)
    
    # Model A (control - baseline)
    accuracy_A = 0.80
    results_A = np.random.binomial(1, accuracy_A, n_samples)
    
    # Model B (treatment - RLHF optimized)
    accuracy_B = accuracy_A + effect_size
    results_B = np.random.binomial(1, accuracy_B, n_samples)
    
    # Calculate metrics
    mean_A = results_A.mean()
    mean_B = results_B.mean()
    
    print(f"\nüìä Results Summary:")
    print(f"  Model A (Control):   {mean_A:.2%} accuracy ({results_A.sum()}/{n_samples})")
    print(f"  Model B (Treatment): {mean_B:.2%} accuracy ({results_B.sum()}/{n_samples})")
    print(f"  Absolute Difference: {mean_B - mean_A:+.2%}")
    print(f"  Relative Improvement: {(mean_B - mean_A) / mean_A:+.2%}")
    
    # Statistical test (two-proportion z-test)
    from scipy.stats import chi2_contingency
    
    contingency_table = np.array([
        [results_A.sum(), n_samples - results_A.sum()],
        [results_B.sum(), n_samples - results_B.sum()]
    ])
    
    chi2, p_value, dof, expected = chi2_contingency(contingency_table)
    
    print(f"\nüìà Statistical Test:")
    print(f"  Chi-square statistic: {chi2:.4f}")
    print(f"  P-value: {p_value:.4f}")
    
    alpha = 0.05
    if p_value < alpha:
        print(f"  ‚úÖ Result: STATISTICALLY SIGNIFICANT (p < {alpha})")
        print(f"  Decision: LAUNCH Model B")
    else:
        print(f"  ‚ö†Ô∏è  Result: NOT statistically significant (p ‚â• {alpha})")
        print(f"  Decision: Continue with Model A or collect more data")
    
    # Visualization
    fig, axes = plt.subplots(1, 2, figsize=(14, 5))
    
    # Bar comparison
    models = ['Model A\n(Control)', 'Model B\n(RLHF)']
    accuracies = [mean_A, mean_B]
    colors = ['#1E64C8', '#6bcf7f']
    
    bars = axes[0].bar(models, accuracies, color=colors, alpha=0.7, edgecolor='black')
    axes[0].set_ylabel('Accuracy')
    axes[0].set_title('Model Comparison')
    axes[0].set_ylim([0.75, 0.90])
    axes[0].grid(True, alpha=0.3, axis='y')
    
    # Add value labels on bars
    for bar, acc in zip(bars, accuracies):
        height = bar.get_height()
        axes[0].text(bar.get_x() + bar.get_width()/2., height,
                    f'{acc:.2%}', ha='center', va='bottom', fontweight='bold')
    
    # Confidence intervals (bootstrap)
    n_bootstrap = 1000
    bootstrap_A = [np.random.choice(results_A, size=n_samples, replace=True).mean() 
                   for _ in range(n_bootstrap)]
    bootstrap_B = [np.random.choice(results_B, size=n_samples, replace=True).mean() 
                   for _ in range(n_bootstrap)]
    
    axes[1].hist(bootstrap_A, bins=30, alpha=0.5, label='Model A', color='#1E64C8')
    axes[1].hist(bootstrap_B, bins=30, alpha=0.5, label='Model B', color='#6bcf7f')
    axes[1].axvline(mean_A, color='#1E64C8', linestyle='--', linewidth=2)
    axes[1].axvline(mean_B, color='#6bcf7f', linestyle='--', linewidth=2)
    axes[1].set_xlabel('Accuracy')
    axes[1].set_ylabel('Frequency')
    axes[1].set_title('Bootstrap Distribution')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    print("\n‚úÖ A/B test complete!")
    return mean_A, mean_B, p_value

# Run simulation
acc_A, acc_B, p_val = simulate_ab_test(n_samples=1000, effect_size=0.05)

---
## Practice 8: Complete Mini RLHF Pipeline

### üéØ Learning Objectives
- Integrate all components into a complete pipeline
- Run end-to-end RLHF simulation
- Understand the full workflow

In [None]:
# 8.1 Complete RLHF Pipeline
class MiniRLHFPipeline:
    """Complete mini RLHF pipeline for medical AI"""
    
    def __init__(self, beta=0.05):
        self.beta = beta  # KL penalty coefficient
        self.reward_model = None
        self.safety_checker = MedicalSafetyChecker()
        self.training_history = []
    
    def step1_collect_preferences(self, n_samples=200):
        """Step 1: Collect preference data"""
        print("\n" + "="*60)
        print("STEP 1: Collecting Preference Data")
        print("="*60)
        
        self.preference_data = create_preference_dataset(n_samples)
        return self.preference_data
    
    def step2_train_reward_model(self):
        """Step 2: Train reward model"""
        print("\n" + "="*60)
        print("STEP 2: Training Reward Model")
        print("="*60)
        
        train_df, test_df = train_test_split(self.preference_data, test_size=0.2)
        
        self.reward_model = SimpleRewardModel()
        self.reward_model.train(train_df)
        accuracy = self.reward_model.evaluate(test_df)
        
        return accuracy
    
    def step3_policy_optimization(self, n_iterations=5):
        """Step 3: Optimize policy with PPO (simulated)"""
        print("\n" + "="*60)
        print("STEP 3: Policy Optimization (Simulated)")
        print("="*60)
        
        print(f"Running {n_iterations} optimization iterations...")
        print(f"KL penalty coefficient (Œ≤): {self.beta}\n")
        
        for iteration in range(n_iterations):
            # Simulate reward and KL divergence
            base_reward = 5.0 + iteration * 0.5  # Improving reward
            kl_divergence = 0.1 + iteration * 0.05  # Increasing KL
            
            # Apply KL penalty
            penalized_reward = base_reward - self.beta * kl_divergence
            
            # Simulate safety score (decreases if too much optimization)
            safety_score = max(0.95 - iteration * 0.01, 0.90)
            
            self.training_history.append({
                'iteration': iteration + 1,
                'base_reward': base_reward,
                'kl_divergence': kl_divergence,
                'penalized_reward': penalized_reward,
                'safety_score': safety_score
            })
            
            print(f"Iteration {iteration+1}: "
                  f"Reward={base_reward:.2f}, "
                  f"KL={kl_divergence:.3f}, "
                  f"Penalized={penalized_reward:.2f}, "
                  f"Safety={safety_score:.2%}")
        
        print("\n‚úÖ Policy optimization complete!")
    
    def step4_safety_validation(self):
        """Step 4: Validate safety constraints"""
        print("\n" + "="*60)
        print("STEP 4: Safety Validation")
        print("="*60)
        
        test_cases = [
            ('aspirin', 150, ['hypertension']),
            ('beta_blocker', 75, ['diabetes'])
        ]
        
        passed = 0
        for medication, dosage, conditions in test_cases:
            result = self.safety_checker.validate_recommendation(
                medication, dosage, conditions
            )
            if result['safe']:
                passed += 1
                print(f"‚úÖ {medication} ({dosage}mg): SAFE")
            else:
                print(f"‚ùå {medication} ({dosage}mg): UNSAFE")
        
        print(f"\nSafety validation: {passed}/{len(test_cases)} passed")
    
    def step5_visualize_results(self):
        """Step 5: Visualize training results"""
        print("\n" + "="*60)
        print("STEP 5: Results Visualization")
        print("="*60)
        
        history_df = pd.DataFrame(self.training_history)
        
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # Reward over iterations
        axes[0, 0].plot(history_df['iteration'], history_df['base_reward'], 
                       'b-o', label='Base Reward', linewidth=2)
        axes[0, 0].plot(history_df['iteration'], history_df['penalized_reward'], 
                       'r-s', label='Penalized Reward', linewidth=2)
        axes[0, 0].set_xlabel('Iteration')
        axes[0, 0].set_ylabel('Reward')
        axes[0, 0].set_title('Reward Optimization Progress')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)
        
        # KL divergence
        axes[0, 1].plot(history_df['iteration'], history_df['kl_divergence'], 
                       'g-^', linewidth=2)
        axes[0, 1].axhline(y=0.1, color='orange', linestyle='--', 
                          label='Target KL', alpha=0.7)
        axes[0, 1].set_xlabel('Iteration')
        axes[0, 1].set_ylabel('KL Divergence')
        axes[0, 1].set_title('KL Divergence from Base Model')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)
        
        # Safety score
        axes[1, 0].plot(history_df['iteration'], history_df['safety_score'], 
                       'purple', linewidth=2, marker='d')
        axes[1, 0].axhline(y=0.95, color='red', linestyle='--', 
                          label='Safety Threshold', alpha=0.7)
        axes[1, 0].set_xlabel('Iteration')
        axes[1, 0].set_ylabel('Safety Score')
        axes[1, 0].set_title('Safety Score Monitoring')
        axes[1, 0].legend()
        axes[1, 0].grid(True, alpha=0.3)
        
        # Summary metrics
        axes[1, 1].axis('off')
        summary_text = f"""
        RLHF Pipeline Summary
        {'='*40}
        
        Final Metrics:
          ‚Ä¢ Base Reward: {history_df['base_reward'].iloc[-1]:.2f}
          ‚Ä¢ KL Divergence: {history_df['kl_divergence'].iloc[-1]:.3f}
          ‚Ä¢ Penalized Reward: {history_df['penalized_reward'].iloc[-1]:.2f}
          ‚Ä¢ Safety Score: {history_df['safety_score'].iloc[-1]:.2%}
        
        Improvements:
          ‚Ä¢ Reward: +{history_df['base_reward'].iloc[-1] - history_df['base_reward'].iloc[0]:.2f}
          ‚Ä¢ KL: +{history_df['kl_divergence'].iloc[-1] - history_df['kl_divergence'].iloc[0]:.3f}
        
        Œ≤ coefficient: {self.beta}
        
        Status: ‚úÖ Training Complete
        """
        axes[1, 1].text(0.1, 0.5, summary_text, 
                       fontsize=11, family='monospace',
                       verticalalignment='center')
        
        plt.tight_layout()
        plt.show()
        
        print("\n‚úÖ All visualizations complete!")
    
    def run_complete_pipeline(self):
        """Run the complete RLHF pipeline"""
        print("\n" + "#"*60)
        print("#" + " "*58 + "#")
        print("#" + " "*10 + "COMPLETE RLHF PIPELINE FOR MEDICAL AI" + " "*9 + "#")
        print("#" + " "*58 + "#")
        print("#"*60)
        
        # Run all steps
        self.step1_collect_preferences()
        self.step2_train_reward_model()
        self.step3_policy_optimization()
        self.step4_safety_validation()
        self.step5_visualize_results()
        
        print("\n" + "#"*60)
        print("#" + " "*58 + "#")
        print("#" + " "*15 + "üéâ PIPELINE COMPLETE! üéâ" + " "*16 + "#")
        print("#" + " "*58 + "#")
        print("#"*60)

# Run the complete pipeline
pipeline = MiniRLHFPipeline(beta=0.05)
pipeline.run_complete_pipeline()

---
## üéØ Practice Complete!

### Summary of What We Learned:

1. **Preference Data Creation**: Understanding how expert preferences are structured and collected
2. **Bradley-Terry Model**: Mathematical foundation for converting rewards to preference probabilities
3. **Reward Model Training**: Building models that learn from expert feedback
4. **KL Divergence Control**: Preventing excessive deviation from base model knowledge
5. **Safety Constraints**: Implementing hard constraints to prevent harmful outputs
6. **Performance Monitoring**: Real-time tracking of model performance in deployment
7. **A/B Testing**: Statistical validation before deploying new models
8. **Complete RLHF Pipeline**: End-to-end integration of all components

### Key Insights:

- **RLHF is iterative**: Continuous improvement through feedback loops
- **Safety first**: Medical AI requires multiple layers of safety constraints
- **Balance is crucial**: Trade-off between optimization and preserving base knowledge
- **Monitoring is essential**: Continuous performance tracking prevents degradation
- **Statistical rigor**: A/B testing ensures decisions are data-driven

### Next Steps:

1. Implement with real medical datasets
2. Use transformer-based reward models
3. Integrate with production LLMs
4. Explore DPO as alternative to PPO
5. Add more sophisticated safety layers
6. Implement multi-objective optimization

### üìö Additional Resources:

- **Papers**: "Training Language Models to Follow Instructions with Human Feedback" (OpenAI, 2022)
- **Libraries**: HuggingFace TRL, DeepSpeed
- **Frameworks**: PyTorch, TensorFlow

---

**üéì Congratulations on completing the RLHF hands-on practice!**

**Contact:**
- Ho-min Park
- homin.park@ghent.ac.kr
- powersimmani@gmail.com