# Classification Tasks with Kinematic Time Series from Head Pose Estimation from HMD

---
# Time Series Classification

Check `01_...ipynb` to see details of the pipeline

In [None]:
# Add files to sys.path
from pathlib import Path
import sys,os
this_path = None
try:    # For .py
    this_path = str(os.path.dirname(os.path.abspath(__file__))) #str(Path().absolute())+"/" # str(os.path.dirname(__file__))
except: # For .ipynb
    this_path = str(Path().absolute())+"/" #str(Path().absolute())+"/" # str(os.path.dirname(__file__))
print("File Path:", this_path)
sys.path.append(os.path.join(this_path, "kinemats"))

# Import classes
import time
import utils  # Utils for generation of files and paths
import quaternion_math

from plotter.ts_visualization import *
import ts_processing
import ts_classification

# Import data science libs
import numpy as np
import pandas as pd

import matplotlib
#matplotlib.rcParams['text.usetex'] = True
#%matplotlib inline
import matplotlib.pyplot as plt

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_validate, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import sktime.utils.data_processing
# from sktime.classification.shapelet_based import MrSEQLClassifier
# from sktime.classification.interval_based import SupervisedTimeSeriesForest
# from sktime.classification.dictionary_based import TemporalDictionaryEnsemble
from sktime.classification.distance_based import KNeighborsTimeSeriesClassifier
from sktime.classification.shapelet_based import ROCKETClassifier
from sktime.transformations.panel.rocket import MiniRocketMultivariate
from sklearn.linear_model import SGDClassifier  # Used to train the transformed features from MiniRocket

---
# SETUP

In [None]:
# CONSTANTS
import experiment_config
from experiment_config import Datasets, DataRepresentation, Classifiers

### SPECIFIC CONSTANTS

# All the files generated from this notebook are in a subfolder with this name
NOTEBOOK_SUBFOLDER_NAME = '3_TimeSeriesClassifiers/'

# Filenames of created files from this script
FILENAME_DATASET_QUATERNION = str(experiment_config.PREFIX_DATASET+str(DataRepresentation.Quaternion))      # generates "dataset_quaternion"
FILENAME_DATASET_EULER = str(experiment_config.PREFIX_DATASET+str(DataRepresentation.Euler))
FILENAME_DATASET_YAW = str(experiment_config.PREFIX_DATASET+str(DataRepresentation.Yaw))

#### NOTE: This dictionary is reassigned later in the code whenever the datasets are generated.
DICT_DATA = {
    DataRepresentation.Quaternion:  None,
    DataRepresentation.Euler:       None,
    DataRepresentation.Yaw:         None,
    DataRepresentation.All:         None,
}
# Dictionary to convert a datarepresentation into a num - To be stored in the numpy array for results
DICT_DATA_TO_NUM = { k:i for i,k in enumerate(DICT_DATA.keys())}

# Combination of each dimension with each transformation
HEADER_QUAT = ["qw","qi","qj","qk"]
HEADER_EULER = ["yaw","pitch","roll"]
HEADER_YAW = ["yaw"]
HEADER_ALL = HEADER_QUAT + HEADER_EULER
print(f"Example header {HEADER_ALL}")


#### Classification methods to apply.
DICT_CLASSIFIERS = {
    Classifiers.KNN:    KNeighborsTimeSeriesClassifier(n_neighbors=experiment_config.KNN_TS_N_NEIGH,
                                                        distance="dtw",
                                                        distance_params={"w":experiment_config.KNN_TS_DTW_WARPING_WINDOW}),
    # Classifiers.MrSEQL: MrSEQLClassifier(),
    # Classifiers.TDE: TemporalDictionaryEnsemble(time_limit=experiment_config.TDE_MAX_TIME,
                                #  max_ensemble_size=experiment_config.TDE_MAX_ENSEMBLE_SIZE,
                                #  randomly_selected_params=experiment_config.TDE_MAX_SELECTED_PARAMS,
                                #  random_state=experiment_config.MC_RANDOM_SEED),
    ### Classifiers.STSF: SupervisedTimeSeriesForest(n_estimators=experiment_config.STSF_N_ESTIMATORS, \
                            ### n_jobs=experiment_config.N_JOBS_PARALLEL, \
                            ### random_state=experiment_config.MC_RANDOM_SEED),
    Classifiers.ROCKET: ROCKETClassifier(num_kernels=experiment_config.ROCKET_N_KERNELS,
                            n_jobs=experiment_config.N_JOBS_PARALLEL,
                            random_state=experiment_config.MC_RANDOM_SEED), 
    Classifiers.MiniRocket: MiniRocketMultivariate(num_features=experiment_config.MINIROCKET_N_KERNELS,
                            max_dilations_per_kernel=experiment_config.MINIROCKET_MAX_DILATIONS,
                            random_state=experiment_config.MC_RANDOM_SEED)
}

# Linear model to be trained after MiniRocket transformer.
MINIROCKET_LINEAR_MODEL = SGDClassifier(loss="log", n_jobs=experiment_config.N_JOBS_PARALLEL, random_state=experiment_config.MC_RANDOM_SEED)

## K-Fold partition
N_SPLITS_CV = experiment_config.CV_NUM_FOLDS # Number of folds for Cross-validation
strat_KFold = StratifiedKFold(n_splits=N_SPLITS_CV,
                              random_state=experiment_config.MC_RANDOM_SEED,
                              shuffle=True)

# Scoring parameters: https://scikit-learn.org/stable/modules/model_evaluation.html#scoring-parameter
SCORING_METRICS = ["accuracy", "precision_macro", "recall_macro", "f1_macro"]

---
# UTILITY FUNCTIONS

Generate paths to write output files

In [None]:
STR_DATASET = str(experiment_config.DATASET_MAIN)+"/"
def gen_path_plot(filename):
    # Generates full paths for PLOTS just by specifying a name
    return utils.generate_complete_path(filename, \
                                        main_folder=experiment_config.PLOT_FOLDER, \
                                        subfolders=STR_DATASET+NOTEBOOK_SUBFOLDER_NAME, \
                                        file_extension=experiment_config.IMG_FORMAT, save_files=experiment_config.EXPORT_PLOTS)

def gen_path_temp(filename, subfolders="", extension=experiment_config.TEMP_FORMAT):
    # Generates full paths for TEMP FILES just by specifying a name
    return utils.generate_complete_path(filename, \
                                        main_folder=experiment_config.TEMP_FOLDER, \
                                        subfolders=STR_DATASET+subfolders, \
                                        file_extension=extension)

def gen_path_results(filename, subfolders="", extension=""):
    # Generates full paths for RESULTS FILES (like pandas dataframes)
    return utils.generate_complete_path(filename, \
                                        main_folder=experiment_config.RESULTS_FOLDER, \
                                        subfolders=STR_DATASET+NOTEBOOK_SUBFOLDER_NAME+subfolders, \
                                        file_extension=extension)

# DATASETS: Load and preprocess

If the files do not exist. Generate them by running `01_...ipynb`

In [None]:
print("\t>>>LOADING DATASETS")
dataset = None
classes = None

# Coordinate reference system. All datasets should be transformed to match this coordinate system.
AXIS_INSTANCE=0
AXIS_TIME=1
AXIS_DIMENSIONS=2

In [None]:
# Load previously processed datasets
dataset_quaternion =  utils.load_binaryfile_npy( gen_path_temp( FILENAME_DATASET_QUATERNION ) )
dataset_euler = utils.load_binaryfile_npy( gen_path_temp( FILENAME_DATASET_EULER ) )
dataset_yaw = utils.load_binaryfile_npy( gen_path_temp( FILENAME_DATASET_YAW ) )

# Concatenation of quaternion and Euler
dataset_all = np.concatenate([dataset_quaternion, dataset_euler], axis=AXIS_DIMENSIONS)

# Variable used to calculate general stats
dataset = dataset_quaternion

In [None]:
if experiment_config.DATASET_MAIN == Datasets.IMT:
    
    # Data for combined time series to cluster
    labels_filename = experiment_config.DATASET_LABELS # Cluster index TRUE_LABEL
    timestamps_filename = experiment_config.DATASET_TIMESTAMPS # Timestamps
    labels = pd.read_csv(labels_filename)
    timestamps = np.loadtxt(timestamps_filename)

    # # Classes are the labels of the videos 
    # classes = labels["videoId"].to_numpy(dtype=np.int32)
    # # Classes are the user watching the videos
    #classes = labels["user"].to_numpy(dtype=np.int32)

    classes = labels[experiment_config.CLASS_COLUMN_NAME].to_numpy(dtype=np.int32)
    
if experiment_config.DATASET_MAIN == Datasets.Tsinghua:
        
    # Data for combined time series to cluster
    labels_filename = experiment_config.DATASET_LABELS # Cluster index TRUE_LABEL
    timestamps_filename = experiment_config.DATASET_TIMESTAMPS # Timestamps
    labels = pd.read_csv(labels_filename)
    timestamps = np.loadtxt(timestamps_filename)
    
    # # Classes are the labels of the videos 
    # classes = labels["videoId"].to_numpy(dtype=np.int32)
    # # Classes are the user watching the videos
    #classes = labels["user"].to_numpy(dtype=np.int32)

    classes = labels[experiment_config.CLASS_COLUMN_NAME].to_numpy(dtype=np.int32)

## Summary dataset

In [None]:
num_classes = np.unique(classes).size

if(dataset.ndim == 2):
    dataset = np.expand_dims(dataset, axis=2)

num_ts = dataset.shape[0]
length_ts = dataset.shape[1]
num_dims = dataset.shape[2]

print("Timestamps:", type(timestamps), timestamps.shape)
print("Dataset", type(dataset), dataset.shape)
print("Classes", type(classes), classes.shape, )
print(f"num_classes={num_classes}")
print(f"num_ts={num_ts}")
print(f"length_ts={length_ts}")
print(f"num_dims={num_dims}")

## `Summary`

Until this point, the head movements are stored as in these data representations: 
- Quaternion (`dataset_quaternion`)
- Euler Angles (`dataset_euler`)
- Spherical Angles (`dataset_spherical`)
- Rotation around Z-axis (`dataset_yaw`)

## `NOTE:` REDEFINITION OF DATASETS
The dictionary is redefined now, since the datasets were properly loaded/created

In [None]:
# Transform into sktime compatible
df_quaternion   = sktime.utils.data_processing.from_3d_numpy_to_nested(np.swapaxes(dataset_quaternion, AXIS_TIME, AXIS_DIMENSIONS), column_names=HEADER_QUAT)
df_euler        = sktime.utils.data_processing.from_3d_numpy_to_nested(np.swapaxes(dataset_euler, AXIS_TIME, AXIS_DIMENSIONS), column_names=HEADER_EULER)
df_yaw          = sktime.utils.data_processing.from_3d_numpy_to_nested(np.swapaxes(dataset_yaw, AXIS_TIME, AXIS_DIMENSIONS), column_names=HEADER_YAW)
df_all          = sktime.utils.data_processing.from_3d_numpy_to_nested(np.swapaxes(dataset_all, AXIS_TIME, AXIS_DIMENSIONS), column_names=HEADER_ALL)

# REDEFINE DICT WITH CORRESPONDING DATASETS
DICT_DATA = {
    DataRepresentation.Quaternion:  df_quaternion,
    DataRepresentation.Euler:       df_euler,
    DataRepresentation.Yaw:         df_yaw,
    DataRepresentation.All:         df_all,
}

# Class labels
y = classes
# X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(X_quaternion, y, random_state=experiment_config.MC_RANDOM_SEED)

# clf = MrSEQLClassifier()
# clf.fit(X_train, y_train)
# clf.score(X_test, y_test)

---
# STATE-OF-THE-ART (SOTA) TIME-SERIES CLASSIFIERS

- Mr-SEQL
- STSF: Univariate `Not used`
- TDE
- ROCKET
- MiniRocket

In [None]:
# iter_datarep = DataRepresentation.Euler # Edit here!
# iter_data = DICT_DATA[iter_datarep]
# iter_classifier = Classifiers.ROCKET # Edit here!
# iter_model = DICT_CLASSIFIERS[iter_classifier]

In [None]:
# ### Test CV iterator

# # The key strings are based on what the cross_validate() function generates automatically in 02_FeatureBasedClass.ipynb
# cv_results = {
#     "fit_time": [],
#     "score_time": [],
#     "test_accuracy": [],
#     "test_precision_macro": [],
#     "test_recall_macro": [],
#     "test_f1_macro": []
# }

# for f, (train_idx, test_idx) in enumerate(strat_KFold.split(iter_data, y)):
#     print(f"Fold: {f} > \t train,test > \t{train_idx.shape, test_idx.shape}")
#     # if(f==N_SPLITS_CV-1):
#         # print(f"{train_idx,test_idx}")

#     # Separate training and testing sets
#     X_train = iter_data.iloc[train_idx,:]
#     X_test = iter_data.iloc[test_idx,:]
#     y_train = y[train_idx]
#     y_test = y[test_idx]

#     # Fitting
#     t0 = time.time()
#     iter_model.fit(X_train,y_train) # Train model!
#     t1 = (time.time() - t0)
#     cv_results["fit_time"].append(t1)

#     # Predicting
#     t0 = time.time()
#     y_pred = iter_model.predict(X_test) # Test model!
#     t1 = (time.time() - t0)
#     cv_results["score_time"].append(t1)

#     # Scores
#     cv_results["test_accuracy"].append(         accuracy_score(y_test, y_pred) )
#     cv_results["test_precision_macro"].append(  precision_score(y_test, y_pred, average="macro") )
#     cv_results["test_recall_macro"].append(     recall_score(y_test, y_pred, average="macro") )
#     cv_results["test_f1_macro"].append(         f1_score(y_test, y_pred, average="macro") )

# # Add more information to CV results
# cv_results[experiment_config.COLUMNS_LABELS[0]] = [ str(iter_datarep) ] * N_SPLITS_CV # Datarep
# cv_results[experiment_config.COLUMNS_LABELS[1]] = [ str(iter_classifier) ] * N_SPLITS_CV # Classifier
# cv_results[experiment_config.COLUMNS_LABELS[2]] = np.arange(N_SPLITS_CV) # Fold
# # for k,v in cv_results.items():
#     # print(f"{k}: \t{v}")

# # From dict to pandas DataFrame
# iteration_results = pd.DataFrame(cv_results.copy(), columns=sorted(cv_results.keys()))

In [None]:
# Contains the dataframe with compiled results
classif_results = None

## Time-series Classification

In [None]:
print("\t>>>LOADING/CREATING CLASSIFICATION RESULTS")
# Filename of the file containing demographics and HMD movements data
classification_results_filename = gen_path_results(experiment_config.RESULTS_FILENAME, extension=".csv")

### INPUTS / OUTPUTS
"""EDIT CUSTOM FILENAMES"""
input_files = [classification_results_filename]

RELOAD_TRIES = experiment_config.RELOAD_TRIES
# Try to load files maximum two times
for tries in range(RELOAD_TRIES):
    try:
        ### LOAD FILE
        print(f"Trying {tries+1}/{RELOAD_TRIES} to load files: {input_files}")
        
        ### CUSTOM SECTION TO READ FILES
        """EDIT CUSTOM READ"""
        classif_results = pd.read_csv(input_files[0]) # pd.DataFrame
        print(f"File {input_files[0]} was successfully loaded")

    except FileNotFoundError as e:
        ### CREATE FILE
        print(f"File not found. Creating again! {e}")

        ### CUSTOM SECTION TO CREATE FILES 
        """EDIT CUSTOM WRITE"""
        
        idx = 1 # Iterator for row index
        total_iter = len(DICT_DATA.keys()) * len(DICT_CLASSIFIERS.keys()) # How many total iterations are going to be conducted
        print(f"Iterating {total_iter} times")
        # f = IntProgress(min=0, max=classif_results.shape[0])
        # display(f)

        ### Iterate over datasets and classifiers

        # For each dataset
        for iter_datarep, iter_data in DICT_DATA.items():
            # Apply each classifier model
            for iter_classifier,iter_model in DICT_CLASSIFIERS.items():
                
                # Print messages
                if idx%experiment_config.DISPLAY_ITER_STEP==0: 
                    print(f" | Iteration {idx}/{total_iter} \t> dataset: {str(iter_datarep)} \tclassifier: {str(iter_classifier)}")
                idx = idx + 1
                
                ### Cross validation
                # The key strings are based on what the cross_validate() function generates automatically in 02_FeatureBasedClass.ipynb
                cv_results = {
                    "fit_time": [],
                    "score_time": [],
                    "test_accuracy": [],
                    "test_precision_macro": [],
                    "test_recall_macro": [],
                    "test_f1_macro": []
                }

                for f, (train_idx, test_idx) in enumerate(strat_KFold.split(iter_data, y)):
                    print(f"Fold: {f} > \t train,test > \t{train_idx.shape, test_idx.shape}")
                    # if(f==N_SPLITS_CV-1):
                        # print(f"{train_idx,test_idx}")

                    # Separate training and testing sets
                    X_train = iter_data.iloc[train_idx,:]
                    X_test = iter_data.iloc[test_idx,:]
                    y_train = y[train_idx]
                    y_test = y[test_idx]

                    
                    #### Fitting
                    t0 = time.time()        # Start measuring fitting time

                    ## NOTE! MiniRocket transforms the time series using random convolutional kernels, and then trains a LINEAR CLASSIFIER.
                    #       As suggested in the paper, for >10k training samples, it is recommended to use a Logistic Regression trained
                    #       using stochastic gradient descent
                    if (iter_classifier is Classifiers.MiniRocket):
                        # At the beginning
                        iter_model.fit(X_train) # `iter_model` contains just the transformer
                        X_train = iter_model.transform(X_train)     # Replace the time series with the transformer
                        # Fit model
                        MINIROCKET_LINEAR_MODEL.fit(X_train, y_train)   # Fit linear model
                    else:
                        # Fit model
                        iter_model.fit(X_train,y_train) # Train model!

                    t1 = (time.time() - t0)         # End measuring fitting time
                    cv_results["fit_time"].append(t1)

                    #### Predicting
                    t0 = time.time()
                    if (iter_classifier is Classifiers.MiniRocket):
                        # Predict model
                        X_test = iter_model.transform(X_test)       # Transform test time-series
                        y_pred = MINIROCKET_LINEAR_MODEL.predict(X_test) # Test model!
                    else:
                        # Predict model
                        y_pred = iter_model.predict(X_test) # Test model!
                    t1 = (time.time() - t0)
                    cv_results["score_time"].append(t1)

                    #### Scoring
                    cv_results["test_accuracy"].append(         accuracy_score(y_test, y_pred) )
                    cv_results["test_precision_macro"].append(  precision_score(y_test, y_pred, average="macro") )
                    cv_results["test_recall_macro"].append(     recall_score(y_test, y_pred, average="macro") )
                    cv_results["test_f1_macro"].append(         f1_score(y_test, y_pred, average="macro") )
                # } END: Cross-validation

                # Add more information to CV results
                cv_results[experiment_config.COLUMNS_LABELS[0]] = [ str(iter_datarep) ] * N_SPLITS_CV # Datarep
                cv_results[experiment_config.COLUMNS_LABELS[1]] = [ str(iter_classifier) ] * N_SPLITS_CV # Classifier
                cv_results[experiment_config.COLUMNS_LABELS[2]] = np.arange(N_SPLITS_CV) # Fold
                # for k,v in cv_results.items():
                    # print(f"{k}: \t{v}")

                # From dict to pandas DataFrame
                iteration_results = pd.DataFrame(cv_results.copy(), columns=sorted(cv_results.keys()))

                ### END: CROSS VALIDATION
                
                # Add more information to CV results
                cv_results[experiment_config.COLUMNS_LABELS[0]] = [ str(iter_datarep) ] * N_SPLITS_CV # Datarep
                cv_results[experiment_config.COLUMNS_LABELS[1]] = [ str(iter_classifier) ] * N_SPLITS_CV # Classifier
                cv_results[experiment_config.COLUMNS_LABELS[2]] = np.arange(N_SPLITS_CV) # Fold
                # for k,v in cv_results.items():
                    # print(f"{k}: \t{v}")

                # From dict to pandas DataFrame
                iteration_results = pd.DataFrame(cv_results.copy(), columns=sorted(cv_results.keys()))

                # Extend dataframe
                if classif_results is None:
                    classif_results = iteration_results
                else:
                    classif_results = classif_results.append(iteration_results, ignore_index=True)

                # END: Classifiers

            # } END: Dataset
            print(f"\t\t>> FINISHED (dataset)")
        # } END: Loop done

        # for i in [0,1,2,3]:
            # classif_results[experiment_config.COLUMNS_LABELS[i]] = classif_results[experiment_config.COLUMNS_LABELS[i]].astype(int)

        # Save files
        classif_results.to_csv(input_files[0], index=False)

        ### ---- CONTROL RETRIES
        if tries+1 < RELOAD_TRIES:
            continue
        else:
            raise
    break

## PLOT

In [None]:
# ## Data preprocessing for plotting
# # Take only accuracy for class1 over all runs and delete irrelevant columns
# plotsdataset = classif_results[( classif_results['classLabel']==1 )] #& (classif_results['distMetric'] == DICT_DISTMETRIC_TO_NUM[metric_txt]) )]   # Filter for plotting
# plotsdataset.drop(['distMetric','mcIter','classLabel','precision','recall'], axis=1, inplace=True)
# plotsdataset.tail()

# ## Plot
# if(experiment_config.SHOW_PLOTS): plot_violin_mc(plotsdataset, x_colname="dataRep", y_colname="accuracy", hue_colname="classifier",\
#                 suptitle=f'Classification accuracy over {MC_ITERATIONS} Monte-Carlo simulations',\
#                 title = f"State of the Art - Classifiers",\
#                 x_ticklabels = experiment_config.HEADERS_DATASETS, \
#                 y_lim=[0,1], \
#                 n_rows=1, n_cols=1, figsize=(8,6), \
#                 save_path=gen_path_plot(f"accuracies_SoA"),
#                 boxplot_instead_violin=True)

## EOF

In [None]:
print(">> FINISHED WITHOUT ERRORS!!")