# Development and Simulation of an Adaptive Clutch Modulator

Rishon John Moses Paul Sudhagar, Engineering Intern

In [16]:
from dataclasses import dataclass, field
import h5py
import os
import csv
import joblib
import optuna
import numpy as np
import pandas as pd
import xgboost as xgb
import matplotlib.pyplot as plt
import scienceplots
from scipy.interpolate import interp1d, CubicSpline
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from scipy.stats import skew, kurtosis
from scipy.signal import find_peaks

@dataclass
class Config:
    source_dir      : str   = 'data/amt_raw'
    temp_dir        : str   = 'data/amt_temp'
    output_dir      : str   = 'artifacts'
    scaler_file     : str   = 'scaler.joblib'
    model_file      : str   = 'model.joblib'
    med_curve_file  : str   = 'median_curves.csv'
    idl_curve_file  : str   = 'ideal_curves.csv'
    curve_stat_file : str   = 'curve_stats.csv'
    n_clusters      : int   = 5
    n_trials        : int   = 50
    time_step       : float = 0.1
    AMTgear_ratios  : dict  = field(default_factory=lambda:{1 : 6.696, 2 : 3.806, 3 : 2.289, 4 : 1.480, 5 : 1.000, 6 : 0.728, -1: 13.862, 0 : 0})
    k               : float = 0.107
    input_featureset: list  = field(default_factory=lambda:['EngTrq', 'EngSpd', 'tmpCltActTC', 'CurrGr', 'Calc_VehSpd'])
    time_col        : str   = 'EventTime'
    target_col      : str   = 'ClutchCval'
    identifier_col  : str   = 'EngagementEvents'
    jerk_weight     : float = 0.6
    wear_weight     : float = 0.4
    bite_point      : int   = 25
    release_point   : int   = 75
    engaged         : int   = 0
    disengaged      : int   = 100
    fallback        : int   = 2
    max_clutch_speed: float = 300.0 
    plot_style      : list  = field(default_factory=lambda:['science', 'no-latex'])

config = Config()

In [17]:
def mat_to_csv(mat_name: str, source_dir: str = 'data/amt_raw', temp_dir: str = 'data/amt_temp') -> None:
    '''
    A function to convert MATLAB .mat files to .csv files.

    Parameters:
        mat_name (str): The name of the .mat file to be converted.
        source_dir (str): The directory where the .mat files are located. Default is 'data/amt_raw'.
        temp_dir (str): The directory where the converted .csv files will be saved. Default is 'data/amt_temp'.

    Returns:
        None
    '''

    testdata={}
    mat_path = os.path.join(source_dir, mat_name)

    with h5py.File(mat_path, 'r') as f:
        for column in f:
            if f[column].shape[0] == 1:
                fil=np.array(f[column][()])
                testdata[f'{column}'] = fil[0]

    headers=testdata.keys()

    for i in range(0,12):
        tim=f't{i}'
        param_lst={}
        name_lst =[]
        if tim in headers:
            param_lst['Time']= testdata[tim]
            for name in headers:
                if name.__contains__(tim) and name != tim and name not in name_lst:
                    param_lst[name] = testdata[name]
                    param_lst= pd.DataFrame(param_lst)
                    test_dir = f'{temp_dir}/{mat_name.split(".")[0]}'
                    os.makedirs(test_dir, exist_ok=True)
                    param_lst.to_csv(os.path.join(test_dir, f'{name}.csv'), index=False)
                    name_lst.append(name)



source_dir = config.source_dir
os.makedirs(source_dir, exist_ok=True)

temp_dir = config.temp_dir
os.makedirs(temp_dir, exist_ok=True)

mat_files = os.listdir(source_dir)
mat_files = [f for f in mat_files if f.endswith('.mat')]

if len(mat_files) == 0:
    print(f'No valid .mat files found at {source_dir}.')
else:
    print(f'{len(mat_files)} file(s) loaded from {source_dir}')
    for mat_name in mat_files:
        mat_to_csv(mat_name, source_dir,temp_dir)

7 file(s) loaded from data/amt_raw


In [18]:
def time_align(test_name : str, source_dir : str, time_step: float = 0.01) -> pd.DataFrame:
    '''
    Aligns all vehicle parameters from a given test to a common time vector using linear interpolation.

    Parameters:
        test_name (str): Name of the test to be processed.
        source_dir (str): Directory containing the test CSV files.
        time_step (float): Time step for the common time vector in seconds.

    Returns:
        pd.DataFrame: A DataFrame containing the time-aligned vehicle parameters.
    '''
    source_dir = f'{source_dir}/{test_name}'
    os.makedirs(source_dir, exist_ok=True)

    params = os.listdir(source_dir)
    params = [f for f in params if f.endswith('.csv')]

    df_lst=list()
    start = float('inf')
    end = float('-inf')

    for param in params:
        df=pd.read_csv(os.path.join(source_dir, param))
        if df.empty:
            continue
        if 'Time' not in df.columns:
            continue

        df.sort_values(by='Time', inplace=True)
        start = min(start, df['Time'].min())
        end = max(end, df['Time'].max())
        df_lst.append(df)

    if start == float('inf') or end == float('-inf'):
        return
    
    time = np.arange(start, end, time_step)
    aligned_df = pd.DataFrame({'Time': time})

    for i, df in enumerate(df_lst):
        data_col = df.columns[-1]
        
        INTERP_FUNC = interp1d(df['Time'], df[data_col], kind='linear', 
                    bounds_error=False, fill_value='extrapolate')

        if data_col.__contains__('Calculated'):
            data_label = data_col.split('_')[1]
        elif data_col.__contains__('CAN1'):
            data_label = data_col.split('_')[6]
        elif data_col.__contains__('SpdR'):
            data_label = str(data_col.split('_')[3]+data_col.split('_')[4])
        elif data_col.__contains__('Clutch_'):
            data_label = str(data_col.split('_')[3]+data_col.split('_')[4])
        else:
            data_label = data_col.split('_')[3]
        
        aligned_df[f'{data_label}'] = INTERP_FUNC(time)
        aligned_df.dropna(inplace=True)

    if aligned_df.empty:
        return None
    
    aligned_df['Time']=aligned_df['Time'].round(decimals=3)

    return aligned_df

source_dir = config.temp_dir
os.makedirs(source_dir, exist_ok=True)

tests = os.listdir(source_dir)

time_step = config.time_step # seconds

test_data = dict()

for test_name in tests:
    df = time_align(test_name, source_dir, time_step)
    if df is not None:
        test_data[f'{test_name.split('.')[0]}_aligned'] = df


In [19]:
def calc_params(df: pd.DataFrame, gear_ratios: dict) -> pd.DataFrame:
    '''
    A function to calculate additional vehicle parameters such as slip power, output shaft speed, and vehicle speed.
    Parameters:
        df (pd.DataFrame): DataFrame containing the vehicle parameters.
        gear_ratios (dict): Dictionary mapping gear numbers to their respective gear ratios.
    Returns:
        pd.DataFrame: DataFrame with additional calculated parameters.
    '''
    col=df.columns
    if 'InShaftSpd' in col:
        df['CurrGr'] = df['CurrGr'].astype(int)
        df['Calc_SlipPower'] = (df['EngTrq']) * (df['EngSpd']-df['InShaftSpd']) * (2*np.pi/60)/ 1000
        df['Calc_OutShaftSpd'] = df['InShaftSpd'] / df['CurrGr'].map(gear_ratios)
        df['Calc_OutShaftSpd'] = df['Calc_OutShaftSpd'].replace([np.nan, np.inf, -np.inf],0)
        df['Calc_VehSpd'] = (3/25)*(np.pi)*(config.k)*df['Calc_OutShaftSpd'] # k is a constant that depends on tyre size and final drive ratio
    return df

tests = test_data.keys()
test_data_calc = dict()

for test_name in tests:
    df = test_data[test_name]
    df_calc = calc_params(df,config.AMTgear_ratios)
    test_data_calc[f'{test_name}_calc'] = df_calc

In [20]:
def find_curves(test_data_calc: dict) -> pd.DataFrame:
    '''
    A function to identify and extract clutch engagement curves from the test data.
    Parameters:
        test_data_calc (dict): Dictionary containing the test data DataFrames.
    Returns:
        pd.DataFrame: DataFrame containing all identified clutch engagement curves with event time.
    '''

    full_curve_lst = []
    offset = 0

    for test_name in test_data_calc:
            
        df = pd.DataFrame.copy(test_data_calc[test_name])

        if 'ClutchCval' not in df.columns:
            continue

        if 'ClutchStat' not in df.columns:
            curves = df[(df['ClutchCval'] > 10) & (df['ClutchCval'] < 85)].copy()
        else:
            df['ClutchStat'] = df['ClutchStat'].astype(int)
            engaging = df['ClutchStat'] == 1
            group = (engaging != engaging.shift()).cumsum()
            lengths = engaging.groupby(group).transform('sum')
            mask = (df['ClutchStat'] == 1) & (lengths >= 3)
            curves = df[mask].copy()
        
        if curves.empty:
            continue

        curves = curves.reset_index(drop=True)
        time_diff = curves['Time'].diff().fillna(0)
        
        curves['EngagementEvents'] = (time_diff > 0.015).cumsum() + offset
        curves['EventTime'] = curves.groupby('EngagementEvents')['Time'].transform(lambda x: round((x - x.iloc[0]),4))
        
        full_curve_lst.append(curves)
        
        offset = curves['EngagementEvents'].max() + 1
        
    return pd.concat(full_curve_lst, ignore_index=True) if full_curve_lst else pd.DataFrame()

df_split = find_curves(test_data_calc)

In [21]:

def create_splines(df: pd.DataFrame, identifier_col: str, target_col: str, time_col: str, points_per_curve: int = 100) -> dict:
    '''
    A function to create cubic splines for each clutch engagement event.
    Parameters:
        df (pd.DataFrame): DataFrame containing the clutch engagement events.
        identifier_col (str): Column name identifying each engagement event.
        target_col (str): Column name of the target variable for spline fitting.
        time_col (str): Column name of the time variable.
        points_per_curve (int): Number of points to generate for each spline curve.
    Returns:
        dict: A dictionary where keys are event identifiers and values are arrays of spline points.
    '''
    splines = {}
    events = df.groupby(identifier_col)

    for event_num, event in events:
        y = event[target_col].values
        x = event[time_col].values

        if len(x) < 4:
            print()
            continue

        try:
            spline = CubicSpline(x, y, bc_type='natural')
            x_new = np.linspace(0, x.max(), points_per_curve)
            y_new = spline(x_new)
            splines[event_num] = y_new
        except Exception as e:
            print(f'Error creating spline for event {event_num}: {e}')
            continue
    return splines

output_dir = config.output_dir
os.makedirs(output_dir, exist_ok=True)

splines = create_splines(df_split, config.identifier_col, config.target_col, config.time_col)

In [22]:
def aggregator(df: pd.DataFrame, identifier_col: str, input_featureset: list, time_col: str) -> pd.DataFrame:
    '''
    A function to aggregate event parameters for each clutch engagement event.
    Parameters:
        df (pd.DataFrame): DataFrame containing the clutch engagement events.
        identifier_col (str): Column name identifying each engagement event.
        input_featureset (list): List of feature column names to aggregate.
        time_col (str): Column name of the time variable.
    Returns:
        pd.DataFrame: A DataFrame containing aggregated event parameters.
    '''
    aggregations = {
        feature: ['mean', 'std', ('q25', lambda x: x.quantile(0.25)), ('q75', lambda x: x.quantile(0.75)), 'first']
        for feature in input_featureset
    }


    event_parameters_df = df.groupby(identifier_col).agg(aggregations)
    event_parameters_df.columns = ['_'.join(col).strip() for col in event_parameters_df.columns.values]

    event_durations = df.groupby(identifier_col)[time_col].max()
    event_parameters_df['Duration'] = event_durations

    return event_parameters_df

event_params = aggregator(df_split, config.identifier_col, config.input_featureset, config.time_col)

In [None]:
def extract_curve_features(y: np.ndarray) -> list:
    '''
    Extracts statistical and shape features from a spline curve.
    Parameters:
        y (np.ndarray): Array of spline points.
    Returns:
        list: A list of extracted features.
    '''
    return [
        np.mean(y), np.median(y), np.std(y), np.min(y), np.max(y),
        skew(y), kurtosis(y), y[0], y[-1], y[-1] - y[0],
        np.trapezoid(y), len(find_peaks(y)[0])
    ]


def cluster(splines: dict, max_k: int = 15) -> tuple:
    '''
    Clusters the spline curves using KMeans and determines the optimal number of clusters using silhouette score.
    Parameters:
        splines (dict): Dictionary of spline curves.
        max_k (int): Maximum number of clusters to test.
    Returns:
        tuple: A tuple containing the final cluster labels and the fitted scaler.
    '''
    event_ids = list(splines.keys())

    feature_list = [extract_curve_features(vec) for vec in splines.values()]

    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(feature_list)

    best_score = -1.0
    optimal_k = 2

    for k in range(2, max_k + 1):
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10).fit(features_scaled)
        score = silhouette_score(features_scaled, kmeans.labels_)
        if score > best_score:
            best_score = score
            optimal_k = k

    final_kmeans = KMeans(n_clusters=optimal_k, random_state=42, n_init=10).fit(features_scaled)
    final_label_map = dict(zip(event_ids, final_kmeans.labels_))

    return final_label_map, scaler

scaler_path = os.path.join(config.output_dir, 'scaler.joblib')
cluster_labels, scaler = cluster(splines)
joblib.dump(scaler, scaler_path)
print(f'Scaler saved to {scaler_path}')

In [None]:
def train_tune(X_train, y_train, X_val, y_val, n_trials=50) -> xgb.XGBClassifier:
    '''
    Trains and tunes an XGBoost classifier using Optuna for hyperparameter optimisation.
    Parameters:
        X_train (pd.DataFrame): Training feature set.
        y_train (pd.Series): Training target labels.
        X_val (pd.DataFrame): Validation feature set.
        y_val (pd.Series): Validation target labels.
        n_trials (int): Number of Optuna trials for hyperparameter optimization.
    Returns:
        xgb.XGBClassifier: The trained XGBoost classifier with the best hyperparameters.
    '''

    weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    sample_weights = y_train.map(dict(enumerate(weights))).values

    def objective(trial) -> float:
        '''
        An objective function for Optuna to optimize XGBoost hyperparameters.
        Parameters:
            trial (optuna.trial.Trial): An Optuna trial object.
        Returns:
            float: The accuracy score on the validation set.
        '''
        params = {
            'objective': 'multi:softprob',
            'eval_metric': 'mlogloss',
            'random_state': 42,
            'num_class': len(np.unique(y_train)),
            'n_estimators': trial.suggest_int('n_estimators', 100, 1000),
            'learning_rate': trial.suggest_float('learning_rate', 0.001, 0.3, log=True),
            'max_depth': trial.suggest_int('max_depth', 4, 12),
            'subsample': trial.suggest_float('subsample', 0.6, 1.0),
            'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
            'gamma': trial.suggest_float('gamma', 0, 5),
            'lambda': trial.suggest_float('lambda', 1e-8, 1.0, log=True),
            'alpha': trial.suggest_float('alpha', 1e-8, 1.0, log=True)
        }


        model = xgb.XGBClassifier(**params, use_label_encoder=False, early_stopping_rounds=15)
        model.fit(X_train, y_train, sample_weight=sample_weights, eval_set=[(X_val, y_val)], verbose=False)

        return accuracy_score(y_val, model.predict(X_val))

    study = optuna.create_study(direction='maximize', study_name='Clutch Clustering')
    study.optimize(objective, n_trials=n_trials)

    print(f'Best hyperparameters: {study.best_params}')

    final_model = xgb.XGBClassifier(**study.best_params).fit(X_train, y_train, sample_weight=sample_weights)

    return final_model

event_params['cluster_label'] = event_params.index.map(cluster_labels)
event_params.dropna(inplace=True)
event_params['cluster_label'] = event_params['cluster_label'].astype(int)

initial_condition_features = [col for col in event_params.columns if col.endswith('_first')]
X = event_params[initial_condition_features]
y = event_params['cluster_label']

X.columns = [col.replace('_first', '') for col in X.columns]

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

final_model = train_tune(X_train, y_train, X_val, y_val, n_trials=config.n_trials)
model_save_path = os.path.join(output_dir,config.model_file)
joblib.dump(final_model, model_save_path)
print(f'Model saved to {model_save_path}')

In [None]:

def estimate_driveline_jerk(event_df: pd.DataFrame) -> float:
    """
    Estimates the total driveline jerk for a single engagement event based on
    the rate of change of transmitted torque. This is a more physically
    accurate measure of shift quality.
    Parameters:
        event_df: The full, un-aggregated DataFrame for a single event. 
                  It must contain 'EngTrq', 'ClutchCval', and 'EventTime'.
    Returns:
        float: The total integrated jerk for the event.
    """
    event_df = event_df.sort_values(by='EventTime')
    transmitted_torque = event_df['EngTrq'].values * (1 - event_df['ClutchCval'].values / 100.0)
    torque_derivative = np.gradient(transmitted_torque, event_df['EventTime'].values)
    total_jerk = np.sum(np.abs(torque_derivative))
    
    return total_jerk

def estimate_wear_from_event(event_df: pd.DataFrame) -> float:
    '''
    Estimates the total clutch wear for a single engagement event based on
    the slip power during the event.
    Parameters:
        event_df (pd.DataFrame): The full, un-aggregated DataFrame for a single event.
    Returns:
        float: The total estimated wear energy for the event in Joules.
    '''

    engine_speed = event_df['EngSpd'] * (np.pi / 30)
    input_shaft_speed = event_df['InShaftSpd'] * (np.pi / 30)
    slip_speed = np.abs(engine_speed - input_shaft_speed)

    power_loss = event_df['EngTrq'] * slip_speed
    total_energy = np.trapezoid(power_loss, x=event_df['EventTime'])

    return total_energy

event_params.dropna(inplace=True)
predictions = final_model.predict(event_params)
event_params['predicted_cluster'] = predictions

costs = []
for event_id in event_params.index:
    if event_id not in splines: continue
    curve = splines[event_id]

    event_df = df_split[df_split['EngagementEvents'] == event_id]
    wear_cost = estimate_wear_from_event(event_df)
    jerk_cost = estimate_driveline_jerk(event_df)
    costs.append({'event_id': event_id, 'jerk_cost': jerk_cost, 'wear_cost': wear_cost})
cost_df = pd.DataFrame(costs).set_index('event_id')

cost_df['jerk_norm'] = (cost_df['jerk_cost'] - cost_df['jerk_cost'].min()) / (cost_df['jerk_cost'].max() - cost_df['jerk_cost'].min())
cost_df['wear_norm'] = (cost_df['wear_cost'] - cost_df['wear_cost'].min()) / (cost_df['wear_cost'].max() - cost_df['wear_cost'].min())
cost_df['total_cost'] = (config.jerk_weight * cost_df['jerk_norm']) + (config.wear_weight * cost_df['wear_norm'])
event_params = event_params.join(cost_df)

median_curves = {}
ideal_curves = {}
cluster_statistics = {}

for cluster_id in sorted(event_params['predicted_cluster'].unique()):
    cluster_events = event_params[event_params['predicted_cluster'] == cluster_id]
    if len(cluster_events) < 1: continue
    cluster_vectors = [splines[i] for i in cluster_events.index if i in splines]
    if not cluster_vectors: continue
    median_curves[cluster_id] = np.median(cluster_vectors, axis=0)

    best_event_id = cluster_events['total_cost'].idxmin()
    ideal_curves[cluster_id] = splines[best_event_id]


    cluster_statistics[cluster_id] = {
        'member_ids': list(cluster_events.index),
        'mean_jerk': cluster_events['jerk_cost'].mean(),
        'mean_wear': cluster_events['wear_cost'].mean(),
        'min_cost_event_id': int(best_event_id)
    }


joblib.dump(median_curves, os.path.join(config.output_dir,config.med_curve_file))
joblib.dump(ideal_curves, os.path.join(config.output_dir,config.idl_curve_file))
joblib.dump(cluster_statistics, os.path.join(config.output_dir,config.curve_stat_file))

plt.style.use(config.plot_style)

plt.figure(figsize=(16, 9))
num_clusters = len(median_curves)
colors = plt.cm.viridis(np.linspace(0, 1, num_clusters))
for i, cluster_id in enumerate(median_curves.keys()):
    plt.plot(median_curves[cluster_id], color=colors[i], linestyle='--', label=f'Cluster {cluster_id} Median')
    plt.plot(ideal_curves[cluster_id], color=colors[i], label=f'Cluster {cluster_id} Ideal')
plt.title('Median (Typical) vs. Ideal (Low-Cost) Engagement Curves per Predicted Profile')
plt.xlabel('Normalized Time Points')
plt.ylabel('ClutchCval')
plt.legend(bbox_to_anchor=(1.02, 1), loc='upper left', borderaxespad=0.)
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
#real-time controller simulation with rate limiting (accounts for actuator latency)


df = pd.read_csv(r'data\man_raw\gear_1_manual_params.csv')

df.rename(columns={
    'Clutch housing Temp': 'tmpCltActTC',
    'EngTrq_Cval_PT': 'EngTrq',
    'EngSpd_Cval_PT': 'EngSpd',
    'VehSpd_Cval_PT': 'Calc_VehSpd'
}, inplace=True)

if 'CurrGr' not in df.columns:
    df['CurrGr'] = 1

simulated_clutch_pos = []
in_modulation_zone = False
predicted_cluster_for_event = None
last_clutch_pos = config.engaged
last_time = df['Time'].iloc[0]

for idx, row in df.iterrows():
    # --- State & Time Update ---
    pedal_pos = row['Clutch_Pedal_Travel']
    current_time = row['Time']
    dt = current_time - last_time
    target_clutch_pos = config.engaged # Default command

    # --- Event Detection Logic ---
    if config.bite_point < pedal_pos < config.release_point and not in_modulation_zone:
        in_modulation_zone = True
        
        initial_conditions = {
            'EngTrq': row['EngTrq'],
            'EngSpd': row['EngSpd'],
            'tmpCltActTC': row['tmpCltActTC'],
            'CurrGr': row['CurrGr'],
            'Calc_VehSpd': row['Calc_VehSpd']
        }
        
        feature_df = pd.DataFrame([initial_conditions], columns=final_model.feature_names_in_)
        predicted_cluster_for_event = final_model.predict(feature_df)[0]

    if pedal_pos <= config.bite_point:
        target_clutch_pos = config.engaged
        in_modulation_zone = False
    elif pedal_pos >= config.release_point:
        target_clutch_pos = config.disengaged
        in_modulation_zone = False
    elif in_modulation_zone and predicted_cluster_for_event is not None:

        reference_curve = ideal_curves.get(predicted_cluster_for_event, median_curves.get(config.fallback))
        
        region_span = config.release_point - config.bite_point
        mod_progress = (pedal_pos - config.bite_point) / region_span
        
        curve_indices = np.linspace(0, 1, len(reference_curve))
        target_clutch_pos = np.interp(mod_progress, curve_indices, reference_curve)

    max_change = config.max_clutch_speed * dt

    if target_clutch_pos > last_clutch_pos:
        current_clutch_pos = min(target_clutch_pos, last_clutch_pos + max_change)
    else:
        current_clutch_pos = max(target_clutch_pos, last_clutch_pos - max_change)

    simulated_clutch_pos.append(current_clutch_pos)

    last_clutch_pos = current_clutch_pos
    last_time = current_time

plt.figure(figsize=(14, 7))
plt.plot(df['Time'], simulated_clutch_pos, label='Simulated Controller Output', linewidth=2.5, color='royalblue')
plt.plot(df['Time'], df['Clutch_Pedal_Travel'], '--', label='Driver Pedal Input', alpha=0.7, color='gray')
plt.xlabel('Time (s)')
plt.ylabel('Clutch Position (%)')
plt.title('Simulated Clutch Controller with Rate Limiting')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
