# Tutorial 5: Managerial Compensation and Moral Hazard

**Econometrics II - Structural Estimation**

*Starter Version - Based on Margiotta & Miller (2000)*

---

This notebook implements the static principal–agent model with moral hazard. We derive the optimal contract, estimate parameters, compute the cost of moral hazard, and conduct counterfactual policy experiments.

**Topics:**

- Optimal contracting under moral hazard (Kuhn–Tucker conditions)

- Identification in principal-agent models

- Maximum likelihood estimation

- Cost decomposition: risk compensation, effort disutility, output loss

- Policy counterfactuals: risk aversion, monitoring, volatility

**Data:** Gayle, Golan, and Miller (2015) executive compensation dataset.

## 0. Quick instructions

1. **Data Setup**: Update `DATA_PATH` in the configuration cell to point to `tutorial_data.dta`

2. **Performance Variable**: We use `net_excess_ret` (net excess return) as the performance measure $x$

3. **Key Parameters**:

- $\gamma$ = risk aversion coefficient (> 0)

- $\alpha, \beta$ = disutility parameters (with $\alpha > \beta > 0$)

4. **Output Files**: Results are saved to CSV files in the current directory

5. **Runtime**: Full estimation may take several minutes depending on sample size

In [None]:
# Install required packages if needed
!pip install numpy pandas scipy matplotlib tabulate statsmodels

In [None]:
# ============================================================================
# IMPORTS
# ============================================================================

import numpy as np
import pandas as pd
from scipy import stats, optimize
from scipy.stats import gaussian_kde
from scipy.integrate import quad  # Added for numerical integration in g(x) computation
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import List, Optional, Dict, Tuple, Callable
from tabulate import tabulate

# Display options
pd.set_option('display.float_format', lambda x: f"{x:,.6f}")

In [None]:
# ============================================================================
# CONFIGURATION
# ============================================================================

# Data paths
DATA_PATH = './data/final_sample_full06112010.dta'

# Variable names from dataset
X_VAR = 'net_excess_ret'  # Performance measure (net excess return)
W_VAR = 'totalComp'       # Total compensation

# Wage scaling factor
WAGE_SCALE = 1      # Choose the right scaling for wages (1 = no scaling)

# Model hyperparameters
ETA_DEFAULT = 1.0  # IC multiplier (eta > 0)
N_BOOTSTRAP = 100   # Number of bootstrap samples for SEs


In [None]:
# ============================================================================
# RESULT CONTAINER (from Tutorial 2)
# ============================================================================

@dataclass
class MLEResult:
    """
    Container for MLE estimation results with enhanced functionality.

    Features:
    - Basic statistics (z-stat, CI, bias)
    - Printing methods (print, summary)
    - Data conversion (to_dict, to_dataframe)
    - Persistence (save, load)
    - Comparison methods
    """
    param_name: str
    estimate: float
    std_error: float
    true_value: Optional[float] = None
    sample_size: Optional[int] = None
    loglik: Optional[float] = None
    convergence: bool = True
    method: str = "Unknown"

    # ========================================================================
    # Basic Statistics
    # ========================================================================

    def z_stat(self) -> float:
        """Compute z-statistic"""
        return self.estimate / self.std_error if self.std_error > 0 else np.nan

    def ci_95(self) -> Tuple[float, float]:
        """Compute 95% confidence interval"""
        return (self.estimate - 1.96 * self.std_error,
                self.estimate + 1.96 * self.std_error)

    def ci(self, alpha: float = 0.05) -> Tuple[float, float]:
        """Compute confidence interval at any level"""
        from scipy.stats import norm
        z_crit = norm.ppf(1 - alpha/2)
        return (self.estimate - z_crit * self.std_error,
                self.estimate + z_crit * self.std_error)

    def bias(self) -> float:
        """Compute bias if true value known"""
        if self.true_value is not None:
            return self.estimate - self.true_value
        return np.nan

    def relative_bias(self) -> float:
        """Compute relative bias as percentage"""
        if self.true_value is not None and self.true_value != 0:
            return 100 * (self.estimate - self.true_value) / self.true_value
        return np.nan

    # ========================================================================
    # Printing Methods
    # ========================================================================

    def print(self, detailed: bool = False):
        """Print formatted summary of results"""
        print("=" * 70)
        print(f"MLE Estimation Result: {self.param_name}")
        print("=" * 70)
        print(f"Method:        {self.method}")
        if self.sample_size:
            print(f"Sample size:   N = {self.sample_size}")

        print()
        print("Point Estimate:")
        print(f"  {self.param_name} = {self.estimate:.6f}")
        print(f"  SE({self.param_name}) = {self.std_error:.6f}")
        print(f"  z-statistic = {self.z_stat():.4f}")

        print()
        print("Confidence Intervals:")
        ci_95 = self.ci_95()
        print(f"  95% CI: [{ci_95[0]:.6f}, {ci_95[1]:.6f}]")

        if self.true_value is not None:
            print()
            print("Comparison to True Value:")
            print(f"  True value:     {self.true_value:.6f}")
            print(f"  Bias:           {self.bias():.6f}")
            print(f"  |Bias|/SE:      {abs(self.bias())/self.std_error:.4f}")

        if detailed and self.loglik is not None:
            print()
            print(f"Log-likelihood: {self.loglik:.4f}")

        print("=" * 70)

    def summary(self):
        """Alias for print(detailed=True)"""
        self.print(detailed=True)

    # ========================================================================
    # Data Conversion Methods
    # ========================================================================

    def to_dict(self) -> dict:
        """Convert result to dictionary with computed statistics"""
        from dataclasses import asdict
        result_dict = asdict(self)
        result_dict['z_stat'] = self.z_stat()
        ci_lower, ci_upper = self.ci_95()
        result_dict['ci_95_lower'] = ci_lower
        result_dict['ci_95_upper'] = ci_upper
        if self.true_value is not None:
            result_dict['bias'] = self.bias()
            result_dict['relative_bias'] = self.relative_bias()
        return result_dict

    def to_dataframe(self):
        """Convert single result to one-row DataFrame"""
        return pd.DataFrame([self.to_dict()])

    @staticmethod
    def results_to_dataframe(results: List['MLEResult']):
        """Convert list of results to DataFrame"""
        if not results:
            return pd.DataFrame()
        return pd.DataFrame([r.to_dict() for r in results])

    # ========================================================================
    # Persistence Methods
    # ========================================================================

    def save(self, filepath: str):
        """Save result to JSON file"""
        import json
        with open(filepath, 'w') as f:
            json.dump(self.to_dict(), f, indent=2, default=str)

    @classmethod
    def load(cls, filepath: str) -> 'MLEResult':
        """Load result from JSON file"""
        import json
        with open(filepath, 'r') as f:
            data = json.load(f)
        # Remove computed fields
        for field in ['z_stat', 'ci_95_lower', 'ci_95_upper', 'bias', 'relative_bias']:
            data.pop(field, None)
        return cls(**data)

    # ========================================================================
    # String Representations
    # ========================================================================

    def __repr__(self):
        s = f"MLE Result for {self.param_name} ({self.method}):\n"
        s += f"  Estimate: {self.estimate:.5f} (SE: {self.std_error:.5f})\n"
        s += f"  95% CI: [{self.ci_95()[0]:.5f}, {self.ci_95()[1]:.5f}]\n"
        if self.true_value is not None:
            s += f"  True value: {self.true_value:.5f} (Bias: {self.bias():.5f})\n"
        return s

    def __str__(self):
        s = f"{self.param_name} = {self.estimate:.6f} ({self.std_error:.6f})"
        if self.true_value is not None:
            s += f" [true: {self.true_value:.6f}]"
        return s

# ============================================================================
# SIMPLIFIED MLE ESTIMATOR CLASS (from Tutorial 2)
# ============================================================================

class MLEEstimator:
    """
    General-purpose Maximum Likelihood Estimator with simplified implementation.

    Improvements over original:
    - Cleaner standard error computation with single fallback path
    - Better error handling
    - More consistent interface
    """

    def __init__(self, loglik_func: Optional[Callable] = None, param_name: str = "theta"):
        """
        Initialize MLE estimator.

        Parameters:
        -----------
        loglik_func : callable, optional
            Log-likelihood function with signature: loglik(theta, data, **kwargs)
            Should return NEGATIVE log-likelihood (for minimization)
        param_name : str
            Name of parameter being estimated
        """
        self.loglik_func = loglik_func
        self.param_name = param_name

    def _compute_standard_error(self, theta_hat: float, objective: Callable,
                                 optimizer_result) -> float:
        """
        Compute standard error using numerical Hessian with clean fallback.

        Parameters:
        -----------
        theta_hat : float
            Estimated parameter value
        objective : callable
            Objective function (negative log-likelihood)
        optimizer_result : OptimizeResult
            Result from scipy.optimize

        Returns:
        --------
        se : float
            Standard error (or np.nan if computation fails)
        """
        # Method 1: Numerical Hessian using centered finite differences
        try:
            # Step 1: Adaptive step size
            eps = 1e-5 * max(abs(theta_hat), 1.0)
            # Step 2: Three points for evaluation
            f_plus = objective([theta_hat + eps])
            f_minus = objective([theta_hat - eps])
            f_center = objective([theta_hat])
            # Step 3: Hessian
            hess = (f_plus - 2*f_center + f_minus) / (eps**2)
            # Step 4: Check whether positive definite
            if hess > 1e-10:
                return np.sqrt(1.0 / hess)
        except Exception:
            pass

        # Method 2: Fallback to optimizer's inverse Hessian (BFGS only)
        if hasattr(optimizer_result, 'hess_inv') and optimizer_result.hess_inv is not None:
            try:
                if isinstance(optimizer_result.hess_inv, np.ndarray):
                    var = (optimizer_result.hess_inv[0, 0] if optimizer_result.hess_inv.ndim > 1
                           else optimizer_result.hess_inv[0])
                    if var > 0:
                        return np.sqrt(var)
            except Exception:
                pass

        return np.nan

    def estimate(self, data: np.ndarray, initial_guess: float,
                 bounds: Optional[Tuple[float, float]] = None,
                 true_value: Optional[float] = None,
                 **kwargs) -> MLEResult:
        """
        Estimate parameter via numerical optimization.

        Parameters:
        -----------
        data : array-like
            Observed data
        initial_guess : float
            Starting value for optimization
        bounds : tuple, optional
            (min, max) bounds for parameter
        true_value : float, optional
            True parameter value (for simulation studies)
        **kwargs : dict
            Additional arguments passed to log-likelihood function

        Returns:
        --------
        result : MLEResult
            Estimation results with standard errors
        """
        if self.loglik_func is None:
            raise ValueError("Must provide log-likelihood function")

        # Define objective
        def objective(theta_vec):
            return self.loglik_func(theta_vec[0], data, **kwargs)

        # Optimize
        if bounds is not None:
            res = optimize.minimize(objective, [initial_guess], method='L-BFGS-B',
                             bounds=[bounds])
        else:
            res = optimize.minimize(objective, [initial_guess], method='BFGS')

        theta_hat = res.x[0]
        se = self._compute_standard_error(theta_hat, objective, res)

        return MLEResult(
            param_name=self.param_name,
            estimate=theta_hat,
            std_error=se,
            true_value=true_value,
            sample_size=len(data) if hasattr(data, '__len__') else None,
            loglik=-res.fun,
            convergence=res.success,
            method="Numerical (Hessian)"
        )

    def estimate_closed_form(self, data: np.ndarray,
                            estimator_func: Callable,
                            se_func: Callable,
                            true_value: Optional[float] = None,
                            **kwargs) -> MLEResult:
        """
        Estimate using closed-form formulas.

        Parameters:
        -----------
        data : array-like
            Observed data
        estimator_func : callable
            Function: estimator_func(data, **kwargs) -> float
        se_func : callable
            Function: se_func(estimate, data, **kwargs) -> float
        true_value : float, optional
            True parameter value
        **kwargs : dict
            Additional arguments

        Returns:
        --------
        result : MLEResult
        """
        theta_hat = estimator_func(data, **kwargs)
        se = se_func(theta_hat, data, **kwargs)

        return MLEResult(
            param_name=self.param_name,
            estimate=theta_hat,
            std_error=se,
            true_value=true_value,
            sample_size=len(data) if hasattr(data, '__len__') else None,
            method="Closed-form (Fisher Info)"
        )

# ============================================================================
# TABLE PRINTING UTILITIES
# ============================================================================

def print_mle_table(results: List[MLEResult], sample_sizes: List[int],
                   title: Optional[str] = None):
    """
    Print formatted table of MLE results.

    Parameters:
    -----------
    results : list of MLEResult
        Results for different sample sizes
    sample_sizes : list of int
        Sample sizes used
    title : str, optional
        Table title
    """
    if not results:
        return

    param_name = results[0].param_name
    true_val = results[0].true_value
    method = results[0].method

    # Create DataFrame
    df = pd.DataFrame({
        'N': sample_sizes,
        'Estimate': [r.estimate for r in results],
        'Std Error': [r.std_error for r in results],
        'z-stat': [r.z_stat() for r in results],
    })

    if true_val is not None:
        df['Bias'] = [r.bias() for r in results]
        df['|Bias|/SE'] = np.abs(df['Bias']) / df['Std Error']

    # Print
    print("=" * 85)
    if title:
        print(title)
    else:
        print(f"MLE ESTIMATION RESULTS: {param_name} ({method})")
    print("=" * 85)

    if true_val is not None:
        print(f"True value: {param_name} = {true_val:.4f}")
        print("-" * 85)

    print(df.to_string(index=False, float_format=lambda x: f'{x:.5f}'))
    print("=" * 85)

    if true_val is not None:
        print("\nNotes:")
        print("  - Estimates should converge to true value as N increases")
        print("  - Standard errors should decrease proportional to 1/sqrt(N)")
        print("  - |Bias|/SE should be small (< 0.1) for unbiased estimation")
        print("=" * 85)

## 1. Data preparation

### 1.1 Load and prepare data

We use the Gayle, Golan, and Miller (2015) executive compensation dataset, which contains:

- **Performance measure** ($x$): `net_excess_ret` (net excess return)
- **Compensation** ($w$): `totalComp` (total compensation)
- **Sample**: Executive-year observations from publicly traded US firms

The data is already cleaned and includes various executive and firm characteristics.

In [None]:
pd.read_stata(DATA_PATH)

In [None]:
# ---------------------------------------------------------------------------
# 0. Load data and construct x, w
# ---------------------------------------------------------------------------

try:
    df = pd.read_stata(DATA_PATH)
except FileNotFoundError:
    raise FileNotFoundError(f'Could not find {DATA_PATH}. Update DATA_PATH to the correct location.')

# Drop rows with missing x or w
df = df.dropna(subset=[X_VAR, W_VAR])

# Define outcome and wage
df['x_outcome'] = df[X_VAR]
df['w_obs'] = df[W_VAR]

# Optionally down-sample for speed while testing
# df = df.sample(5000, random_state=0)

x_data = df['x_outcome'].to_numpy()
w_data = df['w_obs'].to_numpy() / WAGE_SCALE  # Scale wages to match model

# Check for cluster ID variable (for cluster bootstrap)
# Common ID columns: execid, exec_id, gvkey, firmid, id
CLUSTER_VAR = None
for col in ['execid', 'exec_id', 'gvkey', 'firmid', 'firm_id', 'id']:
    if col in df.columns:
        CLUSTER_VAR = col
        break

if CLUSTER_VAR is not None:
    cluster_ids = df[CLUSTER_VAR].to_numpy()
    print(f'Cluster variable: {CLUSTER_VAR} ({len(np.unique(cluster_ids))} unique clusters)')
else:
    cluster_ids = None
    print('No cluster variable found - using standard bootstrap')

print(f'Number of observations: {len(df)}')
print(f'Wage scaling factor: {WAGE_SCALE}')
print(f'Scaled wages - mean: {w_data.mean():.4f}, std: {w_data.std():.4f}')
df[[X_VAR, W_VAR, 'x_outcome', 'w_obs']].head()

## 1. Theoretical Framework

### 1.1. Setting

We work with the static principal–agent model from Margiotta & Miller (2000):

- **Agent** chooses unobservable effort $\ell\in\{0,1\}$ (working vs shirking).

- **Output** $X \in \mathbb{R}_+$ is observable and stochastic, distributed as:
$$
X \sim
\begin{cases}
f(x) & \text{if }\ell=1\text{ (working)},\\
f(x)g(x) & \text{if }\ell=0\text{ (shirking)}
\end{cases}
$$
where $f(x)$ is a density on $\mathbb{R}_+$ and $g(x) \geq 0$ is a likelihood ratio satisfying:
$$
\int g(x)f(x)\,dx = 1, \qquad \mathbb{E}[xg(x)] < \mathbb{E}[x]
$$
so expected output is lower when shirking.

- **Agent preferences** (CARA with risk aversion $\gamma > 0$):
$$
U(w,\ell) = -\exp(-\gamma w) \times
\begin{cases}
\alpha & \text{if }\ell=1\text{ (working)},\\
\beta & \text{if }\ell=0\text{ (shirking)},
\end{cases}
$$
where $\alpha > \beta > 0$ captures that shirking has lower disutility than working.

- **Outside option** normalized to utility $-1$ (rejecting the contract).

- **Principal** is risk-neutral and chooses wage schedule $w(x)$ to maximize expected profit:
$$
\max_{w(\cdot)} \mathbb{E}[x - w(x)]
$$
subject to two constraints:

**Individual Rationality (IR)**: Agent prefers working to outside option
$$
\alpha \, \mathbb{E}\big[\exp(-\gamma w(x))\big] \; \leq \; 1
$$

**Incentive Compatibility (IC)**: Working dominates shirking
$$
\alpha \, \mathbb{E}\big[\exp(-\gamma w(x))\big]
\; \leq \;
\beta \, \mathbb{E}\big[\exp(-\gamma w(x)) g(x)\big]
$$

where expectations are taken with respect to $f(x)$ (the working distribution).

**Note on constraint direction**: The negative exponential utility means *lower* values are better. Hence IR requires $\alpha \mathbb{E}[\exp(-\gamma w)] \leq 1$ (working utility $\geq$ outside utility of $-1$), and IC requires working utility $\geq$ shirking utility.


### 1.2. Optimal Contract (Question 1)

---

**YOUR ANSWER HERE**

---

## 2. Identification (Question 2)

From data, we can observe performance $x$ and wages $w(x)$ from various US companies. Our goal is to find a set of parameters $(\alpha, \beta, \gamma, f(x), g(x))$ that supports the observed wage schedule to be an optimal. In this section, we discuss the identification of parameters from observed wages $w(x)$ and the role of knowing $\gamma$.

### 2.1. Non-identification Without Known $\gamma$

**Observables**: From data we observe pairs $(x_i, w_i)$ representing performance and wages.

**Unknowns**: We want to identify $(\gamma, \alpha, \beta, f(x), g(x))$.

**The problem**: The model is *underidentified* in the static cross-section because:


---

**YOUR ANSWER HERE**

---

### 2.2. Identification with Known $\gamma$

**Assumption**: $\gamma$ is known (e.g., from experimental evidence or calibration).

---

**YOUR ANSWER HERE**

---

## 3. Estimation

This section implements the estimation of the moral hazard model using a unified `MoralHazardModel` class. The class supports both parametric and non-parametric approaches for estimating key functions:

- $f(x)$: density of output under working technology

- $g(x)$: likelihood ratio (shirking density / working density)

- $v^*(x)$: optimal transformed contract

- $w^*(x)$: optimal wage schedule

We may impose parametric structure on $f(x)$ and/or $g(x)$, and depending on its specification we estimate the following parameters:
| Config | f(x) Estimation | g(x) Estimation | Parameters Estimated |
|--------|-----------------|-----------------|---------------------|
| **1** | Parametric (Normal) | Parametric (exp) | μ_w, μ_s, σ, γ, α, β, ψ |
| **2** | Non-parametric (KDE) | Non-parametric (KDE ratio) | γ, α, β |

where:
- $\gamma$: risk aversion parameter

- $\alpha, \beta$: disutility parameters ($\alpha < \beta < 0$)

- $\mu_w, \mu_s, \sigma$: parameters of working distribution $N(\mu_w, \sigma^2)$ under parametric $f(x)$ and $N(\mu_s, \sigma^2)$ under shirking $f(x)g(x)$

- $\psi$: truncation point of output distribution under parametric $f(x)$

- $\sigma_\varepsilon$: measurement error in wages

The estimation proceeds through the following steps:

| Stage | Method | Input                   | Output                      |
|-------|--------|-------------------------|-----------------------------|
| **Data Setup** | `set_data(x, w)` | Raw performance & wages | Cleaned arrays              |
| **Step 1** | `step1_estimate_threshold()` | x data                  | Truncation point ψ̂         |
| **Step 2** | `step2_estimate_f()` | x data, x̲              | f(x) density or (μ̂_w, σ̂)  |
| **Step 3** | `step3_estimate_contract()` | x, w, g(x)              | (γ̂, α̂, β̂, μ̂_w, σ̂\_ε) |
| **Step 4** | `step4_correct_se()` | All estimates           | Bootstrap standard errors   |

---
#### Output Methods

| Method | Returns |
|--------|---------|
| `v_star(x)` | Optimal transformed contract |
| `w_star(x)` | Optimal wage schedule |
| `check_constraints()` | IR/IC verification dict |
| `compute_moral_hazard_costs()` | Δ₁, Δ₂, Δ₃ decomposition |
| `counterfactual(scenarios)` | Policy experiment DataFrame |
| `summary_table()` | Parameter estimates DataFrame |
---

### 3.1 Parametric Estimation

Following the assignment, we impose parametric structure on the densities:

**Working distribution**: Truncated normal with lower truncation at $\psi \geq 0$:
$$
X \mid \ell=1 \sim f_w(x) = \text{TN}(\mu_w, \sigma^2; \psi)
$$
where
$$
f_w(x) = \frac{1}{\sigma}\frac{\phi\left(\frac{x-\mu_w}{\sigma}\right)}{1 - \Phi\left(\frac{\psi-\mu_w}{\sigma}\right)}, \quad x \geq \psi
$$

**Shirking distribution**: Truncated normal with *different mean* but same variance and truncation:
$$
X \mid \ell=0 \sim f_s(x) = \text{TN}(\mu_s, \sigma^2; \psi)
$$
where $\mu_s < \mu_w$ (shirking reduces expected output).

**Likelihood ratio**:
$$
g(x) = \frac{f_s(x)}{f_w(x)} = \frac{\phi\left(\frac{x-\mu_s}{\sigma}\right)}{\phi\left(\frac{x-\mu_w}{\sigma}\right)} \times \frac{1 - \Phi\left(\frac{\psi-\mu_w}{\sigma}\right)}{1 - \Phi\left(\frac{\psi-\mu_s}{\sigma}\right)}
$$

**Parameter vector**:
$$
\theta = (\mu_w, \sigma, \mu_s, \gamma, \alpha, \beta, \sigma_w)
$$
where $\sigma_w$ is measurement error in observed wages:
$$
w_i = w^*(x_i; \theta, \eta) + \varepsilon_i, \quad \varepsilon_i \sim N(0, \sigma_w^2)
$$

**Step 1 - Estimate Truncation Point**:
$$
\hat{\psi} = \min_i x_i
$$

**Step 2 - Estimate Working Distribution** $(\mu_w, \sigma)$:

Maximum likelihood for truncated normal:
$$
\max_{\mu_w, \sigma} \sum_{i=1}^N \log f_w(x_i; \mu_w, \sigma, \hat{\psi})
$$

**Step 3 - Estimate Contract Parameters** $(\gamma, \alpha, \beta, \mu_s, \sigma_w)$:

Nonlinear least squares on wage equation:
$$
\min_{\gamma, \alpha, \beta, \mu_s, \sigma_w} \sum_{i=1}^N \left(w_i - w^*(x_i; \theta, \eta(\theta))\right)^2
$$

where $\eta(\theta)$ is solved numerically from the binding IC constraint:
$$
\alpha \mathbb{E}[v^*(x)] = \beta \mathbb{E}[v^*(x)g(x)]
$$

**Step 4 - Standard Errors**:

Two approaches:
1. **Bootstrap**: Resample observations (or clusters) with replacement, re-estimate
2. **Murphy-Topel correction**: Analytical correction for two-step estimation
---

### 3.2 Non-Parametric Alternative

For robustness, we can estimate $f(x)$ and/or $g(x)$ non-parametrically:

**Non-parametric $f(x)$**: Use kernel density estimation on observed $\{x_i\}$
$$
\hat{f}(x) = \frac{1}{Nh}\sum_{i=1}^N K\left(\frac{x - x_i}{h}\right)
$$

**Non-parametric $g(x)$**: Under current parameter guess $\gamma$, compute weights $v_i = \exp(-\gamma w_i)$ and estimate $g(x)$ via weighted KDE

**Trade-offs**:
- **Parametric**: Efficient if specification correct, but subject to misspecification
- **Non-parametric**: Robust to functional form, but requires larger sample and careful bandwidth selection

---

### 3.3 The MoralHazardModel Class

Below is the complete implementation of the model class, which provides:- Flexible estimation methods (parametric/non-parametric)

- Automatic constraint checking

- Cost decomposition

- Counterfactual analysis


In [None]:
"""
Moral Hazard Model - Student Implementation

TODO: Implement the MoralHazardModel class following the lecture notes.

Estimation procedure:
1. Estimate truncation threshold ψ (minimum observed x)
2. Estimate (μ, σ) via LIML on truncated normal
3. Estimate (γ, α, β, μ_s) via NLS with inner loop for η

Key formulas:
- f(x|work): TN(μ, σ²; ψ) - truncated normal
- f(x|shirk): TN(μ_s, σ²; ψ) - truncated normal  
- g(x) = f(x|shirk) / f(x|work) - likelihood ratio
- v*(x) = 1 / [α(1 + η(α/β - g(x)))] - optimal contract
- w*(x) = (1/γ) ln[α(1 + η(α/β - g(x)))] - optimal wage
- IR: α E[v*] = 1 (binding)
- IC: α E[v*] = β E[v*g] (binding)
"""

from typing import Dict, Optional, Tuple
import numpy as np
from scipy import stats, optimize
from scipy.stats import gaussian_kde
import pandas as pd

class MoralHazardModel:
    """
    Sequential Moral Hazard Estimator
    
    Implements the three-step estimation procedure from lecture notes.
    Students should implement all methods marked with TODO.
    """
    
    def __init__(self, config: Optional[Dict] = None):
        """
        Initialize the model with optional configuration.
        
        Parameters:
        -----------
        config : dict, optional
            Configuration dictionary. Keys may include:
            - 'use_nonparam_f': Use KDE for f(x) (default: False)
            - 'use_nonparam_g': Use KDE for g(x) (default: False)
            - 'kde_bandwidth': Bandwidth for KDE (default: 'scott')
        """
        self.config = {
            'use_nonparam_f': False,
            'use_nonparam_g': False,
            'kde_bandwidth': 'scott',
        }
        if config is not None:
            self.config.update(config)
        
        # Data
        self.x = None  # Performance data
        self.w = None  # Wage data
        self.n = None  # Sample size
        
        # Step 1 results
        self.psi = None  # Truncation threshold
        
        # Step 2 results
        self.mu_hat = None    # Mean of f(x)
        self.sigma_hat = None # Std of f(x)
        
        # Step 3 results
        self.gamma_hat = None   # Risk aversion
        self.alpha_hat = None   # Utility parameter (work)
        self.beta_hat = None    # Utility parameter (shirk)
        self.mu_s_hat = None    # Mean under shirking
        self.eta_hat = None     # Lagrange multiplier (IC)
        self.sigma_eps_hat = None  # Residual std
    
    def set_data(self, x: np.ndarray, w: np.ndarray):
        """
        Load performance and wage data.
        
        Parameters:
        -----------
        x : np.ndarray
            Performance/output data (net excess returns)
        w : np.ndarray
            Wage/compensation data (total compensation)
        """
        self.x = np.asarray(x).flatten()
        self.w = np.asarray(w).flatten()
        self.n = len(self.x)
        
        print(f"Data loaded: n = {self.n}")
        print(f"  x: mean={np.mean(self.x):.4f}, std={np.std(self.x):.4f}")
        print(f"  w: mean={np.mean(self.w):.4f}, std={np.std(self.w):.4f}")
    
    # =========================================================================
    # STEP 1: Estimate truncation threshold
    # =========================================================================
    
    def step1_estimate_threshold(self) -> float:
        """
        Step 1: Estimate truncation threshold ψ.
        
        The truncation point is the minimum observed performance.
        
        Returns:
        --------
        psi : float
            Estimated truncation threshold
        """
        # TODO: Implement threshold estimation
        # Hint: Use minimum of observed x
        raise NotImplementedError("TODO: Implement step1_estimate_threshold")
    
    # =========================================================================
    # STEP 2: Estimate f(x) parameters
    # =========================================================================
    
    def step2_estimate_f(self) -> Tuple[float, float]:
        """
        Step 2: Estimate parameters of f(x).
        
        Parametric: Estimate (μ, σ) via LIML on truncated normal
        Non-parametric: Estimate f(x) via KDE
        
        Returns:
        --------
        mu_hat : float
            Estimated mean (None if non-parametric)
        sigma_hat : float
            Estimated std (None if non-parametric)
        """
        if self.config['use_nonparam_f']:
            return self._step2_nonparametric()
        else:
            return self._step2_parametric()
    
    def _step2_parametric(self) -> Tuple[float, float]:
        """
        Estimate (μ, σ) via Limited Information Maximum Likelihood (LIML)
        on truncated normal distribution.
        
        The log-likelihood for truncated normal TN(μ, σ²; ψ) is:
        ℓ(μ,σ) = Σ log φ((x_i - μ)/σ) - n log σ - n log(1 - Φ((ψ-μ)/σ))
        
        where φ is standard normal PDF and Φ is standard normal CDF.
        
        Returns:
        --------
        mu_hat, sigma_hat : floats
        """
        # TODO: Implement truncated normal MLE
        # Hint: Use scipy.optimize.minimize with negative log-likelihood
        raise NotImplementedError("TODO: Implement _step2_parametric")
    
    def _step2_nonparametric(self) -> Tuple[None, None]:
        """
        Estimate f(x) non-parametrically via KDE.
        
        Returns:
        --------
        None, None (KDE stored in self.f_kde)
        """
        # TODO: Implement KDE estimation
        # Hint: Use scipy.stats.gaussian_kde
        raise NotImplementedError("TODO: Implement _step2_nonparametric")
    
    # =========================================================================
    # STEP 3: Estimate contract parameters
    # =========================================================================
    
    def step3_estimate_contract(self,
                                gamma_init: float = 0.5,
                                alpha_init: float = 2.0,
                                beta_init: float = 1.0,
                                mu_s_init: Optional[float] = None) -> Dict:
        """
        Step 3: Estimate contract parameters via NLS.
        
        Estimate (γ, α, β) [and μ_s if parametric g] by minimizing:
        Σ (w_i - w*(x_i))²
        
        where w*(x) = (1/γ) ln[α(1 + η(α/β - g(x)))]
        
        η is solved from the binding IR constraint: α E[v*] = 1
        
        Parameters:
        -----------
        gamma_init : float
            Initial value for risk aversion
        alpha_init : float
            Initial value for α (utility when working)
        beta_init : float
            Initial value for β (utility when shirking)
        mu_s_init : float, optional
            Initial value for μ_s (only for parametric g)
        
        Returns:
        --------
        results : dict
            Dictionary containing estimated parameters
        """
        if self.config['use_nonparam_g']:
            return self._step3_nonparametric_g(gamma_init, alpha_init, beta_init)
        else:
            return self._step3_parametric_g(gamma_init, alpha_init, beta_init, mu_s_init)
    
    def _compute_g_parametric(self, x: np.ndarray, mu_s: float) -> np.ndarray:
        """
        Compute likelihood ratio g(x) = f_s(x)/f_w(x) parametrically.
        
        Both f_s and f_w are truncated normals with same σ but different means.
        
        Parameters:
        -----------
        x : np.ndarray
            Performance values
        mu_s : float
            Mean under shirking (μ_s < μ for effort to increase output)
        
        Returns:
        --------
        g : np.ndarray
            Likelihood ratio values
        """
        # TODO: Implement parametric g(x) computation
        # Hint: g(x) = f_s(x)/f_w(x) where both are truncated normals
        raise NotImplementedError("TODO: Implement _compute_g_parametric")
    
    def _solve_eta(self, alpha: float, beta: float, g_vals: np.ndarray) -> float:
        """
        Solve for η from binding IR constraint: α E[v*] = 1
        
        v*(x) = 1 / [α(1 + η(α/β - g(x)))]
        
        Parameters:
        -----------
        alpha, beta : floats
            Utility parameters
        g_vals : np.ndarray
            Likelihood ratio values
        
        Returns:
        --------
        eta : float
            Lagrange multiplier on IC constraint
        """
        # TODO: Implement eta solver
        # Hint: Find η such that α * mean(v*) = 1
        raise NotImplementedError("TODO: Implement _solve_eta")
    
    def _compute_wage_residuals(self, gamma: float, alpha: float, beta: float,
                                 eta: float, g_vals: np.ndarray) -> np.ndarray:
        """
        Compute wage residuals: w_i - w*(x_i)
        
        w*(x) = (1/γ) ln[α(1 + η(α/β - g(x)))]
        
        Returns:
        --------
        residuals : np.ndarray
        """
        # TODO: Implement wage residual computation
        raise NotImplementedError("TODO: Implement _compute_wage_residuals")
    
    def _step3_parametric_g(self, gamma_init: float, alpha_init: float,
                            beta_init: float, mu_s_init: Optional[float]) -> Dict:
        """
        Estimate (γ, α, β, μ_s) via NLS with parametric g(x).
        
        Minimizes Σ(w_i - w*(x_i))² over (γ, α, β, μ_s)
        with η solved from IR constraint in inner loop.
        
        Returns:
        --------
        results : dict
        """
        # TODO: Implement NLS estimation with parametric g
        raise NotImplementedError("TODO: Implement _step3_parametric_g")
    
    def _step3_nonparametric_g(self, gamma_init: float, alpha_init: float,
                                beta_init: float) -> Dict:
        """
        Estimate (γ, α, β) via NLS with non-parametric g(x).
        
        g(x) is estimated via KDE-based importance sampling.
        
        Returns:
        --------
        results : dict
        """
        # TODO: Implement NLS estimation with non-parametric g
        raise NotImplementedError("TODO: Implement _step3_nonparametric_g")
    
    # =========================================================================
    # Helper methods for g(x) and wage computation
    # =========================================================================
    
    def g(self, x: np.ndarray) -> np.ndarray:
        """
        Evaluate g(x) at given points using estimated parameters.
        """
        # TODO: Implement g(x) evaluation
        raise NotImplementedError("TODO: Implement g")
    
    def optimal_wage(self, x: np.ndarray) -> np.ndarray:
        """
        Compute optimal wage w*(x) at given points.
        """
        # TODO: Implement optimal wage computation
        raise NotImplementedError("TODO: Implement optimal_wage")
    
    # =========================================================================
    # Analysis methods
    # =========================================================================
    
    def compute_cost_decomposition(self) -> Dict:
        """
        Decompose the cost of moral hazard into three components:
        
        Δ₁: Risk premium (cost due to agent's risk aversion)
        Δ₂: Effort disutility compensation  
        Δ₃: Output loss from moral hazard
        
        Returns:
        --------
        costs : dict
            Dictionary with Δ₁, Δ₂, Δ₃, and total cost
        """
        # TODO: Implement cost decomposition
        raise NotImplementedError("TODO: Implement compute_cost_decomposition")
    
    def run_counterfactual(self, gamma_new: Optional[float] = None,
                           alpha_new: Optional[float] = None,
                           beta_new: Optional[float] = None) -> Dict:
        """
        Run counterfactual experiment with modified parameters.
        
        Parameters:
        -----------
        gamma_new : float, optional
            New risk aversion (default: use estimated)
        alpha_new : float, optional
            New α parameter (default: use estimated)
        beta_new : float, optional
            New β parameter (default: use estimated)
        
        Returns:
        --------
        results : dict
            Counterfactual analysis results
        """
        # TODO: Implement counterfactual analysis
        raise NotImplementedError("TODO: Implement run_counterfactual")
    
    def verify_constraints(self) -> Dict:
        """
        Verify that IR and IC constraints are satisfied.
        
        IR: α E[v*] = 1
        IC: α E[v*] = β E[v*g]
        
        Returns:
        --------
        verification : dict
            IR and IC constraint values and errors
        """
        # TODO: Implement constraint verification
        raise NotImplementedError("TODO: Implement verify_constraints")
    
    def summary(self):
        """Print estimation summary."""
        print("\n" + "="*60)
        print("MORAL HAZARD MODEL - ESTIMATION SUMMARY")
        print("="*60)
        
        print("\nStep 1: Truncation threshold")
        print(f"  ψ = {self.psi:.6f}" if self.psi else "  Not estimated")
        
        print("\nStep 2: Distribution parameters")
        if self.mu_hat is not None:
            print(f"  μ = {self.mu_hat:.6f}")
            print(f"  σ = {self.sigma_hat:.6f}")
        else:
            print("  Non-parametric (KDE)")
        
        print("\nStep 3: Contract parameters")
        if self.gamma_hat is not None:
            print(f"  γ (risk aversion) = {self.gamma_hat:.6f}")
            print(f"  α (work utility)  = {self.alpha_hat:.6f}")
            print(f"  β (shirk utility) = {self.beta_hat:.6f}")
            print(f"  η (IC multiplier) = {self.eta_hat:.6f}")
            if self.mu_s_hat is not None:
                print(f"  μ_s (shirk mean)  = {self.mu_s_hat:.6f}")
        else:
            print("  Not estimated")


### 3.5 Test Your Implementation

After implementing the `MoralHazardModel` class, test it with the cells below.

**Tip:** Start by running on simulated data where you know the true parameters, then apply to real data.


In [None]:
# Test your implementation
# Uncomment and run after implementing the class

# model = MoralHazardModel()
# model.set_data(x_data, w_data)

# # Step 1
# psi = model.step1_estimate_threshold()
# print(f"Threshold: {psi:.4f}")

# # Step 2
# mu, sigma = model.step2_estimate_f()
# print(f"mu={mu:.4f}, sigma={sigma:.4f}")

# # Step 3
# results = model.step3_estimate_contract()
# model.summary()


## 4. Results
### 4.1 Parametric Estimation

In [None]:
# TODO: Run your analysis here


### 4.2 Non-Parametric Estimation
We now estimate the model using non-parametric methods for both the working density $f(x)$

In [None]:
# TODO: Run your analysis here


### 4.3 Model Verification (IR and IC Constraints)

Verify that the estimated model satisfies the theoretical constraints from the principal-agent problem:

- **Individual Rationality (IR) Constraint:**$$\alpha E[v(X)] \leq 1$$At the optimum, this constraint binds (holds with equality).
- **Incentive Compatibility (IC) Constraint:**$$\alpha E[v(X)] \leq \beta E[v(X)g(X)]$$At the optimum, this constraint also binds.

We check these constraints using the observed contract $(x, w)$ data and estimated parameters.

---

In [None]:
# TODO: Run your analysis here


### 4.4 Cost of Moral Hazard Decomposition

Decompose the total cost of moral hazard into three components:
- **Δ₁: Risk Premium**: The extra expected wage cost from tying pay to noisy output (risk compensation). $$\Delta_1 = E[w^*(x)] - \frac{1}{\gamma}\ln(\alpha)$$
- **Δ₂: Effort Disutility**: The monetized disutility of inducing effort (compensation for working vs shirking). $$\Delta_2 = \frac{1}{\gamma}\ln\left(\frac{\beta}{\alpha}\right)$$
- **Δ₃: Output Loss**: The expected output loss if the agent were to shirk. $$\Delta_3 = E[x] - E[x \cdot g(x)]$$

**Total Cost:**
$$
\text{Total Cost} = \Delta_1 + \Delta_2 + \Delta_3
$$

This equals the profit loss compared to the first-best (full information) benchmark.

---

In [None]:
# TODO: Run your analysis here


## 5. Counterfactual Experiments

This section conducts policy experiments to understand how changes in the environment affect moral hazard costs.

We analyze how modifications to key parameters impact the three cost components (Δ₁, Δ₂, Δ₃) and total cost.

---

### 5.1 Policy Experiments

We evaluate four counterfactual scenarios to understand how environmental changes affect moral hazard costs:

**1. Increased Risk Aversion** ($\gamma \to 2\gamma$)
   - Agents become twice as risk-averse
   - Expected impact: Higher risk premium (Δ₁↑) as agents demand greater compensation for income volatility

**2. Lower Truncation Point** ($\psi \to \psi/2$)
   - Lower threshold in the truncated normal distribution
   - Expected impact: Changes the support and shape of f(x), affecting both the working and shirking distributions. May alter the likelihood ratio g(x) and increase total cost.

**3. Increased Output Volatility** ($\sigma \to 2\sigma$)
   - Doubled standard deviation introduces more noise into the performance measure
   - Expected impact: Higher risk premium (Δ₁↑) as agents bear greater income uncertainty

**4. Reduced Effort Cost Differential** ($\beta \to 1.5\beta$)
   - Shirking disutility moves closer to working disutility ($\beta \to \alpha$)
   - Expected impact: Lower incentive cost (Δ₂↓) as the utility gap between effort levels narrows, making effort easier to induce

---

In [None]:
# TODO: Run your analysis here
