# Libraries

In [1]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn import linear_model
from sklearn.neighbors import KNeighborsRegressor
from sklearn.datasets import make_regression
from sklearn.ensemble import GradientBoostingRegressor
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
import time
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
plt.rcParams["figure.figsize"] = (15,10)

# Evaluation functions

In [2]:
def MAPE_MEAN(y_true,y_pred): #MAPE
    y_true = np.where(y_true != 0, y_true, y_true + 1e-5)
    return np.mean(np.abs((y_true - y_pred)/y_true))*100

In [3]:
def MAPE_MEDIAN(y_true,y_pred): #MdAPE
    y_true = np.where(y_true != 0, y_true, y_true + 1e-5)
    return np.median(np.abs((y_true - y_pred)/y_true))*100

In [4]:
def MAE_MEAN(y_true,y_pred): #MAE
    y_true = np.where(y_true != 0, y_true, y_true + 1e-5)
    return np.mean(np.abs((y_true - y_pred)))

In [5]:
def MAE_MEDIAN(y_true,y_pred): #MdAE
    y_true = np.where(y_true != 0, y_true, y_true + 1e-5)
    return np.median(np.abs((y_true - y_pred)))

In [6]:
METRICS = {
    "MAPE_MEAN":MAPE_MEAN,
    "MAPE_MEDIAN":MAPE_MEDIAN,
    "MAE_MEAN":MAE_MEAN,
    "MAE_MEDIAN":MAE_MEDIAN
}

In [7]:
def calc_results(vald_data, pred_data, METRIC):
    results = None
    for c in vald_data.columns.tolist():
        result_value = round(METRIC(vald_data[c], pred_data[c]),5)
        if results is None:
            results = pd.DataFrame([result_value,result_value], columns = [c])
        else:
            results[c] = result_value
    return results.head(1)

# Data preparation

In [8]:
# M (int): amount of magnets
def prepare_data(M): 
    base = f"./../data/I{M}/R1/I{M}_R1_G0.csv"
    df = None
    for r in range(1,2+1,1):
        for g in range(0,360,15):
            path = f"./../data/I{M}/R{r}/I{M}_R{r}_G{g}.csv"
            new_df = pd.read_csv(path)
            new_df["t"] = new_df["t"] - new_df["t"][0]
            df = new_df if df is None else pd.concat([df,new_df])

    df = df.drop("t2", axis = 1)
    df = df.dropna()
    return df

In [9]:
# prepare_data(2).to_csv("data_I2.csv", index = False)
# prepare_data(3).to_csv("data_I3.csv", index = False)
# prepare_data(4).to_csv("data_I4.csv", index = False)

# Train models

In [10]:
def train_predict(data, steps, test_size):
    pred_data = None
    vald_data = None
    models = list(regr.keys())
    for model in models:
        start_time = time.time()
#         print("    Model:",model)
        for var in "xyz":
#             print("        Variable:",var)
            y = data[var].shift(steps).fillna(0)
            x = data.values
            x_train, x_test, y_train, y_test = train_test_split(x, y, random_state = 0, test_size = test_size, shuffle = True)
            regr[model].fit(x_train, y_train)
            y_pred = regr[model].predict(x_test)
            if pred_data is None:
                pred_data = pd.DataFrame(y_pred, columns = [f"{model}-{var}"]) 
                vald_data = pd.DataFrame(y_test.values, columns = [f"{model}-{var}"])
            else:
                pred_data[f"{model}-{var}"] = y_pred
                vald_data[f"{model}-{var}"] = y_test.values
        end_time = time.time()
        time_lapsed = round(float(end_time - start_time),2)
#         print("            Time:",time_lapsed," s \n")
    
    return vald_data, pred_data

# Working area

In [11]:
def prediction_test(magnets, test_size, total_steps,base_path):
    results_table = None
    for steps in range(1,total_steps+1,1):
#         print("  STEPS:",steps)
#         data = prepare_data(magnets)
        data = pd.read_csv(f"data_I{magnets}.csv")
        vald_data, pred_data = train_predict(data = data, steps = steps, test_size = test_size)
        data_join = pd.concat([vald_data.add_suffix('_vald'), pred_data.add_suffix('_pred')], axis = 1)
        
        path = f"{base_path}/M({magnets})"
        if not os.path.isdir(path):
            os.mkdir(path)
            
        data_join.to_csv(f"{path}/M({magnets})_STEPS({steps}).csv")
            
    return results_table

In [12]:
def test_run(test_name, test_size, steps):
    base_path = f'./prediction_results/{test_name}' 
    os.mkdir(base_path)
    description = f"test_name:{test_name}\n{str(regr)}\ntest_size:{test_size}\nsteps:{steps}"
    with open(f'{base_path}/test_description.txt', 'w') as f:
        f.write(description)

    start_time = time.time()

    for M in tqdm(range(2,4+1,1)):
#         print(f"{M} MAGNETS")
        prediction_test(magnets = M,
                        test_size = test_size,
                        total_steps = steps,
                        base_path = base_path)

    end_time = time.time()
    time_lapsed = round(float(end_time - start_time),2)
#     print("Total time:",time_lapsed," s \n")

# Evaluation table

In [13]:
def evaluate_results(magnets, METRIC, steps, test_name):
    base_path = f'./prediction_results/{test_name}'
    results_table = None
    for steps in range(1,steps + 1,1):
        path = f"{base_path}/M({magnets})"
        data_join = pd.read_csv(f"{path}/M({magnets})_STEPS({steps}).csv")
        vald_columns = [s for s in data_join.columns.tolist() if "vald" in s]
        pred_columns = [s for s in data_join.columns.tolist() if "pred" in s]
        vald_data = data_join[vald_columns]
        pred_data = data_join[pred_columns]
        vald_data.columns = [w[:-5] for w in vald_data.columns]
        pred_data.columns = [w[:-5] for w in pred_data.columns]
        
        results = calc_results(vald_data, pred_data, METRIC)
        if results_table is None:
            results_table = results
        else:
            results_table = pd.concat([results_table, results])
            
    results_table = results_table.reset_index(drop = True)
    results_table.index +=1 
    results_table.index.name = 'INTERVALOS'
    results_table = results_table[["RFO-x", "LBR-x", "XGB-x", "KNN-x",
                                   "RFO-y", "LBR-y", "XGB-y", "KNN-y",
                                   "RFO-z", "LBR-z", "XGB-z", "KNN-z"]] 
    return results_table

# Run and Evaluation 

In [17]:
steps = 3

In [14]:
regr = {
        "RFO":RandomForestRegressor(n_estimators = 200, random_state=0),
        "LBR":linear_model.BayesianRidge(n_iter=600),
        "XGB": XGBRegressor(n_estimators=200),
        "KNN":KNeighborsRegressor(n_neighbors = 10)
       }

In [15]:
test_run(test_name = "test_A", test_size = 0.20, steps = 3)

100%|██████████| 3/3 [19:21<00:00, 387.06s/it]


In [19]:
evaluate_results(2, METRICS["MAE_MEDIAN"], steps, "test_A")

Unnamed: 0_level_0,RFO-x,LBR-x,XGB-x,KNN-x,RFO-y,LBR-y,XGB-y,KNN-y,RFO-z,LBR-z,XGB-z,KNN-z
INTERVALOS,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,0.00016,0.00016,0.00021,0.00257,7e-05,0.00012,0.00012,0.00087,0.00017,0.00021,0.00024,0.00812
2,0.00033,0.00107,0.00034,0.00266,0.0001,0.00014,0.00015,0.00089,0.00036,0.00115,0.0004,0.0086
3,0.00049,0.00303,0.00059,0.00143,0.00017,0.00026,0.00022,0.00086,0.0005,0.00349,0.00075,0.00911


In [20]:
evaluate_results(3, METRICS["MAE_MEDIAN"], steps, "test_A")

Unnamed: 0_level_0,RFO-x,LBR-x,XGB-x,KNN-x,RFO-y,LBR-y,XGB-y,KNN-y,RFO-z,LBR-z,XGB-z,KNN-z
INTERVALOS,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,0.00021,0.00019,0.00026,0.00569,0.0001,0.00015,0.00014,0.00072,0.00023,0.00066,0.00025,0.00547
2,0.00048,0.00091,0.00051,0.00647,0.00015,0.00019,0.00018,0.00077,0.0005,0.00178,0.00049,0.00636
3,0.00077,0.00268,0.00094,0.00693,0.00025,0.00035,0.00027,0.00078,0.00085,0.00467,0.00097,0.00679


In [21]:
evaluate_results(4, METRICS["MAE_MEDIAN"], steps, "test_A")

Unnamed: 0_level_0,RFO-x,LBR-x,XGB-x,KNN-x,RFO-y,LBR-y,XGB-y,KNN-y,RFO-z,LBR-z,XGB-z,KNN-z
INTERVALOS,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
1,0.00019,0.00017,0.00025,0.00391,9e-05,0.00014,0.00012,0.0006,0.00019,0.00017,0.00028,0.00383
2,0.00046,0.00072,0.00047,0.0048,0.00012,0.00019,0.00015,0.0007,0.00048,0.00068,0.0005,0.00474
3,0.00084,0.00202,0.00092,0.0051,0.00024,0.00035,0.00026,0.00074,0.00088,0.00191,0.00088,0.00503


# Graphs

### $x$ component

In [None]:
g1 = pd.read_csv("../data/I3/R1/I3_R1_G0.csv")
g1 = g1[g1["t"]<=18]
plt.plot(g1["t"],g1["x"]+0.005, label = "$R_1$") # vertical translation to set reference at 0

g2 = pd.read_csv("../data/I3/R2/I3_R2_G0.csv")
g2 = g2[g2["t"]<=18]
plt.plot(g2["t"],g2["x"]+0.005, label = "$R_2$") # vertical translation to set reference at 0

plt.grid()
plt.xlabel("\nTiempo (s)", fontsize = 15)
plt.ylabel("Componente $x$ (m)", fontsize = 15)
plt.legend(fontsize = 15)

plt.xticks(np.arange(0, 19, 1), fontsize = 12)
plt.yticks(np.arange(-0.05, 0.05, 0.01), fontsize = 12)

plt.title(" ")
plt.savefig('comp_x.pdf')

### $y$ component

In [None]:
g1 = pd.read_csv("../data/I3/R1/I3_R1_G0.csv")
g1 = g1[g1["t"]<=18]
plt.plot(g1["t"],g1["y"], label = "$R_1$")

g2 = pd.read_csv("../data/I3/R2/I3_R2_G0.csv")
g2 = g2[g2["t"]<=18]
plt.plot(g2["t"],g2["y"], label = "$R_2$")

plt.grid()
plt.xlabel("\nTiempo (s)", fontsize = 15)
plt.ylabel("Componente $y$ (m)", fontsize = 15)
plt.legend(fontsize = 15)

plt.xticks(np.arange(0, 19, 1), fontsize = 12)
plt.yticks(np.arange(-0.005, 0.006, 0.001), fontsize = 12)

plt.title(" ")
plt.savefig('comp_y.pdf')

### $z$ component

In [None]:
g1 = pd.read_csv("../data/I3/R1/I3_R1_G0.csv")
g1 = g1[g1["t"]<=18]
plt.plot(g1["t"],g1["z"]+0.005, label = "$R_1$") # vertical translation to set reference at 0

g2 = pd.read_csv("../data/I3/R2/I3_R2_G0.csv")
g2 = g2[g2["t"]<=18]
plt.plot(g2["t"],g2["z"]+0.005, label = "$R_2$") # vertical translation to set reference at 0

plt.grid()
plt.xlabel("\nTiempo (s)", fontsize = 15)
plt.ylabel("Componente $z$ (m)", fontsize = 15)
plt.legend(fontsize = 15)

plt.xticks(np.arange(0, 19, 1), fontsize = 12)
plt.yticks(np.arange(-0.05, 0.07, 0.01), fontsize = 12)

plt.title(" ")
plt.savefig('comp_z.pdf')