In [1]:
# %% Import necessary libraries
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
from scipy import stats
import tensorflow as tf  # Only using TensorFlow 2.x
import seaborn as sns
from pylab import rcParams
from sklearn import metrics
from sklearn.model_selection import train_test_split
%matplotlib inline
sns.set(style='whitegrid', palette='muted', font_scale=1.5)
rcParams['figure.figsize'] = 14, 8
RANDOM_SEED = 42

# Ensure eager execution is enabled
print("Eager execution:", tf.executing_eagerly())

X_train = pd.read_csv("./x_train.csv").values
X_test = pd.read_csv("./x_test.csv").values
y_train = pd.read_csv("./y_train.csv").values
y_test = pd.read_csv("./y_test.csv").values
# %% Print the shape of training and testing data
print(f"{X_train.shape}, {X_test.shape}, {y_train.shape}, {y_test.shape}")

# %% Define the number of hidden units, time steps, and features
N_HIDDEN_UNITS = 64
N_TIME_STEPS = 200
N_FEATURES = 3



# %% Reshape the training and testing data to match the LSTM input shape
X_train = X_train.reshape(len(X_train), N_TIME_STEPS, N_FEATURES)
X_test = X_test.reshape(len(X_test), N_TIME_STEPS, N_FEATURES)

# %% Fill missing values and encode labels
from sklearn.preprocessing import LabelEncoder

label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train.ravel())
y_test_encoded = label_encoder.transform(y_test.ravel())

# Update N_CLASSES
N_CLASSES = len(label_encoder.classes_)

# Ensure labels are in integer type
y_train_encoded = y_train_encoded.astype('int32')
y_test_encoded = y_test_encoded.astype('int32')


2024-12-12 17:02:14.755150: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-12-12 17:02:14.771336: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-12-12 17:02:14.790918: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-12-12 17:02:14.796746: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-12-12 17:02:14.811655: I tensorflow/core/platform/cpu_feature_guar

Eager execution: True
(451162, 600), (112790, 600), (451162, 1), (112790, 1)


In [2]:
# %% Import necessary libraries for decision tree and random forest
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt

# %% Reshape the training and testing data for traditional machine learning
# Reshape X_train and X_test to 2D arrays: (samples, time_steps * N_FEATURES)
X_train_flattened = X_train.reshape(len(X_train), -1)
X_test_flattened = X_test.reshape(len(X_test), -1)


In [None]:
# %% Import necessary libraries for decision tree and random forest
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt

# %% Reshape the training and testing data for traditional machine learning
# Reshape X_train and X_test to 2D arrays: (samples, time_steps * N_FEATURES)
X_train_flattened = X_train.reshape(len(X_train), -1)
X_test_flattened = X_test.reshape(len(X_test), -1)

# %% Train a Decision Tree Classifier
decision_tree_model = DecisionTreeClassifier(random_state=RANDOM_SEED)
decision_tree_model.fit(X_train_flattened, y_train_encoded)

# %% Make predictions with Decision Tree model
y_pred_dt = decision_tree_model.predict(X_test_flattened)

# %% Evaluate Decision Tree model
accuracy_dt = accuracy_score(y_test_encoded, y_pred_dt)
print("Decision Tree Classifier Accuracy:", accuracy_dt)
print(classification_report(y_test_encoded, y_pred_dt, target_names=label_encoder.classes_))
conf_matrix_dt = confusion_matrix(y_test_encoded, y_pred_dt)
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_dt, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title("Confusion Matrix - Decision Tree")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()



In [None]:
import joblib

# 保存决策树模型
joblib.dump(decision_tree_model, 'decision_tree_model.pkl')




In [None]:
import joblib

# 保存决策树模型
joblib.dump(decision_tree_model, 'decision_tree_model.pkl')

import joblib
from sklearn.metrics import accuracy_score

# 加载模型
loaded_dt_model = joblib.load('decision_tree_model.pkl')

# 检查模型结构（可选）
print(loaded_dt_model)

# 验证加载是否成功：使用测试集进行预测
try:
    y_pred_loaded_dt = loaded_dt_model.predict(X_test_flattened)  # 使用测试集
    print("Model loaded and predictions made successfully.")
except Exception as e:
    print("Error during model loading or prediction:", e)

# 如果有真实标签 y_test_encoded，可以计算准确率验证模型效果
accuracy_loaded_model = accuracy_score(y_test_encoded, y_pred_loaded_dt)
print(f"Loaded Model Accuracy: {accuracy_loaded_model:.4f}")


In [None]:

# %% Train a Random Forest Classifier
random_forest_model = RandomForestClassifier(n_estimators=100, random_state=RANDOM_SEED)
random_forest_model.fit(X_train_flattened, y_train_encoded)

# %% Make predictions with Random Forest model
y_pred_rf = random_forest_model.predict(X_test_flattened)

# %% Evaluate Random Forest model
accuracy_rf = accuracy_score(y_test_encoded, y_pred_rf)
print("Random Forest Classifier Accuracy:", accuracy_rf)
print(classification_report(y_test_encoded, y_pred_rf, target_names=label_encoder.classes_))
conf_matrix_rf = confusion_matrix(y_test_encoded, y_pred_rf)
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_rf, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title("Confusion Matrix - Random Forest")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()


In [None]:
import joblib

# 保存模型
joblib.dump(decision_tree_model, 'random_forest_model.pkl')

import joblib
from sklearn.metrics import accuracy_score

# 加载模型
loaded_dt_model = joblib.load('random_forest_model.pkl')

# 检查模型结构（可选）
print(loaded_dt_model)

# 验证加载是否成功：使用测试集进行预测
try:
    y_pred_loaded_dt = loaded_dt_model.predict(X_test_flattened)  # 使用测试集
    print("Model loaded and predictions made successfully.")
except Exception as e:
    print("Error during model loading or prediction:", e)

# 如果有真实标签 y_test_encoded，可以计算准确率验证模型效果
accuracy_loaded_model = accuracy_score(y_test_encoded, y_pred_loaded_dt)
print(f"Loaded Model Accuracy: {accuracy_loaded_model:.4f}")


In [None]:
import xgboost as xgb
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# %% Train an XGBoost Classifier
xgboost_model = xgb.XGBClassifier(n_estimators=100, random_state=RANDOM_SEED, use_label_encoder=False, eval_metric='logloss')
xgboost_model.fit(X_train_flattened, y_train_encoded)

# %% Make predictions with XGBoost model
y_pred_xgb = xgboost_model.predict(X_test_flattened)

# %% Evaluate XGBoost model
accuracy_xgb = accuracy_score(y_test_encoded, y_pred_xgb)
print("XGBoost Classifier Accuracy:", accuracy_xgb)
print(classification_report(y_test_encoded, y_pred_xgb, target_names=label_encoder.classes_))

# Confusion Matrix
conf_matrix_xgb = confusion_matrix(y_test_encoded, y_pred_xgb)
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_xgb, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title("Confusion Matrix - XGBoost")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()


In [None]:
import joblib

# 保存模型
joblib.dump(decision_tree_model, 'xgboost_model.pkl')

import joblib
from sklearn.metrics import accuracy_score

# 加载模型
loaded_dt_model = joblib.load('xgboost_model.pkl')

# 检查模型结构（可选）
print(loaded_dt_model)

# 验证加载是否成功：使用测试集进行预测
try:
    y_pred_loaded_dt = loaded_dt_model.predict(X_test_flattened)  # 使用测试集
    print("Model loaded and predictions made successfully.")
except Exception as e:
    print("Error during model loading or prediction:", e)

# 如果有真实标签 y_test_encoded，可以计算准确率验证模型效果
accuracy_loaded_model = accuracy_score(y_test_encoded, y_pred_loaded_dt)
print(f"Loaded Model Accuracy: {accuracy_loaded_model:.4f}")


In [None]:
import lightgbm as lgb
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# %% Train a LightGBM Classifier
lightgbm_model = lgb.LGBMClassifier(n_estimators=100, random_state=RANDOM_SEED)
lightgbm_model.fit(X_train_flattened, y_train_encoded)

# %% Make predictions with LightGBM model
y_pred_lgb = lightgbm_model.predict(X_test_flattened)

# %% Evaluate LightGBM model
accuracy_lgb = accuracy_score(y_test_encoded, y_pred_lgb)
print("LightGBM Classifier Accuracy:", accuracy_lgb)
print(classification_report(y_test_encoded, y_pred_lgb, target_names=label_encoder.classes_))

# Confusion Matrix
conf_matrix_lgb = confusion_matrix(y_test_encoded, y_pred_lgb)
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix_lgb, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title("Confusion Matrix - LightGBM")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.show()

In [None]:
import joblib

# 保存模型
joblib.dump(decision_tree_model, 'lightgbm_model.pkl')

import joblib
from sklearn.metrics import accuracy_score

# 加载模型
loaded_dt_model = joblib.load('lightgbm_model.pkl')

# 检查模型结构（可选）
print(loaded_dt_model)

# 验证加载是否成功：使用测试集进行预测
try:
    y_pred_loaded_dt = loaded_dt_model.predict(X_test_flattened)  # 使用测试集
    print("Model loaded and predictions made successfully.")
except Exception as e:
    print("Error during model loading or prediction:", e)

# 如果有真实标签 y_test_encoded，可以计算准确率验证模型效果
accuracy_loaded_model = accuracy_score(y_test_encoded, y_pred_loaded_dt)
print(f"Loaded Model Accuracy: {accuracy_loaded_model:.4f}")


In [None]:
#测试模型
# 假设已经训练好的 XGBoost 模型
from sklearn.preprocessing import LabelEncoder

# 准备测试数据（已加载 X_test_flattened 和 y_test_encoded）
label_encoder = LabelEncoder()
label_encoder.fit(y_train)  # 假设 y_train 是训练时的标签
y_test_encoded = label_encoder.transform(y_test)  # 确保测试标签被编码

X_train_flattened = X_train.reshape(len(X_train), -1)
X_test_flattened = X_test.reshape(len(X_test), -1)

# 调用函数进行评估
accuracy = evaluate_model(
    model=decision_tree_model,                # 已训练的 XGBoost 模型
    X_test=X_test_flattened,            # 测试集特征
    y_test=y_test,                      # 测试集标签（未编码）
    label_encoder=label_encoder,        # 标签编码器
    history=None                        # XGBoost 无训练历史，传 None
)

# 输出最终的测试集准确率
print(f"Final Test Accuracy with XGBoost: {accuracy:.4f}")



In [None]:


# %% Define the custom LSTM model class
class CustomLSTMModel(tf.keras.Model):
    def __init__(self, N_FEATURES, N_HIDDEN_UNITS, N_CLASSES, **kwargs):
        super(CustomLSTMModel, self).__init__(**kwargs)
        # Initialize weights and biases
        self.N_FEATURES = N_FEATURES
        self.N_HIDDEN_UNITS = N_HIDDEN_UNITS
        self.N_CLASSES = N_CLASSES
        
        self.W_hidden = tf.Variable(tf.random.normal([N_FEATURES, N_HIDDEN_UNITS]), trainable=True, name="W_hidden")
        self.b_hidden = tf.Variable(tf.random.normal([N_HIDDEN_UNITS], mean=1.0), trainable=True, name="b_hidden")
        
        self.W_output = tf.Variable(tf.random.normal([N_HIDDEN_UNITS, N_CLASSES]), trainable=True, name="W_output")
        self.b_output = tf.Variable(tf.random.normal([N_CLASSES]), trainable=True, name="b_output")
        
        # Define two LSTM layers
        self.lstm_layers = [tf.keras.layers.LSTMCell(N_HIDDEN_UNITS) for _ in range(2)]
        self.rnn = tf.keras.layers.RNN(self.lstm_layers, return_sequences=True, return_state=True)
    
    def call(self, inputs):
        batch_size = tf.shape(inputs)[0]
        time_steps = tf.shape(inputs)[1]
        features = tf.shape(inputs)[2]
        
        X = tf.transpose(inputs, [1, 0, 2])  # (time_steps, batch_size, features)
        X = tf.reshape(X, [-1, features])    # (time_steps * batch_size, features)
        
        # Compute hidden layer
        hidden = tf.nn.relu(tf.matmul(X, self.W_hidden) + self.b_hidden)  # (time_steps * batch_size, hidden_units)
        hidden = tf.reshape(hidden, [time_steps, batch_size, self.N_HIDDEN_UNITS])  # (time_steps, batch_size, hidden_units)
        hidden = tf.transpose(hidden, [1, 0, 2])  # (batch_size, time_steps, hidden_units)
        
        # Pass through LSTM layers
        outputs, *states = self.rnn(hidden)  # outputs: (batch_size, time_steps, hidden_units)
        
        # Get output from the last time step
        lstm_last_output = outputs[:, -1, :]  # (batch_size, hidden_units)
        
        # Compute the final output
        logits = tf.matmul(lstm_last_output, self.W_output) + self.b_output  # (batch_size, num_classes)
        
        return logits

    

   

    def get_config(self):
    # 不调用 super().get_config()，直接返回当前模型的配置即可
        config = {
            "N_FEATURES": self.N_FEATURES,
            "N_HIDDEN_UNITS": self.N_HIDDEN_UNITS,
            "N_CLASSES": self.N_CLASSES
        }
        return config

    @classmethod
    def from_config(cls, config):
        # 从配置字典中恢复模型实例
        return cls(
            N_FEATURES=config['N_FEATURES'],
            N_HIDDEN_UNITS=config['N_HIDDEN_UNITS'],
            N_CLASSES=config['N_CLASSES']
        )




In [None]:
# %% Instantiate the model
model = CustomLSTMModel(N_FEATURES, N_HIDDEN_UNITS, N_CLASSES)

# Compile the model
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

# %% Print the data shape to ensure everything matches
print(f"X_train shape: {X_train.shape}")
print(f"y_train_encoded shape: {y_train_encoded.shape}")
print(f"Unique labels: {np.unique(y_train_encoded)}")
print(f"N_CLASSES: {N_CLASSES}")

# %% Train the model
history = model.fit(X_train, y_train_encoded, batch_size=64, epochs=10, validation_data=(X_test, y_test_encoded), verbose=1)

# %% Evaluate the model
loss, accuracy = model.evaluate(X_test, y_test_encoded)
print(f"Test Accuracy: {accuracy}")

# %% Freeze the layers (if needed)
for layer in model.layers:
    if isinstance(layer, tf.keras.layers.LSTM):
        layer.trainable = False

# Recompile the model after freezing layers
model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

# %% Confusion Matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns

# Predict the labels for the test set
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)  # Convert probabilities to class labels

# Compute confusion matrix
cm = confusion_matrix(y_test_encoded, y_pred_classes)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y_test_encoded), yticklabels=np.unique(y_test_encoded))
plt.title('Confusion Matrix')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()

# %% Visualize training and validation metrics
import matplotlib.pyplot as plt

# Plot accuracy and loss for training and validation
plt.figure(figsize=(12, 6))

# Plot accuracy
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy vs Epochs')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

# Plot loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss vs Epochs')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
accuracy_lstm = accuracy_score(y_test_encoded, y_pred_classes)
print("LSTM Accuracy:", accuracy_lstm)
print(classification_report(y_test_encoded,y_pred_classes, target_names=label_encoder.classes_))

In [None]:
# 保存模型
model.save('my_custom_lstm_model.keras')

In [None]:
from keras.models import load_model

# 自定义对象字典
custom_objects = {'CustomLSTMModel': CustomLSTMModel}

# 读取模型
loaded_model = load_model('my_custom_lstm_model.keras', custom_objects=custom_objects)

# 验证模型加载成功
print(loaded_model.summary())


In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report

# 全局参数
N_TIME_STEPS = 200  # 时间窗口大小
N_FEATURES = 3  # 每个时间步的特征数量
OUTPUT_DIR = "./experiment_results"  # 保存结果的目录

# 创建结果目录
os.makedirs(OUTPUT_DIR, exist_ok=True)

# 数据集路径
datasets = [
    ("devicenexus4_1_x.csv", "devicenexus4_1_y.csv"),
    ("devicenexus4_2_x.csv", "devicenexus4_2_y.csv"),
    ("devices3_1_x.csv", "devices3_1_y.csv"),
    ("devices3_2_x.csv", "devices3_2_y.csv"),
    ("devices3mini_1_x.csv", "devices3mini_1_y.csv"),
    ("devices3mini_2_x.csv", "devices3mini_2_y.csv"),
    ("devicesamsungold_1_x.csv", "devicesamsungold_1_y.csv"),
    ("devicesamsungold_2_x.csv", "devicesamsungold_2_y.csv"),
]

# 单个数据集评估函数
def evaluate_dataset(model, file_x, file_y, label_encoder, output_dir):
    """
    评估单个数据集并保存结果到文件。

    Parameters:
    - model: 已训练好的模型
    - file_x: 特征文件路径
    - file_y: 标签文件路径
    - label_encoder: 标签编码器
    - output_dir: 保存结果的目录
    """
    # 加载数据
    X_test = pd.read_csv(file_x).values
    y_test = pd.read_csv(file_y).values
    X_test = X_test.reshape(len(X_test), N_TIME_STEPS, N_FEATURES)  # 保持时间步特征维度

    # 编码标签
    y_test_encoded = label_encoder.transform(y_test.ravel())

    # 模型预测
    y_pred = model.predict(X_test)  # 无需展平
    y_pred_classes = np.argmax(y_pred, axis=1)

    # 计算准确率
    accuracy = accuracy_score(y_test_encoded, y_pred_classes)

    # 分类报告
    report = classification_report(y_test_encoded, y_pred_classes, target_names=label_encoder.classes_)
    print(f"Classification Report for {file_x}:\n{report}")

    # 保存分类报告到文本文件
    report_file = os.path.join(output_dir, f"{os.path.basename(file_x)}_report.txt")
    with open(report_file, "w") as f:
        f.write(f"Classification Report for {file_x}\n")
        f.write(report)
        f.write(f"\nAccuracy: {accuracy:.4f}\n")

    # 混淆矩阵
    cm = confusion_matrix(y_test_encoded, y_pred_classes)

    # 绘制混淆矩阵并保存为图片
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=label_encoder.classes_,
                yticklabels=label_encoder.classes_)
    plt.title(f"Confusion Matrix for {file_x}")
    plt.xlabel("Predicted Labels")
    plt.ylabel("True Labels")
    plt.tight_layout()
    cm_image_file = os.path.join(output_dir, f"{os.path.basename(file_x)}_confusion_matrix.png")
    plt.savefig(cm_image_file)
    plt.close()

    return accuracy

In [None]:
# 假设已训练的模型和训练集标签
  # 替换为您的已训练模型
y_train = pd.read_csv("y_train.csv").values  # 替换为您的训练标签文件

# 初始化标签编码器
label_encoder = LabelEncoder()
label_encoder.fit(y_train.ravel())  # 使用训练集标签 fit 编码器

# 保存实验结果汇总
summary_file = os.path.join(OUTPUT_DIR, "experiment_summary.txt")
results = {}
with open(summary_file, "w") as summary:
    for file_x, file_y in datasets:
        print(f"Evaluating dataset: {file_x}")
        accuracy = evaluate_dataset(model, file_x, file_y, label_encoder, OUTPUT_DIR)
        results[file_x] = accuracy
        summary.write(f"{file_x}: {accuracy:.4f}\n")

    # 打印总结
print("\nSummary of Experiment Results:")
for dataset, accuracy in results.items():
    print(f"{dataset}: {accuracy:.4f}")

这段代码展示了一个 **自定义 LSTM 模型** 的实现，并且在模型保存和加载的过程中使用了一些 Keras 框架的高级特性。为了更好地理解其使用的方法，我们可以从几个角度来分析：**模型设计**、**模型保存和加载**、以及 **自定义层的序列化与反序列化**。

### 1. **模型设计：使用自定义层**

这段代码定义了一个自定义的 LSTM 模型，继承自 `tf.keras.Model`，并实现了以下内容：

- **自定义初始化方法（`__init__`）**：
  在 `__init__` 方法中，定义了网络的核心结构，包括：
  - **权重和偏置**：使用 `tf.Variable` 初始化了两组参数 `W_hidden` 和 `b_hidden`，用于隐藏层的线性变换；`W_output` 和 `b_output` 用于输出层的线性变换。这些权重参数的初始化是随机的，并且设置了 `trainable=True`，表示这些参数将在训练过程中优化。
  
  - **LSTM 层**：通过定义 `tf.keras.layers.LSTMCell` 创建了两个 LSTM 单元，并将它们组合成一个 RNN 层。LSTM 单元通常用于处理时序数据，因为它们能够在序列的每个时间步捕捉数据的上下文信息。这个模型在 `call()` 方法中处理输入数据并通过 RNN 层进行处理。

  - **传递 `trainable` 参数**：在 `__init__` 方法中，参数（如权重矩阵）被标记为 `trainable=True`，表示它们将在训练过程中进行更新。

### 2. **`call()` 方法：定义前向传播逻辑**

`call()` 方法定义了如何通过模型的网络结构进行前向传播：

- **输入数据的维度转换**：输入数据首先被转换为 `[time_steps, batch_size, features]` 的形状，并进行重塑和转置。`X = tf.transpose(inputs, [1, 0, 2])` 将数据的时间步转置到第一个维度。
  
- **计算隐藏层输出**：计算隐藏层的激活输出 `hidden`，并通过 `relu` 激活函数进行非线性变换。`hidden` 是通过线性变换计算得到的，并且通过 `tf.matmul()` 计算隐藏状态。

- **LSTM 层输出**：`self.rnn(hidden)` 将数据传递给 LSTM 层，得到输出和隐状态。这是一个深度 RNN（即多层 RNN），每层都是一个 LSTM 单元。

- **输出计算**：从 LSTM 的最后一个时间步获取输出，并通过权重矩阵 `W_output` 和偏置 `b_output` 计算最终的 logits 输出，用于分类。

### 3. **自定义层的序列化与反序列化**

这段代码特别关注了如何保存和加载模型时，能够正确地处理自定义层和自定义参数。这是通过重写 `get_config()` 和 `from_config()` 方法来实现的。

#### 3.1 `get_config()` 方法

`get_config()` 方法是 Keras 模型的标准方法，用于返回模型的配置字典。该字典包含了模型的所有超参数，以便能够正确地将模型保存和加载。

- 在 `get_config()` 方法中，我们首先调用 `super(CustomLSTMModel, self).get_config()` 获取父类的配置（即 `tf.keras.Model` 的配置），然后通过 `config.update({...})` 更新模型的自定义配置（如 `N_FEATURES`, `N_HIDDEN_UNITS`, `N_CLASSES`）。这确保了这些重要的超参数也被包含在内。

#### 3.2 `from_config()` 方法

`from_config()` 是一个类方法（即通过 `cls` 引用类本身），它负责根据 `get_config()` 返回的配置字典重新构建模型实例。

- 在 `from_config()` 方法中，我们从配置字典中提取出模型需要的超参数（`N_FEATURES`, `N_HIDDEN_UNITS`, `N_CLASSES`），然后用这些参数创建一个新的模型实例。
  
  通过这种方式，在模型加载时，Keras 会使用这个方法来恢复模型的结构，而不仅仅是加载模型的权重。

### 4. **Keras 序列化机制：保存和加载模型**

Keras 提供了强大的序列化和反序列化功能，使得你可以保存整个模型（包括网络结构和权重）并在稍后加载。保存和加载模型时，Keras 使用了以下几个关键步骤：

- **保存模型**：`model.save('path_to_model.h5')` 保存了模型的结构和权重。这里 `.h5` 格式是 HDF5 格式，常用于存储深度学习模型。
  
- **加载模型时的自定义对象处理**：加载时，如果模型中包含了自定义层（如 `CustomLSTMModel`），需要通过 `custom_objects` 参数将这些自定义类传递给 `load_model()`，这样 Keras 就能够识别并正确加载这些自定义层。`load_model('path_to_model.h5', custom_objects={'CustomLSTMModel': CustomLSTMModel})` 会使用 `CustomLSTMModel` 类来反序列化模型。

#### 3.3 `trainable` 参数的处理

在反序列化模型时，Keras 会将配置字典中的 `trainable` 参数传递给模型的层。由于我们在 `CustomLSTMModel` 中自定义了权重（`W_hidden`，`b_hidden`等）并指定了 `trainable=True`，这些权重在保存时会被标记为可训练。当加载模型时，`trainable` 的配置确保了这些参数依然是可训练的。

### 5. **总结：使用的方法**

在这段代码中，使用了以下主要的方法和设计模式：

1. **自定义模型层**：通过继承 `tf.keras.Model` 创建了一个自定义的 LSTM 模型，手动定义了前向传播的逻辑。
   
2. **权重初始化**：使用 `tf.Variable` 手动初始化模型的权重和偏置，这些变量被设置为 `trainable=True`，使得它们能够在训练过程中更新。

3. **模型序列化与反序列化**：
   - `get_config()` 和 `from_config()` 方法用于保存和加载自定义模型层。
   - 通过 `custom_objects` 传递自定义层，确保加载模型时可以正确反序列化自定义层和超参数。

4. **Keras 保存与加载机制**：利用 `model.save()` 和 `load_model()` 实现模型的持久化和恢复。

通过这些方法，代码实现了一个自定义的深度学习模型，并支持模型的保存和加载，确保自定义层能够被正确处理，同时保留了训练过程中的可训练参数。

In [None]:

# %% Save the model after training
model.save('trained_lstm_model.h5')  # This saves the entire model to a file


# Load the saved model
from keras.models import load_model

# Load the model
loaded_model = load_model('trained_lstm_model.h5')
print("Model loaded from 'trained_lstm_model.h5'")

# %% Evaluate the loaded model on a new test set (X_test_new, y_test_new)
# Make sure X_test_new and y_test_new are preprocessed similarly to your original data

loss, accuracy = loaded_model.evaluate(X_test_new, y_test_new)
print(f"New Test Accuracy: {accuracy}")

# %% Confusion Matrix on the new test set
y_pred_new = loaded_model.predict(X_test_new)
y_pred_classes_new = np.argmax(y_pred_new, axis=1)  # Convert probabilities to class labels

# Compute confusion matrix for the new test set
cm_new = confusion_matrix(y_test_new, y_pred_classes_new)

# Plot the confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm_new, annot=True, fmt='d', cmap='Blues', xticklabels=np.unique(y_test_new), yticklabels=np.unique(y_test_new))
plt.title('Confusion Matrix on New Test Set')
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
