In [1]:
import warnings
warnings.filterwarnings("ignore")
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
#from pmdarima.arima.utils import ndiffs
from statsmodels.tsa.arima.model import ARIMA
%run ../data_prep/trackfuns.py
from sklearn.metrics import mean_squared_error
import math

  from pandas import (to_datetime, Int64Index, DatetimeIndex, Period,
  from pandas import (to_datetime, Int64Index, DatetimeIndex, Period,


In [7]:
# Set Arima model p and q values for p-d-q
pq = 80

# Set data file path
#file_name = r'/Users/beyzabozdag/Desktop/ANTteam/HRM_V1_ants.txt'
trial_name = 'T1'
file_name = r'~/Documents/MATLAB/clean_724000/HRM_' + trial_name + '_ants.txt'

# Set the number of ants to iterate over, if set to 0 or a number greater the total ant count, will iterate all ants.
ant_count = 50

# Set training points count
train_count = 800

# Set forecast/test points count
test_count = 100

# Set plots true/false
plots = False

In [8]:
# Load data file
df = pd.read_csv(file_name)

# Create a list of all ant ids which have more than the required points points.
ant_ids = []

all_ids = df.id.unique()

# All required points
required = train_count + test_count

for ant_id in all_ids:
    # checks if a single ant id has enough rows/points
    if df[df.id == ant_id].count()['x'] > required:
        ant_ids.append(ant_id)

In [9]:
# This is the function that will be used to translate turn angles (alpha) to xy coordinates.
def angles_to_xy(a, d = 2):
    x = []
    y = []
    x0 = 0
    y0 = 0
    ang_new=0
    x.append(x0)
    y.append(y0)
    for ai in a:
        ang_new = (ang_new+ai)%360
        xi = x0 - d*np.cos(np.radians(ang_new))
        yi = y0 - d*np.sin(np.radians(ang_new))
        x.append(xi)
        y.append(yi)
        x0 = xi
        y0 = yi
    return [x,y][::-1]

In [10]:
# Create 2 dataFrames with the forecasted and actual values of the turn angles to use them for histograms.
df_test = pd.DataFrame(columns=['id','t','x','y'])

# Copy test to create forecast
df_forecast = df_test.copy()

In [11]:
if ant_count == 0:
    ant_count = len(ant_ids)

# Loop over each ant get the forecast
for ant_id in ant_ids[0:ant_count]:
    
    temp = df[df.id == ant_id]
    start = temp.index[0]
    
    # Create Training and Test
    train = df.alpha[start : start + train_count]
    test = df.alpha[start + train_count : start + required]
    
    print(f"ANT ID: {ant_id} | model:{pq}-0-{pq}")
    model = ARIMA(train, order=(pq, 0, pq))  
    fitted = model.fit()
    
    # Forecast
    fc = fitted.forecast(test_count, alpha=0.05)  # 95% conf

    # Make as pandas series
    fc_series = pd.Series(fc, index=test.index)
    
    # Plot 
    if plots:
        plt.figure(figsize=(12,5), dpi=100)
        plt.xscale("linear")
        plt.plot(train, label='training')
        plt.plot(test, label='actual')
        plt.plot(fc_series, label='forecast')
        plt.title('Forecast vs Actuals')
        plt.legend(loc='upper left', fontsize=8)
        plt.show()

        test_x, test_y = angles_to_xy(test)
        plt.plot(test_x, test_y, label='training', color="tab:orange")

        forecast_x, forecast_y = angles_to_xy(fc_series)
        plt.plot(forecast_x, forecast_y, label='forecast', color="tab:green")

        plt.show()
    
    
    # Write to data frames
    
    # First create temporary dataframes for both the test and forecast data 
    df_t = pd.DataFrame(columns=['id','t','x','y'])
    df_t['t'] = temp.loc[start + train_count:start + required, 't']
    df_t.id = ant_id
    
    df_f = df_t.copy()

    # Add id, t, x, and y for test data
    test_x, test_y = angles_to_xy(test)
    df_t['x'] = test_x[:]
    df_t['y'] = test_y[:]

    # Add coord for forecast data
    forecast_x, forecast_y = angles_to_xy(fc_series)
    df_f['x'] = forecast_x[:]
    df_f['y'] = forecast_y[:]
    
    # Concatenate to the actual data frames
    df_test = pd.concat([df_test, df_t], ignore_index=True, sort=False)
    df_forecast = pd.concat([df_forecast, df_f], ignore_index=True, sort=False)
    


ANT ID: 1 | model:80-0-80
ANT ID: 2 | model:80-0-80
ANT ID: 3 | model:80-0-80
ANT ID: 5 | model:80-0-80
ANT ID: 6 | model:80-0-80
ANT ID: 8 | model:80-0-80
ANT ID: 11 | model:80-0-80
ANT ID: 12 | model:80-0-80
ANT ID: 13 | model:80-0-80
ANT ID: 14 | model:80-0-80
ANT ID: 16 | model:80-0-80
ANT ID: 17 | model:80-0-80
ANT ID: 18 | model:80-0-80
ANT ID: 19 | model:80-0-80
ANT ID: 22 | model:80-0-80
ANT ID: 24 | model:80-0-80
ANT ID: 25 | model:80-0-80
ANT ID: 26 | model:80-0-80
ANT ID: 27 | model:80-0-80
ANT ID: 28 | model:80-0-80
ANT ID: 30 | model:80-0-80
ANT ID: 31 | model:80-0-80
ANT ID: 32 | model:80-0-80
ANT ID: 35 | model:80-0-80
ANT ID: 36 | model:80-0-80
ANT ID: 37 | model:80-0-80
ANT ID: 38 | model:80-0-80
ANT ID: 39 | model:80-0-80
ANT ID: 40 | model:80-0-80
ANT ID: 42 | model:80-0-80
ANT ID: 43 | model:80-0-80
ANT ID: 44 | model:80-0-80
ANT ID: 45 | model:80-0-80
ANT ID: 47 | model:80-0-80
ANT ID: 48 | model:80-0-80
ANT ID: 49 | model:80-0-80
ANT ID: 50 | model:80-0-80
ANT ID:

In [13]:
# rite test to file
df_test.to_csv(trial_name + str(pq) + '_' + str(train_count) + '_' + str(test_count) + "test.txt")

In [15]:
# Write forecast to file
df_forecast.to_csv(trial_name + str(pq) + '_' + str(train_count) + '_' + str(test_count) + "forecast.txt")

In [52]:
# Display test (=ant) or forecast
df_test
df_forecast
xyDists = np.diff(df_test.loc[:,['x','y']],axis=0)
s = np.append(np.nan, np.sqrt(np.sum(xyDists**2,1).astype(float)))
s.size

5050