# Initialize

In [19]:
import numpy as np
import random
from scipy.integrate import odeint
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from io import StringIO
class SEIRModel:
    def __init__(self, population, beta, sigma, gamma):
        self.population = population
        self.beta = beta
        self.sigma = sigma
        self.gamma = gamma
        self.s = 100 
        self.e = 10
        self.i = 0
        self.r = 0

    def step(self):
        new_infections = self.beta * self.s * self.i / self.population
        new_exposed = new_infections
        new_recovered = self.gamma * self.i
        self.s -= new_infections
        self.e += new_exposed
        self.i += new_infections - new_recovered
        self.r += new_recovered

    def get_infected(self):
        return int(self.i)

    def __call__(self, y, t, N, beta, sigma, gamma):
        S, E, I, R = y
        dSdt = -beta * S * I / N
        dEdt = beta * S * I / N - sigma * E
        dIdt = sigma * E - gamma * I
        dRdt = gamma * I
        return dSdt, dEdt, dIdt, dRdt
    
def load_df(data_name: str) -> pd.DataFrame:
    path = f"nb_datasets/{data_name}"

    df:pd.DataFrame = pd.read_csv(path)

    # ensure sort by date
    df['date'] = pd.to_datetime(df['date'], dayfirst=True, format='%d/%m/%Y')
    df_sorted: pd.DataFrame = df.sort_values(by='date')

    df_sorted = df.sort_values("barangay_Res")
    keep_cols = ["date", "barangay_Res", "count_Exposed", "count_Infectious", "count_Recovered"]
    main_df: pd.DataFrame = df_sorted[df_sorted.columns.intersection(keep_cols)]
    
    return main_df


In [20]:

# Input data as a string
data = """
Barangay	Population percentage (2020)	Population (2020)	Population (2015)	Change (2015‑2020)	Annual Population Growth Rate (2015‑2020)
Bagong Silang	1.76%	5,736	5,539	3.56%	0.74%
Calendola	1.16%	3,797	4,728	-19.69%	-4.51%
Chrysanthemum	3.81%	12,433	10,482	18.61%	3.66%
Cuyab	6.57%	21,422	21,731	-1.42%	-0.30%
Estrella	2.46%	8,025	7,478	7.31%	1.50%
Fatima	1.99%	6,491	8,704	-25.43%	-5.99%
G.S.I.S.	0.87%	2,828	2,867	-1.36%	-0.29%
Landayan	10.19%	33,235	31,407	5.82%	1.20%
Langgam	9.49%	30,946	29,625	4.46%	0.92%
Laram	2.00%	6,536	6,285	3.99%	0.83%
Magsaysay	3.92%	12,793	12,172	5.10%	1.05%
Maharlika	1.71%	5,580	6,343	-12.03%	-2.66%
Narra	0.70%	2,297	2,429	-5.43%	-1.17%
Nueva	1.31%	4,286	4,967	-13.71%	-3.06%
Pacita 1	6.93%	22,581	20,362	10.90%	2.20%
Pacita 2	3.68%	11,993	12,811	-6.39%	-1.38%
Poblacion	1.77%	5,771	5,890	-2.02%	-0.43%
Riverside	0.93%	3,028	3,226	-6.14%	-1.32%
Rosario	1.81%	5,911	5,613	5.31%	1.09%
Sampaguita Village	1.52%	4,941	5,733	-13.81%	-3.08%
San Antonio	18.21%	59,368	61,428	-3.35%	-0.72%
San Lorenzo Ruiz	1.78%	5,800	5,073	14.33%	2.86%
San Roque	2.20%	7,161	7,011	2.14%	0.45%
San Vicente	8.45%	27,561	26,129	5.48%	1.13%
Santo Nino	1.19%	3,892	4,172	-6.71%	-1.45%
United Bayanihan	1.65%	5,385	7,086	-24.01%	-5.61%
United Better Living	1.90%	6,204	6,518	-4.82%	-1.03%
"""

## Check if names are correct
# Create a DataFrame
df = pd.read_csv(StringIO(data), sep="\t")
df["Barangay"] = df["Barangay"].str.upper()

# Clean up the 'Population (2020)' column to remove commas and convert to integers
df["Population (2020)"] = df["Population (2020)"].str.replace(",", "").astype(int)

# Create a dictionary with Barangay as keys and Population (2020) as values
barangay_population_dict = df.set_index("Barangay")["Population (2020)"].to_dict()

# Print the dictionary
print(barangay_population_dict)


df = load_df("CITY_OF_SAN_PEDRO_processed.csv")

name_list = df["barangay_Res"].unique().tolist()

print(f"br_list = {len(barangay_population_dict)} \nbr_df = {len(name_list)}")

for b_name in barangay_population_dict:
    if b_name not in name_list:
        print(f"Wrong: {b_name}")
        

for brgy_name, brgy_pop in barangay_population_dict.items():
  print(f"{brgy_name}: {brgy_pop}")        

{'BAGONG SILANG': 5736, 'CALENDOLA': 3797, 'CHRYSANTHEMUM': 12433, 'CUYAB': 21422, 'ESTRELLA': 8025, 'FATIMA': 6491, 'G.S.I.S.': 2828, 'LANDAYAN': 33235, 'LANGGAM': 30946, 'LARAM': 6536, 'MAGSAYSAY': 12793, 'MAHARLIKA': 5580, 'NARRA': 2297, 'NUEVA': 4286, 'PACITA 1': 22581, 'PACITA 2': 11993, 'POBLACION': 5771, 'RIVERSIDE': 3028, 'ROSARIO': 5911, 'SAMPAGUITA VILLAGE': 4941, 'SAN ANTONIO': 59368, 'SAN LORENZO RUIZ': 5800, 'SAN ROQUE': 7161, 'SAN VICENTE': 27561, 'SANTO NINO': 3892, 'UNITED BAYANIHAN': 5385, 'UNITED BETTER LIVING': 6204}
br_list = 27 
br_df = 27
BAGONG SILANG: 5736
CALENDOLA: 3797
CHRYSANTHEMUM: 12433
CUYAB: 21422
ESTRELLA: 8025
FATIMA: 6491
G.S.I.S.: 2828
LANDAYAN: 33235
LANGGAM: 30946
LARAM: 6536
MAGSAYSAY: 12793
MAHARLIKA: 5580
NARRA: 2297
NUEVA: 4286
PACITA 1: 22581
PACITA 2: 11993
POBLACION: 5771
RIVERSIDE: 3028
ROSARIO: 5911
SAMPAGUITA VILLAGE: 4941
SAN ANTONIO: 59368
SAN LORENZO RUIZ: 5800
SAN ROQUE: 7161
SAN VICENTE: 27561
SANTO NINO: 3892
UNITED BAYANIHAN: 5385


# Sim Anneal Code

In [None]:
# Simulate SEIR model
def simulate_seir(beta, sigma, gamma, y0, t, n):
    N = n
    seir_model = SEIRModel(N, beta, sigma, gamma)
    
    # Solve ODEs for SEIR
    result = odeint(seir_model, y0, t, args=(N, beta, sigma, gamma))
    S, E, I, R = result.T
    
    return S, E, I, R
      

# Objective function
def objective_function(params, observed_data, initial_conditions, time_points, n):
    beta, sigma, gamma = params

    # Simulate the SEIR model with given parameters
    s, predicted_E, predicted_I, predicted_R = simulate_seir(beta, sigma, gamma, initial_conditions, time_points, n)
    
    # Split observed data into compartments
    observed_E, observed_I, observed_R = observed_data[:, 0], observed_data[:, 1], observed_data[:, 2]
    
    # Calculate metrics for each compartment
    metrics = {}
    r2_dict= {}
    mae_dict = {}
    rmse_dict = {}
    
    for compartment, observed, predicted in zip(
        ["E", "I", "R"],
        [observed_E, observed_I],
        [predicted_E, predicted_I]
        ):
        # [observed_E, observed_I, observed_R],
        # [predicted_E, predicted_I, predicted_R]
        # ):
    
        r2 = r2_score(observed, predicted)
        mae = mean_absolute_error(observed, predicted)
        rmse = np.sqrt(mean_squared_error(observed, predicted))

        # Normalize metrics
        r2_normalized = 1 - r2  # Invert R^2 for minimization
        mae_normalized = mae / np.max(observed)  # Scale by max value
        rmse_normalized = rmse / np.max(observed)  # Scale by max value

        # Weighted average for each compartment
        combined_metric = (0.2 * r2_normalized) + (0.4 * mae_normalized) + (0.4 * rmse_normalized)
        metrics[compartment] = combined_metric
        r2_dict[compartment] = r2
        mae_dict[compartment] = mae
        rmse_dict[compartment] = rmse

    # Combine all compartments (average or weighted sum)
    total_metric = np.mean(list(metrics.values()))  # Equal weight to all compartments
    total_r2 = np.mean(list(r2_dict.values()))
    total_mae = np.mean(list(mae_dict.values()))
    total_rmse = np.mean(list(rmse_dict.values()))
    
    return total_metric, total_r2, total_mae, total_rmse



# Simulated Annealing for SEIR parameter tuning
def simulated_annealing_seir(observed_data, initial_conditions, time_points, temp, cooling_rate, max_iter, n):
    # Initial parameter guesses
    current_params = [0.3, 0.2, 0.1]  # Initial guesses for beta, sigma, gamma
    current_cost, current_r2, current_mae, current_rmse = objective_function(current_params, observed_data, initial_conditions, time_points, n)
    best_params = current_params
    best_r2, best_mae, best_rmse = current_r2, current_mae, current_rmse
    best_cost = current_cost

    cost_list = []
    
    min_temp = 1e-5 
    initial_temp = temp
    
    for i in range(max_iter):
        # Generate new candidate parameters
        new_params = [param + random.uniform(-0.005, 0.005) for param in current_params]
        new_params = np.clip(new_params, 0, 1)  # Keep parameters in a valid range (0 to 1)        
        
        # Evaluate the new candidate solution
        new_cost, new_r2, new_mae, new_rmse = objective_function(new_params, observed_data, initial_conditions, time_points, n)

        # ADJUST BASED ON METRIC
        # Decide whether to accept the new solution
        if new_cost < current_cost or random.random() < np.exp((current_cost - new_cost) / temp):
            current_params = new_params
            current_cost = new_cost
            
            # Update the best solution found so far
            if new_cost < best_cost:
                best_params = new_params
                best_cost = new_cost
                best_r2, best_mae, best_rmse = new_r2, new_mae, new_rmse

        # log for visualizing progress
        cost_list.append(best_cost) 

        # Cool down the temperature
        #temp = max(temp * cooling_rate, min_temp)  # Exponential decrease
        #temp = temp - (initial_temp / max_iter)  # Linear decrease
        temp = initial_temp / (1 + cooling_rate * i)  # Logarithmic decrease

    return best_params, best_cost, cost_list, temp, best_r2, best_mae, best_rmse

# Run Sim Anneal

In [None]:
'''
- need every barangay name and population
- iterate through every barangay in san pedro
- store best params in dictionary
- show graphs of using optimized params for brgys
- show split for train and test

brgy_dict = { brgy_name: population } # done

params_dict = {
  "brgy1" : {
    "beta" : 0.217,
    "beta" : 0.217,
    "gamma": 0.217
  },
  "brgy2" : {
    "beta" : 0.217,
    "beta" : 0.217,
    "gamma": 0.217
  },
  "brgy3" : {
    "beta" : 0.217,
    "beta" : 0.217,
    "gamma": 0.217
  }

TODO: fix loop to call sim anneal per barangay with their pop
'''

def run_anneal(brgy_name, brgy_pop):
    # load data
    main_df: pd.DataFrame = load_df("CITY_OF_SAN_PEDRO_processed.csv")
    main_observed: pd.DataFrame = load_observed(main_df, brgy_name) # Change to check for other Brgys.

    # Initial parameters
    N = 33235
    I0 = train_data['count_Infectious'].iloc[0] # Initial infected from data
    E0 = train_data['count_Exposed'].iloc[0] # Initial exposed cases
    R0 = train_data['count_Recovered'].iloc[0] # Initial recovered
    S0 = N - I0 - E0 - R0  # Susceptible

    # split. 7 months training
    train_data = main_observed[(main_observed['date'] >= '01/01/2021') & (main_observed['date'] <= '07/01/2021')]
    test_data = main_observed[(main_observed['date'] > '07/02/2021') & (main_observed['date'] <= '12/31/2022')]

    df_observed_data = main_observed[(main_observed['date'] >= '01/01/2021') & (main_observed['date'] <= '12/31/2022')]
    
    best_params, best_cost, cost_list, final_temp, best_r2, best_mae, best_rmse = simulated_annealing_seir(observed_data, initial_conditions, time_points, temp, cooling_rate, max_iter, N)
    
    return 0


for brgy_name, brgy_pop in barangay_population_dict.items():
  
  run_anneal(brgy_name, brgy_pop)

main_df: pd.DataFrame = load_df("CITY_OF_SAN_PEDRO_processed.csv")



initial_conditions: tuple = [S0, E0, I0, R0]

# Simulated Annealing parameters
temp = 1500  
cooling_rate = 0.95
max_iter = 1500  

observed_data = train_data[["count_Exposed", "count_Infectious", "count_Recovered"]].to_numpy()

time_points = np.arange(train_data.shape[0]) # odeint expects an array (vector) of integers.

# Run the Simulated Annealing algorithm
best_params, best_cost, cost_list, final_temp, best_r2, best_mae, best_rmse = simulated_annealing_seir(observed_data, initial_conditions, time_points, temp, cooling_rate, max_iter, N)
o_beta, o_sigma, o_gamma = best_params

print(f"Final Temp = {final_temp} \nBest Cost = {best_cost:.3f} \n\nPrameters: \nbeta: {o_beta:.3f} \nsigma: {o_sigma:.3f} \ngamma:{o_gamma:.3f}\n")
print(f"Metrics: \nbest r2: {best_r2:.3f} \nbest mae: {best_mae:.3f} \nbest rmse: {best_rmse:.3f}")

# Plot cost progression each compartment
plt.figure(figsize=(12, 6))
plt.plot(cost_list, 'r--', label='Cost')
plt.legend()
plt.xlabel("Iteration")
plt.ylabel("Best Cost")
plt.title(f"Simulated Annealing: Cost Progression")
plt.grid(True)
plt.show()
