# **Neural Networks for SIR models**
# **Keras -- Practical Session 2**
## **Modelos Epidemiológicos, 2020**

In [None]:
import numpy as np
import scipy as sp
import scipy.stats as st
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import statsmodels.api as sm
from time import time
import tqdm

from scipy.integrate import solve_ivp
from scipy.integrate import odeint

from itertools import product

from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras.layers import Input, Dense, Dropout, Activation, Concatenate
from tensorflow.keras import optimizers
from tensorflow.keras.utils import plot_model, model_to_dot

from tensorflow.keras.callbacks import Callback, EarlyStopping, ModelCheckpoint
import tensorflow.keras.backend as K

#need to install pydot and graphviz libraries
# to do: open your terminal and access your keras environment
# execute the following command
#    conda install -c anaconda pydot graphviz

## Create synthetic data

In [None]:
def sirN(y, t, beta, gamma, N):
    S,I,R = y
    dSdt  = -beta*I*S / N
    dIdt  = beta*I*S / N - gamma*I
    dRdt  = gamma*I
    return [dSdt, dIdt, dRdt]

In [None]:
Nmin = 1
Nmax = 1
N = 789

h = 1
Pop = np.linspace(Nmin,Nmax,1+int((Nmax-Nmin)/h))
print(Pop)

In [None]:
bmin = 1.5
bmax = 2.0 
hb = 0.005
beta = np.linspace(bmin, bmax, 1+int((bmax-bmin)/hb))
print(beta)

In [None]:
gmin = 0.75
gmax = 1.25
hg = 0.005
gamma = np.linspace(gmin, gmax, 1+int((gmax-gmin)/hg))
print(gamma)

In [None]:
x = product(beta, gamma, Pop)
#print(x)

In [None]:
T = 25.
tspan = [0., T]
t = np.linspace(0, T, 501)

print(t)

In [None]:
# storage
sol = []

for i in x:  
    # parameters
    bet = i[0]
    gam = i[1]
    n = i[2]

    S0     = (N-1.)/N   # n-1.
    I0     = 1./N       # 1.
    R0     = 0.
    params = (bet, gam, n)
    y0     = [S0, I0, R0]

    # solve SIR model
    #yt = solve_ivp(sir, tspan, y0, t_eval=teval, args=params)
    yt = odeint(sirN, y0, t, args=params)

    # store solution
    sol.append(np.hstack([bet, gam, n, S0, I0, R0, t.ravel(), (yt.T).ravel()]))

print(t.shape)
print(yt.shape)

In [None]:
sol = np.array(sol).astype(np.float32)
print(sol.shape)

In [None]:
print(sol.min(), sol.max())
#print(sol.min(), sol.max())

## Data

In [None]:
#parametros
M = 789.
beta  = 1.89
gamma = 0.48
S0    = (M-1.)/M
I0    = 1./M
R0    = 0.
y0    = np.array([S0, I0, R0])
params = (beta, gamma, 1.)

val_sol = odeint(sirN, y0, t, args=params)
St = val_sol[:,0]
It = val_sol[:,1]
Rt = val_sol[:,2]

In [None]:
print(St.shape, It.shape, Rt.shape)

In [None]:
plt.figure(figsize=(6,4))
plt.plot(St, label=r'$S(t)$')
plt.plot(It, label=r'$I(t)$')
plt.plot(Rt, label=r'$R(t)$')
plt.xlabel('t')
plt.ylabel('Solution')
plt.title(r'SIR model')
plt.legend()
plt.show()

In [None]:
def ReLU(x):
    y = x.copy()
    y[y<0] = 0.
    return y

def doubleReLU(x):
    y = x.copy()
    y[y<0] = 0.
    y[y>1] = 1.
    return y

In [None]:
# add noise
St_noise = doubleReLU(st.poisson.rvs(100.*St)/100.)
It_noise = doubleReLU(st.poisson.rvs(100.*It)/100.)
Rt_noise = doubleReLU(st.poisson.rvs(100.*Rt)/100.)

In [None]:
plt.figure(figsize=(15,4))
plt.subplot(1,3,1)
plt.plot(t, St, label='Real')
plt.plot(t, St_noise, 'r.', label='Observed')
plt.xlabel('t')
plt.ylabel('Susceptible')
plt.title(r'Susceptible $S(t)$, real and observed')
plt.subplot(1,3,2)
plt.plot(t, It, label='Real')
plt.plot(t, It_noise, 'r.', label='Observed')
plt.xlabel('t')
plt.ylabel('Infected')
plt.title(r'Infected $I(t)$, real and observed')
plt.subplot(1,3,3)
plt.plot(t, Rt, label='Real')
plt.plot(t, Rt_noise, 'r.', label='Observed')
plt.xlabel('t')
plt.ylabel('Recovered')
plt.title(r'Recovered $R(t)$, real and observed')
plt.legend()
plt.show()

### Training and Test data

In [None]:
# set start and end interval for training data
tmin_tr = 0
tmax_tr = 161

# set start and end interval for testing data
tmin_ts = 161
tmax_ts = 401

S_train = sol[:,-1503:-1503+tmax_tr:]
I_train = sol[:,-1002:-1002+tmax_tr:]
R_train = sol[:,-501:-501+tmax_tr:]

S_test = sol[:,-1503+tmax_tr:-1503+tmax_ts]
I_test = sol[:,-1002+tmax_tr:-1002+tmax_ts]
R_test = sol[:,-501+tmax_tr:-501+tmax_ts]

# full interval (train + test)
S_full = sol[:,-1503:-1503+tmax_ts]
I_full = sol[:,-1002:-1002+tmax_ts]
R_full = sol[:,-501:-501+tmax_ts]

In [None]:
print(S_train.shape, I_train.shape, R_train.shape)
print(S_test.shape, I_test.shape, R_test.shape)
print(S_full.shape, I_full.shape, R_full.shape)

In [None]:
print(I_train.min(), I_train.max())
print(I_test.min(), I_test.max())

In [None]:
input_shape = I_train.shape[1:]
print(input_shape)

output_shape = I_full.shape[1:]
print(output_shape)

In [None]:
# our train and test data are both concatenations of S(t), I(t), R(t) series
X_train = [S_train, I_train, R_train]
Y_train = np.hstack([S_full, I_full, R_full])
print(Y_train.shape)

## Train neural network

In [None]:
def SIR_Block(D, bname=''):
    X = Dense(128, activation='relu', name=bname+'dense1')(D)
    X = Dropout(0.25, name=bname+'dropout1')(X)
    X = Dense(128, activation='relu', name=bname+'dense2')(X)
    X = Dropout(0.25, name=bname+'dropout2')(X)
    X = Dense(64, activation='relu', name=bname+'dense3')(X)
    X = Dropout(0.25, name=bname+'dropout3')(X)
    X = Dense(64, activation='relu', name=bname+'dense4')(X)
    X = Dropout(0.25, name=bname+'dropout4')(X)
    X = Dense(output_shape[0], activation=None, name=bname+'output')(X)
    return X

In [None]:
def Model_3(input_shape, output_shape):
    S = Input(shape=input_shape, name='input_S')
    I = Input(shape=input_shape, name='input_I')
    R = Input(shape=input_shape, name='input_R')
    
    XS = SIR_Block(S, bname='S_')
    XI = SIR_Block(I, bname='I_')
    XR = SIR_Block(R, bname='R_')
    X = Concatenate(name='cat')([XS,XI,XR])
    
    model = Model([S,I,R], X, name='SIR-Model_3')
    
    return model

In [None]:
if 'model' in globals():
    del model
    model = None
    
model = Model_3(input_shape, output_shape)

In [None]:
model.summary()

In [None]:
# create figure of model  (uncomment after installing pydot and graphviz)
plot_model(model, to_file='model_3.png', show_shapes=True, show_layer_names=True)

### Customize loss function

In [None]:
def loss_SIR(y_actual, y_pred):
    S_actual = y_actual[:,:401]
    I_actual = y_actual[:,401:802]
    R_actual = y_actual[:,802:]
    S_pred = y_pred[:,:401]
    I_pred = y_pred[:,401:802]
    R_pred = y_pred[:,802:]
    
    w = np.array([1.,2.,1.])
    L = w[0]*K.mean(K.square(S_actual - S_pred)) + w[1]*K.mean(K.square(I_actual - I_pred)) + w[2]*K.mean(K.square(R_actual - R_pred))
    return L

In [None]:
# settings
alpha = 5e-5
decay = 1e-5
pat = 20

# define optimizer
opt = optimizers.Adam(learning_rate=alpha, decay=decay)

# compile model
model.compile(optimizer=opt, loss=loss_SIR, metrics=['mae'])

# early stopping settings
path = ''
modelname = 'SIR-Model3_01.h5'
callbacks_list = [EarlyStopping(monitor = 'val_loss', patience=pat),
                  ModelCheckpoint(filepath=path+modelname, monitor='val_loss', save_best_only=True)]

In [None]:
# training model
history = model.fit(x=X_train, y=Y_train, epochs=300, batch_size=64, callbacks=callbacks_list, validation_split=0.16666,
                    shuffle=True, verbose=1)

#history = model.fit(x=I_train, y=I_full, epochs=200, batch_size=64, validation_split=0.16666, shuffle=True)

In [None]:
history_dict = history.history
history_dict.keys()

In [None]:
# plot training history

loss = history.history['loss']
val_loss = history.history['val_loss']
metr = history.history['mae']
val_metr = history.history['val_mae']

start = 20
epochs = range(start, len(loss))

# figure
plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.plot(epochs, loss[start:], 'b', label='Training loss')
plt.plot(epochs, val_loss[start:], 'g', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1,2,2)
plt.plot(epochs, metr[start:], 'r', label='Training mae')
plt.plot(epochs, val_metr[start:], 'g', label='Validation mae')
plt.title('Training and validation metrics')
plt.xlabel('Epochs')
plt.ylabel('MAE')
plt.legend()
plt.show()

## Prediction

In [None]:
# downloading best saved model

#if 'model' in globals():
#    del model
#    model = None
#    
#model = load_model('SIR-Model3_01.h5')

In [None]:
X_test = [St_noise[:161].reshape(1,-1), It_noise[:161].reshape(1,-1), Rt_noise[:161].reshape(1,-1)]
#print(X_test.shape)

prediction = model.predict(X_test).ravel()

In [None]:
S_pred = prediction[:401]
I_pred = prediction[401:802]
R_pred = prediction[802:]

In [None]:
plt.figure(figsize=(6,4))
plt.plot(t[:tmax_tr], It_noise[:tmax_tr], 'b.', label='Observed data')
#plt.plot(t[:tmax_tr], I_pred[:tmax_tr], 'g-', label='Real data')
plt.plot(t[tmax_tr:tmax_ts], It_noise[tmax_tr:tmax_ts], 'r.', label='Expected')
plt.plot(t[:tmax_ts], I_pred.ravel(), 'k-', label='Prediction')
plt.xlabel('t')
plt.ylabel('Infected')
plt.title(r'Infected $I(t)$, real and observed')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(15,4))
plt.subplot(1,3,1)
plt.plot(t[:tmax_tr], St_noise[:tmax_tr], 'b.', label='Observed data')
plt.plot(t[:tmax_ts], St[:tmax_ts], 'g-', label='Real data')
plt.plot(t[tmax_tr:tmax_ts], St_noise[tmax_tr:tmax_ts], 'r.', label='Expected')
plt.plot(t[:tmax_ts], S_pred.ravel(), 'k-', label='Prediction')
plt.xlabel('t')
plt.ylabel('Susceptible')
plt.title(r'Susceptible $S(t)$, real and observed')
plt.subplot(1,3,2)
plt.plot(t[:tmax_tr], It_noise[:tmax_tr], 'b.', label='Observed data')
plt.plot(t[:tmax_ts], It[:tmax_ts], 'g-', label='Real data')
plt.plot(t[tmax_tr:tmax_ts], It_noise[tmax_tr:tmax_ts], 'r.', label='Expected')
plt.plot(t[:tmax_ts], I_pred.ravel(), 'k-', label='Prediction')
plt.xlabel('t')
plt.ylabel('Infected')
plt.title(r'Infected $I(t)$, real and observed')
plt.subplot(1,3,3)
plt.plot(t[:tmax_tr], Rt_noise[:tmax_tr], 'b.', label='Observed data')
plt.plot(t[:tmax_ts], Rt[:tmax_ts], 'g-', label='Real data')
plt.plot(t[tmax_tr:tmax_ts], Rt_noise[tmax_tr:tmax_ts], 'r.', label='Expected')
plt.plot(t[:tmax_ts], R_pred.ravel(), 'k-', label='Prediction')
plt.xlabel('t')
plt.ylabel('Recovered')
plt.title(r'Recovered $R(t)$, real and observed')
plt.legend()
plt.show()