In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential
from keras.layers import LSTM, Dense
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
import tensorflow as tf

np.random.seed(42)

df = pd.read_excel('../data/state_month_overdose.xlsx')

df['Deaths'] = df['Deaths'].apply(lambda x: 0 if x == 'Suppressed' else int(x))

df['Month'] = pd.to_datetime(df['Month'])
df.set_index('Month', inplace=True)

df = df.reset_index() #2015/01
df['Month Code'] = pd.to_datetime(df['Month Code'])#.reset_index() #2015-01-01
# df.set_index('Month', inplace=True)
df = df.groupby(['Month']).agg({'Deaths': 'sum'}).reset_index()



train = df[df['Month'] <= '2019-01-01']
test = df[(df['Month'] >= '2019-01-01') & (df['Month'] <= '2019-12-01')]
testog = test
test = test.reset_index().drop(columns = ['index'])



# Modify the create_dataset function to use a lookback of 3 months
def create_dataset(dataset, look_back=3):
    dataX, dataY = [], []
    for i in range(len(dataset) - look_back):
        a = dataset.iloc[i:(i + look_back)]  # Collect the previous 'look_back' months
        dataX.append(a)
        dataY.append(dataset.iloc[i + look_back])  # The target is the subsequent month
    return np.array(dataX), np.array(dataY)

look_back = 3

extended_test = pd.concat([train.iloc[-look_back:], test])

# Prepare LSTM datasets
trainX, trainY = create_dataset(train['Deaths'], look_back)
testX, testY = create_dataset(extended_test['Deaths'], look_back)


# Reshape inputs to match LSTM input requirements (samples, time_steps, features)
trainX = trainX.reshape((trainX.shape[0], look_back, 1))
testX = testX.reshape((testX.shape[0], look_back, 1))

# Rebuild the LSTM model
model = Sequential()
model.add(LSTM(50, activation='relu', input_shape=(look_back, 1)))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=100, batch_size=1, verbose=1)

# Updated generate_forecast function to handle 3-month lookback
def generate_forecast(model, initial_sequence, num_predictions=12, look_back=3):
    predictions = []
    for _ in range(num_predictions):
        # Generate the next prediction
        next_prediction = model.predict(initial_sequence)
        predictions.append(next_prediction[0][0])
        
        # Update the sequence with the new prediction
        new_sequence = np.append(initial_sequence[0, 1:], [[next_prediction[0][0]]], axis=0)
        initial_sequence = new_sequence.reshape((1, look_back, 1))

    return np.array(predictions)

# Prepare the initial sequence for forecasting using the last `look_back` months from training
initial_sequence = trainX[-1].reshape((1, look_back, 1))

# Generate test predictions with the updated lookback logic
testPredict = generate_forecast(model, initial_sequence, num_predictions=testY.shape[0], look_back=look_back)
trainPredict = model.predict(trainX)

# Flatten predictions for visualization and evaluation
testPredictlst = testPredict.flatten().tolist()
trainPredictlst = trainPredict.flatten().tolist()

# Combine actual data and predictions
combined_array = [0] * look_back + trainPredictlst + testPredictlst
df['LSTM Predictions'] = combined_array




sarima_model = SARIMAX(train['Deaths'], order=(1, 1, 1), seasonal_order=(1, 1, 1, 12),
                enforce_stationarity=False,
                enforce_invertibility=False)
sarima_result = sarima_model.fit(disp=False)
sarima_predictions = sarima_result.predict(start=0, end=len(train) + len(test) - 1, dynamic=False)

df['SARIMA Predictions'] = sarima_predictions

df.to_csv(f'/tables/{look_back}month_predictionresults_batch_1_loss_mse.csv')

plottable = df.iloc[1:] #only taking rows with predictions so excluding part included in first lookback
plottable.set_index('Month', inplace=True)

In [None]:
# df[(df['Month'] >= '2019-01-01') & (df['Month'] <= '2019-12-01')]

In [None]:
df

In [None]:
trainX

In [None]:
print(len(testY))
testY

In [None]:
trainY

In [None]:
print(len(testPredict))
testPredict

In [None]:
testPredict[:len(testY)]

In [None]:
len(sarima_predictions[1:len(trainPredict)+1])

In [None]:
sarima_predictions[len(trainPredict):len(trainPredict)+len(testY)]

In [None]:
trainPredict

In [None]:
# LSTM calculate root mean squared error
print('LSTM')
trainScore = np.sqrt(mean_squared_error(trainY, trainPredict))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = np.sqrt(mean_squared_error(testY[:len(testPredict)], testPredict[:len(testY)]))
print('Test Score: %.2f RMSE' % (testScore))

#SARIMA Error Metrics
print('SARIMA')
trainScore = np.sqrt(mean_squared_error(trainY, sarima_predictions[1:len(trainPredict)+1]))
print('Train Score: %.2f RMSE' % (trainScore))
testScore = np.sqrt(mean_squared_error(testY, sarima_predictions[len(trainPredict):len(trainPredict)+len(testY)]))
print('Test Score: %.2f RMSE' % (testScore))

plt.figure(figsize=(10, 6))
plt.plot(plottable.index, plottable['Deaths'], label='Actual Data', color='blue')
plt.plot(plottable.index, plottable['LSTM Predictions'], label='LSTM Predictions', color='red')
plt.plot(plottable.index, plottable['SARIMA Predictions'], label='SARIMA Predictions', color='green')
plt.title('Deaths: Actual vs LSTM vs SARIMA Predictions (Without Scaling)')
plt.xlabel('Date')
plt.ylabel('Deaths')
plt.legend()
plt.show()

In [None]:
testY

In [None]:
def calculate_confidence_intervals(predictions, alpha=0.05):
    # Calculate mean and standard deviation
    mean_pred = np.mean(predictions)
    std_pred = np.std(predictions)
    
    # Calculate the z-score for the confidence level
    z_score = 1.96  # for 95% confidence
    margin_of_error = z_score * (std_pred / np.sqrt(len(predictions)))
    
    lower_bound = predictions - margin_of_error
    upper_bound = predictions + margin_of_error
    
    return lower_bound, upper_bound

sarimaTestPredict = df[df['Month'] > '2020-01-01']['SARIMA Predictions']
# Calculate confidence intervals
lower_bound_test, upper_bound_test = calculate_confidence_intervals(testPredict)
lower_bound_sarima, upper_bound_sarima = calculate_confidence_intervals(sarimaTestPredict)

In [None]:
sarimaTestPredict

In [None]:
testPredict

In [None]:
def calculate_overlap(lower1, upper1, lower2, upper2):
    # Initialize overlap count
    overlap_count = 0

    for l1, u1, l2, u2 in zip(lower1, upper1, lower2, upper2):
        # Check for overlap
        if u1 >= l2 and l1 <= u2:
            overlap_count += 1

    # Calculate percent overlap
    percent_overlap = (overlap_count / len(lower1)) * 100
    return percent_overlap

# Calculate percent overlap
percent_overlap = calculate_overlap(lower_bound_test, upper_bound_test, lower_bound_sarima, upper_bound_sarima)

print(f'Percent Overlap: {percent_overlap:.2f}%')

In [None]:
plottable