In [4]:
from bounds import bounds
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import ast

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Masking, LSTM, Dense, Dropout
from tensorflow.keras.preprocessing.sequence import pad_sequences

#%% Data Loading and Preprocessing
file_name = "DataOn2025Jan08.xlsx"
df1 = pd.read_excel(file_name, sheet_name="NES170K07Line2")

t5_lb = bounds["170K"][0]
t5_ub = bounds["170K"][1]

df2 = pd.read_excel(file_name, sheet_name="NES170K07Line1")
df = pd.concat([df1, df2], ignore_index=True)
print("Data shape:", df.shape)

def safe_literal_eval(value):
    """Handle NaN values before using ast.literal_eval"""
    if isinstance(value, str):
        value = value.replace("nan", "None")  # Replace 'nan' with None
    try:
        return ast.literal_eval(value)
    except (ValueError, SyntaxError):
        return None  # Return None if it cannot be evaluated

def organized_data(df, t5_lb, t5_ub):
    data = {}
    for index, row in df.iterrows():
        if pd.isna(row['t5']):  # Skip if t5 is NaN
            continue

        batch_number = row["batch_number"]
        data[batch_number] = {
            "MDR": None,
            "t5": row["t5"],
            "class": None
        }
        
        t_S1 = safe_literal_eval(row["MDRTorqueS1"])
        t_S2 = safe_literal_eval(row["MDRTorqueS2"])

        if t_S1 is not None and t_S2 is not None:
            t_vals, S1 = zip(*t_S1)
            t_vals, S2 = zip(*t_S2)
            t_vals, S1, S2 = list(t_vals), list(S1), list(S2)
            MDR = pd.DataFrame({
                "time": t_vals,
                "S1": S1,
                "S2": S2
            })
            
            # Interpolate and fill missing values
            MDR.interpolate(method="linear", inplace=True, limit_direction="both")
            MDR.fillna(method="bfill", inplace=True)
            MDR.fillna(method="ffill", inplace=True)
            
            data[batch_number]["MDR"] = MDR
        else:
            # Skip entry if sequences are missing
            continue

        # Assign classification label based on t5 bounds
        if data[batch_number]["t5"] < t5_lb:
            data[batch_number]["class"] = "low"
        elif data[batch_number]["t5"] > t5_ub:
            data[batch_number]["class"] = "high"
        else:
            data[batch_number]["class"] = "normal"
    
    # Remove entries with empty MDR data
    data = {k: v for k, v in data.items() if v["MDR"] is not None and not v["MDR"].empty}
    return data

data = organized_data(df, t5_lb, t5_ub)

def iPlotCooperStandard(data, ID):
    t = data[ID]["MDR"]["time"]
    S1 = data[ID]["MDR"]["S1"]
    S2 = data[ID]["MDR"]["S2"]

    plt.figure(figsize=(8, 5))
    plt.plot(t, S1, color='blue', label=r'$S_1$')
    plt.plot(t, S2, color='red', label=r'$S_2$')
    plt.xlabel('time')
    plt.legend()
    plt.gca().spines['top'].set_visible(False)
    plt.gca().spines['right'].set_visible(False)
    plt.show()

# Example plot for a specific batch ID
ID = list(data.keys())[0]
iPlotCooperStandard(data, ID)

#%% Filtering Data Based on Sequence Length
lens = [v["MDR"].shape[0] for v in data.values()]
max_len = max(lens)
plt.scatter(np.arange(len(lens)), lens)
plt.xlabel("Sample Index")
plt.ylabel("Sequence Length")
plt.show()

def len_condition(data, len_threshold):
    data = {k: v for k, v in data.items() if v["MDR"].shape[0] >= len_threshold}
    return data

data = len_condition(data, 290)

# Print counts for each class
print(f'# low: {len([k for k, v in data.items() if v["class"]=="low"])}')
print(f'# high: {len([k for k, v in data.items() if v["class"]=="high"])}')
print(f'# normal: {len([k for k, v in data.items() if v["class"]=="normal"])}')

#%% Prepare Sequences and Targets for Classification
def prepare_sequences(data_dict, max_len=max_len):
    """
    Process data dictionary into padded sequences and classification targets.
    Returns:
        X: Padded and normalized sequences (n_samples, max_len, num_features)
        y: Array of class labels (as integers)
        scalers: List of fitted StandardScalers for each feature
    """
    sequences = []
    targets = []
    
    for batch_id in data_dict:
        df_seq = data_dict[batch_id]["MDR"]
        # Using features S1 and S2 only (exclude time)
        seq = df_seq[['S1', 'S2']].values.astype('float32')
        sequences.append(seq)
        targets.append(data_dict[batch_id]["class"])  # class label as string
    
    # Filter out empty sequences if any
    non_empty = [s for s in sequences if len(s) > 0]
    filtered_targets = [t for s, t in zip(sequences, targets) if len(s) > 0]
    
    # Map string labels to integer codes
    class_map = {"low": 0, "normal": 1, "high": 2}
    filtered_targets = [class_map[t] for t in filtered_targets]
    
    # Pad sequences
    padded_sequences = pad_sequences(
        non_empty,
        maxlen=max_len,
        dtype='float32',
        padding='post',
        truncating='post'
    )
    
    # Normalize features
    scalers = []
    normalized = []
    for feature_idx in range(padded_sequences.shape[2]):
        feature_data = padded_sequences[:, :, feature_idx].reshape(-1, 1)
        scaler = StandardScaler().fit(feature_data)
        scalers.append(scaler)
        normalized_feature = scaler.transform(feature_data).reshape(
            padded_sequences.shape[0], padded_sequences.shape[1], 1)
        normalized.append(normalized_feature)
    
    X = np.concatenate(normalized, axis=2)
    y = np.array(filtered_targets)
    return X, y, scalers

X, y, scalers = prepare_sequences(data)

# Split data for training and testing
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

#%% Define the LSTM Model for Classification
def create_lstm_model(input_shape):
    model = Sequential([
        Masking(mask_value=0., input_shape=input_shape),
        LSTM(32, return_sequences=True),
        Dropout(0.3),
        LSTM(16),
        Dropout(0.2),
        Dense(16, activation='relu'),
        Dense(3, activation='softmax')  # 3 output classes: low, normal, high
    ])
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',  # using integer labels
        metrics=['accuracy']
    )
    return model

model = create_lstm_model((X_train.shape[1], X_train.shape[2]))
model.summary()

from sklearn.utils import class_weight
import numpy as np

# Compute class weights based on the training labels
class_weights = class_weight.compute_class_weight(
    class_weight='balanced',
    classes=np.unique(y_train),
    y=y_train
)
class_weight_dict = {i: weight for i, weight in enumerate(class_weights)}
print(class_weight_dict)

# Then pass the computed class_weight_dict to model.fit:
history = model.fit(
    X_train, y_train,
    epochs=10,
    validation_split=0.2,
    class_weight=class_weight_dict,
    verbose=1
)

#%% Evaluate the Model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"Test Accuracy: {test_accuracy:.4f}")

#%% Compute and Display the Confusion Matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Get predictions (as probabilities) and convert to class labels
y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)

# Create confusion matrix
cm = confusion_matrix(y_test, y_pred)
print("Confusion Matrix:\n", cm)

# Define class labels for display (ensure order matches the mapping: 0: low, 1: normal, 2: high)
labels = ["low", "normal", "high"]

plt.figure(figsize=(6, 6))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")
plt.show()