In [1]:
import numpy as np
from MLP.model import MLP
import pandas as pd
from MLP.layers import SoftmaxCrossEntropy
from MLP.layers import Adam
from MLP.layers import MiniBatchFit
from MLP.layers import SGDMomentum

# 导入npy数据集
X_train = np.load('Assignment1-Dataset/train_data.npy')
X_test = np.load('Assignment1-Dataset/test_data.npy')
Y_train = np.load('Assignment1-Dataset/train_label.npy').flatten()
Y_test = np.load('Assignment1-Dataset/test_label.npy').flatten()


In [2]:
X_train.shape, Y_train.shape, X_test.shape, Y_test.shape

((50000, 128), (50000,), (10000, 128), (10000,))

In [3]:
# 统计数据集的类别
classes = np.unique(Y_train)
classes

array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], dtype=uint8)

In [4]:
def stratified_train_test_split(X, y, test_size=0.2, random_seed=None):
    """
    使用 NumPy 实现的分层抽样训练-测试数据划分。
    
    :param X: 特征数组，形状 (N, d)，N为样本数，d为特征维度
    :param y: 标签数组，形状 (N, )
    :param test_size: 测试集比例（0.0 ~ 1.0）
    :param random_seed: 随机数种子，用于可复现的结果
    :return: (X_train, X_test, y_train, y_test)
    """
    if random_seed is not None:
        np.random.seed(random_seed)

    # 找到标签中所有不同类别，以及每个类别出现的索引
    unique_labels = np.unique(y)

    train_indices = []
    test_indices = []

    for label in unique_labels:
        # 找到该类别所有样本的索引
        label_indices = np.where(y == label)[0]
        
        # 打乱该类别索引，防止顺序影响
        np.random.shuffle(label_indices)

        # 计算应该分配到测试集的数量
        test_count = int(len(label_indices) * test_size)
        
        # 前 test_count 个样本分到测试集，其余分到训练集
        test_indices.append(label_indices[:test_count])
        train_indices.append(label_indices[test_count:])

    # 拼接所有类别的索引
    train_indices = np.concatenate(train_indices)
    test_indices = np.concatenate(test_indices)

    # 如果需要进一步打乱train和test索引，可以再做一次shuffle
    np.random.shuffle(train_indices)
    np.random.shuffle(test_indices)

    # 根据索引切分数据
    X_train, X_test = X[train_indices], X[test_indices]
    y_train, y_test = y[train_indices], y[test_indices]

    return X_train, X_test, y_train, y_test

In [5]:
def classification_report_numpy(y_true, y_pred, average='macro'):
    """
    计算多分类 Precision、Recall 和 F1，仅使用 NumPy。
    
    :param y_true: 真实标签，形状 (N,)
    :param y_pred: 预测标签，形状 (N,)
    :param average: 指定平均方式，可选 'macro' 或 'micro'
    :return: (precision, recall, f1)
             - 如果 average='macro'，返回宏平均的 (P, R, F1)
             - 如果 average='micro'，返回微平均的 (P, R, F1)
    """
    # 收集所有类别，并建立索引映射（如果需要）
    classes = np.unique(np.concatenate((y_true, y_pred)))
    num_classes = len(classes)

    # 构建混淆矩阵 confusion matrix
    #  行：真实标签
    #  列：预测标签
    conf_mat = np.zeros((num_classes, num_classes), dtype=np.int64)

    # 如果你的标签不是从0开始的连续整数，需要建立映射
    # 这里假设 classes 已经是从小到大的唯一整数标签
    # 构建 label -> index 的映射字典
    label_to_index = {label: idx for idx, label in enumerate(classes)}
    
    for t, p in zip(y_true, y_pred):
        i = label_to_index[t]
        j = label_to_index[p]
        conf_mat[i, j] += 1

    if average == 'macro':
        # 宏平均：对每个类别计算后再平均
        precisions = []
        recalls = []
        f1s = []
        for i in range(num_classes):
            TP = conf_mat[i, i]
            FP = conf_mat[:, i].sum() - TP  # 该列除对角线外的元素之和
            FN = conf_mat[i, :].sum() - TP  # 该行除对角线外的元素之和

            # 注意分母可能为 0，要做检查
            prec = TP / (TP + FP) if (TP + FP) != 0 else 0.0
            rec = TP / (TP + FN) if (TP + FN) != 0 else 0.0
            f1 = 2 * prec * rec / (prec + rec) if (prec + rec) != 0 else 0.0

            precisions.append(prec)
            recalls.append(rec)
            f1s.append(f1)
        
        precision_macro = np.mean(precisions)
        recall_macro = np.mean(recalls)
        f1_macro = np.mean(f1s)
        return precision_macro, recall_macro, f1_macro

    elif average == 'micro':
        # 微平均：基于全局的 TP, FP, FN 来计算
        TP_total = np.diag(conf_mat).sum()
        # 对每列：列和 - 对角线元素，即所有列上的假正例
        FP_total = conf_mat.sum(axis=0) - np.diag(conf_mat)
        # 对每行：行和 - 对角线元素，即所有行上的假负例
        FN_total = conf_mat.sum(axis=1) - np.diag(conf_mat)
        
        FP_total = FP_total.sum()
        FN_total = FN_total.sum()

        precision_micro = TP_total / (TP_total + FP_total) if (TP_total + FP_total) != 0 else 0.0
        recall_micro = TP_total / (TP_total + FN_total) if (TP_total + FN_total) != 0 else 0.0
        f1_micro = 2 * precision_micro * recall_micro / (precision_micro + recall_micro) \
            if (precision_micro + recall_micro) != 0 else 0.0
        
        return precision_micro, recall_micro, f1_micro

    else:
        raise ValueError("Unsupported average type. Use 'macro' or 'micro'.")

In [6]:
X_train, X_val, Y_train, Y_val = stratified_train_test_split(X_train, Y_train, test_size=0.2, random_seed=42)
X_train.shape, Y_train.shape, X_test.shape, Y_test.shape

((40000, 128), (40000,), (10000, 128), (10000,))

In [7]:
# 每个数据集的种类样本数
pd.Series(Y_train).value_counts(), pd.Series(Y_test).value_counts()


(6    4000
 7    4000
 8    4000
 3    4000
 2    4000
 1    4000
 4    4000
 9    4000
 0    4000
 5    4000
 Name: count, dtype: int64,
 3    1000
 8    1000
 0    1000
 6    1000
 1    1000
 9    1000
 5    1000
 7    1000
 4    1000
 2    1000
 Name: count, dtype: int64)

In [8]:


# 超参数
input_dim = 128  # 输入数据维度
hidden_dims = [64, 32]  # 隐藏层维度
output_dim = 10  # 输出类别数（例如10分类）
learning_rate = 0.001
momentum = 0.9
num_epochs = 200
batch_size = 100



In [9]:
optimizer1=Adam()
# 创建MLP模型
model_Adam = MLP(input_dim, hidden_dims, output_dim, dropout_rate=0.2, weight_decay=1e-4, optimizer=optimizer1)
# 定义损失函数


Adam_Fit = MiniBatchFit(model_Adam, optimizer1,
             X_train, Y_train, 
             output_dim, 
             num_epochs=num_epochs, 
             batch_size=batch_size, 
             learning_rate=learning_rate)

# 训练模型
Adam_Fit.fit()


Epoch 1/200, Loss: 2.0218
Epoch 2/200, Loss: 1.8389
Epoch 3/200, Loss: 1.7927
Epoch 4/200, Loss: 1.7667
Epoch 5/200, Loss: 1.7467
Epoch 6/200, Loss: 1.7297
Epoch 7/200, Loss: 1.7168
Epoch 8/200, Loss: 1.7045
Epoch 9/200, Loss: 1.6943
Epoch 10/200, Loss: 1.6850
Epoch 11/200, Loss: 1.6719
Epoch 12/200, Loss: 1.6684
Epoch 13/200, Loss: 1.6582
Epoch 14/200, Loss: 1.6497
Epoch 15/200, Loss: 1.6416
Epoch 16/200, Loss: 1.6337
Epoch 17/200, Loss: 1.6320
Epoch 18/200, Loss: 1.6239
Epoch 19/200, Loss: 1.6219
Epoch 20/200, Loss: 1.6188
Epoch 21/200, Loss: 1.6131
Epoch 22/200, Loss: 1.6102
Epoch 23/200, Loss: 1.6014
Epoch 24/200, Loss: 1.6001
Epoch 25/200, Loss: 1.5924
Epoch 26/200, Loss: 1.5881
Epoch 27/200, Loss: 1.5825
Epoch 28/200, Loss: 1.5771
Epoch 29/200, Loss: 1.5776
Epoch 30/200, Loss: 1.5744
Epoch 31/200, Loss: 1.5718
Epoch 32/200, Loss: 1.5614
Epoch 33/200, Loss: 1.5630
Epoch 34/200, Loss: 1.5552
Epoch 35/200, Loss: 1.5557
Epoch 36/200, Loss: 1.5503
Epoch 37/200, Loss: 1.5491
Epoch 38/2

In [10]:
predictions_train_Adam = model_Adam.predict_and_evaluate(X_val, Y_val)
precision_val,recall_val,f1_val = classification_report_numpy(Y_val, predictions_train_Adam, average='macro')
print(f"Validation F1 Score: {f1_val:.4f}")
print(f"Validation Recall: {recall_val:.4f}")
print(f"Validation Precision: {precision_val:.4f}")

 Test Accuracy: 50.02%
Validation F1 Score: 0.4867
Validation Recall: 0.5002
Validation Precision: 0.5061


In [11]:
predictions_test_Adam = model_Adam.predict_and_evaluate(X_test, Y_test)
precision_test,recall_test,f1_test = classification_report_numpy(Y_test, predictions_test_Adam, average='macro')
print(f"Test F1 Score: {f1_test:.4f}")
print(f"Test Recall: {recall_test:.4f}")
print(f"Test Precision: {precision_test:.4f}")

 Test Accuracy: 50.79%
Test F1 Score: 0.4929
Test Recall: 0.5079
Test Precision: 0.5137


In [12]:
optimizer2 = SGDMomentum(momentum=0.9)

# 创建MLP模型
model_SGDM = MLP(input_dim, hidden_dims, output_dim, dropout_rate=0.2, weight_decay=1e-4, optimizer=optimizer2)

SGDM_Fit = MiniBatchFit(model_SGDM, optimizer2,
             X_train, Y_train, 
             output_dim, 
             num_epochs=num_epochs, 
             batch_size=batch_size, 
             learning_rate=learning_rate)

# 训练模型
SGDM_Fit.fit()

Epoch 1/200, Loss: 2.3012
Epoch 2/200, Loss: 2.1222
Epoch 3/200, Loss: 2.0414
Epoch 4/200, Loss: 1.9899
Epoch 5/200, Loss: 1.9534
Epoch 6/200, Loss: 1.9255
Epoch 7/200, Loss: 1.9004
Epoch 8/200, Loss: 1.8772
Epoch 9/200, Loss: 1.8576
Epoch 10/200, Loss: 1.8422
Epoch 11/200, Loss: 1.8291
Epoch 12/200, Loss: 1.8174
Epoch 13/200, Loss: 1.8044
Epoch 14/200, Loss: 1.7960
Epoch 15/200, Loss: 1.7861
Epoch 16/200, Loss: 1.7710
Epoch 17/200, Loss: 1.7616
Epoch 18/200, Loss: 1.7575
Epoch 19/200, Loss: 1.7480
Epoch 20/200, Loss: 1.7475
Epoch 21/200, Loss: 1.7360
Epoch 22/200, Loss: 1.7333
Epoch 23/200, Loss: 1.7248
Epoch 24/200, Loss: 1.7270
Epoch 25/200, Loss: 1.7130
Epoch 26/200, Loss: 1.7073
Epoch 27/200, Loss: 1.7058
Epoch 28/200, Loss: 1.6966
Epoch 29/200, Loss: 1.6970
Epoch 30/200, Loss: 1.6908
Epoch 31/200, Loss: 1.6840
Epoch 32/200, Loss: 1.6827
Epoch 33/200, Loss: 1.6764
Epoch 34/200, Loss: 1.6735
Epoch 35/200, Loss: 1.6719
Epoch 36/200, Loss: 1.6677
Epoch 37/200, Loss: 1.6616
Epoch 38/2

In [13]:

predictions_train_SGDM = model_SGDM.predict_and_evaluate(X_val, Y_val)
precision_val,recall_val,f1_val = classification_report_numpy(Y_val, predictions_train_SGDM, average='macro')
print(f"Validation F1 Score: {f1_val:.4f}")
print(f"Validation Recall: {recall_val:.4f}")
print(f"Validation Precision: {precision_val:.4f}")

 Test Accuracy: 49.07%
Validation F1 Score: 0.4784
Validation Recall: 0.4907
Validation Precision: 0.4950


In [14]:

predictions_test_SGDM = model_SGDM.predict_and_evaluate(X_test, Y_test)
precision_test,recall_test,f1_test = classification_report_numpy(Y_test, predictions_test_SGDM, average='macro')
print(f"Test F1 Score: {f1_test:.4f}")
print(f"Test Recall: {recall_test:.4f}")
print(f"Test Precision: {precision_test:.4f}")

 Test Accuracy: 48.70%
Test F1 Score: 0.4734
Test Recall: 0.4870
Test Precision: 0.4910
