In [1]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm import tqdm
from statsmodels.tsa.stattools import adfuller as ADF
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.varmax import VARMAX

In [None]:
path = os.path.dirname(__file__)

In [3]:
np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})
plt.rcParams["font.family"] = "Times New Roman"
plt.style.use('seaborn-whitegrid')
plt.style.use('seaborn-poster')
plt.style.use('seaborn-dark-palette')
plt.rcParams["mathtext.fontset"] = "cm"

In [4]:
def num_chamber(list_x, list_y):
    list_x = list_x.values
    list_y = list_y.values
    list_c = list()
    for i in range(len(list_x)):
        x = list_x[i]
        y = list_y[i]
        #print(x, y)
        if (y > 178):
            list_c.append(5)
        else:
            if ((y <= 0)&(x <= 0)):
                list_c.append(5)
            else:
                if (y <= 46):
                    list_c.append(1)
                else:
                    if (y <= 92):
                        list_c.append(2)
                    else:
                        if (y <= 138):
                            list_c.append(3)
                        else:
                            list_c.append(4)
    return list_c  

In [5]:
def plot_time_series(ts_1, ts_label_1, ts_2, ts_label_2, title, path):
    assert len(ts_1) == len(ts_2)
    xs = list(range(0, len(ts_1)))

    plt.rcParams['savefig.dpi'] = 300 
    plt.rcParams['figure.dpi'] = 300
    
    plt.plot(xs, ts_2, c='red', label=ts_label_2, lw = 1)
    plt.plot(xs, ts_1, c='green', label=ts_label_1, lw = 1)
    
    plt.title(title)
    plt.legend(loc='upper left')
    plt.savefig(path)
    plt.show()

In [7]:
def ARIMA_model_loop(data, coef, times): 
    
    train = data[:int(len(data)*coef)]
    test = data[int(len(data)*coef):]
    par = order(data, times)
    # Forecast
    start_t = len(train)
    predictions = list()

    for t in tqdm(range(len(test))):     
         
        current_t = t + start_t
        model = ARIMA(data[:current_t], order=(par['p'], times, par['q']))       
        model_fit = model.fit()  

        predictions.append(model_fit.forecast().iloc[0])
        
    predictions = pd.DataFrame(predictions)
    predictions = pd.concat([train, predictions], axis = 0)
    predictions.reset_index(inplace = True, drop = True)
    
    return predictions 

def adf_test(ts, signif=0.05):
    times = -1
    p = 1
    while (p > signif):
        times = times + 1
        dftest = ADF(ts)
        p = dftest[1]
        ts = ts.diff().dropna()
    return times
        
def order(train, times):
    tmp = []
    for p in tqdm(range(1, 6)):
        for q in tqdm(range(5)):
            try:
                tmp.append([ARIMA(train, order=(p, times, q)).fit().bic, p, q])
            except:
                tmp.append([None, p, q])
    tmp = pd.DataFrame(tmp,columns = ['bic', 'p', 'q'])
    return tmp[tmp['bic'] == tmp['bic'].min()]

## ARIMA one-step forecasting

In [None]:
data_df = pd.read_csv(path+'/data/location_in_mm.csv')
coef = 0.5

colony_list = np.unique(data_df['colony_id'].to_numpy())
num_ant = 0

for colony_id in colony_list:
    df_colonies = data_df.copy()
    df_colonies = df_colonies[df_colonies['colony_id'] == colony_id]

    ant_list = np.unique(df_colonies['ant_id'].to_numpy())

    for ant_id in ant_list:

        data = df_colonies.copy()
        data = data[data['ant_id'] == ant_id]

        times_x = adf_test(data['location_x'])
        times_y = adf_test(data['location_y'])

        pred_x = ARIMA_model_loop(data['location_x'], coef, times_x)
        pred_y = ARIMA_model_loop(data['location_y'], coef, times_y)
        prediction_c = num_chamber(pred_x, pred_y)
        prediction_c = pd.DataFrame(prediction_c, columns = ['%i'%num_ant])
        prediction_c.to_csv(path+'/pred_loop/prediction_%i.csv'%num_ant, index = False)
        plot_time_series(ts_1=pred_x, ts_label_1='VARMA Model', ts_2=data['location_x'], ts_label_2='True data', title='VARMA predictions vs. truth of x', path = 'pred_loop/%s_x.png'%num_ant)
        plot_time_series(ts_1=pred_y, ts_label_1='VARMA Model', ts_2=data['location_y'], ts_label_2='True data', title='VARMA predictions vs. truth of y', path = 'pred_loop/%s_y.png'%num_ant)

        num_ant += 1

## ARIMA multi_step forecasting

In [None]:
data_df = pd.read_csv(path+'/data/location_in_mm.csv')
pred = pd.DataFrame()

colony_list = np.unique(data_df['colony_id'].to_numpy())
num_ant = 0

for colony_id in colony_list:
    df_colonies = data_df.copy()
    df_colonies = df_colonies[df_colonies['colony_id'] == colony_id]

    ant_list = np.unique(df_colonies['ant_id'].to_numpy())

    for ant_id in ant_list:

        data = df_colonies.copy()
        data = data[data['ant_id'] == ant_id]

        data.reset_index(inplace = True)
        data = data[['location_x', 'location_y']]

        # spliting the train and validation set
        train = data[:int(0.5*(len(data)))]
        valid = data[int(0.5*(len(data))):]
        
        model = VARMAX(train, order = (4, 3))
        fitted_model = model.fit()
        prediction = fitted_model.forecast(len(valid)).reset_index(drop=True) 
        prediction = pd.DataFrame(prediction)
        prediction = pd.concat([train, prediction], axis = 0)

        prediction_c = num_chamber(prediction['location_x'], prediction['location_y'])
 
        pred[['%i'%num_ant]] = pd.DataFrame({"%i"%num_ant: prediction_c})
        print(pred)

        plot_time_series(ts_1 = prediction['location_x'], ts_label_1 = 'VARMA Model', ts_2 = data['location_x'], ts_label_2 = 'Close', title = 'VARMA predictions vs. ground truth of location x', path = path+'insect/%i/x1.png'%num_ant)
        plot_time_series(ts_1 = prediction['location_y'], ts_label_1 = 'VARMA Model', ts_2 = data['location_y'], ts_label_2 = 'Close', title = 'VARMA predictions vs. ground truth of location y', path = path+'insect/%i/y1.png'%num_ant)

        pred.to_csv(path+'/data/pred_noloop.csv', index = False)  
    
        num_ant += 1

## VARMA one-step forecasting

In [None]:
data_df = pd.read_csv(path+'location_in_mm.csv')

colony_list = np.unique(data_df['colony_id'].to_numpy())
num_ant = 0

for colony_id in colony_list:
    df_colonies = data_df.copy()
    df_colonies = df_colonies[df_colonies['colony_id'] == colony_id]

    ant_list = np.unique(df_colonies['ant_id'].to_numpy())

    for ant_id in ant_list:

        data = df_colonies.copy()
        data = data[data['ant_id'] == ant_id]

    valid = data[int(0.5*(len(data))):]
    train = data[:int(0.5*(len(data)))]

    pred= pd.DataFrame(columns= ['location_x', 'location_y'])

    start_t = len(train)
    for t_i in tqdm(range(len(valid))):
        current_t = t_i + start_t
        model = VARMAX(data[t_i:current_t], order = (1, 0))#order['p'], order['q']))
        fitted_model = model.fit()
        prediction = fitted_model.forecast(1).reset_index(drop=True) 
        pred = pred.append(prediction)

        print(pred)

    pred.reset_index(inplace = True, drop = True)
    prediction_c = num_chamber(pred['location_x'], pred['location_y'])
    prediction_c = pd.DataFrame(prediction_c, columns = ['%i'%num_ant])
    prediction_c.to_csv(path+'/pred_loop/prediction_%i.csv'%num_ant, index = False)


## VARMA multi-step forecasting

In [None]:
data_df = pd.read_csv(path+'/data/location_in_mm.csv')

colony_list = np.unique(data_df['colony_id'].to_numpy())
num_ant = 0

for colony_id in colony_list:
    df_colonies = data_df.copy()
    df_colonies = df_colonies[df_colonies['colony_id'] == colony_id]

    ant_list = np.unique(df_colonies['ant_id'].to_numpy())

    for ant_id in ant_list:

        data = df_colonies.copy()
        data = data[data['ant_id'] == ant_id]

        #creating the train and validation set
        valid = data[int(0.5*(len(data))):]
        train = data[:int(0.5*(len(data)))]

        tmp = []
        for p in tqdm(range(5)):
            for q in tqdm(range(5)):
                try:
                    tmp.append([VARMAX(train, order = (p,q)).fit().bic, p, q])
                except:
                    tmp.append([None, p, q])
        tmp = pd.DataFrame(tmp,columns = ['bic', 'p', 'q'])
        print(tmp)
        order = tmp[tmp['bic'] == tmp['bic'].min()]
        print(order)

        model = VARMAX(train, order = (order['p'], order['q']))
        fitted_model = model.fit()
        prediction = fitted_model.forecast(len(valid)).reset_index(drop=True) 
        prediction = pd.DataFrame(prediction)
        prediction = pd.concat([train, prediction], axis = 0)

        prediction_c = num_chamber(prediction['location_x'], prediction['location_y'])
        pred[['%i'%num_ant]] = pd.DataFrame(prediction_c, columns = ['%i'%num_ant])

        plot_time_series(ts_1 = prediction['location_x'], ts_label_1 = 'VARMA Model', ts_2 = data['location_x'], ts_label_2 = 'True data', title = 'VARMA predictions vs. truth of location x', path = 'pred_varma_noloop/x_%i.png'%num_ant)
        plot_time_series(ts_1 = prediction['location_y'], ts_label_1 = 'VARMA Model', ts_2 = data['location_y'], ts_label_2 = 'True data', title = 'VARMA predictions vs. truth of location y', path = 'pred_varma_noloop/y_%i.png'%num_ant)
        plot_time_series(ts_1 = pred[['%i'%num_ant]], ts_label_1 = 'VARMA Model', ts_2 = df['chamber'], ts_label_2 = 'True data', title = 'VARMA predictions vs. truth of chamber number', path = 'pred_varma_noloop/chamber_%i.png'%num_ant)

        pred.to_csv(path+'/pred_VARMA_noloop.csv', index = False)   