In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torchvision.models import resnet18
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from tqdm import tqdm

# data_path
data_dir = 'data'
train_csv = os.path.join(data_dir, 'train.csv')
test_csv = os.path.join(data_dir, 'test.csv')
img_dir = os.path.join(data_dir, 'images')

# 训练数据和标签
train_df = pd.read_csv(train_csv)
print(train_df.head())

In [None]:
class LeafDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, train=True):
        self.data_frame = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.train = train

        if self.train:
            # 创建标签映射
            self.label_mapping = {label: idx for idx, label in enumerate(self.data_frame['label'].unique())}
            self.data_frame['label'] = self.data_frame['label'].map(self.label_mapping)
        else:
            self.data_frame['label'] = None

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame.iloc[idx, 0])
        image = Image.open(img_name)

        if self.transform:
            image = self.transform(image)

        if self.train:
            label = self.data_frame.iloc[idx, 1]
            label = torch.tensor(label, dtype=torch.long)
            return image, label
        else:
            return image, self.data_frame.iloc[idx, 0]

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
# 加载训练数据集和数据加载器
train_dataset = LeafDataset(csv_file='data/train.csv', root_dir='data', transform=transform, train=True)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# 加载测试数据集和数据加载器
test_dataset = LeafDataset(csv_file='data/test.csv', root_dir='data', transform=transform, train=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [None]:
import torch
import torch.nn as nn

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = self.relu(out)
        return out


In [None]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=1000):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x


In [None]:
def ResNet18(num_classes=176):
    return ResNet(BasicBlock, [2, 2, 2, 2], num_classes=num_classes)

# 实例化 ResNet18 模型
num_classes = len(train_dataset.label_mapping)  # 你的分类数量
model = ResNet18(num_classes=num_classes)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
# 训练模型
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)

    epoch_loss = running_loss / len(train_loader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    
# 保存模型权重
torch.save(model.state_dict(), 'weights/resnet18_scratch_weights.pth')


In [None]:
# 加载模型权重（在需要时）
model.load_state_dict(torch.load('weights/resnet18_scratch_weights.pth'))
model = model.to(device)

# 进行预测并生成提交文件
model.eval()
predictions = []

with torch.no_grad():
    for inputs, file_paths in test_loader:
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        predictions.extend(zip(file_paths, preds.cpu().numpy()))

# 创建标签映射的反向字典
reverse_label_mapping = {v: k for k, v in train_dataset.label_mapping.items()}

# 根据预测结果生成提交文件
submission = pd.DataFrame(predictions, columns=['image', 'label'])
submission['label'] = submission['label'].map(reverse_label_mapping)
submission.to_csv('submission_resnet18_scratch.csv', index=False)
