# Data alignment - merge through 'image_id'

In [3]:
import pandas as pd
import pickle

def load_labels_from_pickle(pickle_file):
    with open(pickle_file, 'rb') as f:
        data = pickle.load(f)
        # 直接访问 data 字典中的 'image_id' 和 'label'
        image_ids = data['image_id']
        labels = data['label']
        labels_dict = dict(zip(image_ids, labels))
    return labels_dict

# 加载 pickle 文件
pickle_path = 'D:\\CAPSTONE5703_Classifier\\DATASET_C01\\train.pkl'
labels_dict = load_labels_from_pickle(pickle_path)

# 加载 CSV 文件
features_path = 'D:\\CAPSTONE5703_Classifier\\DATASET_C01\\Fused.csv'
features_df = pd.read_csv(features_path)

# 确保 CSV 文件中最后一列为 image_id
if features_df.columns[-1] != 'image_id':
    print("Error: 'image_id' must be the last column in the CSV file.")
else:
    # 创建 DataFrame 用于包含从字典中提取的 image_id 和 label
    labels_df = pd.DataFrame(list(labels_dict.items()), columns=['image_id', 'label'])

    # 将 features_df 和 labels_df 中的 'image_id' 设置为索引
    features_df.set_index('image_id', inplace=True)
    labels_df.set_index('image_id', inplace=True)

    # 根据 image_id 合并 features_df 和 labels_df
    merged_df = pd.merge(features_df, labels_df, left_index=True, right_index=True, how='inner')

    # 重置索引以便导出或其他处理
    merged_df.reset_index(inplace=True)

    # 检查合并后的数据
    print(merged_df.head())

    # 可以选择保存合并后的 DataFrame
    # merged_df.to_csv('D:\\CAPSTONE5703_CNN\\merged_fused features.csv', index=False)


ModuleNotFoundError: No module named 'torch'

# Split training & test set

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# 从DataFrame中分离出特征数据和标签
X = merged_df.drop(['label', 'image_id'], axis=1)  # 移除 'label' 和 'image_id' 列
y = merged_df['label']  # 标签

# 打印出每列的数据类型
# print(X.dtypes)

# 将数据集划分为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# X_train and X_test represent the training set and test set of feature data respectively.
# y_train and y_test represent the training set and test set of labeled data respectively.

# PCA 

In [None]:
import numpy as np
from sklearn.decomposition import PCA

# 应用 PCA 降维，只在训练数据上fit，然后transform训练数据和测试数据
pca = PCA(n_components=0.95)  # 保留95%的方差
X_train_pca = pca.fit_transform(X_train) # pca后的训练集
X_test_pca = pca.transform(X_test) # pca后的测试集

# 检查新的维数和解释的方差比
print("New training dimensions:", X_train_pca.shape[1])
print("New testing dimensions:", X_test_pca.shape[1])
# print("Explained variance ratio:", pca.explained_variance_ratio_)


New training dimensions: 193
New testing dimensions: 193


## CNN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from skorch import NeuralNetClassifier
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV

# 假设你已经有X_train_pca和X_test_pca
# 例如：X_train_pca, X_test_pca = load_your_data()

# 重塑数据为1D CNN接受的形状
X_train_pca = X_train_pca.reshape(X_train_pca.shape[0], X_train_pca.shape[1], 1)
X_test_pca = X_test_pca.reshape(X_test_pca.shape[0], X_test_pca.shape[1], 1)

# 定义3层卷积的1D CNN模型
class CNN1D(nn.Module):
    def __init__(self, num_features, num_filters=32, kernel_size=3, pool_size=2, dense_units=128):
        super(CNN1D, self).__init__()
        self.conv1 = nn.Conv1d(1, num_filters, kernel_size)
        self.pool = nn.MaxPool1d(pool_size)
        self.conv2 = nn.Conv1d(num_filters, num_filters*2, kernel_size)
        self.conv3 = nn.Conv1d(num_filters*2, num_filters*4, kernel_size)
        self.fc1 = nn.Linear(num_filters*4 * ((num_features - 3 * (kernel_size - 1)) // pool_size**3), dense_units)
        self.fc2 = nn.Linear(dense_units, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(-1, x.shape[1] * x.shape[2])
        x = self.relu(self.fc1(x))
        x = self.sigmoid(self.fc2(x))
        return x

# 创建skorch的NeuralNetClassifier
net = NeuralNetClassifier(
    module=CNN1D,
    module__num_features=X_train_pca.shape[1],
    max_epochs=10,
    lr=0.01,
    optimizer=optim.Adam,
    optimizer__weight_decay=0.0001,
    criterion=nn.CrossEntropyLoss,
    batch_size=64,
    iterator_train__shuffle=True,
    callbacks=[skorch.callbacks.EarlyStopping(patience=5)],
    # device='cuda'  # 如果有GPU，可以使用'cuda'
)

# 设置参数分布（手动定义）
param_dist = {
    'optimizer': [optim.Adam, optim.SGD, optim.RMSprop],
    'module__num_filters': [32, 64, 128],
    'module__kernel_size': [3, 5, 7],
    'module__pool_size': [2, 3],
    'module__dense_units': [64, 128, 256],
    'lr': [0.001, 0.005, 0.01]  # 手动定义学习率
}

# 设置K折交叉验证
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

# 创建RandomizedSearchCV对象
random_search = RandomizedSearchCV(net, param_distributions=param_dist, cv=5, scoring='accuracy', n_jobs=-1, verbose=1, n_iter=40)
random_search.fit(X_train_pca, y_train)  # 使用降维后的训练数据

# 打印最佳超参数和对应的分数
print("Best parameters: ", random_search.best_params_)
print("Best score: ", random_search.best_score_)

# 使用最佳超参数创建新的模型
best_model = random_search.best_estimator_

# 在完整的训练集上训练最佳模型
best_model.fit(X_train_pca, y_train)

# 在测试集上评估最佳模型
predictions = best_model.predict(X_test_pca)
print("Accuracy on test set: ", accuracy_score(y_test, predictions))
print("\nClassification Report:\n", classification_report(y_test, predictions))


ModuleNotFoundError: No module named 'skorch.history'