## Prerequisites to run:
- clone https://github.com/crowdbotp/OpenTraj into the current folder

In [8]:
import os, yaml
import sys
sys.path.insert(0, os.path.abspath(os.getcwd()) + '/OpenTraj/opentraj') # Anaconda python can't find the toolkit path without this for some reason

from toolkit.loaders.loader_edinburgh import load_edinburgh
from matplotlib import pyplot as plt
import numpy as np

# Tests with OpenTraj Edinburgh dataset
https://github.com/crowdbotp/OpenTraj/tree/master/datasets/Edinburgh

In [None]:
opentraj_root = './OpenTraj/'
selected_day = '01Sep' # 3 days of data in total, ['01Jul', '01Aug', '01Sep']
edinburgh_path = os.path.join(opentraj_root, 'datasets/Edinburgh/annotations', 'tracks.%s.txt' % selected_day)
traj_dataset = load_edinburgh(edinburgh_path, title="Edinburgh", 
                              use_kalman=False, scene_id=selected_day, sampling_rate=4)

## Dataset overview

In [None]:
data = traj_dataset.data

In [None]:
data.head()

In [None]:
0.618650/(15.543484 - 15.268529)

In [None]:
max(data["agent_id"])

In [None]:
for i in range(1000):
    plt.plot(data.loc[data["agent_id"]==i, ["pos_x"]],data.loc[data["agent_id"]==i, ["pos_y"]])
plt.show()

In [None]:
def generate_data(data, begin_idx, agent_id, num_steps=5):
    sample_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values
    sample_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values

    test_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values
    test_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values
    
    return sample_x, sample_y, test_x, test_y

## Testing with numpy polyfit

### Single test

In [None]:
sample_x, sample_y, test_x, test_y = generate_data(data, 0, agent_id=10, num_steps=5)

polynomialOrder = 2
fittedParameters = np.polyfit(sample_x, sample_y, polynomialOrder)

In [None]:
modelPredictions = np.polyval(fittedParameters, sample_x)

In [None]:
plt.plot(test_x, modelPredictions, label="fitted polynomial")
plt.plot(test_x, test_y, label="test data")
plt.legend()

### Many tests

In [None]:
# Baseline tests
def one_run(data, start_idx, agent_id, step, plot_data=False):
    # Generate 'seen' and 'unseen' data
    sample_x, sample_y, test_x, test_y = generate_data(data, start_idx, agent_id, step)
    
    if len(sample_x) + len(sample_y) + len(test_x) + len(test_y) < 4*(step-start_idx):
        return None, None # return None if same amount of data is not available for each slice
    
    
    pred_x_list = []
    pred_y_list = []
    for i in range(10):
        noisy_sample_x = sample_x + np.random.normal(0,0.1,len(sample_x))
        noisy_sample_y = sample_y + np.random.normal(0,0.1,len(sample_y))
        
        # Fit a polynomial function to the data
        time_range = range(len(sample_x))
        fitted_x = np.polyfit(time_range, noisy_sample_x, 2)
        fitted_y = np.polyfit(time_range, noisy_sample_y, 2)

        time_range_pred = range(len(sample_x), 2*len(sample_x))
        pred_x = np.polyval(fitted_x, time_range_pred)
        pred_y = np.polyval(fitted_y, time_range_pred)
        
        pred_x_list.append(pred_x)
        pred_y_list.append(pred_y)
    
    if (plot_data):
        plt.axes().set_aspect('equal')
        for i in range(10):
            plt.plot(np.append(sample_x,pred_x_list[i]), np.append(sample_y,pred_y_list[i]), label="fitted polynomial")
        plt.plot(np.append(sample_x,test_x), np.append(sample_y,test_y), label="actual data")
        plt.scatter(sample_x[0], sample_y[0])
        
        #plt.xlim(np.mean(test_x)-2, np.mean(test_x)+2)
        #plt.ylim(np.mean(test_y)-2, np.mean(test_y)+2)
        #plt.legend()
        plt.show()
    
    # Error calculation
    '''
    absError = modelPredictions - test_y
    SE = np.square(absError)
    MSE = np.mean(SE)
    RMSE = np.sqrt(MSE)
    
    distance_y = modelPredictions[-1], test_y[-1]
    '''
    return RMSE, None

step = 5
all_rmse = []
all_distance = []
for i in range(0, 1000):
    run_rmse, run_final_distance = one_run(data, start_idx=0, agent_id=i, step=step)
    if run_rmse != None:
        all_rmse.append(run_rmse)
        all_distance.append(run_final_distance)
'''
#print(all_rmse)
plt.plot(all_rmse)
plt.title("RMSE")
plt.ylim(0, 500)
plt.show()

plt.plot(all_distance)
plt.title("Final distance")
plt.ylim(-100,100)
plt.show()'''

In [None]:
#data["pos_x"].loc[data["agent_id"]==9][0:300]

In [None]:
print(np.append(sample_x, test_x))

In [None]:
# Checking the fitted polynomial for some data
for i in range(10, 30):
    run_rmse = one_run(data, start_idx=0, agent_id=i, step=step, plot_data=True)

## Testing scipy.optimize.curve_fit
source: https://stackoverflow.com/questions/59391249/python-scipy-optimise-curve-fit-gives-linear-fit

In [None]:
from scipy.optimize import curve_fit
from scipy.optimize import differential_evolution
import warnings

def hyperbolic_func(x, a, b, c):
    return  a * x / (b + x) + c * x

agent_id=100 # Does not find optimal parameters often (e.g. finds them with id=100, but not with id=10)
begin_idx=0
num_steps=5

sample_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values
sample_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values

test_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values
test_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values


fittedParameters, pcov = curve_fit(hyperbolic_func, sample_x, sample_y)
modelPredictions = hyperbolic_func(test_x, *fittedParameters) 

In [None]:
plt.plot(test_x, test_y, label="actual")
plt.plot(test_x, modelPredictions, label="fitted")
plt.legend()

## Testing with np.polyfit and np.poly1d
source: https://www.analyticsvidhya.com/blog/2018/03/introduction-regression-splines-python-codes/

In [None]:
# Data
agent_id=1
begin_idx=0
num_steps=5

sample_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values
sample_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values

test_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values
test_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values

# Generating weights for polynomial function with degree =2
weights = np.polyfit(sample_x, sample_y, 2)

# Generating model with the given weights
model = np.poly1d(weights)

# Prediction on validation set
pred = model(test_x)
plt.scatter(test_x, test_y, facecolor='None', edgecolor='k', alpha=0.3)
plt.plot(test_x, pred)
plt.show()


## Bezier curve fitting
source: https://stackoverflow.com/questions/12643079/b%C3%A9zier-curve-fitting-with-scipy

In [None]:
import numpy as np
from scipy.special import comb

def bernstein_poly(i, n, t):
    """
     The Bernstein polynomial of n, i as a function of t
    """

    return comb(n, i) * ( t**(n-i) ) * (1 - t)**i


def bezier_curve(x_data, y_data, nTimes=1000):
    n_points = len(x_data)

    t = np.linspace(0.0, 1.0, nTimes)

    polynomial_array = np.array([ bernstein_poly(i, n_points-1, t) for i in range(0, n_points)   ])

    xvals = np.dot(x_data, polynomial_array)
    yvals = np.dot(y_data, polynomial_array)

    return polynomial_array, xvals, yvals

In [None]:
# Data
agent_id=1
begin_idx=0
num_steps=5

sample_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values
sample_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx:begin_idx+num_steps].values

test_x = data["pos_x"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values
test_y = data["pos_y"].loc[data["agent_id"]==agent_id][begin_idx+num_steps:begin_idx+2*num_steps].values

poly_array, b_x, b_y = bezier_curve(sample_x, sample_y)

In [None]:
plt.plot(b_x, b_y)
plt.plot(sample_x, sample_y)
plt.show()