In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import networkx as nx
import random
from tqdm import tqdm
from scipy.stats import uniform, norm, multivariate_normal

In [None]:
class NetworkSEIR_tuned:
    """Network-based SEIR model parameter estimation class"""
    def __init__(self, observed_data, network_params=None, fixed_alpha=0.2, fixed_gamma=0.1):
        """Initialization with fixed parameters"""
        self.observed_data = observed_data
        self.fixed_alpha = fixed_alpha
        self.fixed_gamma = fixed_gamma
        # Default network parameters
        if network_params is None:
            network_params = {
                'n_nodes': 1000,
                'network_type': 'barabasi_albert',
                'network_params': {'m': 3}
            }
        self.network_params = network_params
        self.hm_results = None
        print(f"Fixed parameters: alpha = {self.fixed_alpha}, gamma = {self.fixed_gamma}")
        print("Calibrating parameters: tau (transmission rate), rho (initial infection fraction)")
    
    def generate_network(self):
        """Generate network based on specified parameters"""
        n_nodes = self.network_params['n_nodes']
        network_type = self.network_params['network_type']
        net_params = self.network_params['network_params']
        # In case we want to use different network types
        if network_type == 'barabasi_albert':
            return nx.barabasi_albert_graph(n_nodes, net_params['m'])
        elif network_type == 'erdos_renyi':
            return nx.erdos_renyi_graph(n_nodes, net_params['p'])
        elif network_type == 'watts_strogatz':
            return nx.watts_strogatz_graph(n_nodes, net_params['k'], net_params['p'])
        elif network_type == 'complete':
            return nx.complete_graph(n_nodes)
        elif network_type == 'regular':
            return nx.random_regular_graph(net_params['d'], n_nodes)
        else:
            raise ValueError(f"Unknown network type: {network_type}")
    
    def SEIR_network(self, G, tau, alpha, gamma, rho, tmax):
        """SEIR network simulation"""
        # Initialize states: S=0 (Susceptible), E=1 (Exposed), I=2 (Infected), R=3 (Recovered)
        for node in G.nodes():
            G.nodes[node]['state'] = 0  # Start all nodes as susceptible
        initial_infected = int(rho * len(G.nodes()))
        initial_infected_nodes = random.sample(list(G.nodes()), initial_infected)
        for node in initial_infected_nodes:
            G.nodes[node]['state'] = 2
        susceptible_count = []
        exposed_count = []
        infected_count = []
        recovered_count = []
        for day in range(tmax + 1):
            new_states = {}
            # Сurrent states
            susceptible = sum(1 for n in G.nodes if G.nodes[n]['state'] == 0)
            exposed = sum(1 for n in G.nodes if G.nodes[n]['state'] == 1)
            infected = sum(1 for n in G.nodes if G.nodes[n]['state'] == 2)
            recovered = sum(1 for n in G.nodes if G.nodes[n]['state'] == 3)
            susceptible_count.append(susceptible)
            exposed_count.append(exposed)
            infected_count.append(infected)
            recovered_count.append(recovered)
            for node in G.nodes():
                if G.nodes[node]['state'] == 2:  # Infected node
                    for neighbor in G.neighbors(node):
                        if G.nodes[neighbor]['state'] == 0:  # Susceptible neighbor
                            if random.random() < tau:
                                new_states[neighbor] = 1  # Expose neighbor
                    if random.random() < gamma:
                        new_states[node] = 3  # Recover
                elif G.nodes[node]['state'] == 1:  # Exposed node
                    if random.random() < alpha:
                        new_states[node] = 2  # Become infected
            # State transitions
            for node, new_state in new_states.items():
                G.nodes[node]['state'] = new_state
        return [index for index in range(tmax + 1)], susceptible_count, exposed_count, infected_count, recovered_count
    
    def simulator_function(self, tau, rho):
        """Run SEIR network simulation with given tau, rho and fixed alpha, gamma"""
        try:
            G = self.generate_network()
            tmax = len(self.observed_data) - 1
            # Use fixed alpha and gamma
            days, S, E, I, R = self.SEIR_network(G, tau, self.fixed_alpha, self.fixed_gamma, rho, tmax)
            sim_results = pd.DataFrame({
                'day': days,
                'S': S,
                'E': E,
                'I': I,
                'R': R
            })
            return sim_results
        except Exception as e:
            print(f"Simulation error: {e}")
            return None
    
    def calculate_distance(self, sim_data):
        """Calculate distance between simulated and observed data"""
        if sim_data is None:
            return np.inf
        try:
            min_len = min(len(self.observed_data), len(sim_data))
            obs = self.observed_data['I'].values[:min_len]  # Focus on infected compartment
            sim = sim_data['I'].values[:min_len]
            # Mean squared error
            distance = np.mean((obs - sim)**2)
            return distance
        except Exception as e:
            print(f"Error calculating distance: {e}")
            return np.inf
    
    def history_matching(self, prior_ranges, n_samples=100, epsilon=1000, adaptive=False, accept_ratio=0.2):
        """History matching to find plausible parameter regions for tau and rho only"""
        print(f"Running history matching with {n_samples} samples...")
        print(f"Calibrating: tau, rho | Fixed: alpha={self.fixed_alpha}, gamma={self.fixed_gamma}")
        # Generate samples from prior ranges (only tau and rho)
        samples = []
        for _ in range(n_samples):
            sample = {}
            for param, (min_val, max_val) in prior_ranges.items():
                if param in ['tau', 'rho']:  # Only sample tau and rho
                    sample[param] = uniform.rvs(loc=min_val, scale=max_val-min_val)
            samples.append(sample)
        # Run simulations and calculate distances
        results = []
        for sample in tqdm(samples):
            sim_data = self.simulator_function(
                sample["tau"], sample["rho"]
            )
            distance = self.calculate_distance(sim_data)
            # Trajectory data
            result_dict = {
                "tau": sample["tau"],
                "rho": sample["rho"],
                "alpha": self.fixed_alpha,
                "gamma": self.fixed_gamma,
                "distance": distance
            }
            # Trajectory to results dictionary
            if sim_data is not None:
                result_dict["trajectory"] = sim_data["I"].copy()
            
            results.append(result_dict)
        results_df = pd.DataFrame(results)
        # Filter results
        if adaptive:  # By acceptance ratio
            n_accept = max(1, int(len(results_df) * accept_ratio))
            accepted = results_df.nsmallest(n_accept, "distance")
        else:  # Fixed threshold
            accepted = results_df[results_df["distance"] < epsilon]
        print(f"Accepted {len(accepted)} parameter sets")
        # Store results for other methods to use
        self.hm_results = accepted
        return accepted
    
    def rejection_abc(self, n_samples=100, epsilon=1e-5, adaptive=True, accept_ratio=0.01):
        """ABC rejection sampling based on history matching results for tau and rho"""
        if self.hm_results is None or len(self.hm_results) < 1:
            raise ValueError("One must run history_matching before rejection_abc and obtain at least one parameter set.") 
        print(f"Running ABC rejection with {n_samples} samples from History Matching...") 
        # Use history matching results to define parameter ranges (only tau and rho)
        param_bounds = {}
        for param in ['tau', 'rho']:
            if param in self.hm_results.columns:
                param_bounds[param] = (self.hm_results[param].min(), self.hm_results[param].max())
        # Sample from history matching parameter space
        samples = []
        for _ in range(n_samples):
            # Randomly select a parameter set from history matching results
            hm_idx = np.random.randint(0, len(self.hm_results))
            hm_sample = self.hm_results.iloc[hm_idx]
            # Add small perturbation to create a new sample
            sample = {}
            for param in param_bounds.keys():
                perturb = uniform.rvs(loc=-0.02, scale=0.04)  # +- 0.02
                param_min, param_max = param_bounds[param]
                sample[param] = np.clip(hm_sample[param] + perturb, param_min, param_max)
            samples.append(sample)
        # Run simulations and calculate distances
        results = []
        for sample in tqdm(samples):
            sim_data = self.simulator_function(
                sample["tau"], sample["rho"]
            )
            distance = self.calculate_distance(sim_data)
            result_dict = {
                "tau": sample["tau"],
                "rho": sample["rho"],
                "alpha": self.fixed_alpha,
                "gamma": self.fixed_gamma,
                "distance": distance
            }
            if sim_data is not None:
                result_dict["trajectory"] = sim_data["I"].copy()  
            results.append(result_dict)  
        results_df = pd.DataFrame(results) 
        if adaptive:  # By acceptance ratio
            n_accept = max(1, int(len(results_df) * accept_ratio))
            accepted = results_df.nsmallest(n_accept, "distance")
        else:  # Fixed threshold
            accepted = results_df[results_df["distance"] < epsilon]
            if len(accepted) == 0:
                print(f"No samples accepted at epsilon = {epsilon}. Taking {accept_ratio} of the best samples.")
                accepted = results_df.nsmallest(max(1, int(n_samples * 0.1)), "distance")
        print(f"Accepted {len(accepted)} parameter sets from {n_samples} samples")
        return accepted
    
    def annealing_abc(self, n_samples=100, initial_epsilon=1e-3, final_epsilon=1e-5, cooling_steps=3, adaptive=True, accept_ratio=0.01):
        """ABC simulated annealing using history matching results for tau and rho"""
        if self.hm_results is None:
            raise ValueError("Must run history_matching before annealing_abc")
        print(f"Running ABC annealing with {cooling_steps} cooling steps...")
        # Epsilon values for each step
        epsilons = np.geomspace(initial_epsilon, final_epsilon, cooling_steps)
        # Parameter bounds from history matching (only tau and rho)
        param_bounds = {}
        for param in ['tau', 'rho']:
            if param in self.hm_results.columns:
                param_bounds[param] = (self.hm_results[param].min(), self.hm_results[param].max())
        # Initial samples from history matching results
        current_samples = []
        for _ in range(n_samples):
            hm_idx = np.random.randint(0, len(self.hm_results))
            hm_sample = self.hm_results.iloc[hm_idx]
            sample = {}
            for param in param_bounds.keys():
                sample[param] = hm_sample[param]
            current_samples.append(sample)
        # Annealing process
        for step, epsilon in enumerate(epsilons):
            print(f"Annealing step {step+1}/{cooling_steps}, epsilon = {epsilon:.6f}")  
            # Evaluate current samples
            results = []
            for sample in tqdm(current_samples):
                sim_data = self.simulator_function(
                    sample["tau"], sample["rho"]
                )
                distance = self.calculate_distance(sim_data)
                result_dict = {
                    "tau": sample["tau"],
                    "rho": sample["rho"],
                    "alpha": self.fixed_alpha,
                    "gamma": self.fixed_gamma,
                    "distance": distance
                }
                if sim_data is not None:
                    result_dict["trajectory"] = sim_data["I"].copy()
                results.append(result_dict)
            # Filter accepted samples
            results_df = pd.DataFrame(results)
            if adaptive:  # By acceptance ratio
                n_accept = max(1, int(len(results_df) * accept_ratio))
                accepted = results_df.nsmallest(n_accept, "distance")
            else:  # Fixed threshold
                accepted = results_df[results_df["distance"] < epsilon]
                if len(accepted) == 0:
                    print(f"No samples accepted at epsilon = {epsilon}. Taking {accept_ratio} of the best samples.")
                    accepted = results_df.nsmallest(max(1, int(n_samples * 0.1)), "distance")
            # Generate new samples for next iteration
            if step < cooling_steps - 1:
                # Calculate means and variances of accepted parameters
                new_samples = []
                for _ in range(n_samples):
                    sample = {}
                    for param in param_bounds.keys():
                        param_mean = accepted[param].mean()
                        param_var = accepted[param].var()
                        
                        if np.isnan(param_var) or param_var <= 1e-6:
                            param_var = 1e-4
                        try:
                            new_value = norm.rvs(loc=param_mean, scale=np.sqrt(param_var))
                        except ValueError:
                            # Fallback
                            perturb_scale = 0.05
                            param_min, param_max = param_bounds[param]
                            new_value = param_mean + np.random.uniform(-perturb_scale, perturb_scale) * (param_max - param_min)
                        # Ensure within bounds
                        param_min, param_max = param_bounds[param]
                        sample[param] = max(param_min, min(new_value, param_max))
                    new_samples.append(sample)
                current_samples = new_samples
        return accepted
    
    def smc_abc(self, n_particles=100, n_populations=3, initial_epsilon=1e-3, final_epsilon=1e-5, adaptive=True, accept_ratio=0.01):
        """ABC Sequential Monte Carlo using history matching results for tau and rho"""
        if self.hm_results is None:
            raise ValueError("One must run history_matching before smc_abc")
        print(f"Running ABC-SMC with {n_populations} populations...")
        # Epsilon sequence
        epsilons = np.geomspace(initial_epsilon, final_epsilon, n_populations)
        # Parameter bounds from history matching (only tau and rho)
        param_bounds = {}
        for param in ['tau', 'rho']:
            if param in self.hm_results.columns:
                param_bounds[param] = (self.hm_results[param].min(), self.hm_results[param].max())
        # First population from history matching results
        particles = []
        for _ in range(n_particles):
            hm_idx = np.random.randint(0, len(self.hm_results))
            hm_sample = self.hm_results.iloc[hm_idx]
            
            particle = {}
            for param in param_bounds.keys():
                particle[param] = hm_sample[param]
            particles.append(particle) 
        # Equal weights for first population
        weights = np.ones(n_particles) / n_particles
        # SMC process
        for t in range(n_populations):
            epsilon = epsilons[t]
            print(f"SMC Population {t+1}/{n_populations}, epsilon = {epsilon:.6f}")
            # Evaluate particles and calculate distances
            distances = []
            trajectories = []
            for particle in tqdm(particles):
                sim_data = self.simulator_function(
                    particle["tau"], particle["rho"]
                )
                distance = self.calculate_distance(sim_data)
                distances.append(distance)
                if sim_data is not None:
                    trajectories.append(sim_data["I"].copy())
                else:
                    trajectories.append(None)
            # Update weights based on epsilon
            new_weights = np.zeros(n_particles)
            for i, distance in enumerate(distances):
                if distance < epsilon:
                    new_weights[i] = weights[i]
            # Normalize weights
            if np.sum(new_weights) > 0:
                new_weights = new_weights / np.sum(new_weights)
            else:
                print(f"No particles accepted at epsilon = {epsilon}. Taking best particles.")
                sorted_indices = np.argsort(distances)
                for i in range(max(1, int(n_particles * 0.1))):
                    new_weights[sorted_indices[i]] = 1.0
                new_weights = new_weights / np.sum(new_weights)
            # Calculate effective sample size
            ESS = 1.0 / np.sum(new_weights**2)
            print(f"Effective sample size: {ESS:.2f}")
            # Resample if needed
            if ESS < n_particles / 2 or t == n_populations - 1:
                indices = np.random.choice(n_particles, size=n_particles, p=new_weights)
                resampled_particles = [particles[i] for i in indices]
                resampled_trajectories = [trajectories[i] for i in indices]
                particles = resampled_particles
                trajectories = resampled_trajectories
                weights = np.ones(n_particles) / n_particles
            else:
                weights = new_weights
            # If not final iteration, perturb particles
            if t < n_populations - 1:
                # Calculate kernel covariance (only for tau and rho)
                param_values = []
                for param in param_bounds.keys():
                    param_values.append([p[param] for p in particles])
                
                params = np.array(param_values).T
                cov = np.cov(params.T) + np.eye(len(param_bounds)) * 1e-6
                # Perturb particles
                new_particles = []
                for i, particle in enumerate(particles):
                    accepted = False
                    attempts = 0
                    while not accepted and attempts < 100:
                        attempts += 1
                        # Multivariate normal perturbation
                        perturbation = multivariate_normal.rvs(mean=np.zeros(len(param_bounds)), cov=cov)
                        new_particle = {}
                        valid = True
                        for j, param in enumerate(param_bounds.keys()):
                            new_value = particle[param] + perturbation[j]
                            param_min, param_max = param_bounds[param]
                            if param_min <= new_value <= param_max:
                                new_particle[param] = new_value
                            else:
                                valid = False
                                break
                        if valid:
                            accepted = True
                            new_particles.append(new_particle)
                    if not accepted:
                        new_particles.append(particle)
                particles = new_particles
        # Return final particles and weights
        final_results = []
        for i, particle in enumerate(particles):
            final_results.append({
                "tau": particle["tau"],
                "rho": particle["rho"],
                "alpha": self.fixed_alpha,
                "gamma": self.fixed_gamma,
                "weight": weights[i],
                "distance": distances[i],
                "trajectory": trajectories[i]
            })
        return pd.DataFrame(final_results)
    
    def plot_results(self, results_df, method_name="ABC", n_trajectories=5):
        """Plot parameter posterior for tau and rho with time series comparison"""
        if len(results_df) == 0:
            fig, ax = plt.subplots(figsize=(10, 6))
            ax.text(0.5, 0.5, f"No accepted parameter sets for {method_name}", 
                   horizontalalignment='center', verticalalignment='center')
            return fig
        # Debug information
        print(f"Plotting {len(results_df)} parameter sets for {method_name}")
        print("Parameter ranges:")
        for param in ['tau', 'rho']:
            if param in results_df.columns:
                print(f"  {param}: {results_df[param].min():.6f} - {results_df[param].max():.6f}")
        print(f"Fixed: alpha = {self.fixed_alpha}, gamma = {self.fixed_gamma}")
        fig = plt.figure(figsize=(15, 10))
        # 1. 2D Parameter posterior plot (tau vs rho)
        ax1 = fig.add_subplot(2, 2, 1)
        # Small jitter to prevent overlapping points
        jitter_amount = 0.001
        tau_jitter = results_df["tau"] + np.random.normal(0, jitter_amount, len(results_df))
        rho_jitter = results_df["rho"] + np.random.normal(0, jitter_amount/10, len(results_df))
        if 'weight' in results_df.columns:
            scatter = ax1.scatter(tau_jitter, rho_jitter,
                                s=results_df["weight"]*200 + 20, # Marker size based on weights
                                c=results_df["distance"], # Color based on distance
                                cmap='viridis_r', alpha=0.7, edgecolors='black', linewidth=0.5)
            cbar = fig.colorbar(scatter, ax=ax1)
            cbar.set_label('Distance')
        else:
            scatter = ax1.scatter(tau_jitter, rho_jitter,
                                c=results_df["distance"], cmap='viridis_r', alpha=0.7, s=80,
                                edgecolors='black', linewidth=0.5)
            cbar = fig.colorbar(scatter, ax=ax1)
            cbar.set_label('Distance')
        ax1.set_xlabel('Tau (transmission rate)', fontsize=12)
        ax1.set_ylabel('Rho (initial infection fraction)', fontsize=12)
        ax1.set_title(f'Parameter Posterior - {method_name}\n({len(results_df)} points)\nFixed: α={self.fixed_alpha}, γ={self.fixed_gamma}', fontsize=12)
        ax1.grid(True, alpha=0.3, linestyle='--', linewidth=0.8)
        # 2. Tau distribution
        ax2 = fig.add_subplot(2, 2, 2)
        bins = max(15, min(30, len(results_df)//2))
        if 'weight' in results_df.columns:
            ax2.hist(results_df["tau"], bins=bins, alpha=0.7, 
                    weights=results_df["weight"], density=True, 
                    edgecolor='black', color='blue', label='Tau')
        else:
            ax2.hist(results_df["tau"], bins=bins, alpha=0.7, 
                    density=True, edgecolor='black', color='blue', label='Tau')
        ax2.set_title(f"Tau distribution\n({len(results_df)} samples)")
        ax2.set_xlabel("Tau (transmission rate)")
        ax2.set_ylabel("Density")
        ax2.grid(True, alpha=0.3, linestyle=':', linewidth=0.8)
        # 3. Rho distribution
        ax3 = fig.add_subplot(2, 2, 3)
        if 'weight' in results_df.columns:
            ax3.hist(results_df["rho"], bins=bins, alpha=0.7, 
                    weights=results_df["weight"], density=True, 
                    edgecolor='black', color='red', label='Rho')
        else:
            ax3.hist(results_df["rho"], bins=bins, alpha=0.7, 
                    density=True, edgecolor='black', color='red', label='Rho')
        ax3.set_title(f"Rho distribution\n({len(results_df)} samples)")
        ax3.set_xlabel("Rho (initial infection fraction)")
        ax3.set_ylabel("Density")
        ax3.grid(True, alpha=0.3, linestyle=':', linewidth=0.8)
        # 4. Time series comparison
        ax4 = fig.add_subplot(2, 2, 4)
        n_plot = min(n_trajectories, len(results_df))
        if n_plot > 0:
            if 'weight' in results_df.columns:
                sorted_indices = results_df['weight'].nlargest(n_plot).index
            else:
                sorted_indices = results_df['distance'].nsmallest(n_plot).index
            for i, idx in enumerate(sorted_indices):
                traj = results_df.loc[idx].get("trajectory")
                if traj is not None:
                    alpha_val = 0.8 if i < 3 else 0.4
                    ax4.plot(traj, alpha=alpha_val, label=f"Sim {i+1}", linewidth=1.5)

        ax4.plot(self.observed_data["I"], color="black", linestyle="--", 
                linewidth=3, label="Observed")
        ax4.set_title("Time series comparison")
        ax4.set_xlabel("Time")
        ax4.set_ylabel("Infected")
        ax4.legend()
        ax4.grid(True, alpha=0.3, linestyle=':', linewidth=0.8)
        plt.tight_layout()
        return fig

def generate_synthetic_seir_data(tau=0.3, alpha=0.2, gamma=0.1, rho=0.01, tmax=99, network_params=None):
    """Generate synthetic SEIR epidemic data with known parameters"""
    if network_params is None:
        network_params = {
            'n_nodes': 1000,
            'network_type': 'barabasi_albert',
            'network_params': {'m': 3}
        }
    # Create dummy instance to generate network
    dummy_seir = NetworkSEIR_tuned(pd.DataFrame({'I': [0]}), network_params, fixed_alpha=alpha, fixed_gamma=gamma)
    G = dummy_seir.generate_network()
    days, S, E, I, R = dummy_seir.SEIR_network(G, tau, alpha, gamma, rho, tmax)
    data = pd.DataFrame({
        'day': days,
        'S': S,
        'E': E,
        'I': I,
        'R': R
    })
    return data

In [3]:
# Define parameter ranges for SEIR model
prior_ranges = {
        "tau": (0.1, 0.9), # Transmission rate
        "rho": (0.001, 0.999) # Initial infection fraction
    }

In [4]:
# Generate synthetic data with known parameters
true_tau = 0.3
true_alpha = 0.2
true_gamma = 0.1
true_rho = 0.01

In [5]:
network_params = {
        'n_nodes': 1000,
        'network_type': 'barabasi_albert',
        'network_params': {'m': 3}
    }

In [None]:
observed_data = generate_synthetic_seir_data(
        tau=true_tau, alpha=true_alpha, gamma=true_gamma, 
        rho=true_rho, network_params=network_params
    )

In [None]:
# Create NetworkSEIR instance
seir_abc = NetworkSEIR_tuned(observed_data, network_params, fixed_alpha=true_alpha, fixed_gamma=true_gamma)

In [None]:
# Run history matching
print("Running history matching...")
hm_results = seir_abc.history_matching(prior_ranges, n_samples=1000, epsilon=5000)

In [None]:
print(len(hm_results))
seir_abc.plot_results(hm_results, "History Matching")

In [None]:
print("Running ABC rejection...")
rejection_results = seir_abc.rejection_abc(n_samples=50, adaptive=True, accept_ratio=0.1)

In [None]:
print(len(rejection_results))
seir_abc.plot_results(rejection_results, "ABC Rejection", len(rejection_results))

In [None]:
print("Running ABC annealing...")
annealing_results = seir_abc.annealing_abc(n_samples=50, cooling_steps=3, adaptive=True)

In [None]:
print(len(annealing_results))
seir_abc.plot_results(annealing_results, "ABC Annealing", len(annealing_results))

In [None]:
print("Running ABC SMC...")
smc_results = seir_abc.smc_abc(n_particles=50, n_populations=3, adaptive=True)

In [None]:
print(len(smc_results))
seir_abc.plot_results(smc_results, "ABC SMC", len(smc_results))