In this notebook, the trained models are evaluated using the test set.

In [1]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import Sequence
from sklearn.metrics import f1_score
from tensorflow.keras.layers import Layer, Dense
import os
import matplotlib.pyplot as plt
import seaborn as sns
import joblib
from sklearn.metrics import confusion_matrix

# DataFrame display settings
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.width', 500)

In [2]:
def custom_cost_metric(y_true, y_pred):
    # Convert predictions to probabilities
    y_pred = tf.nn.softmax(y_pred)
    
    # Get the predicted class
    pred_class = tf.argmax(y_pred, axis=1)
    true_class = tf.argmax(y_true, axis=1)
    
    # Define cost matrix
    cost_matrix = tf.constant([
        [0, 7, 8, 9, 10],
        [200, 0, 7, 8, 9],
        [300, 200, 0, 7, 8],
        [400, 300, 200, 0, 7],
        [500, 400, 300, 200, 0]
    ], dtype=tf.float32)
    
    # Calculate cost
    costs = tf.gather_nd(cost_matrix, 
                        tf.stack([true_class, pred_class], axis=1))
    return tf.reduce_mean(costs)

In [3]:
class DynamicPaddingGenerator(Sequence):
    def __init__(self, groups, batch_size, label_encoder, scaler, is_training=True, **kwargs):
        super().__init__(**kwargs)
        self.groups = list(groups)
        self.batch_size = batch_size
        self.is_training = is_training
        self.label_encoder = label_encoder
        self.scaler = scaler  # Use pretrained scaler
        self.n_samples = len(self.groups)
        self.indexes = np.arange(self.n_samples)
        
    def __len__(self):
        return int(np.ceil(self.n_samples / self.batch_size))
    
    def on_epoch_end(self):
        if self.is_training:
            np.random.shuffle(self.indexes)
    
    def __getitem__(self, idx):
        batch_indexes = self.indexes[idx * self.batch_size:(idx + 1) * self.batch_size]
        batch_groups = [self.groups[i] for i in batch_indexes]
        max_length = max(len(group[1]) for group in batch_groups)
        
        X_batch = []
        y_batch = []
        
        for vehicle_id, group in batch_groups:
            if self.is_training:
                time_series = group.sort_values('time_step').iloc[:, 2:-3].values
                label = group['class_labels'].iloc[0]
            else:
                time_series = group.sort_values('time_step').iloc[:, 2:].values
                label = test_labels.loc[test_labels['vehicle_id'] == vehicle_id, 'class_label'].values[0]
            
            time_series = self.scaler.transform(time_series)
            padded_series = pad_sequences([time_series], maxlen=max_length, padding='post', dtype='float32')[0]
            
            X_batch.append(padded_series)
            encoded_label = tf.keras.utils.to_categorical(
                self.label_encoder.transform([label])[0], 
                num_classes=5
            )
            y_batch.append(encoded_label)
        
        return np.array(X_batch), np.array(y_batch)

In [4]:
def plot_confusion_matrix(true_labels, pred_labels, label_encoder, model_name):
    sns.heatmap(
        confusion_matrix(true_labels, pred_labels),
        annot=True, 
        fmt='d', 
        cmap='Blues',
        xticklabels=label_encoder.classes_,
        yticklabels=label_encoder.classes_
    )
    plt.title(f'Confusion Matrix - {model_name}')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.tight_layout()

In [5]:
def calculate_cost(true_labels, predicted_labels):
    cost_matrix = np.array([
        [0, 7, 8, 9, 10],
        [200, 0, 7, 8, 9],
        [300, 200, 0, 7, 8],
        [400, 300, 200, 0, 7],
        [500, 400, 300, 200, 0]
    ])
    total_cost = 0
    for true, pred in zip(true_labels, predicted_labels):
        total_cost += cost_matrix[true, pred]
    return total_cost

In [6]:
# Load test data
test_operational = pd.read_csv('selected_and_filled_test_operational_readouts.csv')
test_labels = pd.read_csv('test_labels.csv')

model_dir = "trained_model_bilstm_attention_deneme"

# Load label encoder and scaler
le = joblib.load(os.path.join(model_dir, 'label_encoder.joblib'))
scaler = joblib.load(os.path.join(model_dir, 'standard_scaler.joblib'))

In [7]:
# Create test generator with the loaded scaler
batch_size = 64
test_generator = DynamicPaddingGenerator(
    test_operational.groupby('vehicle_id'),
    batch_size,
    le,
    scaler,  
    is_training=False
)

# Initialize results list
results = []

# Evaluate all models in directory
model_files = [f for f in os.listdir(model_dir) if f.endswith('.keras')]

# Create directory for confusion matrix plots
plot_dir = os.path.join(model_dir, 'evaluation_plots')
os.makedirs(plot_dir, exist_ok=True)

In [8]:
# Self-Attention Layer for model loading
class SelfAttention(Layer):
    def __init__(self, attention_units=128, return_attention=False, **kwargs):
        self.attention_units = attention_units
        self.return_attention = return_attention
        super(SelfAttention, self).__init__(**kwargs)
        
    def build(self, input_shape):
        self.time_steps = input_shape[1]
        self.input_dim = input_shape[2]
        
        self.query_dense = Dense(self.attention_units)
        self.key_dense = Dense(self.attention_units)
        self.value_dense = Dense(self.input_dim)  
        
        self.context_dense = Dense(self.input_dim)
        
        super(SelfAttention, self).build(input_shape)
    
    def call(self, inputs):
        query = self.query_dense(inputs)  
        key = self.key_dense(inputs)      
        value = self.value_dense(inputs)  
        
        score = tf.matmul(query, key, transpose_b=True)  
        score = score / tf.math.sqrt(tf.cast(self.attention_units, tf.float32))
        
        attention_weights = tf.nn.softmax(score, axis=-1)  
        
        context = tf.matmul(attention_weights, value) 
        
        output = self.context_dense(context)  
        
        if self.return_attention:
            return output, attention_weights
        return output
    
    def compute_output_shape(self, input_shape):
        if self.return_attention:
            return [(input_shape[0], input_shape[1], self.input_dim), 
                    (input_shape[0], input_shape[1], input_shape[1])]
        return (input_shape[0], input_shape[1], self.input_dim)
    
    def get_config(self):
        config = super(SelfAttention, self).get_config()
        config.update({
            'attention_units': self.attention_units,
            'return_attention': self.return_attention
        })
        return config

In [9]:
for model_file in model_files:
    try:
        print(f"Evaluating {model_file}...")
        model_path = os.path.join(model_dir, model_file)
        model = load_model(model_path, custom_objects={'SelfAttention': SelfAttention(), 'custom_cost_metric': custom_cost_metric})
        
        all_predictions = []
        all_true_labels = []
        
        for i in range(len(test_generator)):
            X_test, y_test = test_generator[i]
            batch_predictions = model.predict(X_test, verbose=0)
            batch_predictions = tf.nn.softmax(batch_predictions)  # Apply softmax to logits
            all_predictions.extend(np.argmax(batch_predictions, axis=1))
            all_true_labels.extend(np.argmax(y_test, axis=1))
        
        all_predictions = np.array(all_predictions)
        all_true_labels = np.array(all_true_labels)
        
        accuracy = np.mean(all_predictions == all_true_labels)
        total_cost = calculate_cost(all_true_labels, all_predictions)
        
        plt.figure(figsize=(10, 8))
        plot_confusion_matrix(all_true_labels, all_predictions, le, model_file)
        plt.savefig(os.path.join(plot_dir, f'confusion_matrix_{model_file}.png'))
        plt.close() 
        
        results.append({
            'model_name': model_file,
            'accuracy': accuracy,
            'total_cost': total_cost
        })
        
    except Exception as e:
        print(f"Error evaluating {model_file}: {str(e)}")

Evaluating bilstm_attention_model_epoch_01.keras...
Evaluating bilstm_attention_model_epoch_02.keras...
Evaluating bilstm_attention_model_epoch_03.keras...
Evaluating bilstm_attention_model_epoch_04.keras...
Evaluating bilstm_attention_model_epoch_05.keras...
Evaluating bilstm_attention_model_epoch_06.keras...
Evaluating bilstm_attention_model_epoch_07.keras...
Evaluating bilstm_attention_model_epoch_08.keras...
Evaluating bilstm_attention_model_epoch_09.keras...
Evaluating bilstm_attention_model_epoch_10.keras...
Evaluating bilstm_attention_model_epoch_11.keras...
Evaluating bilstm_attention_model_epoch_12.keras...
Evaluating bilstm_attention_model_epoch_13.keras...
Evaluating bilstm_attention_model_epoch_14.keras...
Evaluating bilstm_attention_model_epoch_15.keras...
Evaluating bilstm_attention_model_epoch_16.keras...
Evaluating bilstm_attention_model_epoch_17.keras...
Evaluating bilstm_attention_model_epoch_18.keras...
Evaluating bilstm_attention_model_epoch_19.keras...
Evaluating b

In [10]:
# Create results DataFrame
results_df = pd.DataFrame(results)
results_df['epoch'] = results_df['model_name'].str.extract(r'epoch_(\d+)').astype(float)
results_df = results_df.sort_values('epoch')

print("\nModel Evaluation Results:")
print(results_df)


Model Evaluation Results:
                               model_name  accuracy  total_cost  epoch
0   bilstm_attention_model_epoch_01.keras  0.013479       62785    1.0
1   bilstm_attention_model_epoch_02.keras  0.224975       46137    2.0
2   bilstm_attention_model_epoch_03.keras  0.268583       45552    3.0
3   bilstm_attention_model_epoch_04.keras  0.314371       45332    4.0
4   bilstm_attention_model_epoch_05.keras  0.265213       46120    5.0
5   bilstm_attention_model_epoch_06.keras  0.408722       42951    6.0
6   bilstm_attention_model_epoch_07.keras  0.234291       44592    7.0
7   bilstm_attention_model_epoch_08.keras  0.287611       46807    8.0
8   bilstm_attention_model_epoch_09.keras  0.188107       46021    9.0
9   bilstm_attention_model_epoch_10.keras  0.260258       44760   10.0
10  bilstm_attention_model_epoch_11.keras  0.181368       44295   11.0
11  bilstm_attention_model_epoch_12.keras  0.293558       45993   12.0
12  bilstm_attention_model_epoch_13.keras  0.18830

In [11]:
print("\nBest Models by Accuracy:")
print(results_df.nlargest(5, 'accuracy')[['model_name', 'accuracy', 'total_cost']])


Best Models by Accuracy:
                               model_name  accuracy  total_cost
5   bilstm_attention_model_epoch_06.keras  0.408722       42951
20  bilstm_attention_model_epoch_57.keras  0.404163       47126
16  bilstm_attention_model_epoch_17.keras  0.329039       45167
18  bilstm_attention_model_epoch_19.keras  0.323092       44941
3   bilstm_attention_model_epoch_04.keras  0.314371       45332


In [12]:
print("\nBest Models by Total Cost (lowest):")
print(results_df.nsmallest(5, 'total_cost')[['model_name', 'accuracy', 'total_cost']])


Best Models by Total Cost (lowest):
                               model_name  accuracy  total_cost
5   bilstm_attention_model_epoch_06.keras  0.408722       42951
12  bilstm_attention_model_epoch_13.keras  0.188305       43900
13  bilstm_attention_model_epoch_14.keras  0.280476       44288
10  bilstm_attention_model_epoch_11.keras  0.181368       44295
6   bilstm_attention_model_epoch_07.keras  0.234291       44592


In [13]:
# Plot metrics
plt.figure(figsize=(15, 10))

plt.subplot(2, 1, 1)
plt.plot(results_df['epoch'], results_df['accuracy'])
plt.title('Accuracy vs Epoch')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.grid(True)

plt.subplot(2, 1, 2)
plt.plot(results_df['epoch'], results_df['total_cost'])
plt.title('Total Cost vs Epoch')
plt.xlabel('Epoch')
plt.ylabel('Total Cost')
plt.grid(True)

plt.tight_layout()
plt.savefig(os.path.join(plot_dir, 'metrics_over_epochs.png'))
plt.close()  

In [14]:
# Save results
results_df.to_csv(os.path.join(model_dir, 'model_evaluation_results.csv'), index=False)