In [5]:
import os
from torch.utils.data import Dataset
from torchvision import transforms
from PIL import Image
import random
import torch
import numpy as np


class SRDataset(Dataset):

    def __init__(self, data_path, crop_size, scaling_factor):
        # 参数：图片路径、裁剪尺寸、放大倍数
        self.data_path = data_path
        self.crop_size = int(crop_size)
        self.scaling_factor = int(scaling_factor)
        self.images_path = []


        # 读取图像路径
        for name in os.listdir(self.data_path):
            self.images_path.append(os.path.join(self.data_path, name))

        # 数据处理方式
        self.pre_trans = transforms.Compose([
            transforms.RandomCrop(self.crop_size),
            transforms.RandomHorizontalFlip(),
            transforms.RandomVerticalFlip(),
        ])

        self.input_transform = transforms.Compose([
            transforms.Resize(self.crop_size // self.scaling_factor),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]),
        ])

        self.target_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5], std=[0.5]),
        ])

    def __getitem__(self, i):
        # 读取图像
        img = Image.open(self.images_path[i]).convert('RGB')
        img = self.pre_trans(img)

        lr_img = self.input_transform(img)
        hr_img = self.target_transform(img.copy())

        return lr_img, hr_img

    def __len__(self):
        return len(self.images_path)

In [6]:
import torch
import torch.nn as nn
from torch.autograd import Function

class LayerNormFunction(Function):

    @staticmethod
    def forward(ctx, x, weight, bias, eps):
        ctx.eps = eps
        N, C, H, W = x.shape
        mu = x.mean(dim=1, keepdim=True)
        var = ((x - mu) ** 2).mean(dim=1, keepdim=True)
        y = (x - mu) / torch.sqrt(var + eps)
        ctx.save_for_backward(y, var, weight)
        y = weight.view(1, C, 1, 1) * y + bias.view(1, C, 1, 1)
        return y

    @staticmethod
    def backward(ctx, grad_output):
        eps = ctx.eps
        N, C, H, W = grad_output.shape
        y, var, weight = ctx.saved_tensors
        g = grad_output * weight.view(1, C, 1, 1)
        mean_g = g.mean(dim=1, keepdim=True)
        mean_gy = (g * y).mean(dim=1, keepdim=True)
        gx = 1. / torch.sqrt(var + eps) * (g - y * mean_gy - mean_g)
        return gx, (grad_output * y).sum(dim=[0, 2, 3]), grad_output.sum(dim=[0, 2, 3]), None


class LayerNorm2d(nn.Module):
    def __init__(self, channels, eps=1e-6):
        super(LayerNorm2d, self).__init__()
        self.weight = nn.Parameter(torch.ones(channels))
        self.bias = nn.Parameter(torch.zeros(channels))
        self.eps = eps

    def forward(self, x):
        return LayerNormFunction.apply(x, self.weight, self.bias, self.eps)

class SimpleGate(nn.Module):
    def forward(self, x):
        x1, x2 = x.chunk(2, dim=1)
        return x1 * x2

# 特征通道数
n_feat = 256
kernel_size = 3

class SimpleGate(nn.Module):
    def forward(self, x):
        x1, x2 = x.chunk(2, dim=1)
        return x1 * x2

class NAFBlock(nn.Module):
    def __init__(self, c, DW_Expand=2, FFN_Expand=2, drop_out_rate=0.):
        super().__init__()
        dw_channel = c * DW_Expand
        self.conv1 = nn.Conv2d(in_channels=c, out_channels=dw_channel, kernel_size=1, padding=0, stride=1, groups=1, bias=True)
        self.conv2 = nn.Conv2d(in_channels=dw_channel, out_channels=dw_channel, kernel_size=3, padding=1, stride=1, groups=dw_channel,
                               bias=True)
        self.conv3 = nn.Conv2d(in_channels=dw_channel // 2, out_channels=c, kernel_size=1, padding=0, stride=1, groups=1, bias=True)
        
        # Simplified Channel Attention
        self.sca = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Conv2d(in_channels=dw_channel // 2, out_channels=dw_channel // 2, kernel_size=1, padding=0, stride=1,
                      groups=1, bias=True),
        )

        # SimpleGate
        self.sg = SimpleGate()

        ffn_channel = FFN_Expand * c
        self.conv4 = nn.Conv2d(in_channels=c, out_channels=ffn_channel, kernel_size=1, padding=0, stride=1, groups=1, bias=True)
        self.conv5 = nn.Conv2d(in_channels=ffn_channel // 2, out_channels=c, kernel_size=1, padding=0, stride=1, groups=1, bias=True)

        self.norm1 = LayerNorm2d(c)
        self.norm2 = LayerNorm2d(c)

        self.dropout1 = nn.Dropout(drop_out_rate) if drop_out_rate > 0. else nn.Identity()
        self.dropout2 = nn.Dropout(drop_out_rate) if drop_out_rate > 0. else nn.Identity()

        self.beta = nn.Parameter(torch.zeros((1, c, 1, 1)), requires_grad=True)
        self.gamma = nn.Parameter(torch.zeros((1, c, 1, 1)), requires_grad=True)

    def forward(self, inp):
        x = inp

        x = self.norm1(x)

        x = self.conv1(x)
        x = self.conv2(x)
        x = self.sg(x)
        x = x * self.sca(x)
        x = self.conv3(x)

        x = self.dropout1(x)

        y = inp + x * self.beta

        x = self.conv4(self.norm2(y))
        x = self.sg(x)
        x = self.conv5(x)

        x = self.dropout2(x)

        return y + x * self.gamma


class EDSR(nn.Module):
    def __init__(self):
        super(EDSR, self).__init__()

        in_ch = 3
        num_blocks = 32
        c = 256
        self.conv1 = nn.Conv2d(in_ch, n_feat, kernel_size, padding=1)

        # nafblock
        self.body = self.make_layer(NAFBlock, num_blocks, c)
        # 扩大
        self.conv_up = nn.Conv2d(n_feat, n_feat * 4, kernel_size, padding=1)
        self.conv_out = nn.Conv2d(n_feat, in_ch, kernel_size, padding=1)
        # resblock
        # self.body = self.make_layer(_Res_Block, num_blocks)

        # 上采样
        self.upsample = nn.Sequential(self.conv_up, nn.PixelShuffle(2))

    # 32个naf块
    def make_layer(self, block, layers, c):
        naf_block = []
        for _ in range(layers):
            naf_block.append(block(c))
        return nn.Sequential(*naf_block)

    def forward(self, x):
        
        out = self.conv1(x)
        out = self.body(out)
        out = self.upsample(out)
        out = self.conv_out(out)

        return out

In [None]:
import os
from math import log10
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import warnings

if __name__ == '__main__':
    warnings.filterwarnings("ignore", category=Warning)
    
    train_path = 'train/DIV2K_train_HR'
    test_path = 'DIV2K_test'

    crop_size = 96  # 、图像裁剪尺寸
    scaling_factor = 2  # 放大倍数

    # 学习参数
    checkpoint_dir = 'checkpoints'  # 模型保存路径及预训练模型路径
    # 定义保存的文件路径，包括文件名
    checkpoint_path = os.path.join(checkpoint_dir, 'edsr_NAF_1.pth')
    batch_size = 16
    start_epoch = 0 
    epochs = 5000
    workers = 4 
    lr = 1e-4 

     # 检查GPU是否可用
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f'Using device: {device}')

    # 先前的psnr
    pre_psnr = 0

    try:
        model = torch.load(checkpoint_path)
        model = model.to(device)
        print('加载先前模型成功')
    except:
        print('未加载原有模型训练')
        model = EDSR()
        model = model.to(device)

    # 初始化优化器
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.99)

    # 采用L1损失
    criterion = nn.L1Loss()

    train_dataset = SRDataset(train_path, crop_size, scaling_factor)
    test_dataset = SRDataset(test_path, crop_size, scaling_factor)

    train_loader = DataLoader(train_dataset,
                              batch_size=batch_size,
                              shuffle=True,
                              num_workers=workers,
                              )

    test_loader = DataLoader(test_dataset,
                             batch_size=batch_size,
                             shuffle=False,
                             num_workers=workers,
                             )

    for epoch in range(start_epoch, epochs + 1):

        model.train()  # 训练模式
        train_loss = 0
        n_iter_train = len(train_loader)
        train_psnr = 0
        # 按批处理
        for i, (lr_imgs, hr_imgs) in enumerate(train_loader):
            lr_imgs = lr_imgs.to(device)
            hr_imgs = hr_imgs.to(device)

            sr_imgs = model(lr_imgs)
            loss = criterion(sr_imgs, hr_imgs)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            psnr = 10 * log10(1 / loss.item())
            train_psnr += psnr

        epoch_loss_train = train_loss / n_iter_train
        train_psnr = train_psnr / n_iter_train

        print(f"Epoch {epoch}. Training loss: {epoch_loss_train} Train psnr {train_psnr} dB")

        model.eval()  # 测试模式
        test_loss = 0
        all_psnr = 0
        n_iter_test = len(test_loader)

        with torch.no_grad():
            for i, (lr_imgs, hr_imgs) in enumerate(test_loader):
                lr_imgs = lr_imgs.to(device)
                hr_imgs = hr_imgs.to(device)

                sr_imgs = model(lr_imgs)
                loss = criterion(sr_imgs, hr_imgs)

                psnr = 10 * log10(1 / loss.item())
                all_psnr += psnr
                test_loss += loss.item()

        epoch_loss_test = test_loss / n_iter_test
        epoch_psnr = all_psnr / n_iter_test

        print(f"Epoch {epoch}. Testing loss: {epoch_loss_test} Test psnr{epoch_psnr} dB")

        if epoch_psnr > pre_psnr:
            torch.save(model, checkpoint_path)
            pre_psnr = epoch_psnr
            print('模型更新成功')

        scheduler.step()

Using device: cuda
未加载原有模型训练
Epoch 0. Training loss: 0.03867540670264708 Train psnr 15.928738709123412 dB
Epoch 0. Testing loss: 0.015908514814717428 Test psnr18.008697173181258 dB
模型更新成功
Epoch 1. Training loss: 0.009657988892541382 Train psnr 20.371816356816158 dB
Epoch 1. Testing loss: 0.009120427803801638 Test psnr20.6491820647525 dB
模型更新成功
Epoch 2. Training loss: 0.0068338366282548295 Train psnr 21.919655623267765 dB
Epoch 2. Testing loss: 0.008024026440190417 Test psnr21.186508869185428 dB
模型更新成功
Epoch 3. Training loss: 0.005833972792811038 Train psnr 22.617893712662543 dB
Epoch 3. Testing loss: 0.007229125925472805 Test psnr21.553039817498618 dB
模型更新成功
Epoch 4. Training loss: 0.005696794590014115 Train psnr 22.696025414497264 dB
Epoch 4. Testing loss: 0.006495171426130193 Test psnr22.176533437532353 dB
模型更新成功
Epoch 5. Training loss: 0.005629008637920937 Train psnr 22.940318346511567 dB
Epoch 5. Testing loss: 0.006089778617024422 Test psnr22.412512902310308 dB
模型更新成功
Epoch 6. Trai

In [7]:
from PIL import Image
import torchvision.transforms as transforms
import torch
import numpy as np
import matplotlib.pyplot as plt

# 检查GPU是否可用
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

imgO = Image.open('0811x2.png').convert("RGB")  # 测试图片的路径

transform = transforms.Compose([
    transforms.ToTensor(),
])

test_path='test'
ds=SRDataset(train_path,96,2)
l,h=ds[1]

# img = input_transform(imgO).unsqueeze(0).to(device)
img = transform(imgO).unsqueeze(0).to(device)  # 将图像移到GPU

# 导入模型
net = torch.load("checkpoints/edsr_NAF_1.pth").to(device)  # 将模型移到GPU
net.eval()  # 设置模型为评估模式

with torch.no_grad():  # 不计算梯度
    source = net(img)[0, :, :, :]

source = source.cpu().detach().numpy()  # 转为numpy并移到CPU
source = source.transpose((1, 2, 0))  # 切换形状
source = np.clip(source, 0, 1)  # 修正图片
img = Image.fromarray(np.uint8(source * 255))

plt.figure(figsize=(9, 9))
plt.subplot(1, 2, 1)
plt.imshow(imgO)
plt.title('input')
plt.subplot(1, 2, 2)
plt.imshow(img)
plt.title('output')
plt.show()

img.save('./result_NAF.png')


Using device: cuda


NameError: name 'train_path' is not defined