In [1]:
import pandas as pd
import numpy as np
import scipy.signal
import matplotlib.pyplot as plt
import seaborn as sns
import datetime
import os

%matplotlib inline
plt.rcParams['font.sans-serif']=['SimHei'] 
plt.rcParams['axes.unicode_minus']=False 


def readDeathData(country):
    """
    Liest Daten ueber die Anzahl der Tote aus
    :param country: Land
    :return: ausgelesene Daten
    """
    death_data = pd.read_csv('csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv')
    data = death_data[(death_data['Country/Region'] == country)].iloc[:,4:]
    return data.sum()

def readConfirmedData(country):
    """
    liest die Daten ueber die Anzahl der Infizierten ein
    :param country: Land
    :return: ausgelesene Daten
    """
    recover_data = pd.read_csv('csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv')
    data = recover_data[(recover_data['Country/Region'] == country)].iloc[:,4:]
    return data.sum()

def readRecoveredData(country):
    """
    liest die Daten ueber die Anzahl der Genesenen ein
    :param country: Land
    :return: ausgelesene Daten
    """
    recover_data = pd.read_csv('csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_recovered_global.csv')
    data = recover_data[(recover_data['Country/Region'] == country)].iloc[:,4:]
    return data.sum()

In [2]:
#Populationsstand verschiedener Laender
popu_dict = {'Germany':82927922, 'Italy':60482200}

def load_IR(name, N, t,T=None):
    """
    Laedt die Daten der Faelle, Genesenen und Toten
    :param name: Das zu betrachtende Land
    :param N: Gesamtpopulation
    :param t: Startzeitpunkt
    :param T: Endzeitpunkt (optional)
    :return: Aktive Infizierte, Genesen, Tote, gesamte Faelle
    """
    #T=163

    R = readRecoveredData(name)
    D = readDeathData(name)
    C = readConfirmedData(name)
    R = R.tolist()[t:T]
    D = D.tolist()[t:T]
    C = C.tolist()[t:T]
    R = np.array(R)
    D = np.array(D)
    C = np.array(C)
    I = C - R - D
    return I,R,D,C

def SSE(I_t, I):
    """
    Summe des quadratischen Fehlers
    :param I_t: modellierte Infizierte
    :param I: Infizierte aus Daten
    :return: Summe des quadratischen Fehlers
    """
    sse = (I_t[:len(I)] - I)**2
    return sse.sum()

In [3]:
def RungeKutta(beta, gamma, N, S_0, I_0, R_0, t):
    """
    Runge-Kutta-Verfahren zum loesen des SIR-Modells
    :param beta: Kontaktrate
    :param gamma: Genseungsrate
    :param N: Gesamtpopulation
    :param S_0: Anfangswert des Anfaelligen
    :param I_0: Anfangswert des Infizierten
    :param R_0: Anfangswert der Toten/Genesenen
    :param t: Startzeitpunkt
    :return: geloestes SIR-Modell ab t mit S_t, I_t, R_t
    """
    days = 200
    X = np.arange(t, days)
    I_t = np.zeros(days)
    S_t = np.zeros(days)
    R_t = np.zeros(days)
    I_t[0] = I_0
    S_t[0] = S_0
    R_t[0] = R_0
    h = 1  
    for i in range(1, days): 
        k11 = -beta * S_t[i-1] * I_t[i-1] / N
        k21 = beta * S_t[i-1] * I_t[i-1] / N - gamma * I_t[i-1]
        k31 = gamma * I_t[i-1]

        k12 = -beta * (S_t[i-1] + h / 2 * k11) * (I_t[i-1] + h / 2 * k21) / N
        k22 = beta * (S_t[i-1] + h / 2 * k11) * (I_t[i-1] + h / 2 * k21) / N - gamma * (I_t[i-1] + h / 2 * k21)
        k32 = gamma * (I_t[i-1] + h / 2 * k21)

        k13 = -beta * (S_t[i-1] + h / 2 * k12) * (I_t[i-1] + h / 2 * k22) / N
        k23 = beta * (S_t[i-1] + h / 2 * k12) * (I_t[i-1] + h / 2 * k22) / N - gamma * (I_t[i-1] + h / 2 * k22)
        k33 = gamma * (I_t[i-1] + h / 2 * k22)

        k14 = -beta * (S_t[i-1] + h * k13) * (I_t[i-1] + h * k23) / N
        k24 = beta * (S_t[i-1] + h * k13) * (I_t[i-1] + h * k23) / N - gamma * (I_t[i-1] + h * k23)
        k34 = gamma * (I_t[i-1] + h * k23)

        S_t[i] = S_t[i-1] + h / 6 * (k11 + 2 * k12 + 2 * k13 + k14)
        I_t[i] = I_t[i-1] + h / 6 * (k21 + 2 * k22 + 2 * k23 + k24)
        R_t[i] = R_t[i-1] + h / 6 * (k31 + 2 * k32 + 2 * k33 + k34)
    return S_t[t:],I_t[t:],R_t[t:]

def findOptimalParameterWithRatio(I, R, N, beta_ini,gamma_ini):
    """
    Suche nach der optimalen Kontakt- und Genesungsrate
    :param I: akute Infizierte
    :param R: Tote/Genesene
    :param N: Gesamtpopulation
    :param beta_ini: Anfangswert fuer die Kontakrate
    :param gamma_ini: Anfangswert fuer die Genesesungsrate
    :return: optimales beta und gamma, kleinste quadratische Fehler, alles Ergebnisse
    """
    S_0 = N - I[0] - R[0]
    I_0 = I[0]
    R_0 = R[0]
    t = 0
    step = 0.01 #Schrittgroesse
    result = []
    beta_0,gamma_0 = beta_ini, gamma_ini
    minSSe = float("inf")
    beta = beta_ini - 1 
    gamma = gamma_ini - 1
    if beta == 0: beta = 0.01
    if gamma == 0: gamma = 0.01
    result_list = []
    I_opt = I
    while beta < beta_0:
        gamma = gamma_0 - 1
        if gamma == 0: gamma = 0.01
        while gamma < beta:
            if beta == 0 or gamma == 0 or beta == gamma: continue
            S_t,I_t,R_t = RungeKutta(beta, gamma, N, S_0, I_0, R_0, t)
            sse = SSE(I_t,I)
            result_list.append((beta,gamma,sse))
            if sse < minSSe:
                minSSe = sse
                result = (beta, gamma)
                I_opt = I_t[:len(I)]
            gamma = gamma + step
        beta = beta + step
    return result[0], result[1], minSSe, result_list

def findOptimalParameterWithRatio2(I, R, N, beta_ini,gamma_ini):
    """
    Suche nach der optimalen Kontakt- und Genesungsrate
    :param I: akute Infizierte
    :param R: Tote/Genesene
    :param N: Gesamtpopulation
    :param beta_ini: Anfangswert fuer die Kontakrate
    :param gamma_ini: Anfangswert fuer die Genesesungsrate
    :return: optimales beta und gamma, kleinste quadratische Fehler, alles Ergebnisse
    """
    S_0 = N - I[0] - R[0]
    I_0 = I[0]
    R_0 = R[0]
    t = 0
    step = 0.01 #Schrittgroesse
    result = []
    beta_0,gamma_0 = beta_ini, gamma_ini
    minSSe = float("inf")
    beta = beta_ini - 1 
    gamma = gamma_ini - 1
    if beta == 0: beta = 0.01
    if gamma == 0: gamma = 0.01
    result_list = []
    I_opt = I
    while gamma < gamma_0:
        beta = beta_0 - 1
        if beta == 0: beta = 0.01
        while beta < gamma:
            if beta == 0 or gamma == 0 or beta == gamma: continue
            S_t,I_t,R_t = RungeKutta(beta, gamma, N, S_0, I_0, R_0, t)
            sse = SSE(I_t,I)
            result_list.append((beta,gamma,sse))
            if sse < minSSe:
                minSSe = sse
                result = (beta, gamma)
                I_opt = I_t[:len(I)]
            beta = beta + step
        gamma = gamma + step
    return result[0], result[1], minSSe, result_list

def search(I,R,N,range1,range2):
    """
    Auswertung der Parameter und Suche nach den optimalen
    :param I: akute Infizierte (in einem bestimmten Zeitfenster)
    :param R: Genesene/Tote (in einem bestimmten Zeitfenster)
    :param N: Gesamtpopulation
    :param range1: Intervallgrenze zur Parametersuche
    :param range2: Intervallgrenze zur Parametersuche
    :return: alle Ergebnisse, optimale Parameter, modelliertes I
    """
    minSSE = float('inf')
    result = []
    result_all = []
    result_opt = []
    for i in range(range1,range2):
        beta,gamma,sse,result_list = findOptimalParameterWithRatio(I,R,N,i,i)

        beta2,gamma2,sse2,result_list2 = findOptimalParameterWithRatio2(I,R,N,i,i)
        if sse2 < sse:
            beta = beta2
            gamma = gamma2
            sse = sse2
            result_list = result_list2
   
        result_all = result_all + result_list
        result_opt.append((beta,gamma,sse))
        if sse < minSSE:
            result = (beta,gamma)
            minSSE = sse
    print("best param: ", result, "minSSE: ",minSSE)
    S_t,I_t,R_t = RungeKutta(result[0], result[1], N, N - I[0] - R[0], I[0], R[0], 0)
    return result_all,result_opt,I_t
    
        
def create_assist_date(datestart = None,dateend = None):
    """
    Erstellt eine Liste von Daten
    :param datestart: Startdatum
    :param dateend: Enddatum
    :return: Liste von Daten zwischen datestart und dateend
    """
    if datestart is None:
        datestart = '2016-01-01'
    if dateend is None:
        dateend = datetime.datetime.now().strftime('%Y-%m-%d')

    datestart=datetime.datetime.strptime(datestart,'%Y-%m-%d')
    dateend=datetime.datetime.strptime(dateend,'%Y-%m-%d')
    date_list = [datestart.strftime('%Y-%m-%d')]
    while datestart<dateend:
        datestart+=datetime.timedelta(days=+1)
        date_list.append(datestart.strftime('%Y-%m-%d'))
    return date_list

def predictPlot(I, I_t, t=None):
    """
    Plot von I und I_t
    :param I: gemessenen Infizierten
    :param I_t: modellierte oder vorhergesagt Infizierte
    :param t: Start ab wann vorhergesagt wird
    """
    x_test  =pd.date_range(start='1/22/2020', end='10/1/2023', freq='D')
    X = np.arange(0, len(I_t))
    ax  = plt.figure(figsize=(14, 8))
    plt.grid(b=True, which='major', color='#666666', linestyle='-')
    plt.minorticks_on()
    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
    sns.lineplot(x_test[:len(I)], I, label = 'echte Daten')
    sns.set_style('whitegrid')
    if t is not None: #Wenn Vorhersage getroffen, Vorhersage gruen plotten
        sns.lineplot(x_test[0:t],I_t[0:t],label="Modelliert")
        sns.lineplot(x_test[t:len(I_t)], I_t[t:], label='Vorhersage')
    else:
        sns.lineplot(x_test[:len(I_t)],I_t,label="Modelliert")
    plt.xlabel('Zeit')
    plt.ylabel('Anzahl aktiver Infizierten')
    plt.title('SIR Modell')

def save_result(i,j, result_all, result_opt, I_t, country_name,size):
    """
    Speichert die optimalen Parameter und modellierten Infizierten
    """
    dirs = '\\data\\'
    dir_path = os.path.dirname(os.path.abspath('__file__')) + dirs
    dir_path = dir_path+country_name+'\\'+str(size)
    if not os.path.exists(dir_path):
        os.makedirs(dir_path)
    df_all = pd.DataFrame(result_all,columns=['beta', 'gamma', 'sse'])
    df_all.to_csv(dir_path + '/'+str(i) + '_'+str(j)+'_all.csv')
    df_opt = pd.DataFrame(result_opt,columns=['beta', 'gamma', 'sse'])
    df_opt.to_csv(dir_path + '/'+str(i) + '_'+str(j)+'_opt.csv')
    df_I = pd.DataFrame(I_t,columns=['I_t'])
    df_I.to_csv(dir_path+ '/'+str(i) + '_'+str(j)+'_It.csv')

def oneDayError(I,I_p):
    """
    Vorhersage Fehler
    :param I: gemessene Daten
    :param I_p: modellierte oder vorhergesagte Daten
    :return: Liste der Fehler
    """
    error = ((I_p[:len(I)] - I)/I)
    X = np.arange(0, len(error))
    x_test  =pd.date_range(start='1/22/2020', end='10/1/2023', freq='D')
    ax  = plt.figure(figsize=(14, 8))
    sns.set_style('whitegrid')
    sns.lineplot(x_test[:len(error)],error, label="$Error_{w}$")
    plt.grid(b=True, which='major', color='#666666', linestyle='-')
    plt.minorticks_on()
    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
    plt.xlabel('Zeit')
    plt.ylabel('Groesse des Fehlers')
    plt.title('Vorhersagefehler')
    plt.savefig('error.png')
    return error
    
def read_I_t(I,country, size):
    """
    Liest die bereits modellierten Werte von I_t ein
    :param I: gemessene I
    :param country: Land
    :param size: Groesse des Zeitfenster
    :return: Liste der gespeicherten, modellierten I
    """
    I_t_list = []
    I_t_all = []
    I_t_p = list(I[0:size])

    for i in range(0, len(I)- size +1 ):
        if i == len(I) - size:
            j = len(I) - 1
        else:
            j = i + size - 1
        file_name = str(i) + '_'+str(j)+'_It.csv'
        a = pd.read_csv('data/'+country+'/'+str(size)+'/'+file_name,engine='python')
        I_t = list(a['I_t'])
        I_t_list.append(I_t)

        if i == len(I) - size:
            I_t_all = I_t_all + I_t
            I_t_p = I_t_p + I_t[size:]
        else:
            I_t_all = I_t_all + I_t[:1]
            I_t_p.append(I_t[size])
    return I_t_p,I_t_all

        
def read_opt(I,country, size):
    """
    liest die optimalen Parameter
    :param I: gemessene I
    :param country: Land
    :param size: Groesse des Zeitfenster
    :return: Liste von optimalen beta und gamma
    """
    beta_list = []
    gamma_list = []
    for i in range(0, len(I)- size+1):
        if i == len(I) - size:
            j = len(I) - 1
        else:
            j = i + size-1
        file_name = str(i) + '_'+str(j)+'_opt.csv'
        a = pd.read_csv('data/'+country+'/'+str(size)+'/'+file_name,engine='python')
        #opt = a.iloc[a['sse'].idxmin(),][1:3].tolist()
        opt = a.iloc[0,1:3].tolist()
        beta_list.append(opt[0])
        gamma_list.append(opt[1])
    beta_list = np.array(beta_list,dtype="float64")
    gamma_list = np.array(gamma_list,dtype="float64")
    return beta_list,gamma_list

def plot(data,label=None):
    X = np.arange(0, len(data))
    ax  = plt.figure(figsize=(13, 8))
    sns.lineplot(X[:len(data)],data,label=label)

def parameterPlot(beta, gamma):
    """
    Berechnet aus beta und gamme die exp. Wachstumsrate und die Basisreproduktionszahl
    :param beta: Kontakrate
    :param gamma: Genesungsrate
    :return: exp. Wachstumsrate und Basisreproduktionszahl
    """
    beta = beta.tolist()
    gamma = gamma.tolist()
    r =[]
    ex = []
    for i in range(0, len(beta)-1):
        r.append(beta[i]/gamma[i])
        ex.append(beta[i]-gamma[i])

    return r, ex

In [4]:
#Parameter evaluation
def predictI_bySize(country, t, size):
    """
    Modelliert anhand echter Daten die Infiziertenzahl
    :param country: Land
    :param t: Startzeitpunkt
    :param size: Zeitfenster
    """
    I,R,D,C = load_IR(country, popu_dict[country], t)
    I_t_p = list(I[0:size])
    i = 0 
    j = size - 1
    for i in range(0, len(I)- size + 1):
        if i == len(I) - size:
            j = len(I) - 1
        else:
            j = i + size - 1 
        result_all,result_opt,I_t = search(I[i:j],R[i:j],popu_dict[country],1,2)
        save_result(i,j, result_all, result_opt, I_t, country,size)
        if i == len(I) - size:
            I_t_p = I_t_p + list(I_t[size:])
        else:
            I_t_p.append(I_t[size])
    predictPlot(I,I_t_p)

In [5]:
def modelGermany(country, size, T, start):
    """
    Plottet einen modellierten Zeitraum im Vergleich zu den echten Daten und printed den Vorhersagefehler
    :param country: Land
    :param size: Zeitfenster
    :param T: Endzeitpunkt
    :param start: Startzeitpunkt
    """
    I,R,D,C = load_IR(country,popu_dict[country],0, T)
    I_t_p,I_t_all = read_I_t(I,country,size)
    predictPlot(I[start:T],I_t_p[start:T])
    dirs = '\\data\\'
    dir_path = os.path.dirname(os.path.abspath('__file__')) + dirs
    dir_path = dir_path+country+'\\'+'Picture'+'\\'
    plt.savefig(dir_path + 'modelGermany.jpg')
    ax  = plt.figure(figsize=(14, 8))

    dirs = '\\data\\'
    dir_path = os.path.dirname(os.path.abspath('__file__')) + dirs
    dir_path = dir_path+country+'\\'
    date_X = create_assist_date("2020-1-22", "2021-10-01")
    data = []
    for i in range(start,T):
        if i < len(I):
            data.append((date_X[i], I[i], I_t_p[i]))
        else:
            data.append((date_X[i], I_t_p[i]))
    df = pd.DataFrame(data)
    df.columns=['date','I', 'I_t']
    df.to_csv(dir_path +'evaluated.csv')
    er2 = oneDayError(I[start:T], I_t_p[start:T])
    print(len(er2))
    error = 0
    for i in range(0, len(er2)):
        if not np.isnan(er2[i]):
            error = error + abs(er2[i])
    print((1/(len(er2)))*error)


In [6]:
def plotParam(country, size, t):
    """
    Plotte Parameter gamma und beta, Basisreproduktionszahl und exp. Wachstumsrate, geglaettet
    :param country: Land
    :param size: Zeitfenster
    :param t: Endzeitpunkt
    """
    I,R,D,C = load_IR(country,popu_dict[country],0)
    beta_list,gamma_list = read_opt(I,country, size)
    x  =pd.date_range(start='1/22/2020', end='10/1/2023', freq='D')
    r0,dif = parameterPlot(beta_list,gamma_list)

    #Plotte Kontakt- und Genesungsrate in abh. zur Zeit
    ax = plt.figure(figsize=(14,8))
    b = plt.scatter(range(0, t), beta_list[:t], marker= 'x', color = 'c')
    g = plt.scatter(range(0,t), gamma_list[:t],marker='x', color = 'm')
    plt.grid(b=True, which='major', color='#666666', linestyle='-')
    plt.minorticks_on()
    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
    plt.xlabel('Zeit')
    plt.ylabel('Kontakt- und Genesungsrate')
    plt.title('Kontakt- und Genesungsrate in abh. zu der Zeit')
    plt.legend((b, g), ('Beta', 'Gamma'))
    #Bild abspeichern
    dirs = '\\data\\'
    dir_path = os.path.dirname(os.path.abspath('__file__')) + dirs
    dir_path = dir_path+country+'\\'+'Picture'+'\\'
    plt.savefig(dir_path + 'beta_gamma.png')

    #Plot Basisreproduktionszahl und exponentielle Wchstumsrate
    #in einem Plot mit zwei Achsen
    fig, ax1 = plt.subplots(figsize=(14,8))
    ax1.set_xlabel('Zeit')
    ax1.set_ylabel('Basisreproduktionszahl', color='m')
    ax1.plot(x[:t], r0[:t], color='m')
    ax1.tick_params(axis='y', labelcolor='m')

    ax2 = ax1.twinx()

    color = 'tab:blue'
    ax2.set_ylabel('exponentielle Wachstumsrate', color=color)
    ax2.tick_params(axis='y', labelcolor=color)
    ax2.plot(x[:t], dif[:t], color=color)
    plt.savefig(dir_path + 'dif_exp.png')

    fig.tight_layout()
    plt.show()

    #Glaetten der exponentielle Wachstumsrate
    smoothday = 15
    plt.figure(figsize=(14,8))
    difhat = scipy.signal.savgol_filter(dif, smoothday, 2)
    plt.plot(x[:t],dif[:t], label = 'modellierte Ex(t)')
    plt.plot(x[:t],difhat[:t], color='red', label='geglaettete Ex(t)')
    plt.xlabel('Zeit')
    plt.grid(b=True, which='major', color='#666666', linestyle='-')
    plt.minorticks_on()
    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
    plt.legend()
    plt.savefig(dir_path + 'smoothExp.png')

    #Speichern
    dirs = '\\data\\'
    dir_path2 = os.path.dirname(os.path.abspath('__file__')) + dirs
    dir_path2 = dir_path2+country+'\\'
    date_X = create_assist_date("2020-1-22", "2022-10-01")
    data = []
    for i in range(0, len(difhat)):
        if i < len(difhat):
            data.append((date_X[i], difhat[i]))

    df = pd.DataFrame(data)
    df.columns=['date','difhat']
    df.to_csv(dir_path2 +'smoothDif%i.csv' %smoothday)

    #Glaetten der Basisreproduktionszahl
    plt.figure(figsize=(14,8))
    r0hat = scipy.signal.savgol_filter(r0, smoothday, 2)
    plt.plot(x[:t],r0[:t], label='modelliertes $R_{0}(t)$')
    plt.plot(x[:t], r0hat[:t], color='red', label= 'geglaetettes $R_{0}(t)$')
    plt.legend()
    plt.xlabel('Zeit')
    plt.grid(b=True, which='major', color='#666666', linestyle='-')
    plt.minorticks_on()
    plt.grid(b=True, which='minor', color='#999999', linestyle='-', alpha=0.2)
    plt.savefig(dir_path + 'smoothR.png')
    date_X = create_assist_date("2020-1-22", "2022-10-01")
    data = []
    for i in range(0, len(r0hat)):
        if i < len(r0hat):
            data.append((date_X[i], r0hat[i]))

    df = pd.DataFrame(data)
    df.columns=['date','r0hat']
    df.to_csv(dir_path2 +'smoothr0%i.csv' %smoothday)

In [7]:
""" Auswertung """
country = 'Germany'
#predictI_bySize(country, 0, 5)
#modelGermany(country, 5, 472, 0)
#plotParam(country, 5, 337)

