### COVID-19 Predictions

In [None]:
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import seaborn as sns 
import os

plt.rc('font', size=6)

In [None]:
df = pd.read_csv('data/cases_and_deaths.csv')
df = df.fillna(0)
df['date'] = pd.to_datetime(df['date']).astype('int64')
df.head()

Let's take only Indonesia's data

In [None]:
df = df.loc[df['location'] == 'Indonesia']
df.head()

In [None]:
fig = plt.figure(figsize=(4, 3))
plt.xticks(df['date'][::60], rotation=90)
plt.plot(df['date'], df['new_cases'])

In [None]:
clean_df = df[['date', 'new_cases']]
clean_df.head()

In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.stattools import adfuller
from dateutil.parser import parse

In [None]:
additive_decomposition = seasonal_decompose(clean_df['new_cases'], model='additive', period=30)

In [None]:
additive_decomposition.plot().suptitle('Additive Decomposition')
plt.tight_layout(rect=[0, 0.03, 1, 0.95])

In [None]:
# Test for Stationarity

result = adfuller(clean_df['new_cases'], autolag='AIC')
print(f'ADF Statistic: {result[0]}')
print(f'n_lags: {result[1]}')
print(f'p-value: {result[1]}')
for key, value in result[4].items():
    print('Critial Values:')
    print(f'   {key}, {value}')    

p-value is less than $0.05$ so the data is more or less stationary

In [None]:
detrended = clean_df['new_cases'].values - additive_decomposition.trend

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(6, 3))
ax1.plot(clean_df['new_cases'])
ax2.plot(detrended)

In [None]:
deseasonalized = clean_df['new_cases'].values / additive_decomposition.seasonal

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(6, 3))
ax1.plot(clean_df['new_cases'])
ax2.plot(deseasonalized)

In [None]:
from statsmodels.tsa.stattools import acf, pacf
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Draw Plot
fig, axes = plt.subplots(1,2,figsize=(16,3), dpi= 100)
plot_acf(clean_df['new_cases'].tolist(), lags=50, ax=axes[0])
plot_pacf(clean_df['new_cases'].tolist(), lags=50, ax=axes[1])

Creating the Model

In [None]:
from keras.models import Sequential
from keras.layers import LSTM
from keras.layers.core import Dense, Activation, Dropout
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.utils import shuffle

In [None]:
data_raw = clean_df['new_cases'].values.astype('float32').reshape(-1, 1)

In [None]:
data_raw.shape

In [None]:
scaler = MinMaxScaler(feature_range = (0, 1))

In [None]:
dataset = scaler.fit_transform(data_raw)

In [None]:
dataset.shape

In [None]:
TRAIN_SIZE = 0.80

train_size = int(len(dataset) * TRAIN_SIZE)
test_size = len(dataset) - train_size
train, test = dataset[0:train_size, :], dataset[train_size:len(dataset), :]
print("Number of entries (training set, test set): " + str((len(train), len(test))))

In [None]:
def create_dataset(dataset, window_size = 1):
    data_X, data_Y = [], []
    for i in range(len(dataset) - window_size - 1):
        a = dataset[i:(i + window_size), 0]
        data_X.append(a)
        data_Y.append(dataset[i + window_size, 0])
    return(np.array(data_X), np.array(data_Y))

In [None]:
# Create test and training sets for one-step-ahead regression.
window_size = 60
train_X, train_Y = create_dataset(train, window_size)
test_X, test_Y = create_dataset(test, window_size)
print("Original training data shape:")
print(train_X.shape)


# Reshape the input data into appropriate form for Keras.
train_X = np.reshape(train_X, (train_X.shape[0], 1, train_X.shape[1]))
test_X = np.reshape(test_X, (test_X.shape[0], 1, test_X.shape[1]))
print("New training data shape:")
print(train_X.shape)

In [None]:
print(train_X)
print(train_Y)

In [None]:
model = Sequential()

In [None]:
model.add(LSTM(input_shape=(1, window_size),
               units=window_size,
               return_sequences=True, activation='relu'))
model.add(Dropout(0.2))
model.add(LSTM(256))
model.add(Dropout(0.2))
model.add(Dense(1))
model.add(Activation("sigmoid"))
model.compile(loss="mse", optimizer="adam")
model.summary()


In [None]:
model.fit(train_X, train_Y, epochs=3000, batch_size=32, verbose=2)

In [None]:
def predict_and_score(model, X, Y):
    pred_scaled = model.predict(X)
    pred = scaler.inverse_transform(pred_scaled)
    orig_data = scaler.inverse_transform([Y])
    score = math.sqrt(mean_squared_error(orig_data[0], pred[:, 0]))
    return(score, pred, pred_scaled)

rmse_train, train_predict, train_predict_scaled = predict_and_score(model, train_X, train_Y)
rmse_test, test_predict, test_predict_scaled = predict_and_score(model, test_X, test_Y)

print(f"Training RMSE: {rmse_train} RMSE")
print(f"Test RMSE: {rmse_test} RMSE")

test_predict.size

In [None]:
train_predict_plot = np.empty_like(dataset)
train_predict_plot[:, :] = np.nan
train_predict_plot[window_size:len(train_predict) + window_size, :] = train_predict
test_predict_plot = np.empty_like(dataset)
test_predict_plot[:, :] = np.nan
test_predict_plot[len(train_predict) + (window_size * 2) + 1:len(dataset) - 1, :] = test_predict
plt.figure(figsize = (10, 5))
plt.plot(scaler.inverse_transform(dataset), label = "True value")
plt.plot(train_predict_plot, label = "Training set prediction")
plt.plot(test_predict_plot, label = "Test set prediction")
plt.xlabel("Days")
plt.ylabel("New COVID Cases")
plt.title("Comparison true vs. predicted training / test")
plt.legend()
plt.show()

In [None]:
# Create Predictions

def predict(T, X):
    X_val=X.reshape(1, 1, window_size)

    predictions = []

    for t in range(T):
        P = model.predict(X_val[-1].reshape(1, 1, window_size))
        predictions.append(P[0][0])
        new_X_val = X_val[-1][0].reshape(window_size)
        new_X_val = new_X_val[1:]
        P = P[0].reshape(1)
        new_X_val = np.concatenate((new_X_val, P), axis=0)
        X_val = np.concatenate((X_val, [[new_X_val]]))

    predictions = [predictions]
    predictions_scaled = scaler.inverse_transform(predictions)

    return predictions, predictions_scaled

    

In [None]:
predictions, predictions_scaled = predict(5, test_X[-1])

In [None]:
results = [round(x) for x in predictions_scaled[0]]

In [None]:
print(results)