In [None]:
import pandas as pd
import numpy as np
import statsmodels.api as smf
from statsmodels.regression.quantile_regression import QuantReg
import matplotlib.pyplot as plt


In [None]:
def load_and_prepare_data(state):

    # Load data
    if(state=='US'):
        data = pd.read_csv('./data/ILI_national_2002_2024.csv')
        data = ILI_df[['date','year','week','weighted_ili']]
        data = data.rename(columns={'weighted_ili':'US'})
    else:
        data = pd.read_csv('./data/ILI_states_2010_2024.csv')

    # data['week'] = data['date'].dt.isocalendar().week.astype('int32')  # Get week of the year
    # data = data[data.week!=53] #ignore week 53 for now
    data['date'] = pd.to_datetime(data['date'])
    data = data[(data.date<'2020-06-28') | (data.date>='2022-07-01')] #remove covid years
    data = data[['date','week',state]] #    data = data.drop(columns=['year'])  # data = data.drop(columns=['DC']) 
    data_long = data.melt(id_vars=['date', 'week'], var_name='location', value_name='count')
    # data_long = pd.get_dummies(data_long, columns=['location'], dtype='int32', drop_first=True)
    return data_long

In [None]:
def generate_features(data, num_weeks, weeks_ahead):
    # Create lagged features for the number of specified weeks
    for i in range(1,num_weeks+1):
        data[f'lag_{i}'] = data['count'].shift(i)

    data['future_count'] = data['count'].shift(-weeks_ahead)
    # Drop rows with NaN values that result from lagging
    data = data.dropna()
    return data

In [None]:
def fit_quantile_regression(data, predictors, quantiles):
    
    np.asarray(data[predictors])
    results = {}
    # Iterate over each quantile
    for q in quantiles:
        # Fit the quantile regression model
        mod = QuantReg(data['future_count'], data[predictors])
        res = mod.fit(q=q)
        results[q] = res
    return results

In [None]:
def calculate_wis(model_results, data, predictors, quantiles):
   
    # Generate quantile pairs
    n = len(quantiles)
    quantile_pairs = [(quantiles[i], quantiles[n-i-1]) for i in range(n//2)]

    # Calculate the observed and predicted values
    obs = data['future_count']

    # Calculate Interval Scores (IS) for specified pairs of quantiles
    wis = 0
    for (q_lower, q_upper) in quantile_pairs:
        p = q_upper - q_lower
        alpha = 1 - p

        # Retrieve predictions for the upper and lower quantiles
        L = model_results[q_lower].predict(data[predictors])
        U = model_results[q_upper].predict(data[predictors])

        # Interval Score calculation
        IS = (U - L) + (2 / alpha) * ((L - obs) * (obs < L) + (obs - U) * (obs > U))
        
        # Weight for each interval score, using alpha/2 as described
        wis += (alpha / 2) * IS.mean()  # mean of IS over all observations

    # Evaluate median accuracy separately if it is a distinct quantile
    if 0.5 in quantiles:
        
        median_predictions = model_results[0.5].predict(data[predictors])
        median_error = abs(median_predictions - obs).mean()
        wis += median_error

    K = len(quantile_pairs)
    wis = wis/(K+0.5)
    return wis

In [None]:
def calculate_q_loss(model_results, data, predictors, quantiles):

    # Calculate the observed and predicted values
    obs = data['future_count']

    # Calculate Interval Scores (IS) for specified pairs of quantiles
    q_loss = 0
    for q in quantiles:
        # Retrieve predictions for q
        pred = model_results[q].predict(data[predictors])
        # Calculate residuals
        err = obs - pred
        # Calculate quantile loss
        loss = np.maximum(q * err, (q - 1) * err)
        q_loss += loss.mean()
    
    return q_loss

In [None]:
def calculate_coverage(model_results, data, predictors, lower_quantile, upper_quantile):
    # Predict using the specified quantiles
    lower_predictions = model_results[lower_quantile].predict(data[predictors])
    upper_predictions = model_results[upper_quantile].predict(data[predictors])
    
    # Calculate whether each actual value is within the predicted interval
    within_interval = (data['future_count'] >= lower_predictions) & (data['future_count'] <= upper_predictions)
    
    # Calculate coverage percentage
    coverage_percentage = within_interval.mean() * 100  # Convert proportion to percentage
    return coverage_percentage

In [None]:
def plot_quantile_regression(model_results, data, predictors, quantiles, state, 
                             weeks_ahead, wis_score, q_loss, colors=None):
    
    plt.figure(figsize=(10, 4))
    dates = data['date']
    obs = data['future_count']
    
    for i, q in enumerate(quantiles):
        predictions = model_results[q].predict(data[predictors])
        if(colors is None):
            plt.plot(dates, predictions, label=f'Quantile {q}',alpha=0.75)
        else:
            plt.plot(dates, predictions, label=f'Quantile {q}',alpha=0.75, color=colors[i])

    plt.plot(dates, obs, 'o--', color='black', markersize=3, alpha=0.75, label='Observed')
    # plt.scatter(time, obs, s=2, color='blue', label='Observed')
    plt.title('state={}, horizon={} weeks (WIS={}, Q-loss={})'.format(state, weeks_ahead,wis_score,q_loss))    
    plt.xlabel('week')
    plt.ylabel('ILI')
    plt.legend(fontsize=10)
    plt.show()

In [None]:
num_weeks = 4  # Number of weeks to use for prediction

weeks_ahead = 1  # Number of weeks ahead you want to predict

quantiles = [0.010, 0.025, 0.050, 0.100, 0.150, 0.200, 0.250, 0.300, 0.350,
             0.400, 0.450, 0.500, 0.550, 0.600, 0.650, 0.700, 0.750, 0.800,
             0.850, 0.900, 0.950, 0.975, 0.990]

# Load data
state = 'US'
data = load_and_prepare_data(state)

# Prepare the data with lagged weeks
data = generate_features(data, num_weeks, weeks_ahead)

# Debug: Check data types
# print(data.dtypes)
# print(data.head())

# Split data into training and testing sets
train_data = data[data.date>='2022-07-01'] #data[data.date<'2019-07-01'] #data[data.date<'2016-10-15'] #
test_data = data[data.date>='2022-07-01'] #data[data.date>='2019-07-01'] #data[data.date>='2016-10-15'] #

predictors = ([f'lag_{i}' for i in range(1, num_weeks + 1)] + ['week'])# + 
              #[col for col in train_data.columns if 'location_' in col])
 
# Fit the model
model_results = fit_quantile_regression(train_data, predictors, quantiles)

 # Calculate WIS
# wis_score_train = np.round(calculate_wis(model_results, train_data, predictors, quantiles),3)
# print(f'Weighted Interval Score - train (length={len(train_data)}): {wis_score_train}')
wis_score_test = np.round(calculate_wis(model_results, test_data, predictors, quantiles),3)
print(f'Weighted Interval Score - test (length={len(test_data)}): {wis_score_test}')
q_loss_test = np.round(calculate_q_loss(model_results, test_data, predictors, quantiles),3)
print(f'Quantile loss - test (length={len(test_data)}): {q_loss_test}')

In [None]:
lower_quantile = 0.25 #0.025
upper_quantile = 0.75 #0.975
coverage_percentage = calculate_coverage(model_results, test_data, predictors, lower_quantile, upper_quantile)
print(f'Coverage of actuals within the {lower_quantile * 100}% to {upper_quantile * 100}% interval: {coverage_percentage:.2f}%')

In [None]:
# Select a subset or a specific location to plot, or adjust the data selection as needed
quantiles_show = [0.025, 0.50, 0.975]
# specific_data = test_data[test_data['location'] == 'NY']
#specific_data = test_data[test_data['location_US'] == 1]
# plot_quantile_regression(model_results, specific_data, predictors, quantiles_show)
plot_quantile_regression(model_results, test_data, predictors, quantiles_show, state, weeks_ahead, wis_score_test, q_loss_test, colors=['blue','green','red'])

In [None]:
# Print the summary of the models for each quantile
# for q, result in model_results.items():
#     print(f'Quantile: {q}')
#     print(result.summary())
print(model_results[0.5].summary())