In [1]:
import datetime
import shutil
import time
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from PIL import Image
from sklearn.metrics import confusion_matrix
from torch import nn
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from torchvision.models import resnext50_32x4d, resnet50
from tqdm import tqdm

In [2]:
# Hyperparameters
initial_lr = 0.001
num_epochs = 20
batch_size = 64

# Data
# base_path = Path(r'D:\UAV_DATA_NEW\dataset\all')
base_path = Path(r'C:\Users\xianyu\GraduationProject\UAV_YUNNAN_DATA\last_labels')
train_img_path = base_path / 'train'
test_img_path = base_path / 'test'

device = torch.device("cuda:0")

In [3]:
# 统计每个类别的样本数量
counts = []
for i in range(1, 29):
    path_temp = train_img_path / f'{i:02d}'
    counts.append(len(list(path_temp.iterdir())))

count = sum(counts)
weights = (sum(counts) / torch.tensor(counts)).to(device)


class weighted_MSELoss_number(nn.Module):

    def __init__(self):
        super(weighted_MSELoss_number, self).__init__()
        self.weights = weights.to(device)
        self.weights_sum = weights.sum()

    def forward(self, inputs, targets):
        difference = (inputs.round() - targets.round()).float()
        weight = self.weights[targets.long() - 1]
        return (weight * difference * difference).sum() / self.weights_sum


print(weights)
print(counts)

tensor([ 19.4901,   5.1632,   4.5628,   7.5851,  12.7957,  15.8652,  20.6526,
         28.0286,  32.1639,  54.0000,  63.2903,  66.8864,  94.9355, 130.8000,
         89.1818,  96.4918, 133.7727, 267.5455, 189.8710, 235.4400, 189.8710,
        140.1429, 183.9375, 280.2857, 196.2000, 267.5455, 267.5455, 255.9131],
       device='cuda:0')
[302, 1140, 1290, 776, 460, 371, 285, 210, 183, 109, 93, 88, 62, 45, 66, 61, 44, 22, 31, 25, 31, 42, 32, 21, 30, 22, 22, 23]


In [4]:
class Trainer:

    def __init__(self, model) -> None:
        self.model = model.to(device)
        self.optimizer = Adam(self.model.parameters(), lr=initial_lr)
        # self.loss_func = weighted_MSELoss().to(device)
        self.loss_func = weighted_MSELoss_number().to(device)
        self.scheduler = StepLR(self.optimizer, step_size=7, gamma=0.4)

        self.total_time = 0
        self.train_loader = None
        self.test_loader = None
        self.labels = None

        self.create_output_dir()
        self.load_data()

    def create_output_dir(self):
        # 获取当前时间的字符串表示形式
        current_time = datetime.datetime.now().strftime(f"%Y-%m-%d_%H-%M-%S")
        # 生成以当前时间命名的目录路径
        self.output_dir = Path.cwd().parent / 'output' / 'run' / current_time
        # 创建目录
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def load_data(self):
        # 加载训练集
        transform_train = transforms.Compose([
            transforms.Grayscale(1),
            transforms.RandomHorizontalFlip(),  # 随机水平翻转
            transforms.RandomVerticalFlip(),  # 随机垂直翻转
            transforms.RandomRotation(360),  # 随机旋转
            transforms.ToTensor()
        ])
        train_data = datasets.ImageFolder(root=train_img_path.as_posix(), transform=transform_train)
        self.train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, pin_memory=True)

        # 加载测试集
        transform_test = transforms.Compose([transforms.Grayscale(1), transforms.ToTensor()])
        test_data = datasets.ImageFolder(root=test_img_path.as_posix(), transform=transform_test)
        self.test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, pin_memory=True)

        self.labels = [x for x, _ in sorted(test_data.class_to_idx.items(), key=lambda x: x[1])]

    def train(self):
        self.model.train()
        running_loss = 0
        for inputs, labels in self.train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device).float().view(-1, 1) + 1
            outputs = self.model(inputs) + 1
            self.optimizer.zero_grad()
            loss = self.loss_func(outputs, labels)
            loss.backward()
            self.optimizer.step()
            running_loss += loss.item() * inputs.size(0)
        self.scheduler.step()
        return running_loss / len(self.train_loader.dataset)

    def validate(self):
        self.model.eval()
        running_loss = 0
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device).float().view(-1, 1) + 1
                outputs = self.model(inputs) + 1
                loss = self.loss_func(outputs, labels)
                running_loss += loss.item() * inputs.size(0)

        return running_loss / len(self.test_loader.dataset)

    def run(self):
        with open(self.output_dir / 'log.txt', 'w') as log:
            log.write(f'Number of epochs: {num_epochs}\n')
            log.write(f'Initial learning rate: {initial_lr}\n')
            log.write(f'Batch size: {batch_size}\n')
            log.write(f'Optimizer: Adam\n')
            log.write(f'Scheduler: StepLR\n')
            log.write(f'Loss function: {self.loss_func}\n')
            log.write(f'Model: resnet50\n')
            log.write(f'Device: {device}\n\n')

            with tqdm(range(num_epochs), desc='Progress') as tbar:
                for epoch in range(num_epochs):
                    epoch_start = time.time()
                    train_loss = self.train()
                    valid_loss = self.validate()
                    time_elapsed = time.time() - epoch_start
                    current_lr = self.optimizer.param_groups[0]['lr']

                    log.write(f'Epoch {epoch+1:02d}/{num_epochs}, ')
                    log.write(f'Training Loss: {train_loss:.4f}, ')
                    log.write(f'Validation Loss: {valid_loss:.4f}, ')
                    log.write(f'lr:{current_lr}, ')
                    log.write(f'Time: {time_elapsed:.2f}s\n')
                    log.flush()
                    tbar.update()

            torch.save(self.model, self.output_dir / f'model.pth')
            overall_acc, average_loss = self.eval_accuracy()
            log.write(f'Overall accuracy: {overall_acc:.4f}\n')
            log.write(f'Average loss: {average_loss:.4f}\n')

    def eval_accuracy(self):
        # 精度评估
        difference = []
        weight = []
        losses = []

        self.model.eval()
        with torch.no_grad():
            for inputs, labels in self.test_loader:
                inputs = inputs.to(device)
                labels = labels.to(device).float().view(-1, 1) + 1
                outputs = self.model(inputs) + 1
                # 四舍五入到最近的整数, 将所有小于1的值设为1
                # outputs = outputs.round().clamp(min=1)
                outputs = outputs.clamp(min=1, max=28)

                difference.append((outputs - labels).abs().mean().item())
                losses.append(self.loss_func(outputs, labels).item())
                weight.append(inputs.size(0))

        # 计算精度
        overall_acc = np.average(difference)
        average_loss = np.average(losses, weights=weight)
        return overall_acc, average_loss

    def predict(self):
        pred_path = Path(r'C:\Users\xianyu\GraduationProject\UAV_YUNNAN_DATA\last_labels\pred\pred')
        dst_path = Path(r'C:\Users\xianyu\GraduationProject\UAV_YUNNAN_DATA\last_labels\output')

        self.model.eval()
        with torch.no_grad():
            for pic_path in pred_path.iterdir():
                pic = Image.open(str(pic_path))
                pic_tensor = torch.from_numpy(np.array(pic, dtype=np.float32)) / 255

                input = pic_tensor.unsqueeze(0).unsqueeze(0).to(device)
                output = round(self.model(input).item()) + 1

                pic.close()
                dst_path = Path(r'C:\Users\xianyu\GraduationProject\UAV_YUNNAN_DATA\last_labels\output')
                dst_path = dst_path / f'{output}_{pic_path.name}'

                shutil.copy(pic_path, dst_path)

In [5]:
# 因为任务是回归，所以将最后一层的输出维度设置为1
model = resnet50(weights=None, num_classes=1)
# 重置第一层卷积层，将输入由三通道变为单通道
model.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)

trainer = Trainer(model)
trainer.run()

Progress:  20%|██        | 4/20 [01:48<07:13, 27.10s/it]

In [None]:
# 精度评估
trainer.model.eval()

all_labels = torch.tensor([])
all_predictions = torch.tensor([])

with torch.no_grad():
    for inputs, labels in trainer.test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device).float().view(-1, 1) + 1
        outputs = trainer.model(inputs) + 1
        # outputs = outputs.round().clamp(min=1, max=28)
        outputs = outputs.clamp(min=1, max=28)

        all_labels = torch.cat((all_labels, labels.cpu()), dim=0)
        all_predictions = torch.cat((all_predictions, outputs.cpu()), dim=0)

# 生成混淆矩阵
cm = confusion_matrix(all_labels, all_predictions)

# 画混淆矩阵图
_, ax = plt.subplots(figsize=(10,10), dpi=300)
ax.matshow(cm, cmap=plt.cm.Reds)

for i in range(len(cm)):
    for j in range(len(cm)):
        plt.annotate(cm[j, i], xy=(i, j), horizontalalignment='center', verticalalignment='center')

plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.xticks(range(28), labels=trainer.labels)
plt.yticks(range(28), labels=trainer.labels)
plt.tight_layout()

pic_path = trainer.output_dir / 'confusion_matrix.png'
# plt.savefig(pic_path.as_posix())
plt.close()