# 1. Dataset

In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

def get_large_dataset():
    ''' Get COVID confirmed case for all countries

    '''
    # get large data frame
    df_full=pd.read_csv('../data/processed/COVID_final_set.csv',sep=';')  
    df_full.reset_index(drop=True)

    country_list = df_full.country.unique()
    
    # convert date to datetime format
    t_idx = [datetime.strptime(date,"%Y-%m-%d") for date in df_full.date] 
    t_str = [each.strftime('%Y-%m-%d') for each in t_idx] 
    df_full['date'] = t_idx
    
    # featch confirmed cases of all countries
    df = df_full.drop(['state'], axis=1).groupby(['country', 'date'])['confirmed'].sum()
    df_confirmed = pd.DataFrame()
    df_confirmed['date'] = df['Canada'].index
    for each in country_list:
        df_confirmed[each] = df[each].values
    
    return df_confirmed

import requests
from bs4 import BeautifulSoup

def world_population():
    
    # get large data frame
    df_full=pd.read_csv('../data/processed/COVID_final_set.csv',sep=';')  
    df_full.reset_index(drop=True)

    country_list = df_full.country.unique()
    
    page = requests.get("https://www.worldometers.info/coronavirus/")    # get webpage
    soup = BeautifulSoup(page.content, 'html.parser')                    # get page content 
    
    # scrap table data from page content into a list 
    html_table= soup.find('table')                 # find the table in the page content
    all_rows= html_table.find_all('tr')            # filn rows in table data

    final_data_list= []
    for pos,rows in enumerate(all_rows):
        col_list= [each_col.get_text(strip= True) for each_col in rows.find_all('td')]     # td for row element
        final_data_list.append(col_list)

    # convert list into DataFrame with proper labling
    pd_daily=pd.DataFrame(final_data_list)
    
    df_population = pd.DataFrame()
    df_population['population'] = pd_daily[14][9:223]    # get only population column 
    df_population['country'] = pd_daily[1][9:223]        # respective country names
    
    # convert number seperator
    df_population['population'] = df_population.apply(lambda x: x.str.replace(',',''))
    df_population = df_population.reset_index(drop=True)
    # convert string to number
    df_population['population']  = pd.to_numeric(df_population['population'], errors='coerce')
    
    # some country names are different in Jhon Hopkins dataset and Worldometer data, therefore we have to plausiblise it
    df_population['country'] = df_population['country'].replace('S. Korea', 'Korea, South')
    df_population['country'] = df_population['country'].replace('USA', 'US')
    df_population['country'] = df_population['country'].replace('Taiwan', 'Taiwan*')
    df_population['country'] = df_population['country'].replace('UAE', 'United Arab Emirates')
    df_population['country'] = df_population['country'].replace('UK', 'United Kingdom')
    
    # plausiblize data of unknown country
    pop = {}
    for each in country_list:
        try:
            pop[each] = np.floor(df_population['population'][np.where(df_population['country']==each)[0][0]])
        except:
            if each=='China':
                pop[each] = 14e7
            else:
                pop[each] = 5000000 # randowm number for the unkonwn country
                
    df_population = pd.DataFrame([pop]).T.rename(columns={0:'population'})
    
    return df_population, country_list


if __name__ == '__main__':
    df_confirmed = get_large_dataset()
    df_population, country_list = world_population()

# 2. SIR model

In [2]:
import pandas as pd
import numpy as np

from scipy import optimize
from scipy.integrate import odeint

class SIR_Model():
    '''This class is programmed for SIR model of epidemiology
       Args:
       -------
       df: pd.DataFrame of large dataset
       country: select country
       population: total population of selected country
       percentage: percentage of total population which is susceptable
    '''
    
    def __init__(self, df, country, population, percentage=5):
        
        self.df = df
        self.country = country
        self.population = population
        self.percentage = percentage
        
        self._get_SIR_initials()
        
        
    def _calculate_susceptible(self):
        '''Calculation of total susceptable based on selected percentage'''
        self.N0 = (self.percentage/100)*self.population # max susceptible population, 10% of pupulation as default
    
    
    def _get_index(self, percentage):
        '''Day of initially infected population
        '''
        self._calculate_susceptible()
        self.idx_I0 = np.where(self.df[self.country] > self.N0*(percentage/100))[0][0]
        
        
    def _initial_infected(self, percentage=0.05):
        '''Initially infected population based on percentage.
           Args:
           ----
           percentage: user specified percentage
           Initially infected = Susceptable population * percentage(user-specified)
        '''
        self._get_index(percentage)
        self.ydata = np.array(self.df[self.country][self.idx_I0:])
        
        
    def _set_time(self):
        '''Set time period based on initially infected index
        '''
        self._initial_infected()
        self.t = np.arange(len(self.ydata))
        
    
    def _get_SIR_initials(self, R0=0):
        '''Set up initial values for SIR model.
           Recovery index is intially set to zero.
        '''
        self._set_time()
        self.I0 = self.ydata[0]
        self.S0 = self.N0-self.I0
        self.R0 = R0
        
        self.SIR = np.array([self.S0, self.I0, self.R0])
        
        
    def calculate_SIR(self, SIR, t, beta, gamma):
        ''' Simple SIR model
            S: susceptible population
            I: infected people
            R: recovered people
            beta: infection rate
            gamma: recovery rate
            t: time-step --> required for solving differential equation

            Overall condition is that the sum of changes (differnces) sum up to 0
            dS+dI+dR=0
            S+I+R= N (constant size of population)
        '''

        S,I,R = SIR
        dS_dt = -beta*S*I/self.N0          
        dI_dt = beta*S*I/self.N0 - gamma*I
        dR_dt = gamma*I

        return dS_dt, dI_dt, dR_dt
    
    def fit_odeint(self, x, beta, gamma):
        ''' Helper function for the integration
        '''
        self._get_SIR_initials()
        return odeint(self.calculate_SIR, (self.S0, self.I0, self.R0), self.t, args=(beta, gamma))[:,1]
    
    def fitted_curve(self, printout=True):
        '''Fitting of curve by using optimize.curve_fit form scipy libaray.
        '''
        self.popt, self.pcov = optimize.curve_fit(self.fit_odeint, self.t, self.ydata)
        self.perr = np.sqrt(np.diag(self.pcov))
        if printout:
            print('standard deviation errors : ',str(self.perr), ' start infect:',self.ydata[0])
            print("Optimal parameters: beta =", self.popt[0], " and gamma = ", self.popt[1])
        
        self.fitted = self.fit_odeint(self.t, *self.popt)
        # get the final fitted curve
        return self.fitted
    
def get_optimum_beta_gamma(df, country, susceptable_perc=5, period='default'):
    
    # get world population
    # plausiblization for dashboard
    try:
        df_population = pd.read_csv('../data/processed/world_population.csv',sep=';', index_col=0)
        population = df_population.T[country].values[0]
    except:
        df_population = pd.read_csv('data/processed/world_population.csv',sep=';', index_col=0)
        population = df_population.T[country].values[0]
    
    if period != 'default':
        # set periods
        periods = []
        periods.append([39,70])

        for i in np.arange(70,len(df)-1,period)[:-1]:
            periods.append([i, i+period])

        periods.append([np.arange(70,len(df)-1,period)[-1],len(df)-1])
        
        names = ['Period '+ str(n) for n in range(len(periods))]
        time_period = [str(df_confirmed.date[p[0]])[:10]+' to '+str(df_confirmed.date[p[1]])[:10] for p in periods]
        
    else:
        # rather than using fixed periods, we will use following periods for better approximation
        periods = [[39,70], [70,80], [80,100], [100,130], [130,180], [180,len(df)-1]]
        time_period = ['March 2020         ', 
                       '1-10th April 2020  ', 
                       '10-30th April 2020 ', 
                       'May 2020           ', 
                       'June-July 2020     ', 
                       'From August 2020   ']
        names = ['Virus spreaded          ', 
                 'People awared           ', 
                 'People take precautions ',
                 'Start recovering        ', 
                 'Constant spread         ', 
                 'Second wave             ']
    
    # fit curve
    fit_line = np.array([])
    dyn_beta = []
    dyn_gamma = []
    dyn_R0 = []
    summary = []
    for n, element in enumerate(periods):
        try:
            OBJ_SIR = SIR_Model(df[element[0]:element[1]], country= country, population = population, percentage=susceptable_perc)
            fit_line = np.concatenate([fit_line, OBJ_SIR.fitted_curve(printout=False)])
            dyn_beta.append(OBJ_SIR.popt[0])
            dyn_gamma.append(OBJ_SIR.popt[1])
            dyn_R0.append(OBJ_SIR.popt[0]/OBJ_SIR.popt[1])
        except:
            periods = periods[n+1:]
            dyn_beta.append(np.nan)
            dyn_gamma.append(np.nan)
            dyn_R0.append(np.nan)
            
        summary.append({'Time period':time_period[n], 
                        'Actions': names[n], 
                        'Beta': abs(round(dyn_beta[n],3)), 
                        'Gamma': abs(np.round(dyn_gamma[n],3)),
                        'R0': abs(np.round(dyn_R0[n],3))})
    
    # get strating point
    idx = SIR_Model(df, country= country, population = population).idx_I0
    
    return fit_line, idx, pd.DataFrame(summary)

if __name__ == '__main__':
    fit_line, idx, summary  = get_optimum_beta_gamma(df_confirmed, country='Germany', susceptable_perc=5)
    print(summary)

           Time period                   Actions   Beta  Gamma     R0
0  March 2020           Virus spreaded            1.250  1.029  1.215
1  1-10th April 2020    People awared             0.471  0.392  1.202
2  10-30th April 2020   People take precautions   0.154  0.171  0.902
3  May 2020             Start recovering          0.057  0.049  1.159
4  June-July 2020       Constant spread           0.001  0.003  0.328
5  From August 2020     Second wave               0.000  0.004  0.004


# 3. SIR Dashboard

In [3]:
import dash
dash.__version__
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output,State
import random

import plotly.graph_objects as go
import plotly


def generate_table(dataframe, max_rows=10):
    '''Given dataframe, return template generated using Dash components
    '''
    return html.Table(
        # Header
        [html.Tr([html.Th(col) for col in dataframe.columns])] +

        # Body
        [html.Tr([
            html.Td(dataframe.iloc[i][col]) for col in dataframe.columns
        ]) for i in range(min(len(dataframe), max_rows))]
    )

## list of hex color codes 
color_list = []
for i in range(int((df_confirmed.shape[1]-1)/2)):
    random_color = '#%02x%02x%02x' % (random.randint(0, 255),random.randint(0, 255), random.randint(0, 255))
    color_list.append(random_color)

fig = go.Figure()

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div([
    
    html.H1('SIR Model (Susceptible, Infectious, or Recovered)', style={'text-align': 'center',
                                                                        'color': 'white',
                                                                        'padding': 10,
                                                                        'background-color': '#25383C',}),
    
    dcc.Markdown(''' Simulation can be carried out at different time periods e.g, 7 days, 15 days. 
    Based on that, curve fitting algorithm from scipy calculates beta and gamma values.

    Note: Initially, it is assumed that 5% of total population of selected country is under threat of virus and simulation 
    begins once 0.005% (applicable for all countries) of susceptible population is infected.  ''',
                 style={'text-align': 'center',
                        'padding': 1,
                        'background-color': '#FAFFFF',}),
    
    dcc.Markdown(''' ''',
                 style={'text-align': 'center',
                        'padding': 10,}),
    
    html.Div([
        
        html.Div([  
            
            html.Div([
               dcc.Markdown('''Country:''')
            ],
                style={'width': '45%', 'display': 'inline-block', 'padding-left': 10,}),

            html.Div([
                dcc.Markdown('''Susceptible population out of total population:''')
            ],
                style={'width': '40%', 'float': 'right', 'display': 'inline-block',})
        ],
        style={'width': '60%', 'display': 'inline-block'}
        ),
        
        html.Div([dcc.Markdown('''Period :''', style={'width': '15%', 'float': 'right'})],
                 style={'width': '40%','float': 'right', 'display': 'inline-block',}
        ),
    ]),
    
    html.Div([
        
        html.Div([

            html.Div([
                dcc.Dropdown(
                    id='country_drop_down',
                    options=[{'label': each,'value':each} for each in country_list],
                    value=['Germany'], # Which is pre-selected
                    multi=True,
                    style={'padding-left': 10}
                ),
                dcc.RadioItems(
                    id='yaxis-type',
                    options=[{'label': i, 'value': i} for i in ['Log', 'Linear']],
                    value='Log',
                    labelStyle={'display': 'inline-block'},
                    style={'padding': 10}
                )
            ],
                style={'width': '45%', 'display': 'inline-block'}),

            html.Div([
                dcc.Dropdown(
                    id='susceptible_population_percentage',
                    options=[{'label': str(number)+'%', 'value': number} for number in range(1,6)],
                    value=5, # Which is pre-selected
                    multi=False,
                    
                ),
            ],
                style={'width': '40%', 'float': 'right', 'display': 'inline-block'})
        ],
        style={'width': '60%', 'display': 'inline-block'}

        ),
        
        html.Div([
            dcc.Dropdown(
                id='period-type',
                options=[{'label': str(each)+' days', 'value': each} for each in ['default', 10, 15, 20, 25, 30]],
                value='default', # Which is pre-selected
                multi=False,
                style={'width': '90%', 'float': 'left'}
            ),
        ],
            style={'width': '15%','float': 'right', 'display': 'inline-block'}
        ),
    ]),
    
    html.Div([  
        
        html.Div([
            dcc.Graph(figure=fig, id='SIR',)],
                style={'width': '60%', 'display': 'inline-block', 'padding-left': 10}),

        html.Div([
            dcc.Markdown('''Beta: infection rate | Gamma: recovery rate | R0: beta/gamma 
                      
                    ''', style={'padding-top': 10}),
            
            html.Table(id='result-summary'),
            dcc.Markdown('''Note: Table will show data of lastly selected country only.''', style={'padding-top': 15}),

            ],
                
                style={'width': '37%', 'float': 'right', 'display': 'inline-block'})
    ]),

])


@app.callback(
    [
        Output(component_id='SIR', component_property='figure'),
        Output(component_id='result-summary', component_property='children'),
    ],
    [
        Input(component_id='country_drop_down', component_property='value'),
        Input(component_id='period-type', component_property='value'),
        Input(component_id='susceptible_population_percentage', component_property='value'),
        Input(component_id='yaxis-type', component_property='value')
    ]
)
def update_figure(country_list, period, susceptable_perc, yaxis):
    
    traces = []
    for pos, each in enumerate(country_list):

        traces.append(dict(x=df_confirmed.date,
                                y=df_confirmed[each],
                                mode='lines',
                                opacity=0.9,
                                name=each,
                                line = dict(color = color_list[pos])
                        )
                )
        fit_line, idx, summary = get_optimum_beta_gamma(df_confirmed, each, susceptable_perc=susceptable_perc,
                                                        period=period)
        traces.append(dict(x=df_confirmed.date[idx:],
                                    y=fit_line,
                                    mode='markers+lines',
                                    opacity=0.9,
                                    name=each+'_simulated',
                                    line = dict(color = color_list[pos])
                            )
                    )
        
    return {
            'data': traces,
            'layout': dict (
                autosize=True,
                #width=900,
                #height=700,
                xaxis={'title':'Timeline',
                        'tickangle':-25,
                        'nticks':20,
                        'tickfont': 18,
                        'titlefont': 20 
                      },

                yaxis={'type': 'linear' if yaxis == 'Linear' else 'log', 
                       'title':'Number of infected people',
                       'tickfont': 18, 
                       'titlefont': 22 
                      },
                legend=dict(orientation="h",
                            yanchor="bottom",
                            y=1.02,
                            xanchor="right",
                            x=1)

            )
            
}, generate_table(summary)

if __name__ == '__main__':
    app.run_server(debug=True, 
                   use_reloader=False,
                   host='127.0.0.2',
                   port=8050)

Running on http://127.0.0.2:8050/
Debugger PIN: 634-203-831
 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: on



Covariance of the parameters could not be estimated


Excess work done on this call (perhaps wrong Dfun type). Run with full_output = 1 to get quantitative information.

