In [0]:
from scipy.optimize import dual_annealing, minimize
from sklearn.metrics import r2_score
from collections import namedtuple
from matplotlib import pyplot as plt

In [0]:
plt.rcParams['font.sans-serif'] = ['SimHei']
SEIR_PARAM = namedtuple('SEIRparm', ['beta_1', 'beta_2', 'beta_T', 'xi', 'sigma', 'gamma_I', 'gamma_T', 'mu_I', 'mu_T', 'theta_I'])

class SEIR(object):
    def __init__(self, P=None):
        self.P = P
    def _forward(self, S, E, I, T, D, R, param, max_iter):
        beta_1, beta_2, beta_T, xi, sigma, gamma_I, gamma_T, mu_I, mu_T, theta_I= param
        est = pd.DataFrame(columns=['S', 'E', 'I', 'T', 'D', 'R'])
        N = S + E + I + T + D + R
        for t in range(max_iter):
            S_ = S - (beta_1 * S * I)/N - (beta_2 * S * I)/N - (beta_T * S * T)/N + xi* R
            E_ = E + (beta_1 * S * I)/N + (beta_2 * S * I)/N + (beta_T * S * T)/N - sigma * E
            I_ = I + sigma*E - gamma_I *I - mu_I *I - theta_I * I
            T_ = T + theta_I * I - gamma_T *T - mu_T *T
            D_ = D + mu_T *T + mu_I *I
            R_ = R + gamma_I *I + gamma_T *T - xi* R
            S, E, I, T, D, R = S_, E_, I_, T_, D_, R_
            est.loc[t] = [S, E, I, T, D, R]
        return est
    def _loss(self, obs, est):
        assert len(obs) == len(est)
        loss = ((np.log2(obs + 1) - np.log2(est + 1)) ** 2).sum()
        self.lossing.append(loss)
        return loss
    def _optimize(self, param, s, e, i, t, d, c, obs):
        est = self._forward(s, e, i, t, d, c, param, len(obs))
        return self._loss(obs, np.array(est[['I', 'T', 'D', 'R']]))
    def fit(self, initS, initE, initI, initT, initD, initC, Y):
        self.lossing = []
        args = (initS, initE, initI, initT, initD, initC, np.array(Y))
        param = [(0, 1),]*10
        result = dual_annealing(self._optimize, param, args=args, seed=30, maxiter=8,x0=[0.155,0.03,0.05,0.001,1/5.2,1/12.39,1/9.0,0.0004,0.0004,0.02])['x']
        self.P = SEIR_PARAM(*result)
    def score(self, initS, initE, initI, initT, initD, initC, Y, plot=False):
        est = self.predict(initS, initE, initI, initT, initD, initC, len(Y))[['I', 'T', 'D', 'R']]
        loss = self._loss(np.array(Y[['cases','tests', 'deaths', 'recovered']]), np.array(est))
        est.columns = ['cases','tests', 'deaths', 'recovered']
        r1 = r2_score(Y['cases'], est['cases'])
        r2 = r2_score(Y['tests'], est['tests'])
        r3 = r2_score(Y['deaths'], est['deaths'])
        r4 = r2_score(Y['recovered'], est['recovered'])
        if plot:
            self.plot_predict(Y, est)
            #print(' - 平均潜伏期为：%.2f天' % (1.0 / self.P.beta))
            #print(' - 病毒再生基数：%.2f' % (self.P.alpha1 / self.P.beta + (self.P.alpha2 / self.P.sigma + self.P.alpha2 / self.P.gamma)/ 2))
            print(' - recoveredR2：%.4f' % r4)
            print(' - deathsR2：%.4f' % r3)
            print(' - testsR2：%.4f' % r2)
            print(' - receoveredR2：%.4f' % r1)

            print(' - model R2：%.4f' % ((r1 + r2 + r3 + r4) / 4))
            print(' - Loss：%.4f' % loss)
        return loss, (r1 + r2 + r3+ r4) / 4
    
    def plot_error(self):
        plt.plot(self.lossing, label=u'error')
        plt.legend()
        plt.show()
    
    def plot_predict(self, obs, est):
        for label, color in zip(obs.keys(), ['red', 'black', 'green','pink']):
            plt.plot(obs[label], color=color)
            plt.plot(est[label], color=color, alpha=0.6, label=label)
            plt.legend()
            plt.show()
            
    def predict(self, initS, initE, initI, initT, initD,initC, T):
        return self._forward(initS, initE, initI, initT, initD, initC, self.P, T)
  

In [0]:
from pandas import DataFrame
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd
from numpy import array


df_data = pd.read_csv("data.csv")
df_data.drop(columns='Unnamed: 0',axis=1,inplace=True)


In [0]:
def searchBestParam(seir):
    min_loss, max_r2, best_param, likeli_potential = float('inf'),0.0, None, 0
    for potential in range(0, 100000, 2500):
        seir.fit(60000000, potential, 132, 4324, 2, 1, df_data)
        loss, r2 = seir.score(60000000, potential, 132, 4324, 2, 1, Y=df_data)
        if loss < min_loss and r2 > max_r2:
            print('potential：%.4f | R2：%.4f | error： %.6f' % (potential, r2, loss))
            min_loss, max_r2, best_param, likeli_potential = loss, r2, seir.P, potential
    seir.P = best_param
    seir.score(60000000, potential, 132, 4324, 2, 1, df_data, plot=True)
    return seir, likeli_potential

seir, potentials = searchBestParam(SEIR())

def forcast(seir, T):
    predict = seir.predict(60000000, potential, 132, 4324, 2, 1, T)
    plt.plot(df_data['cases'], label='cases(real)', color='red')
    plt.plot(df_data['deaths'], label='deaths(real)', color='black')
    plt.plot(df_data['recovered'], label='recovered(real)', color='green')
    plt.plot(df_data['tests'], label='tests(real)', color='pink')
    # plt.plot(predict['S'], label='易感(预计)', color='blue', alpha=0.5)
    plt.plot(predict['E'], label='potential', color='orange', alpha=0.5)
    plt.plot(predict['I'], label='cases', color='red', alpha=0.5)
    plt.plot(predict['T'], label='tests', color='pink', alpha=0.5)
    plt.plot(predict['D'], label='Deaths', color='black', alpha=0.5)
    plt.plot(predict['R'], label='recovered', color='green', alpha=0.5)
    plt.legend()
    plt.show()
forcast(seir, 30)

  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  if sys.path[0] == '':
  if sys.path[0] == '':
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]


potential：12500.0000 | R2：0.0398 | error： 229.509768


  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  
  from ipykernel import kernelapp as app


potential：22500.0000 | R2：0.1292 | error： 228.860495


  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  
  from ipykernel import kernelapp as app
  if sys.path[0] == '':
  if sys.path[0] == '':
  del sys.path[0]
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  del sys.path[0]
  if sys.path[0] == '':
  

In [0]:
len(np.array(df_data[['cases','tests', 'deaths', 'recovered']]))

69

In [0]:
%debug


> [0;32m<ipython-input-5-b28299ad24f4>[0m(23)[0;36m_loss[0;34m()[0m
[0;32m     21 [0;31m    [0;32mdef[0m [0m_loss[0m[0;34m([0m[0mself[0m[0;34m,[0m [0mobs[0m[0;34m,[0m [0mest[0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     22 [0;31m        [0;32massert[0m [0mlen[0m[0;34m([0m[0mobs[0m[0;34m)[0m [0;34m==[0m [0mlen[0m[0;34m([0m[0mest[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m---> 23 [0;31m        [0mloss[0m [0;34m=[0m [0;34m([0m[0;34m([0m[0mnp[0m[0;34m.[0m[0mlog2[0m[0;34m([0m[0mobs[0m [0;34m+[0m [0;36m1[0m[0;34m)[0m [0;34m-[0m [0mnp[0m[0;34m.[0m[0mlog2[0m[0;34m([0m[0mest[0m [0;34m+[0m [0;36m1[0m[0;34m)[0m[0;34m)[0m [0;34m**[0m [0;36m2[0m[0;34m)[0m[0;34m.[0m[0msum[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[0;32m     24 [0;31m        [0mself[0m[0;34m.[0m[0mlossing[0m[0;34m.[0m[0mappend[0m[0;34m([0m[0mloss[0m[0;34m)[0m[0;34m[0m[0;3

In [0]:
df_data=df_data[['cases','tests','deaths','recovered']]

In [0]:
def forcast(seir, T):
    predict = seir.predict(60000000, potential, 132, 4324, 2, 1, T)
    plt.plot(df_data['cases'], label='cases(real)', color='red')
    plt.plot(df_data['deaths'], label='deaths(real)', color='black')
    plt.plot(df_data['recovered'], label='recovered(real)', color='green')
    plt.plot(df_data['tests'], label='tests(real)', color='pink')
    # plt.plot(predict['S'], label='易感(预计)', color='blue', alpha=0.5)
    plt.plot(predict['E'], label='potential', color='orange', alpha=0.5)
    plt.plot(predict['I'], label='cases', color='red', alpha=0.5)
    plt.plot(predict['T'], label='tests', color='pink', alpha=0.5)
    plt.plot(predict['D'], label='Deaths', color='black', alpha=0.5)
    plt.plot(predict['R'], label='recovered', color='green', alpha=0.5)
    plt.legend()
    plt.show()
forcast(seir, 30)

NameError: ignored