# Data Loading and Class Labeling

In [2]:
import pandas as pd
import glob
import os

# Data directories
expert_dir = './trajectories/expert'
novice_dir = './trajectories/novice'

# Function to load data and label
def load_data(directory, label):
    data_list = []
    for filepath in glob.glob(os.path.join(directory, "*.csv")):
        df = pd.read_csv(filepath)
        df['label'] = label  # Class label
        data_list.append(df)
    return data_list

# Load expert and novice data
expert_data = load_data(expert_dir, label="expert")
novice_data = load_data(novice_dir, label="novice")

# Combine lists into a single DataFrame and review the structure
all_data = expert_data + novice_data
print("Expert and novice data loaded.")
print(f"Total time series: {len(all_data)}")
print("Data example:", all_data[0].head())


Expert and novice data loaded.
Total time series: 14
Data example:                  Timestamp  PositionX  PositionY  PositionZ  RotationX  \
0  2023-05-17 15:12:37.254   0.066467  -0.148628   0.171193   0.126154   
1  2023-05-17 15:12:37.263   0.099649  -0.153283   0.165442   0.031201   
2  2023-05-17 15:12:37.269   0.099649  -0.153283   0.165442   0.031201   
3  2023-05-17 15:12:37.274   0.099649  -0.153283   0.165442   0.031201   
4  2023-05-17 15:12:37.317   0.099225  -0.153320   0.165391   0.032077   

   RotationY  RotationZ  RotationW   label  
0   0.984342  -0.056134  -0.109569  expert  
1   0.998373  -0.041187  -0.024101  expert  
2   0.998373  -0.041187  -0.024101  expert  
3   0.998373  -0.041187  -0.024101  expert  
4   0.998298  -0.041397  -0.025646  expert  


# Converting Timestamp to Relative Time in Milliseconds

In [3]:
# Function to convert timestamp to relative time in milliseconds
def convert_to_relative_time(df):
    df['Timestamp'] = pd.to_datetime(df['Timestamp'], format="%Y-%m-%d %H:%M:%S.%f")
    df['time'] = (df['Timestamp'] - df['Timestamp'].iloc[0]).dt.total_seconds() * 1000  # Milliseconds
    return df.drop(columns=['Timestamp'])  # Remove original column

# Apply the function to each series in the list
all_data = [convert_to_relative_time(df) for df in all_data]

# Review the structure of the data
print("Data after converting Timestamp to relative time:")
print(all_data[0].head())


Data after converting Timestamp to relative time:
   PositionX  PositionY  PositionZ  RotationX  RotationY  RotationZ  \
0   0.066467  -0.148628   0.171193   0.126154   0.984342  -0.056134   
1   0.099649  -0.153283   0.165442   0.031201   0.998373  -0.041187   
2   0.099649  -0.153283   0.165442   0.031201   0.998373  -0.041187   
3   0.099649  -0.153283   0.165442   0.031201   0.998373  -0.041187   
4   0.099225  -0.153320   0.165391   0.032077   0.998298  -0.041397   

   RotationW   label  time  
0  -0.109569  expert   0.0  
1  -0.024101  expert   9.0  
2  -0.024101  expert  15.0  
3  -0.024101  expert  20.0  
4  -0.025646  expert  63.0  


# 3. Interpolation to Match the Length of Time Series.

### We calculate the average length of the experts' series and use this length as a reference to interpolate all the series.

In [4]:
import numpy as np
from scipy.interpolate import interp1d
import pandas as pd

# Calculate the average length of the expert series
mean_length = int(np.mean([len(df) for df in expert_data]))

# Function to interpolate to a standard length
def interpolate_series(df, target_length):
    interpolated_df = pd.DataFrame()
    common_time = np.linspace(0, df['time'].iloc[-1], target_length)
    interpolated_df['time'] = common_time
    for col in df.columns:
        if col != 'time' and col != 'label':  # Exclude the label
            f = interp1d(df['time'], df[col], kind='linear', fill_value="extrapolate")
            interpolated_df[col] = f(common_time)
    interpolated_df['label'] = df['label'].iloc[0]  # Keep the original label
    return interpolated_df

# Apply interpolation to all series
all_data_interpolated = [interpolate_series(df, mean_length) for df in all_data]

# Review the structure after interpolation
print("Data after interpolation:")
print(all_data_interpolated[0].head())


Data after interpolation:
         time  PositionX  PositionY  PositionZ  RotationX  RotationY  \
0    0.000000   0.066467  -0.148628   0.171193   0.126154   0.984342   
1   91.172662   0.099089  -0.153292   0.165439   0.032355   0.998269   
2  182.345324   0.098723  -0.153169   0.165550   0.033184   0.998201   
3  273.517986   0.098242  -0.152974   0.165602   0.034438   0.998114   
4  364.690647   0.097511  -0.152724   0.165596   0.036225   0.997953   

   RotationZ  RotationW   label  
0  -0.056134  -0.109569  expert  
1  -0.041596  -0.026100  expert  
2  -0.041964  -0.027074  expert  
3  -0.042095  -0.028392  expert  
4  -0.042292  -0.031440  expert  


# 4. Preparing Data for Training (Concatenation and Division of Variables)

### We concatenate all the interpolated series and split into features (X) and labels (y).

In [5]:
# Concatenate all series into a single DataFrame
data_for_classification = pd.concat(all_data_interpolated, ignore_index=True)

# Separate features and labels
X = data_for_classification.drop(columns=['label', 'time'])  # Features (without label or time)
y = data_for_classification['label']  # Labels

print("Data prepared for classification:")
print("Features (X):", X.shape)
print("Labels (y):", y.shape)


Data prepared for classification:
Features (X): (3906, 7)
Labels (y): (3906,)


# 5. Training and Evaluation with Random Forest

### Here we will use train_test_split to split the dataset and test a RandomForestClassifier model.

In [6]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report

# Split into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

# Train a Random Forest classifier
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)

# Make predictions and evaluate performance
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)

print("Model accuracy:", accuracy)
print("Classification report:\n", report)



Model accuracy: 0.9991467576791809
Classification report:
               precision    recall  f1-score   support

      expert       1.00      1.00      1.00       837
      novice       1.00      1.00      1.00       335

    accuracy                           1.00      1172
   macro avg       1.00      1.00      1.00      1172
weighted avg       1.00      1.00      1.00      1172



# 6. Save the Trained Model

In [7]:
import joblib

# Path to save the trained model
model_filename = './models/random_forest_model.joblib'

# Save the trained model
joblib.dump(model, model_filename)
print(f"Model saved at {model_filename}")



Model saved at ./models/random_forest_model.joblib


In [8]:
# Load the saved model
loaded_model = joblib.load(model_filename)
print("Model loaded successfully.")

# Classification function for new time series
def classify_series(new_series, model, target_length):
    # Convert Timestamp to relative time in milliseconds
    new_series = convert_to_relative_time(new_series)
    # Interpolate the series to the same number of points as the model
    interpolated_series = interpolate_series(new_series, target_length)
    # Extract features (excluding 'time' and 'label' columns if present)
    X_new = interpolated_series.drop(columns=['time'], errors='ignore')
    # Classify with the loaded model
    prediction = model.predict(X_new)
    return prediction


Model loaded successfully.


In [9]:
# Function to interpolate to a standard length
def interpolate_series(df, target_length):
    interpolated_df = pd.DataFrame()
    common_time = np.linspace(0, df['time'].iloc[-1], target_length)
    interpolated_df['time'] = common_time
    for col in df.columns:
        if col != 'time' and col != 'label':  # Exclude label if present
            f = interp1d(df['time'], df[col], kind='linear', fill_value="extrapolate")
            interpolated_df[col] = f(common_time)
    # Only add 'label' if it exists in the original DataFrame
    if 'label' in df.columns:
        interpolated_df['label'] = df['label'].iloc[0]
    return interpolated_df


In [11]:
import glob
import pandas as pd

# Classification function for a complete series (simplified to return a single prediction)
def classify_series(new_series, model, target_length):
    # Convert Timestamp to relative time in milliseconds
    new_series = convert_to_relative_time(new_series)
    # Interpolate the series to the same number of points as the model
    interpolated_series = interpolate_series(new_series, target_length)
    
    # Select only the first record for classification
    X_new = interpolated_series.drop(columns=['time'], errors='ignore').iloc[0].values.reshape(1, -1)
    
    # Classify with the loaded model
    prediction = model.predict(X_new)
    return prediction[0]  # Return a single label

# Path to the directory with .csv files
csv_directory = './trajectories/test_serie/'

# Load and classify each .csv file in the directory
for csv_file in glob.glob(f"{csv_directory}/*.csv"):
    new_series_df = pd.read_csv(csv_file)
    result = classify_series(new_series_df, loaded_model, mean_length)
    print(f"Classification of the series in {csv_file}:", result)



Classification of the series in ./trajectories/test_serie/positions01.csv: novice




In [12]:
# Function to extract summary statistics from each series in the dataset
def extract_features(data_list):
    features = []
    labels = []
    for df in data_list:
        feature_vector = []
        for col in df.columns:
            if col != 'time' and col != 'label':  # Exclude 'time' and 'label'
                feature_vector.extend([
                    df[col].mean(),
                    df[col].std(),
                    df[col].min(),
                    df[col].max(),
                    df[col].median(),
                ])
        features.append(feature_vector)
        labels.append(df['label'].iloc[0])  # Get the label for the entire series
    return np.array(features), np.array(labels)

# Extract features and labels from the training dataset
X_train, y_train = extract_features(all_data_interpolated)

# Train the model with the summary statistics feature vector
from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)

# Save the trained model
import joblib
model_filename = './models/random_forest_model.joblib'  # Define the model file path
joblib.dump(model, model_filename)
print(f"Model saved at {model_filename}")



Model saved at ./models/random_forest_model.joblib


# Giving a grade to the user's performance

In [15]:
import numpy as np
import glob
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib

# Function to extract summary statistics from each series in the dataset
def extract_features(data_list):
    features = []
    labels = []
    for df in data_list:
        feature_vector = []
        for col in df.columns:
            if col != 'time' and col != 'label':  # Exclude 'time' and 'label'
                feature_vector.extend([
                    df[col].mean(),
                    df[col].std(),
                    df[col].min(),
                    df[col].max(),
                    df[col].median(),
                ])
        features.append(feature_vector)
        labels.append(df['label'].iloc[0])  # Get the label for the entire series
    return np.array(features), np.array(labels)

# Assuming `all_data_interpolated` is a list of interpolated DataFrames
X_train, y_train = extract_features(all_data_interpolated)

# Train the model with summary statistics
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)

# Save the model
model_filename = './models/random_forest_model.joblib'
joblib.dump(model, model_filename)
print(f"Model saved at {model_filename}")

# Classification function with a score scale based on probability
def classify_series_with_score(new_series, model, target_length):
    # Convert Timestamp to relative time in milliseconds
    new_series = convert_to_relative_time(new_series)
    # Interpolate the series to the same number of points as the model
    interpolated_series = interpolate_series(new_series, target_length)
    
    # Calculate summary statistics to reduce the series to a single feature vector
    feature_vector = []
    for col in interpolated_series.columns:
        if col != 'time' and col != 'label':  # Exclude 'time' and 'label'
            feature_vector.extend([
                interpolated_series[col].mean(),
                interpolated_series[col].std(),
                interpolated_series[col].min(),
                interpolated_series[col].max(),
                interpolated_series[col].median(),
            ])
    
    # Ensure the feature vector has the correct format
    X_new = np.array(feature_vector).reshape(1, -1)
    
    # Get the probability of belonging to the "expert" class
    proba = model.predict_proba(X_new)[0]
    proba_expert = proba[0]  # Assuming index 0 is "expert" and index 1 is "novice"
    
    # Convert the probability to a 1 to 7 scale
    if proba_expert >= 0.85:
        score = 7
    elif proba_expert >= 0.70:
        score = 6
    elif proba_expert >= 0.55:
        score = 5
    elif proba_expert >= 0.40:
        score = 4
    elif proba_expert >= 0.25:
        score = 3
    elif proba_expert >= 0.10:
        score = 2
    else:
        score = 1

    # Assign a label based on the score
    if score >= 6.0:
        label = "expert"
    elif score >= 4.0:
        label = "intermediate"
    else:
        label = "novice"
    
    return label, score, proba_expert





Model saved at ./models/random_forest_model.joblib


### Next, and as an example, we consider three users who generate the records: novice -> positions01.csv, expert-> positions08.csv, expert-> positions02.csv

In [18]:
# Directory with .csv files
csv_directory = './trajectories/test_serie/'

# Load and classify each .csv file in the directory
for csv_file in glob.glob(f"{csv_directory}/*.csv"):
    new_series_df = pd.read_csv(csv_file)
    label, score, proba_expert = classify_series_with_score(new_series_df, model, mean_length)
    print(f"Classification for {csv_file}:")
    print(f"Label: {label}")
    print(f"Score on a scale of 1 to 7: {score}")
    print(f"Probability of being an expert: {proba_expert:.2f}\n")

Classification for ./trajectories/test_serie/positions08.csv:
Label: expert
Score on a scale of 1 to 7: 7
Probability of being an expert: 0.94

Classification for ./trajectories/test_serie/positions01.csv:
Label: novice
Score on a scale of 1 to 7: 2
Probability of being an expert: 0.15

Classification for ./trajectories/test_serie/positions02.csv:
Label: expert
Score on a scale of 1 to 7: 7
Probability of being an expert: 0.96

