# Rotor Model Analysis



This Jupyter Notebook is designed to analyze the performance of a pre-trained rotor model using a Long Short-Term Memory (LSTM) neural network. The notebook includes the following steps:

1. **Importing Libraries and Setting Up Paths**:
    - Essential libraries such as `pandas`, `torch`, `numpy`, and others are imported.
    - Paths for project, data, models, and results are defined.

2. **Device Configuration**:
    - The notebook checks if a GPU is available and sets the device accordingly.

3. **Utility Functions and Dataset Class**:
    - Utility functions for data processing are imported.
    - A custom `PropellerDataset` class is defined to load and preprocess the rotor data.

4. **Model Definition and Loading**:
    - The LSTM model architecture is defined by extending a base class `LSTMNet_rotor`.
    - The trained model and scalers are loaded from the specified paths.

5. **Evaluation Functions**:
    - Functions to compute evaluation metrics such as Mean Absolute Percentage Error (MAPE) and Relative L2 Norm Error are defined.

6. **Testing and Evaluation**:
    - The notebook iterates through test cases, loads the test data, and makes predictions using the trained model.
    - Evaluation metrics are computed for each test case and results are plotted.
    - The overall metrics across all test cases are computed and displayed.

7. **Plotting and Saving Results**:
    - If enabled, the results are plotted and optionally saved as PDF files.
    - The evaluation results are saved to a CSV file if the option is enabled.

This notebook provides a comprehensive workflow for evaluating the rotor model's performance and visualizing the results.

NOTE - Change the `project_path` before running the notebook!!

In [None]:
import os
import pandas as pd
import torch
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.utils.data import TensorDataset
import torch.nn.utils as nn_utils
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import MinMaxScaler, StandardScaler
# from torchviz import make_dot
from scipy.fftpack import fft, ifft
import sys

import matplotlib.pyplot as plt

import joblib

from sklearn.metrics import r2_score

In [None]:


project_path = "/mnt/e/eVTOL_model/eVTOL-VehicleModel/"     # path to the project
save_path = project_path + "result/rotor_model/"            # path to save the result   
model_path = project_path + "trained_models/"               # path to the saved model and scaler
data_path = project_path + "data/rotor_data/"                


PLOT_RESULT = True     # plot the result
SAVE_PLOT = False       # save the plot
SAVE_RESULT = False     # save the result

In [None]:
# Check if GPU is available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# Add the path to the project to the system path
sys.path.append(project_path + '/src')


# Import necessary functions
from utility_functions import downsample_to_35
from utility_functions import organize_data

In [None]:
# Condition function to filter subdirectories
def subdir_condition(subdir_name):
    """
    Modify this function to apply a specific filtering logic.
    """
    return subdir_name.startswith('rotor_dataset')  # Change this condition as needed


class PropellerDataset(Dataset):
    # def __init__(self, root_dir, alpha, J, theta, yaw, tilt, subdir_condition=None):
    def __init__(self, root_dir, subdir_condition=None):
        """
        Args:
            root_dir (string): Root directory with subdirectories containing CSV files.
            alpha (float): Angle of attack.
            J (float): Advance ratio.
            theta (float): Pitch.
            yaw (float): Yaw.
            tilt (float): Tilt.
            subdir_condition (callable, optional): A function or condition to filter subdirectories by name.
        """
        self.root_dir = root_dir
        self.data = []
        self.targets = []
        self.time_data = []  # Store time data separately
        self.omega_data = []  # Store omega (RPM) separately
        self.ct_data = []
        self.cq_data = []
        self.fft_ct_r = []
        self.fft_ct_i = []
        self.fft_cq_r = []
        self.fft_cq_i = []
        self.subdir_condition = subdir_condition

        # Traverse the root directory to gather data
        self._load_data()

    def _load_data(self):
        """
        Helper function to read CSV files from each subdirectory and extract relevant columns.
        """
        # Iterate through each subdirectory in the root directory
        for subdir, _, files in os.walk(self.root_dir):
            subdir_name = os.path.basename(subdir)
            
            # Apply subdirectory name condition
            if self.subdir_condition and not self.subdir_condition(subdir_name):
                continue

            for file in files:
                if file.endswith("_convergence.csv"):
                    # Load the CSV file
                    csv_path = os.path.join(subdir, file)
                    df = pd.read_csv(csv_path)
                    # df = df[(df['ref age (deg)']) < 1800]
                    # print("Length:",len(df))
                    
                    # Extract necessary columns for input features
                    time = df['T'].values  # Time
                    omega = df['RPM_1'].values  # RPM
                    J = df['J'].values
                    AOA = df['AOA'].values
                    v_inf = df['v_inf'].values
                    pitch = df['Pitch (blade)'].values
                    tilt = df['Tilt'].values
                    yaw = df['Yaw'].values
                    ref_angle = df['ref age (deg)'].values

                    # Store time and omega in separate lists for easy access
                    self.time_data.append(time)
                    self.omega_data.append(omega)
                    
                    # Extract CT and CQ as output variables
                    ct = df['CT_1'].values  # Thrust coefficient (CT)
                    cq = df['CQ_1'].values  # Torque coefficient (CQ)

                    fft_ct = fft(ct)
                    fft_ct_real = np.real(fft_ct)
                    fft_ct_imag = np.imag(fft_ct)

                    fft_cq = fft(cq)
                    fft_cq_real = np.real(fft_cq)
                    fft_cq_imag = np.imag(fft_cq)

                    
                    # Store ct and cq in separate lists for easy access
                    self.ct_data.append(ct)
                    self.cq_data.append(cq)
                    self.fft_ct_r.append(fft_ct_real)
                    self.fft_ct_i.append(fft_ct_imag)
                    self.fft_cq_r.append(fft_cq_real)
                    self.fft_cq_i.append(fft_cq_imag)

                    # For each simulation, the input sequence is structured as (n_timesteps, n_features)
                    sequence_inputs = []
                    sequence_outputs = []
                    for i in range(len(time)):
                        # Each time step has time, omega, and predefined variables: alpha, J, theta, yaw, tilt
                        input_data = [
                            time[i],  omega[i], AOA[i], v_inf[i], (100*np.sin(omega[i]*time[i])), 
                            (100*np.cos(omega[i]*time[i])), J[i], pitch[i], tilt[i], yaw[i]
                        ]
                        output_data = [ct[i], cq[i]]
                        
                        
                        sequence_inputs.append(input_data)
                        sequence_outputs.append(output_data)

                    sequence_inputs = np.array([sequence_inputs], dtype=float)
                    sequence_outputs = np.array([sequence_outputs], dtype=float)

                    # Append input sequence (n_timesteps, num_features) and output (CT, CQ)
                    self.data.append(sequence_inputs)
                    self.targets.append(sequence_outputs)  # Append the whole CT and CQ sequences


    def __len__(self):
        """
        Returns the total number of sequences in the dataset.
        """
        return len(self.data)

    def __getitem__(self, idx):
        """
        Returns a single sequence and its targets.
        """
        print(f"Requested index: {idx}")
        print(f"Dataset length: {len(self.data)}")  # Check if idx exceeds this
        print(f"Targets length: {len(self.targets)}")
        
        inputs = self.data[idx]  # Input sequence: (n_timesteps, n_features)
        targets = self.targets[idx]  # Output: (n_timesteps, 2)
        
        # return inputs, targets
        return torch.tensor(inputs, dtype=torch.float32), torch.tensor(targets, dtype=torch.float32)

    def get_variable(self, variable_name):
        """
        Returns a list of arrays for the specified variable.
        Args:
            'time' - timesteps
            'omega' - rotational velocity [RPM]
            'CT' - Thrust coefficient
            'CQ' - Torque coefficient
            'fft_ct_r' - Thrust ccoefficient FFT - real part
            'fft_ct_i' - Thrust ccoefficient FFT - imag part
            'fft_cq_r' - Torque ccoefficient FFT - real part
            'fft_cq_i' - Torque ccoefficient FFT - imag part

        """
        if variable_name == 'time':
            return self.time_data  # Return all time steps for each simulation
        elif variable_name == 'omega':
            return self.omega_data  # Return all omega (RPM) values for each simulation
        elif variable_name == 'CT':
            return self.ct_data 
        elif variable_name == 'CQ':
            return self.cq_data
        elif variable_name == 'fft_ct':
            return self.fft_ct_r, self.fft_ct_i 
        elif variable_name == 'fft_cq':
            return self.fft_cq_r, self.fft_cq_i  
        else:
            raise ValueError(f"Variable {variable_name} not supported.")



In [None]:
from rotor_model import LSTMNet_rotor

input_size = 10
hidden_size = 50
output_size = 2
num_layers = 4

class PropModel(LSTMNet_rotor):
    def __init__(self):
        super(PropModel, self).__init__(input_size, hidden_size, output_size, num_layers)

root_rotorModelsTrained = model_path + '/models/rotor/'
root_rotorScalersTrained = model_path + '/scalers/rotor/'

propeller_model = PropModel()
propeller_model.load_state_dict(torch.load(root_rotorModelsTrained + '2025-03-06_scaled_H26FpropModel_lr0.0005_e2500_nL4_numNN50.pth'))
propeller_model.to(device)

# Load the scalers
input_scaler_rotor = joblib.load(root_rotorScalersTrained + '2025-03-06_scaled_H26F_ipScaler_lr0.0005_e2500_nL4_numNN50.pkl')
output_scaler_rotor = joblib.load(root_rotorScalersTrained + '2025-03-06_scaled_H26F_opScaler_lr0.0005_e2500_nL4_numNN50.pkl')

In [None]:
# Evaluation functions

def mape(y_true, y_pred):
    """Compute Mean Absolute Percentage Error (MAPE)"""
    mask = y_true != 0  # Avoid division by zero
    return np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask])) * 100

def relative_l2_norm(y_true, y_pred):
    """Compute Relative L2 Norm Error (ε)"""
    mask = y_true != 0  # Avoid division by zero
    numerator = np.linalg.norm(y_pred[mask] - y_true[mask], ord=2)  # ||pred - true||_2
    denominator = np.linalg.norm(y_true[mask], ord=2)  # ||true||_2
    return (numerator / denominator) * 100

In [None]:



# Root directory where simulation subdirectories are stored
root_dir_test_base = data_path + "testing_data/"


# Initialize lists to store evaluation results
mape_ct_list, mape_cq_list = [], []
r2_ct_list, r2_cq_list = [], []
relative_l2_norm_ct_list, relative_l2_norm_cq_list = [], []
case_names = []

for simulation_case in os.listdir(root_dir_test_base):
    # Root directory where simulation subdirectories are stored
    root_dir_test_sim = root_dir_test_base+simulation_case
  

    # Create the dataset with a subdirectory condition
    dataset_test = PropellerDataset(root_dir_test_sim, subdir_condition=subdir_condition)
    inputs_test, outputs_test = dataset_test[0:]

    # Assuming your input tensor is named `input_tensor`
    input_tensor_test = inputs_test.squeeze(1)  # Remove the singleton dimension at index 1
    # print("Input shape:", input_tensor_test.shape)  # Should print: torch.Size([6, 145, 7])

    output_tensor_test = outputs_test.squeeze(1)
    # print("Output shape:",output_tensor_test.shape)

    inputs_test_reshaped = input_tensor_test.reshape(-1, input_size)

    test_inputs_normalized = input_scaler_rotor.transform(inputs_test_reshaped.reshape(-1, input_size)).reshape(input_tensor_test.shape)

    test_inputs_tensor = torch.tensor(test_inputs_normalized, dtype=torch.float32).to(device)

    # Make predictions using the trained model
    propeller_model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        predicted_outputs = propeller_model(test_inputs_tensor)


    # Convert the predictions back to numpy and inverse scale the outputs
    predicted_outputs = predicted_outputs.cpu().detach().numpy()  # Convert tensor to numpy array
    predicted_outputs_original_scale = output_scaler_rotor.inverse_transform(predicted_outputs.reshape(-1, output_size))

    # Reshape the predictions to match the original sequence structure if needed
    predicted_outputs_original_scale = predicted_outputs_original_scale.reshape(input_tensor_test.shape[0], input_tensor_test.shape[1], output_size)
    predicted_outputs_original_scale = predicted_outputs_original_scale[0]

    # print(predicted_outputs_original_scale.shape)

    # Model predicted values
    ct_test_NN = predicted_outputs_original_scale[:,0]
    cq_test_NN = predicted_outputs_original_scale[:,1]



    # Load timesteps, CT and CQ from FLOWUnsteady simualtions
    time_steps = dataset_test.get_variable('time')

    ct_test_flowuns = dataset_test.get_variable('CT')
    cq_test_flowuns = dataset_test.get_variable('CQ')

    fft_ct_fu_real, fft_ct_fu_imag = dataset_test.get_variable('fft_ct')
    fft_cq_fu_real, fft_cq_fu_imag = dataset_test.get_variable('fft_cq')


    mape_ct = mape(ct_test_flowuns[0], ct_test_NN)
    mape_cq = mape(cq_test_flowuns[0], cq_test_NN)
    r2_ct = r2_score(ct_test_flowuns[0], ct_test_NN)
    r2_cq = r2_score(cq_test_flowuns[0], cq_test_NN)
    relative_l2_norm_ct = relative_l2_norm(ct_test_flowuns[0], ct_test_NN)
    relative_l2_norm_cq = relative_l2_norm(cq_test_flowuns[0], cq_test_NN)
    


    # Store results
    mape_ct_list.append(mape_ct)
    mape_cq_list.append(mape_cq)
    r2_ct_list.append(r2_ct)
    r2_cq_list.append(r2_cq)
    relative_l2_norm_ct_list.append(relative_l2_norm_ct)
    relative_l2_norm_cq_list.append(relative_l2_norm_cq)
    case_names.append(simulation_case)

    # Print metrics for this simulation case
    print(f"Simulation Case: {simulation_case}")
    print(f"  MAPE Ct: {mape_ct:.2f}%, MAPE Cq: {mape_cq:.2f}%")
    print(f"  R² Ct: {r2_ct:.4f}, R² Cq: {r2_cq:.4f}")
    print(f"  ε Ct: {relative_l2_norm_ct:.2f}%, ε Cq: {relative_l2_norm_cq:.2f}%")



    # Plot the results

    
    if PLOT_RESULT:
        plt.figure()
        plt.figure(figsize=(15, 5))
        legend_labels = simulation_case.split('_')
        rpm_label = legend_labels[4]
        v_inf_label = legend_labels[6]
        tilt = legend_labels[8]
        # Plot for Thrust coefficient, $C_T$
        plt.subplot(1, 2, 1)
        plt.plot(time_steps[0], ct_test_NN, label='Predicted $C_T$', color='black', linestyle='--', linewidth=2)
        plt.plot(time_steps[0], ct_test_flowuns[0], label='Actual $C_T$', color='red', linestyle='-', linewidth=2)
        plt.xlabel('Time [s]', fontsize=16)
        plt.ylabel('Thrust coefficient, $C_T$', fontsize=16)
        plt.title(f'Thrust Coefficient Comparison', fontsize=20)
        plt.legend(loc='upper center', fontsize=16, title='RPM = {}, $V_{{\infty}}$ = ${} m/s$, Tilt = ${}^{{\circ}}$'.format(rpm_label, v_inf_label, tilt), fancybox=True, borderpad=1, title_fontsize='16', ncol=3)
        plt.ylim([min(ct_test_NN.min(), ct_test_flowuns[0].min()) - 0.005, max(ct_test_NN.max(), ct_test_flowuns[0].max()) + 0.01])
        plt.grid(True, linestyle='--', linewidth=0.5)
        
        # Plot for Torque coefficient, $C_Q$
        plt.subplot(1, 2, 2)
        plt.plot(time_steps[0], cq_test_NN, label='Predicted $C_Q$', color='black', linestyle='--', linewidth=2)
        plt.plot(time_steps[0], cq_test_flowuns[0], label='Actual $C_Q$', color='red', linestyle='-', linewidth=2)
        plt.xlabel('Time [s]', fontsize=16)
        plt.ylabel('Torque coefficient, $C_Q$', fontsize=16)
        plt.title(f'Torque Coefficient Comparison', fontsize=20)
        plt.legend(loc='upper center', fontsize=16, title='RPM = {}, $V_{{\infty}}$ = ${} m/s$, Tilt = ${}^{{\circ}}$'.format(rpm_label, v_inf_label, tilt), fancybox=True, borderpad=1, title_fontsize='16', ncol=3)
        plt.ylim([min(cq_test_NN.min(), cq_test_flowuns[0].min()) - 0.0002, max(cq_test_NN.max(), cq_test_flowuns[0].max()) + 0.0006])
        plt.grid(True, linestyle='--', linewidth=0.5)
        
        plt.tight_layout()
        if SAVE_PLOT:
            plt.savefig(f"{save_path}{simulation_case}_comparison.pdf", format="pdf", dpi=300, bbox_inches="tight")
        plt.show()


    # plt.show()

# Compute average metrics across all test cases
print("\nOverall Metrics Across All Test Cases:")
print(f"  Avg MAPE Ct: {np.mean(mape_ct_list):.2f}%, Avg MAPE Cq: {np.mean(mape_cq_list):.2f}%")
print(f"  Avg R² Ct: {np.mean(r2_ct_list):.4f}, Avg R² Cq: {np.mean(r2_cq_list):.4f}")
print(f"  Avg ε Ct: {np.mean(relative_l2_norm_ct_list):.2f}%, Avg ε Cq: {np.mean(relative_l2_norm_cq_list):.2f}%")

if SAVE_RESULT:
    # Convert results to a DataFrame
    results_df = pd.DataFrame({
        "Case": case_names,
        "Relative L2 Norm CT (%)": relative_l2_norm_ct_list,
        "Relative L2 Norm CQ (%)": relative_l2_norm_cq_list,
        "MAPE CT (%)": mape_ct_list,
        "MAPE CQ (%)": mape_cq_list,
        "R² CT": r2_ct_list,
        "R² CQ": r2_cq_list
    })

    # Append mean values to the DataFrame
    mean_row = pd.DataFrame({
        "Case": ["Mean"], 
        "Relative L2 Norm CT (%)": np.mean(relative_l2_norm_ct_list),
        "Relative L2 Norm CQ (%)": np.mean(relative_l2_norm_cq_list),
        "MAPE CT (%)": np.mean(mape_ct_list),
        "MAPE CQ (%)": np.mean(mape_cq_list),
        "R² CT": np.mean(r2_ct_list),
        "R² CQ": np.mean(r2_cq_list)
    })

    results_df = pd.concat([results_df, mean_row], ignore_index=True)

    # Define CSV file path
    csv_filename = os.path.join(save_path, "rotorModel_evaluation_results.csv")

    # Save to CSV
    results_df.to_csv(csv_filename, index=False)

    print(f"Results (including mean values) saved to {csv_filename}")