In [1]:
from firefly_optimizer import *
from plot_results import *
from transition_probability_estimation import *
from Data_synthesize import *
from rw_data_processing import *
from scipy.optimize import leastsq
from scipy.optimize import curve_fit
import random
import seaborn as sns
import queue
import scipy.stats as stats
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import powerlaw
import time
from datetime import datetime
from collections import Counter
from tqdm import tqdm
%load_ext autoreload
%autoreload 
plt.style.use(r"./rw_visualization.mplstyle")


# Load data

In [None]:
# import warnings filter
from warnings import simplefilter
# ignore all future warnings
simplefilter(action='ignore', category=FutureWarning)

# Color
current_palette = seaborn.color_palette()
current_palette


In [None]:
data_path = Path('./synthetic_data_results_spread_Taiwan_weight')
# min_case_num = 7*52  # 25 weeks, assuming there sould be at least 1 case per day for an outbreak case.
min_case_num = 0 
demographic_data_list_all, social_data_list_all, course_of_disease_data_list_all, contact_data_list_all, case_edge_list_all = \
    load_synthetic_data(data_path, monte_carlo_number=100, return_len='all', memory_limit=1e9, min_case_num=min_case_num)

In [None]:
print('Number of simulations:', len(demographic_data_list_all))
print(f'Number of synthetic cases in each simulation: Simulation 1: {len(demographic_data_list_all[0])}, Simulation 2: {len(demographic_data_list_all[1])}, Simulation 3: {len(demographic_data_list_all[2])}, ...')

In [None]:
all_len_dist = {f'seed {i}': len(demographic_data_list_all[i]) for i in range(100)}
print(all_len_dist)
max_len = max(all_len_dist.values())
print(f'Maximum length in the distribution found in seed: {list(all_len_dist.keys())[list(all_len_dist.values()).index(max_len)]}, with length: {max_len}')


# Map the synthetic data to epidemic time series

- In the following code, we only use the first simulation as an example. User can try different simulation by setting other seed index in the `seed` variable.

In [25]:
seed = 81 # Pick the seed with the biggest number of synthetic cases
demographic_data_list = demographic_data_list_all[seed]
course_of_disease_data_list = course_of_disease_data_list_all[seed]
contact_data_list = contact_data_list_all[seed]
case_edge_list = case_edge_list_all[seed]

In [None]:
time_limit = 7*52
daily_susceptible_population, daily_infected_cases, daily_contagious_cases, daily_symptomatic_cases, \
            daily_confirmed_cases, daily_tested_cases, daily_suspected_cases, daily_isolation_cases, daily_critically_ill_cases, daily_recovered_cases, \
            daily_death_cases = transform_course_object_to_population_data(course_of_disease_data_list,
                                                                           contact_data_list,
                                                                           time_limit=time_limit,
                                                                           population_size=23008366)
daily_hospitalization = daily_isolation_cases
plt.figure()
plt.plot(range(time_limit+1), daily_infected_cases, label='Infected')
plt.xlabel('Day')
plt.ylabel('Daily number of cases')
plt.legend(loc='best')

plt.figure()
plt.plot(range(time_limit+1), np.cumsum(daily_infected_cases), label='Infected')
plt.plot(range(time_limit+1), np.cumsum(daily_contagious_cases), label='Contagious')
plt.plot(range(time_limit+1), np.cumsum(daily_symptomatic_cases), label='Symptomatic')
plt.plot(range(time_limit+1), np.cumsum(daily_confirmed_cases), label='Confirmed')
plt.plot(range(time_limit+1), np.cumsum(daily_isolation_cases), label='Isolated/hospitalized')
plt.plot(range(time_limit+1), np.cumsum(daily_critically_ill_cases), label='Critically ill')
plt.plot(range(time_limit+1), np.cumsum(daily_recovered_cases), label='Recovered')
plt.plot(range(time_limit+1), np.cumsum(daily_death_cases), label='Death')

plt.xlabel('Day')
plt.ylabel('Cumulative number of cases')
plt.legend(loc='best')

plt.figure()
plt.plot(range(time_limit+1), np.cumsum(daily_suspected_cases), label='Suspected')
plt.plot(range(time_limit+1), np.cumsum(daily_tested_cases), label='Tests')
plt.xlabel('Day')
plt.ylabel('Cumulative number of cases')
plt.legend(loc='best')

plt.figure()
plt.plot(range(time_limit+1), daily_susceptible_population)
plt.xlabel('Day')
plt.ylabel('Susceptible population')

## Doubling time

In [33]:
def exponential_growth(t, N0, growth_rate):
    """
    Exponential growth function
    N(t) = N0 * exp(growth_rate * t)
    
    Parameters:
    t : array-like
        Time points
    N0 : float
        Initial number of cases
    growth_rate : float
        Growth rate (per unit time)
    """
    return N0 * np.exp(growth_rate * t)

def calculate_doubling_time(daily_infected_cases):
    """
    Calculate doubling time using curve fitting to an exponential growth model
    
    Parameters:
    daily_infected_cases : array-like
        Daily number of infected cases
    
    Returns:
    doubling_time : float
        Estimated doubling time in days
    params : tuple
        Fitted parameters (N0, growth_rate)
    """
    # Create time array (assuming daily data)
    t = np.arange(len(daily_infected_cases))
    
    # Fit the exponential growth model
    try:
        params, covariance = curve_fit(
            exponential_growth, 
            t, 
            daily_infected_cases,
            p0=[daily_infected_cases[0], 0.1],  # Initial guess
            bounds=([0, 0], [np.inf, np.inf])
        )
        
        N0, growth_rate = params

        
        # Calculate doubling time
        # For exponential growth, doubling time = ln(2)/growth_rate
        doubling_time = np.log(2) / growth_rate
        
        return doubling_time, params
    
    except RuntimeError:
        print("Error: Could not fit the exponential growth model to the data")
        return None, None


In [None]:
doubling_time, params = calculate_doubling_time(daily_infected_cases)
print(f'Doubling time: {doubling_time} days')

t = np.arange(len(daily_infected_cases))
plt.plot(t, daily_infected_cases, label='Actual')
plt.plot(t, exponential_growth(t, *params), label='Fitted', linestyle='--')
plt.legend(loc='best')
# plt.ylim([0, max(daily_infected_cases)])
plt.show()

# Transmission rate

In [35]:
daily_number_of_susceptible, daily_number_of_infection, daily_number_of_recovery, daily_number_of_death = \
    transform_population_data_to_sird_number(daily_infected_cases, daily_recovered_cases, daily_death_cases, population_size=23215015)

In [36]:
beta = 23215015*(-np.diff(daily_number_of_susceptible))/(daily_number_of_susceptible[1::]*daily_number_of_infection[1::])

In [None]:
plt.plot(beta)
print('Transmission rate: ', np.nanmean(beta))
print('Transmission rate without considering zeros: ', np.nanmean(beta[beta!=0]))
print('Transmission rate with considering only the last days: ', np.nanmean(beta[80::]))

In [None]:
# Methods for estimating disease transmission rates: Evaluating the precision of Poisson regression and two novel methods
# Testing on day 30
# Poisson Method 1
index = 60
beta_poisson_1 = -np.log(1-daily_confirmed_cases[index+1]/daily_number_of_infection[index])/(daily_number_of_susceptible[index]/23215015)
print(beta_poisson_1)
# Poisson Method 2
beta_poisson_2 = -np.log(1-daily_confirmed_cases[index+1]*(1/daily_number_of_infection[index]+1/daily_number_of_susceptible[index]))
print(beta_poisson_2)

# Serial interval and generation time

In [None]:
generation_time_array = generate_generation_time(course_of_disease_data_list, case_edge_list)
_ = plt.hist(generation_time_array, bins=100)
print(f"Average generation time: {np.nanmean(generation_time_array)} days")

In [None]:
serial_interval_array = generate_serial_interval(course_of_disease_data_list, case_edge_list)
_ = plt.hist(serial_interval_array, bins=100)
print(f"Average serial interval: {np.nanmean(serial_interval_array)} days")

# State transition time

## Symptom to hospitalization

In [41]:
time_from_symptom_to_hospitalization = []
for i in range(len(course_of_disease_data_list)):
    symptom_t = course_of_disease_data_list[i]['infection_day'] + course_of_disease_data_list[i]['incubation_period']
    hospitalization_t = course_of_disease_data_list[i]['infection_day'] + course_of_disease_data_list[i]['monitor_isolation_period']
    time_from_symptom_to_hospitalization.append(hospitalization_t - symptom_t)

In [None]:
plt.hist(time_from_symptom_to_hospitalization, bins=100)
print(f"Average time from symptom to hospitalization: {np.nanmean(time_from_symptom_to_hospitalization):.1f} days")

## Hospital length of stay

In [43]:
time_hospital_stay = []
for i in range(len(course_of_disease_data_list)):
    hospitalization_t = course_of_disease_data_list[i]['infection_day'] + course_of_disease_data_list[i]['monitor_isolation_period']
    if not np.isnan(course_of_disease_data_list[i]['date_of_recovery']):
        time_hospital_stay.append(course_of_disease_data_list[i]['date_of_recovery'] - hospitalization_t)
    else:
        time_hospital_stay.append(course_of_disease_data_list[i]['date_of_death'] - hospitalization_t)

In [None]:
plt.hist(time_hospital_stay, bins=100)
plt.xlabel('Days of hospital stay')
plt.ylabel('Frequency')
print(f"Average hospital length of stay: {np.nanmean(time_hospital_stay):.1f} days")

## ICU length of stay

In [45]:
time_icu_stay = []
for i in range(len(course_of_disease_data_list)):
    icu_t = course_of_disease_data_list[i]['date_of_critically_ill']
    if not np.isnan(course_of_disease_data_list[i]['date_of_recovery']):
        time_icu_stay.append(course_of_disease_data_list[i]['date_of_recovery'] - icu_t)
    else:
        time_icu_stay.append(course_of_disease_data_list[i]['date_of_death'] - icu_t)

In [None]:
plt.hist(time_icu_stay, bins=100)
plt.xlabel('Days of icu stay')
plt.ylabel('Frequency')
print(f"Average icu length of stay: {np.nanmean(time_icu_stay):.1f} days")

# Rate

NOTE: For the current version of data synthesis alsorithm, the hospitalization is 1.

In [None]:
age_dependent_mortality_rate = np.zeros(100) # age 0 to 99
age_number_list = np.zeros(100)
for i in range(len(course_of_disease_data_list)):
    age = demographic_data_list[i]['age']
    age_number_list[age] += 1
    if not np.isnan(course_of_disease_data_list[i]['date_of_death']):
        age_dependent_mortality_rate[age] += 1


age_dependent_mortality_rate = age_dependent_mortality_rate/age_number_list

In [None]:
plt.bar(range(len(age_dependent_mortality_rate)), age_dependent_mortality_rate)
plt.xlabel('Age')
plt.ylabel('Mortality Rate')
plt.title('Age-Dependent Mortality Rate')
plt.xlim([-1, 100])
plt.show()

# Proportion

In [None]:
asymptomatic_counts = 0
mild_counts = 0
severe_counts = 0
for i in range(len(course_of_disease_data_list)):
    if np.isnan(course_of_disease_data_list[i]['incubation_period']):
        asymptomatic_counts += 1
    if np.isnan(course_of_disease_data_list[i]['date_of_critically_ill']):
        mild_counts += 1
    else:
        severe_counts += 1

proportion_of_asymptomatic_cases = asymptomatic_counts/len(course_of_disease_data_list)
print(f"Proportion of asymptomatic cases: {proportion_of_asymptomatic_cases:.2f}")

proportion_of_mild_cases = mild_counts/len(course_of_disease_data_list)
print(f"Proportion of mild cases: {proportion_of_mild_cases:.2f}")

proportion_of_severe_cases = severe_counts/len(course_of_disease_data_list)
print(f"Proportion of severe cases: {proportion_of_severe_cases:.2f}")

In [None]:
age_number_list = np.zeros(100)
age_asymptomatic_counts = np.zeros(100)
age_mild_counts = np.zeros(100)
age_severe_counts = np.zeros(100)
for i in range(len(course_of_disease_data_list)):
    age = demographic_data_list[i]['age']
    age_number_list[age] += 1
    if np.isnan(course_of_disease_data_list[i]['incubation_period']):
        age_asymptomatic_counts[age] += 1
    if np.isnan(course_of_disease_data_list[i]['date_of_critically_ill']):
        age_mild_counts[age] += 1
    else:
        age_severe_counts[age] += 1

age_asymptomatic_counts = age_asymptomatic_counts/age_number_list
age_mild_counts = age_mild_counts/age_number_list
age_severe_counts = age_severe_counts/age_number_list


plt.bar(range(len(age_asymptomatic_counts)), age_asymptomatic_counts)
plt.xlabel('Age')
plt.ylabel('Proportion of asymptomatic cases')
plt.title('Age-Dependent Proportion of Asymptomatic Cases')
plt.xlim([-1, 100])
plt.show()

plt.bar(range(len(age_mild_counts)), age_mild_counts)
plt.xlabel('Age')
plt.ylabel('Proportion of mild cases')
plt.title('Age-Dependent Proportion of Mild Cases')
plt.xlim([-1, 100])
plt.show()

plt.bar(range(len(age_severe_counts)), age_severe_counts)
plt.xlabel('Age')
plt.ylabel('Proportion of severe cases')
plt.title('Age-Dependent Proportion of Severe Cases')
plt.xlim([-1, 100])
plt.show()

# R0

In [51]:
code_path = Path('../').resolve()
sys.path.append(str(code_path))

In [53]:
from R0_network import R0_average_effective_contact

In [None]:
R0 = R0_average_effective_contact(contact_data_list)
print(f"R0: {R0}")

# Test

In [None]:
# Consider only the negative test
test_accuracy_list = []
for i in range(len(course_of_disease_data_list)):
    test_accuracy = np.sum(course_of_disease_data_list[i]['negative_test_status'])/len(course_of_disease_data_list[i]['negative_test_status'])
    test_accuracy_list.append(test_accuracy)
    
print(f"Average test accuracy: {np.mean(test_accuracy_list):.2f}")

In [None]:
# The positive test accuracy is 1
# Combine the positive test information into the calculation of the the test accuracy
test_accuracy_list = []
for i in range(len(course_of_disease_data_list)):
    test_accuracy = np.sum(course_of_disease_data_list[i]['negative_test_status'])/len(course_of_disease_data_list[i]['negative_test_status'])
    test_accuracy_list.append(test_accuracy)
for i in range(len(course_of_disease_data_list)):
    test_accuracy_list.append(1)
    
print(f"Average test accuracy: {np.mean(test_accuracy_list):.2f}")