In [14]:
# 基础配置
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, random_split

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.style.use('seaborn-v0_8')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [30]:
class ImageDatasetFromCSV(Dataset):
    def __init__(self, dataframe, labeled=True, image_shape=None, transform=None):
        """ 
        dataframe: 预先读取的 pandas DataFrame 
        labeled: 布尔值，指示数据集是否包含标签 
        image_shape: 图像的形状，默认为 None。如果提供，则应该是 (height, width, channels) 格式 
        transform: 可选的图像转换操作 
        """
        self.df = dataframe
        self.transform = transform
        self.labeled = labeled

        if labeled:
            # 如果有标签，分开提取图像和标签数据
            self.images = self.df.iloc[:, 1:].values.astype(np.uint8)  # 跳过第一列（标签）
            self.labels = self.df.iloc[:, 0].values
        else:
            # 如果没有标签，提取所有图像数据
            self.images = self.df.values.astype(np.uint8)
        
        if image_shape is None:
            # 自动推断图像形状，假设为正方形灰度图像
            num_pixels = self.images.shape[1]
            side_length = int(np.sqrt(num_pixels))
            self.image_shape = (side_length, side_length, 1)
        else:
            self.image_shape = image_shape

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        # 获取图像并按照指定的形状进行重塑
        image = self.images[idx].reshape(self.image_shape)
        
        if self.transform:
            image = self.transform(image)
        else:
            image = torch.tensor(image / 255.0, dtype=torch.float)
        
        if self.labeled:
            label = int(self.labels[idx])
            label = torch.tensor(label, dtype=torch.long)
            return image, label
        else:
            return image

In [16]:
image_shape = (28,28,1)
trans = transforms.ToTensor()
df_train = pd.read_csv("../data/train.csv")
data_train = ImageDatasetFromCSV(df_train, labeled=True,image_shape=image_shape,transform=trans)

In [17]:
# 假设你要将数据集 80% 用于训练，20% 用于验证
train_size = int(0.8 * len(data_train))
test_size = len(data_train) - train_size
train_data, test_data = random_split(data_train, [train_size, test_size])

In [18]:
batch_size = 32
train_iter = DataLoader(train_data, batch_size, shuffle=True)
test_iter = DataLoader(test_data, batch_size, shuffle=False)

In [19]:
for X,y in train_iter:
        print(X.shape, y.shape)
        break

torch.Size([32, 1, 28, 28]) torch.Size([32])


In [20]:
net = nn.Sequential(nn.Flatten(), nn.Linear(28*28,10))

def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.normal_(m.weight, std=0.01)
        
net.apply(init_weights)

Sequential(
  (0): Flatten(start_dim=1, end_dim=-1)
  (1): Linear(in_features=784, out_features=10, bias=True)
)

In [21]:
loss_fn = nn.CrossEntropyLoss(reduction='none')

In [22]:
trainer = torch.optim.Adam(net.parameters(), lr=0.001)

In [23]:
# 假设 train_iter 和 test_iter 是 DataLoader 对象
num_epochs = 10
for epoch in range(num_epochs):
    net.train()  # 切换到训练模式
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    
    # 训练阶段
    for image, label in train_iter:  # train_iter 是一个 DataLoader
        # 前向传播
        outputs = net(image)
        loss = loss_fn(outputs, label).mean()  # 将 l 求平均值或求和
        running_loss += loss.item()

        # 反向传播和优化
        trainer.zero_grad()
        loss.backward()
        trainer.step()

        # 计算训练准确率
        _, predicted = torch.max(outputs.data, 1)
        total_train += label.size(0)
        correct_train += (predicted == label).sum().item()

    train_loss = running_loss / len(train_iter)
    train_acc = correct_train / total_train
    
    # 验证阶段
    net.eval()  # 切换到评估模式
    correct_test = 0
    total_test = 0
    with torch.no_grad():  # 禁用梯度计算，因为这是验证阶段
        total_loss = 0
        for image, label in test_iter:  # test_iter 也是一个 DataLoader
            outputs = net(image)
            l = loss_fn(outputs, label).mean()
            total_loss += l.item()

            # 计算测试准确率
            _, predicted = torch.max(outputs.data, 1)
            total_test += label.size(0)
            correct_test += (predicted == label).sum().item()

    avg_loss = total_loss / len(test_iter)
    test_acc = correct_test / total_test

    # 打印每个 epoch 的训练损失、训练准确率和测试准确率
    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, Test Loss: {avg_loss:.4f}, Test Acc: {test_acc:.4f}')

Epoch 1, Train Loss: 0.5542, Train Acc: 0.8693, Test Loss: 0.3589, Test Acc: 0.9052
Epoch 2, Train Loss: 0.3243, Train Acc: 0.9101, Test Loss: 0.3132, Test Acc: 0.9176
Epoch 3, Train Loss: 0.2927, Train Acc: 0.9172, Test Loss: 0.3012, Test Acc: 0.9207
Epoch 4, Train Loss: 0.2772, Train Acc: 0.9218, Test Loss: 0.2970, Test Acc: 0.9179
Epoch 5, Train Loss: 0.2665, Train Acc: 0.9242, Test Loss: 0.2888, Test Acc: 0.9217
Epoch 6, Train Loss: 0.2593, Train Acc: 0.9263, Test Loss: 0.2855, Test Acc: 0.9243
Epoch 7, Train Loss: 0.2546, Train Acc: 0.9278, Test Loss: 0.2856, Test Acc: 0.9258
Epoch 8, Train Loss: 0.2495, Train Acc: 0.9286, Test Loss: 0.2840, Test Acc: 0.9246
Epoch 9, Train Loss: 0.2457, Train Acc: 0.9309, Test Loss: 0.2829, Test Acc: 0.9250
Epoch 10, Train Loss: 0.2425, Train Acc: 0.9318, Test Loss: 0.2826, Test Acc: 0.9243


In [46]:
# 读取测试数据
df_test = pd.read_csv("../data/test.csv")
data_test = ImageDatasetFromCSV(df_test, labeled=False,image_shape=(28,28,1),transform=trans)
iter_test = DataLoader(data_test, batch_size, shuffle=False)

In [48]:
net.eval()  # 切换到评估模式

predictions = []
with torch.no_grad():  # 禁用梯度计算
    for images in iter_test:
        outputs = net(images)  # 前向传播
        _, predicted_classes = torch.max(outputs, 1)  # 获取每个样本的预测类别
        predictions.extend(predicted_classes.cpu().numpy())  # 保存预测结果

# 将预测结果转换为numpy数组
predictions = np.array(predictions)

In [51]:
def Kaggle_submission(predictions,sample_path,result_path):
    if sample_path:
        if result_path:
            data = pd.read_csv(sample_path)
            data.iloc[:,-1] = predictions
            print(data.head())
            data.to_csv(result_path,index=False)
            print("down")
        else:
            print("no result path")
    else:
        print("no sample path")

In [52]:
sample_path = '../data/sample_submission.csv'
result_path = '../result/simple softmax.csv'
Kaggle_submission(predictions,sample_path,result_path)

   ImageId  Label
0        1      2
1        2      0
2        3      9
3        4      7
4        5      3
down
