In [2]:
import pandas as pd
import numpy as np
import warnings
import matplotlib.pyplot as plt
import seaborn as sns

import statsmodels.api as sm
import scipy.stats as stats
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
import matplotlib.patches as mpatches
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from scipy.fft import fft, fftfreq, ifft, rfft, rfftfreq
from scipy import signal as sig
from cmath import phase


sns.set_style('whitegrid')

import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 100
warnings.filterwarnings("ignore")

In [3]:
data = pd.read_csv('/storage/zkarwowska/microbiome-dynamics-preprint/datasets/alpha_diversity/male_shannon.csv')
female_data = pd.read_csv('/storage/zkarwowska/microbiome-dynamics-preprint/datasets/alpha_diversity/female_shannon.csv')

### ARIMAX WITH FFT

In [4]:
def make_fft(amp, N):
    
    ''' prepare fourier series of amplitude amp and length N '''

    N = N 
    T = 1/1 
    f = 1/amp

    x = np.linspace(0, N*T, N, endpoint=False)
    ft = np.cos(f * 2.0*np.pi*x) + np.sin(f * 2.0*np.pi*x)

    return ft

In [5]:
def sliding_arima_with_fft(data, exog_fft, p, d, q):
    
    ''' train a arima model with Fourier terms in a sliding cv manner'''
    
    data = data
    exog_fft = exog_fft
    p=p
    d=d
    q=q
    
    m = data.mean().values[0]
    train_fold_size = 50; test_fold_size = 30

    train_rho = []; test_rho = []
    for n in range(0, 350, 20):

        # define train and test
        train = data[n:n+train_fold_size].values
        test = data[n+train_fold_size:(n+train_fold_size+test_fold_size)].values

        # detrend 
        train = train - m

        #create exog fft variable
        train_fft = exog_fft[n:n+train_fold_size]
        test_fft = exog_fft[n+train_fold_size:(n+train_fold_size+test_fold_size)]

        # train ARIMA model
        arima_model = ARIMA((train), order = (p, d, q), exog=train_fft)
        arima_model.initialize_approximate_diffuse() 
        arima_model_fit = arima_model.fit(method_kwargs={"warn_convergence": False})

        #predict train
        yhat_train = arima_model_fit.predict(exog=train_fft)
        yhat_test = arima_model_fit.predict(start=train_fold_size, end=((train_fold_size+test_fold_size)-1), exog=test_fft)
        yhat_test = yhat_test + m
        
        
        plt.plot(test, 'k')
        plt.plot((yhat_test), 'r')
        plt.show()
        
        train_score = stats.spearmanr(train, yhat_train)[0]
        test_score = stats.spearmanr(test, yhat_test)[0]

        train_rho.append(train_score)
        test_rho.append(test_score)

    df = pd.DataFrame(list(zip(train_rho, test_rho)), columns = ['train_score', 'test_score'])
    df[['p', 'q', 'd']] = p, q, d
    df['folds'] = range(0, len(range(0, 350, 20)))
    
        
    return df

### ARIMA

In [None]:
def sliding_arima(data, 
                  p, 
                  d, 
                  q, 
                  train_fold_size, 
                  test_fold_size, 
                  detrend = False, 
                  plot=False):
    
     ''' train an arima model  sliding cv manner'''
    
    data = data
    m = data.mean().values[0]
    p=p
    d=d
    q=q
    train_fold_size=train_fold_size
    test_fold_size=test_fold_size
    

    train_rho = []; test_rho = []
    for n in range(0, 300, 10):

        # define train and test
        train = data[n:n+train_fold_size]
        test = data[n+train_fold_size:(n+train_fold_size+test_fold_size)]

        if detrend == True:
            train = train - m
            arima_model = ARIMA((train), order = (p, d, q))
            arima_model.initialize_approximate_diffuse() 
            arima_model_fit = arima_model.fit(method_kwargs={"warn_convergence": False})

            #predict train
            yhat_train = arima_model_fit.predict()
            yhat_train = yhat_train + m
            yhat_test = arima_model_fit.predict(start=train_fold_size, end=((train_fold_size+test_fold_size)-1))
            yhat_test = yhat_test + m
            
            train_score = stats.spearmanr(train, yhat_train)[0]
            test_score = stats.spearmanr(test, yhat_test)[0]

            train_rho.append(train_score)
            test_rho.append(test_score)
            
            if plot == True:
            
                plt.figure(figsize = (5, 3))
                plt.plot(test.values, 'ko-')
                plt.plot(yhat_test.values, 'r')
                plt.show()
            
            elif plot == False: pass
    

        elif detrend == False:
            
            arima_model = ARIMA((train), order = (p, d, q))
            arima_model.initialize_approximate_diffuse() 
            arima_model_fit = arima_model.fit(method_kwargs={"warn_convergence": False})

            #predict train
            yhat_train = arima_model_fit.predict()
            yhat_test = arima_model_fit.predict(start=train_fold_size, end=((train_fold_size+test_fold_size)-1))
            
            train_score = stats.spearmanr(train, yhat_train)[0]
            test_score = stats.spearmanr(test, yhat_test)[0]

            train_rho.append(train_score)
            test_rho.append(test_score)
        
            
            if plot == True:
            
                plt.figure(figsize = (5, 3))
                plt.plot(test.values, 'ko-')
                plt.plot(yhat_test.values, 'r')
                plt.show()
            
            elif plot == False: pass
        
    
    df = pd.DataFrame(list(zip(train_rho, test_rho)), columns = ['train_score', 'test_score'])
    df[['p', 'q', 'd']] = p, q, d
    df['folds'] = range(0, len(range(0, 300, 10)))
    
        
    return df

## test how prediction accuracy changes with train set size

In [None]:
train_sizes = [15, 30, 50, 70, 100, 120]

DF = []
for t in train_sizes:
    
    pred_detrend_df = sliding_arima(data, 10, 0, 0, t, 20, True, False)
    pred_detrend_df['train_size'] = t
    DF.append(pred_detrend_df)
    
results_df = pd.concat(DF)
results_grouped_df = results_df.groupby(by = ['train_size']).mean().reset_index()

plt.figure(figsize = (7, 3))
sns.boxplot(data = results_df, x = 'train_size', y = 'test_score', linewidth=0.8 ,color = 'lightgrey')
sns.swarmplot(data = results_df, x = 'train_size', y = 'test_score', s = 3, color = 'black')
plt.title('prediction accuracy vs forecast horizon')
plt.ylabel('spearman rho')
plt.ylabel('train size')
plt.tight_layout()

plt.savefig(f'fft_plots/{subject}_forecast_horizon_10_0_0.png', dpi=200)

## test how prediction accuracy changes with forecast horizon length

In [None]:
test_sizes = [5, 10, 15, 20, 30, 40, 50]

DF = []
for t in test_sizes:
    
    pred_detrend_df = sliding_arima(data, 10, 0, 0, 50, t, True, False)
    pred_detrend_df['test_size'] = t
    DF.append(pred_detrend_df)
    
results_df = pd.concat(DF)
results_grouped_df = results_df.groupby(by = ['test_size']).mean().reset_index()

plt.figure(figsize = (7, 3))
sns.boxplot(data = results_df, x = 'test_size', y = 'test_score', linewidth=0.8 ,color = 'lightgrey')
sns.swarmplot(data = results_df, x = 'test_size', y = 'test_score', s = 3, color = 'black')
plt.title('prediction accuracy vs forecast horizon')
plt.ylabel('spearman rho')
plt.ylabel('test size')
plt.tight_layout()

plt.savefig(f'fft_plots/{subject}_forecast_horizon_10_0_0.png', dpi=200)

## SARIMA

In [None]:
p=13; d=0; q=0
P =1; Q=1; D=0; S = 12
m = data.mean().values[0]

test_size = 35; train_size = 50

train_rho = []; test_rho = []
for n in range(0, 120, 40):

    train = data.iloc[n:(n+train_size)]
    test = data.iloc[(n+train_size):(n+train_size+test_size)]

    train = train - m

    #arima_model = ARIMA(train, order = (p, d, q))
    arima_model = SARIMAX(train,  order = (p, d, q), seasonal_order=(P, D, Q, S))
    arima_model.initialize_approximate_diffuse() 
    arima_model_fit = arima_model.fit(method_kwargs={"warn_convergence": False}, disp=False)

    #predict train
    yhat_train = arima_model_fit.predict()
    yhat_test = arima_model_fit.predict(start=train_size, end = (train_size + test_size)-1)
    yhat_test = yhat_test + m
    
    plt.plot(test.values, 'k')
    plt.plot(yhat_test.values, 'r')
    plt.show()