In [1]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np
import cv2
import os

In [2]:
data_dir = r'data/data'
csv_dir = r'/Users/jiayi/Workspace/pythonPrograms/ml/cn_nums/chinese_mnist.csv'
df = pd.read_csv(csv_dir)
df

Unnamed: 0,suite_id,sample_id,code,value,character
0,1,1,10,9,九
1,1,10,10,9,九
2,1,2,10,9,九
3,1,3,10,9,九
4,1,4,10,9,九
...,...,...,...,...,...
14995,99,5,9,8,八
14996,99,6,9,8,八
14997,99,7,9,8,八
14998,99,8,9,8,八


In [37]:


char_groups = df.groupby("character")

test_df = pd.DataFrame()
train_df = pd.DataFrame()

train_ratio = 0.8
random_seed = 42
for _, df in char_groups:
    train = df.sample(n = int(len(df) * train_ratio), random_state=random_seed)
    test = df.drop(train.index)

    test_df = pd.concat([test_df, test], ignore_index=True)
    train_df = pd.concat([train_df, train], ignore_index=True)

print(train_df.info())
print(train_df.head(5))

if not os.path.exists('train.csv'):
    train_df.to_csv('train.csv')

if not os.path.exists('test.csv'):
    test_df.to_csv('test.csv')

# image_paths = []
# for index, row in train_df.iterrows():
#     filename = "_".join(["input", str(row['suite_id']), str(row['sample_id']), str(row['code'])])
#     filename += ".jpg"
#     image_paths.append(os.path.join(data_dir, filename))



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12000 entries, 0 to 11999
Data columns (total 5 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   suite_id   12000 non-null  int64 
 1   sample_id  12000 non-null  int64 
 2   code       12000 non-null  int64 
 3   value      12000 non-null  int64 
 4   character  12000 non-null  object
dtypes: int64(4), object(1)
memory usage: 468.9+ KB
None
   suite_id  sample_id  code  value character
0        56         10     2      1         一
1        75          7     2      1         一
2        76          1     2      1         一
3        69          1     2      1         一
4        46         10     2      1         一


In [38]:
class ChineseMinistDataset(Dataset):

    def __init__(self, file_dir, transform=None):
        self.df = pd.read_csv(file_dir)
        self.transform = transform
        self.data_dir = r'data/data'

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):

        # 生成图片的名字和读取的地址
        file_name = "_".join(["input"] + [str(self.df.iloc[index, i]) for i in range(1,4)]) + ".jpg"
        img_path = os.path.join(self.data_dir, file_name)
        # 读取图片
        img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
        img = img.astype(np.float32) / 255.0 # 归一化
        # feature = img.flatten()
        feature = img

        if self.transform is not None:
            feature = self.transform(feature)

        # 生成标签
        label = int(self.df.iloc[index]['value'])

        return feature, label

# 定义 transform
transform = transforms.Compose([
    transforms.ToTensor(),  # 自动将numpy数组转换为 [C, H, W]
    transforms.Normalize(mean=[0.5], std=[0.5]),
    transforms.RandomRotation(10),
    transforms.RandomHorizontalFlip(p=0.5)
])

# 定义数据集
test_dataset = ChineseMinistDataset('test.csv', transform=transform)
train_dataset = ChineseMinistDataset('train.csv', transform=transform)

batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [39]:
import torch.nn as nn

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.model = nn.Sequential(
            # 输入图片大小: 64x64
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding=1),  # 输出大小: 64x64
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 输出大小: 32x32

            # 输入图片大小: 32x32
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),  # 输出大小: 32x32
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 输出大小: 16x16

            # 输入图片大小: 16x16
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),  # 输出大小: 16x16
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),  # 输出大小: 8x8

            nn.Flatten(),  # 展平操作
            nn.Linear(in_features=8 * 8 * 64, out_features=128),  # 输入特征数: 8*8*64
            nn.ReLU(),
            nn.Linear(in_features=128, out_features=10),  # 假设有 10 个类别
            nn.Softmax(dim=1)
        )

    def forward(self, input):
        output = self.model(input)
        return output

In [40]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)
net = Net()
print(net.to(device=device))

lossF = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)

num_epochs = 25
for epoch in range(num_epochs):
    net.train()  # 设置模型为训练模式
    running_loss = 0.0  # 用于记录每个 epoch 的总损失

    for batch_idx, (features, labels) in enumerate(train_loader):
        # 将数据移动到设备上
        features, labels = features.to(device), labels.to(device)

        # 前向传播
        outputs = net(features)
        loss = lossF(outputs, labels)

        # 反向传播和优化
        optimizer.zero_grad()  # 清空梯度
        loss.backward()  # 反向传播
        optimizer.step()  # 更新参数

        # 打印损失
        running_loss += loss.item()
        if (batch_idx + 1) % 100 == 0:  # 每 100 个 batch 打印一次损失
            print(f"Epoch [{epoch + 1}/{num_epochs}], Batch [{batch_idx + 1}/{len(train_loader)}], Loss: {running_loss / 100:.4f}")
            running_loss = 0.0

print("训练完成！")

mps
Net(
  (model): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Flatten(start_dim=1, end_dim=-1)
    (10): Linear(in_features=4096, out_features=128, bias=True)
    (11): ReLU()
    (12): Linear(in_features=128, out_features=10, bias=True)
    (13): Softmax(dim=1)
  )
)
训练完成！


In [46]:
net.eval()  # 关闭 BatchNorm/Dropout
train_correct = 0
with torch.no_grad():  # 禁用梯度计算
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs)
        _, predicted = torch.max(outputs, 1)
        train_correct += (predicted == labels).sum().item()

train_acc = 100 * train_correct / len(train_dataset)
print(f"Epoch {epoch+1}, Train Accuracy: {train_acc:.2f}%")
net.train()  # 恢复训练模式

Epoch 25, Train Accuracy: 58.36%


Net(
  (model): Sequential(
    (0): Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Flatten(start_dim=1, end_dim=-1)
    (10): Linear(in_features=4096, out_features=128, bias=True)
    (11): ReLU()
    (12): Linear(in_features=128, out_features=10, bias=True)
    (13): Softmax(dim=1)
  )
)

In [45]:
torch.save(net.state_dict(), "./model.pkl")

In [None]:
from torch.utils.data import Dataset

class MyDataset(Dataset):

    def __init__(self):
        pass

    def __len__(self):
        pass

    def __getitem__(self, index):
        pass
    
