In [4]:
from pyro_mcmc_sampler import PyroMCMCSampler

ModuleNotFoundError: No module named 'pyro'

In [2]:
sampler = MCMCSampler()

In [3]:
sampler.run_tests()

INFO:mcmc_sampler:
🧪 Running Tests on MCMC Sampler with Empirical Samples...

INFO:mcmc_sampler:🔹 Running Test: Normal Distribution Samples
INFO:mcmc_sampler:📊 Sample distribution set with 10000 points.
INFO:mcmc_sampler:   📊 Sample Distribution Set with 10000 points
INFO:mcmc_sampler:   🔧 Force Non-Negative: False
INFO:mcmc_sampler:   ✅ Completed in 4.416 sec
INFO:mcmc_sampler:   🔍 MAP Estimate: 0.056927
INFO:mcmc_sampler:   🔍 Max Value: 3.984919
INFO:mcmc_sampler:   🔍 HDI 50%: [0.000000, 0.549489]
INFO:mcmc_sampler:   🔍 HDI 95%: [0.000000, 1.990308]
INFO:mcmc_sampler:   🔍 HDI 99%: [0.000000, 2.837897]
INFO:mcmc_sampler:🔹 Running Test: Poisson Count Data
INFO:mcmc_sampler:📊 Sample distribution set with 10000 points.
INFO:mcmc_sampler:   📊 Sample Distribution Set with 10000 points
INFO:mcmc_sampler:   🔧 Force Non-Negative: True
INFO:mcmc_sampler:   ✅ Completed in 3.540 sec
INFO:mcmc_sampler:   🔍 MAP Estimate: 3.939578
INFO:mcmc_sampler:   🔍 Max Value: 12.291484
INFO:mcmc_sampler:   🔍 H

In [None]:
import numpy as np
import logging
from typing import Callable, List, Dict, Tuple, Any
import math
import time

class MCMCSampler:
    def __init__(self, log_posterior: Callable[[float], float], proposal_std: float = 0.5, 
                 force_non_negative: bool = True, random_seed: int = None):
        """
        MCMC Sampler with Metropolis-Hastings algorithm.

        Parameters:
        - log_posterior: Callable function that computes the log posterior probability.
        - proposal_std: Standard deviation for the proposal distribution.
        - force_non_negative: If True, ensures all samples are >= 0 (useful for count data).
        - random_seed: Optional seed for reproducibility.
        """

        self.logger = logging.getLogger(__name__)  # Class-specific logger
        logging.basicConfig(level=logging.INFO)  # Configure logging format
        #self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu")
        #self.logger.info(f"Using device: {self.device}")

        self.log_posterior = log_posterior
        self.proposal_std = proposal_std
        self.force_non_negative = force_non_negative
        if random_seed is not None:
            np.random.seed(random_seed)
    
    def metropolis_hastings(self, initial_value: float, num_samples: int = 10000, 
                            burn_in: int = 1000, thin: int = 1) -> np.ndarray:
        """
        Runs the Metropolis-Hastings MCMC algorithm.

        Parameters:
        - initial_value: Starting point for the chain.
        - num_samples: Number of samples to collect (after burn-in).
        - burn_in: Number of initial samples to discard.
        - thin: Keeps every `thin`-th sample to reduce autocorrelation.

        Returns:
        - Array of sampled values.
        """
        samples = np.empty(num_samples + burn_in)
        samples[0] = max(0, initial_value) if self.force_non_negative else initial_value
        current = samples[0]
        current_log_post = self.log_posterior(current)

        for i in range(1, num_samples + burn_in):
            proposal = np.random.normal(current, self.proposal_std)
            if self.force_non_negative:
                proposal = max(0, proposal)  # Ensures only non-negative proposals if required
            
            proposal_log_post = self.log_posterior(proposal)
            log_acceptance_ratio = proposal_log_post - current_log_post

            if np.log(np.random.rand()) < log_acceptance_ratio:
                current = proposal
                current_log_post = proposal_log_post

            samples[i] = current

        return np.maximum(samples[burn_in:][::thin], 0) if self.force_non_negative else samples[burn_in:][::thin]

    @staticmethod
    def compute_hdi(samples: np.ndarray, credible_interval: float = 0.95) -> Tuple[float, float]:
        """
        Computes the Highest Density Interval (HDI).

        Parameters:
        - samples: The MCMC samples.
        - credible_interval: The probability mass contained in the HDI.

        Returns:
        - Lower and upper bounds of the HDI.
        """
        samples = np.sort(samples)
        n = len(samples)
        ci_index = int(np.floor(credible_interval * n))
        intervals = np.array([samples[i + ci_index] - samples[i] for i in range(n - ci_index)])
        min_index = np.argmin(intervals)
        return float(samples[min_index]), float(samples[min_index + ci_index])

    @staticmethod
    def compute_map(samples: np.ndarray) -> float:
        """
        Computes the Maximum A Posteriori (MAP) estimate.

        Parameters:
        - samples: The MCMC samples.

        Returns:
        - MAP estimate (mode of the posterior).
        """
        hist, bin_edges = np.histogram(samples, bins="auto", density=True)  # Adaptive bins
        max_bin_index = np.argmax(hist)
        return (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2  

    def compute_summary(self, samples: np.ndarray) -> Dict[str, Any]:
        """
        Computes MAP, Max value, and HDI intervals in a structured format.

        Parameters:
        - samples: The MCMC samples.

        Returns:
        - Dictionary containing structured results for DataFrame conversion.
        """
        map_estimate = self.compute_map(samples)
        max_value = float(np.max(samples))

        hdi_50 = self.compute_hdi(samples, 0.50)
        hdi_95 = self.compute_hdi(samples, 0.95)
        hdi_99 = self.compute_hdi(samples, 0.99)

        return {
            "map": map_estimate,
            "max": max_value,
            "hdi50lb": hdi_50[0], "hdi50ub": hdi_50[1],
            "hdi95lb": hdi_95[0], "hdi95ub": hdi_95[1],
            "hdi99lb": hdi_99[0], "hdi99ub": hdi_99[1]
        }
    
    def run_tests(self):
        """
        Runs a battery of tests to validate correctness, efficiency, and robustness of MCMC sampling.
        """
        self.logger.info("\n🧪 Running Tests on MCMC Sampler...\n")

        test_cases = [
            {"name": "Basic Normal Distribution", "log_posterior": lambda x: -0.5 * x**2, "initial_value": 0.0, "force_non_negative": False},
            {"name": "Poisson Count Data", "log_posterior": lambda x: -np.inf if x < 0 else x * np.log(5) - 5 - math.lgamma(x + 1), "initial_value": 5, "force_non_negative": True},
            {"name": "Extreme Peak Distribution", "log_posterior": lambda x: -1e6 * (x - 100) ** 2, "initial_value": 100, "force_non_negative": False},
            {"name": "Wide Proposal Step", "log_posterior": lambda x: -0.5 * x**2, "initial_value": 0.0, "force_non_negative": False},
            {"name": "Small Proposal Step", "log_posterior": lambda x: -0.5 * x**2, "initial_value": 0.0, "force_non_negative": False},
            {"name": "Floating-Point Precision Test", "log_posterior": lambda x: -0.5 * (x / 1e-5) ** 2, "initial_value": 1e-5, "force_non_negative": False},
            {"name": "Extreme Negative Log Posterior", "log_posterior": lambda x: -1e8 * x**2, "initial_value": 10, "force_non_negative": False},
        ]

        for test in test_cases:
            self.logger.info(f"🔹 Running Test: {test['name']}")

            sampler = MCMCSampler(
                log_posterior=test["log_posterior"], 
                proposal_std=1.0 if "Wide" not in test["name"] else 5.0,
                force_non_negative=test["force_non_negative"]
            )

            # Run sampling
            start_time = time.time()
            samples = sampler.metropolis_hastings(initial_value=test["initial_value"], num_samples=5000)
            end_time = time.time()

            summary = sampler.compute_summary(samples)

            self.logger.info(f"   ✅ Completed in {end_time - start_time:.3f} sec")
            self.logger.info(f"   🔍 MAP Estimate: {summary['map']:.6f}")
            self.logger.info(f"   🔍 Max Value: {summary['max']:.6f}")
            self.logger.info(f"   🔍 HDI 50%: [{summary['hdi50lb']:.6f}, {summary['hdi50ub']:.6f}]")
            self.logger.info(f"   🔍 HDI 95%: [{summary['hdi95lb']:.6f}, {summary['hdi95ub']:.6f}]")
            self.logger.info(f"   🔍 HDI 99%: [{summary['hdi99lb']:.6f}, {summary['hdi99ub']:.6f}]")

            # **Validation Checks**
            assert summary["map"] is not None, "❌ MAP estimate is missing!"
            assert summary["max"] >= min(samples), "❌ Max value incorrect!"
            if test["force_non_negative"]:
                assert np.all(samples >= 0), "❌ Negative values found in non-negative sampling!"

        self.logger.info("\n✅ All MCMC Sampler Tests Passed Successfully!\n")


In [None]:
# ✅ **Define an Example Log-Posterior Function**
def normal_log_posterior(x):
    """Example: Standard normal log posterior."""
    return -0.5 * x**2

In [None]:
if __name__ == "__main__":

    sampler = MCMCSampler(log_posterior=normal_log_posterior, proposal_std=1.0, force_non_negative=False)
    
    # ✅ **Run Automated Tests**
    sampler.run_tests()

In [None]:
# Example Usage:
if __name__ == "__main__":
    import pandas as pd  # Required for DataFrame integration

    sampler = MCMCSampler(log_posterior, proposal_std=1.0, force_non_negative=True, random_seed=42)

    # Simulating multiple distributions
    results = []
    for initial_value in [3, 5, 10]:  # Different starting points
        samples = sampler.metropolis_hastings(initial_value=initial_value, num_samples=10000)
        summary = sampler.compute_summary(samples)
        results.append(summary)

    # Convert results to DataFrame
    df_results = pd.DataFrame(results)
    print(df_results)


In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy.stats as stats

In [None]:
import numpy as np
import logging
import math
from typing import Callable, List, Dict, Tuple

class MCMCSampler:
    def __init__(self, log_posterior: Callable[[float], float], proposal_std: float = 0.5, random_seed: int = None):
        self.log_posterior = log_posterior
        self.proposal_std = proposal_std
        if random_seed is not None:
            np.random.seed(random_seed)
    
    def metropolis_hastings(self, initial_value: float, num_samples: int = 10000, burn_in: int = 1000, thin: int = 1) -> np.ndarray:
        samples = np.empty(num_samples + burn_in)
        samples[0] = max(0, initial_value)  # Ensure initial sample is non-negative
        current = samples[0]
        current_log_post = self.log_posterior(current)

        for i in range(1, num_samples + burn_in):
            # Ensure non-negative proposals
            proposal = max(0, np.random.normal(current, self.proposal_std))  
            proposal_log_post = self.log_posterior(proposal)
            log_acceptance_ratio = proposal_log_post - current_log_post

            if np.log(np.random.rand()) < log_acceptance_ratio:
                current = proposal
                current_log_post = proposal_log_post

            samples[i] = current

        return np.maximum(samples[burn_in:][::thin], 0)  # Ensure final outputs are non-negative

    @staticmethod
    def compute_hdi(samples: np.ndarray, credible_interval: float = 0.95) -> Tuple[int, int]:
        samples = np.sort(samples)
        n = len(samples)
        ci_index = int(np.floor(credible_interval * n))
        intervals = np.array([samples[i + ci_index] - samples[i] for i in range(n - ci_index)])
        min_index = np.argmin(intervals)
        return int(round(samples[min_index])), int(round(samples[min_index + ci_index]))  # Ensure integer output

    @staticmethod
    def compute_map(samples: np.ndarray) -> int:
        hist, bin_edges = np.histogram(samples, bins=np.arange(0, np.max(samples) + 1), density=True)  # Integer bins
        max_bin_index = np.argmax(hist)
        return int(bin_edges[max_bin_index])  # Return integer MAP

    def compute_summary(self, samples: np.ndarray, credible_intervals: List[float] = [0.5, 0.75, 0.95]) -> Dict[str, any]:
        map_estimate = self.compute_map(samples)
        hdi_intervals = {ci: self.compute_hdi(samples, ci) for ci in credible_intervals}
        max_value = int(np.max(samples))

        return {
            "MAP": map_estimate,
            "HDIs": hdi_intervals,
            "Max Value": max_value
        }

# Example Usage:
if __name__ == "__main__":
    def log_posterior(x):
        """Example: Poisson log-posterior (approx) for count data"""
        lambda_param = 5  # Mean count rate
        if x < 0:
            return -np.inf  # Log-prob of negative counts is undefined
        return x * np.log(lambda_param) - lambda_param - math.lgamma(x + 1)

    sampler = MCMCSampler(log_posterior, proposal_std=1.0, random_seed=42)
    samples = sampler.metropolis_hastings(initial_value=5, num_samples=10000)
    summary = sampler.compute_summary(samples)
    print(summary)


In [None]:
class MCMCSampler:
    def __init__(self, log_posterior, proposal_std=0.5):
        self.log_posterior = log_posterior
        self.proposal_std = proposal_std

    def metropolis_hastings(self, initial_value, num_samples=10000, burn_in=1000, thin=1):
        samples = np.empty(num_samples + burn_in)
        samples[0] = initial_value
        current = initial_value
        current_log_post = self.log_posterior(current)

        for i in range(1, num_samples + burn_in):
            proposal = np.abs(np.random.normal(current, self.proposal_std))  # Keep values positive
            proposal_log_post = self.log_posterior(proposal)
            log_acceptance_ratio = proposal_log_post - current_log_post

            if np.log(np.random.rand()) < log_acceptance_ratio:
                current = proposal
                current_log_post = proposal_log_post

            samples[i] = current

        return samples[burn_in:][::thin]  # Discard burn-in and thin chain

    @staticmethod
    def compute_hdi(samples, credible_interval=0.95):
        samples = np.sort(np.asarray(samples))
        ci_index = max(1, int(np.floor(credible_interval * len(samples))))

        intervals_min = samples[:len(samples) - ci_index]
        intervals_max = samples[ci_index:]
        interval_widths = intervals_max - intervals_min

        min_index = np.argmin(interval_widths)
        return intervals_min[min_index], intervals_max[min_index]

    @staticmethod
    def compute_map(samples):
        """Compute the Maximum A Posteriori (MAP) estimate (highest density point)."""
        hist, bin_edges = np.histogram(samples, bins=50, density=True)  # Histogram density
        max_bin_index = np.argmax(hist)  # Find bin with max density
        map_estimate = (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2  # Midpoint of bin
        return map_estimate

    def compute_summary(self, samples, credible_intervals=[0.5, 0.75, 0.95]):
        """Compute MAP, HDIs, and max value."""
        map_estimate = self.compute_map(samples)
        hdi_intervals = {ci: self.compute_hdi(samples, ci) for ci in credible_intervals}
        max_value = np.max(samples)

        return {
            "MAP": map_estimate,
            "HDIs": hdi_intervals,
            "Max Value": max_value
        }


In [None]:
import numpy as np
import logging
from typing import Callable, List, Dict, Tuple

class MCMCSampler:
    def __init__(self, log_posterior: Callable[[float], float], proposal_std: float = 0.5, random_seed: int = None):
        self.log_posterior = log_posterior
        self.proposal_std = proposal_std
        if random_seed is not None:
            np.random.seed(random_seed)
    
    def metropolis_hastings(self, initial_value: float, num_samples: int = 10000, burn_in: int = 1000, thin: int = 1) -> np.ndarray:
        samples = np.empty(num_samples + burn_in)
        samples[0] = initial_value
        current = initial_value
        current_log_post = self.log_posterior(current)

        for i in range(1, num_samples + burn_in):
            proposal = np.random.normal(current, self.proposal_std)  # Removed np.abs() to avoid bias
            proposal_log_post = self.log_posterior(proposal)
            log_acceptance_ratio = proposal_log_post - current_log_post

            if np.log(np.random.rand()) < log_acceptance_ratio:
                current = proposal
                current_log_post = proposal_log_post

            samples[i] = current

        return samples[burn_in:][::thin]  

    @staticmethod
    def compute_hdi(samples: np.ndarray, credible_interval: float = 0.95) -> Tuple[float, float]:
        samples = np.sort(samples)
        n = len(samples)
        ci_index = int(np.floor(credible_interval * n))
        intervals = np.array([samples[i + ci_index] - samples[i] for i in range(n - ci_index)])
        min_index = np.argmin(intervals)
        return samples[min_index], samples[min_index + ci_index]

    @staticmethod
    def compute_map(samples: np.ndarray) -> float:
        hist, bin_edges = np.histogram(samples, bins="auto", density=True)  # Adaptive bins
        max_bin_index = np.argmax(hist)
        return (bin_edges[max_bin_index] + bin_edges[max_bin_index + 1]) / 2  

    def compute_summary(self, samples: np.ndarray, credible_intervals: List[float] = [0.5, 0.75, 0.95]) -> Dict[str, any]:
        map_estimate = self.compute_map(samples)
        hdi_intervals = {ci: self.compute_hdi(samples, ci) for ci in credible_intervals}
        max_value = np.max(samples)

        return {
            "MAP": map_estimate,
            "HDIs": hdi_intervals,
            "Max Value": max_value
        }

# Example Usage:
if __name__ == "__main__":
    def log_posterior(x):
        return -0.5 * (x ** 2)  # Example: standard normal log-posterior

    sampler = MCMCSampler(log_posterior, proposal_std=1.0, random_seed=42)
    samples = sampler.metropolis_hastings(initial_value=0.0, num_samples=10000)
    summary = sampler.compute_summary(samples)
    print(summary)


In [None]:
# Define Zero-Inflated Right-Skewed Log-Posterior
def log_posterior(theta):
    p = 0.3  # Probability of zero inflation
    alpha, beta = 2, 1  # Gamma distribution shape and scale

    data = np.array([0, 0, 0, 1.2, 0.5, 3.0, 7.5, 2.3, 8.1, 0])  # Example data with zeros
    log_likelihood = np.sum(
        np.log(p * (data == 0) + (1 - p) * stats.gamma.pdf(data, a=alpha, scale=1/beta) * (data > 0))
    )

    log_prior = stats.gamma.logpdf(theta, a=alpha, scale=1/beta)  # Prior for theta
    return log_likelihood + log_prior  # Posterior ∝ Likelihood * Prior

# Run MCMC Sampling
sampler = MCMCSampler(log_posterior, proposal_std=0.5)
samples = sampler.metropolis_hastings(initial_value=1.0, num_samples=20000)

# Compute Summary Stats
summary = sampler.compute_summary(samples, credible_intervals=[0.5, 0.75, 0.95])

# Extract values
map_estimate = summary["MAP"]
hdi_intervals = summary["HDIs"]
max_value = summary["Max Value"]


In [None]:

def plot_posterior(samples, hdi_intervals, map_estimate, max_value, color_scheme="gray"):
    """
    Plots a posterior distribution with HDIs, MAP estimate, and max sampled value
    using a clean, scientific, and Tufte-inspired aesthetic.

    Args:
        samples (array-like): Posterior samples.
        hdi_intervals (dict): Dictionary of HDI intervals {credible_level: (low, high)}.
        map_estimate (float): Maximum a posteriori (MAP) estimate.
        max_value (float): Maximum sampled value.
        color_scheme (str): "gray" for shades of gray, "blue" for shades of blue.
    """
    # Set up the figure
    fig, ax = plt.subplots(figsize=(10, 5))

    # Choose colors based on the scheme
    if color_scheme == "gray":
        hdi_colors = ["#D9D9D9", "#A6A6A6", "#737373"]  # Light, Medium, Dark Gray
    elif color_scheme == "blue":
        hdi_colors = ["#C6DBEF", "#6BAED6", "#2171B5"]  # Light, Medium, Dark Blue
    else:
        raise ValueError("Invalid color_scheme. Use 'gray' or 'blue'.")

    # Plot smooth density (KDE) instead of histogram
    sns.kdeplot(samples, color="black", linewidth=1.5, fill=True, alpha=0.2, label="Posterior Density")

    # HDI shading for multiple credible intervals (largest first)
    for idx, (ci, (hdi_low, hdi_high)) in enumerate(sorted(hdi_intervals.items(), reverse=True)):
        label = f"{int(ci*100)}% HDI"
        color = hdi_colors[idx % len(hdi_colors)]  # Cycle through the three colors
        ax.axvspan(hdi_low, hdi_high, color=color, alpha=0.5, label=label)

    # MAP Estimate (Highest Density Point)
    ax.axvline(map_estimate, color="black", linestyle="-", linewidth=1.5, label="MAP Estimate")
    ax.text(map_estimate, plt.ylim()[1] * 0.05, "MAP", ha="center", fontsize=11, color="black")

    # Max Value (Extreme Sample)
    ax.axvline(max_value, color="red", linestyle=":", linewidth=1.2, label="Max Sampled Value")
    ax.text(max_value, plt.ylim()[1] * 0.07, "Max", ha="left", fontsize=11, color="red")

    # Minimalist aesthetics
    sns.despine()  # Remove spines for cleaner look
    ax.set_yticks([])  # Remove y-axis ticks
    ax.set_xlabel(r"Predicted Fatalities (ln)", fontsize=14)  # LaTeX-style label
    ax.set_ylabel("")  # No y-label for cleaner aesthetics

    # Add legend with all HDIs, MAP, and Max
    ax.legend(loc="upper right", frameon=False, fontsize=11)

    # Show the plot
    plt.show()

# Example Usage:
plot_posterior(samples, hdi_intervals, map_estimate, max_value, color_scheme="blue")
