In [13]:
import torch
import cv2
import os
import glob
from torch.utils.data import Dataset
import random
import numpy as np
import json

from torchvision import transforms
from PIL import Image
from torch.utils.data import DataLoader
from torchvision.utils import save_image

class Gopro_Loader(Dataset):
    def __init__(self, data_path, transform=None):
        self.data_path = data_path
        self.transform = transform
        
        # 讀取所有影像的檔名
        self.imgs = glob.glob(os.path.join(data_path, 'blur/*.png'))

    def __getitem__(self, idx):
        # 讀取影像
        img_path = self.imgs[idx]
        label_path = img_path.replace("blur", "sharp")
        
        # 讀取標籤
        img = Image.open(img_path)
        label = Image.open(label_path)
        
        if self.transform:
            img = self.transform(img)
            label = self.transform(label)
        return img, label

    def __len__(self):
        return len(self.imgs)

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class double_convolution(nn.Module):
    #(convolution => [BN] => ReLU) * 2
    def __init__(self, in_channels, out_channels):
        super(double_convolution,self).__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)
    

class Down(nn.Module):
    #Downscaling with maxpool then double conv

    def __init__(self, in_channels, out_channels):
        super(Down,self).__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            double_convolution(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    #Upscaling then double conv
    def __init__(self, in_channels, out_channels):
        super(Up, self).__init__()
        self.up = nn.ConvTranspose2d(in_channels, out_channels, kernel_size=2, stride=2)
        self.conv = double_convolution(in_channels, out_channels)
        self.in_channels = in_channels

    def forward(self, x, x1):
        x = self.up(x)
        x1 = F.interpolate(x1, size=(x.size()[2], x.size()[3]))

        x = torch.cat([x1, x], dim=1)
        return self.conv(x)

In [4]:
class UNet(torch.nn.Module):
    def __init__(self):
        super(UNet, self).__init__()
        self.conv_block = double_convolution(3, 64)
        self.maxpool = nn.MaxPool2d(kernel_size=2)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 1024)
        self.up1 = Up(1024, 512)
        self.up2 = Up(512, 256)
        self.up3 = Up(256, 128)
        self.up4 = Up(128, 64)
        self.out = nn.Conv2d(64, 3, kernel_size=1, stride=1, padding=0)

        
    def forward(self, x):
#         print("x shape:", x.shape)
        
        x1 = self.conv_block(x)
#         print("x1 shape:", x1.shape)
        x2 = self.down1(x1)
#         print("x2 shape:", x2.shape)
        x3 = self.down2(x2)
#         print("x3 shape:", x3.shape)
        x4 = self.down3(x3)
#         print("x4 shape:", x4.shape)
        x5 = self.down4(x4)
#         print("x5 shape:", x5.shape)
        x = self.up1(x5, x4)
#         print("x shape:", x.shape)
        
        x = self.up2(x, x3)
#         print("x shape:", x.shape)
        x = self.up3(x, x2)
#         print("x shape:", x.shape)
        x = self.up4(x, x1)
#         print("x shape:", x.shape)
        x = self.out(x)
#         print("x shape:", x.shape)
        return x

In [5]:
model = UNet()
# 创建一个大小为 [1, 3, 256, 256] 的张量
input_tensor = torch.rand(1, 3, 368, 640)

# 将张量传递给模型进行前向传播
output_tensor = model(input_tensor)

In [10]:
import torch
import torch.nn as nn
import torch.optim as optim
from pytorch_msssim import ssim, ms_ssim, SSIM, MS_SSIM

def train_net(net, train_loader, val_loader, num_epochs=10, learning_rate=0.001, device='cpu', save_mode=''):
    # 将模型移动到指定设备
    net.to(device)
    
    # 定义损失函数和优化器
    criterion = MS_SSIM(data_range=1, size_average=True, channel=3).cuda() # channel=1 for grayscale images
    optimizer = optim.Adam(net.parameters(), lr=learning_rate)
    
    best_loss = float('inf')
    loss_record = []
    acc_record = []
    
    # 训练模型
    for epoch in range(num_epochs):
        net.train() # 将模型设为训练模式
        train_loss = 0.0
        
        for i, data in enumerate(train_loader):
            inputs, labels = data[0].to(device), data[1].to(device)
            
            # 将梯度清零
            optimizer.zero_grad()
            
            # 前向传播，计算预测结果和损失
            outputs = net(inputs)
            loss = 1 - criterion(outputs, labels)
            
            # 保存loss值最小的网络参数
            if loss < best_loss:
                best_loss = loss
                torch.save(net.state_dict(), f'{save_mode}/best_model.pth')
            
            # 反向传播，更新参数
            loss.backward()
            optimizer.step()
            
            # 统计损失
            train_loss += loss.item()
            loss_record.append(loss.item())
            
            acc = ms_ssim(labels, outputs, data_range=1)
            acc_record.append(acc.item())
            
        torch.save(net.state_dict(), f'{save_mode}/epoch_{epoch}_model.pth')
        
        # 在验证集上测试模型，并记录损失
        net.eval() # 将模型设为测试模式
        val_loss = 0.0
        with torch.no_grad():
            for i, data in enumerate(val_loader):
                inputs, labels = data[0].to(device), data[1].to(device)
                
                # 前向传播，计算预测结果和损失
                outputs = net(inputs)
                loss = criterion(outputs, labels)
                
                # 统计损失
                val_loss += loss.item()
                
        save_image(outputs, f'dataset_gopro/test/epoch_{epoch}.png')
        # 打印每个epoch的损失
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_loader)}, Val Loss: {val_loss/len(val_loader)}")
        
    return acc_record, loss_record

def save_metrics_to_json(metrics, filepath):
    with open(filepath, "w") as f:
        json.dump(metrics, f)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

batch_size = 4
net = UNet()

if torch.cuda.device_count() > 1:
    print ("multi GPUs")
    net = nn.DataParallel(net,device_ids = [0, 1])

transform_train = transforms.Compose([
#     transforms.RandomCrop(size=((256,256))),
#     transforms.RandomVerticalFlip(p = 0.5),
    transforms.Resize((368, 640)),
    transforms.ToTensor(),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
])
# 指定训练集地址，开始训练
train_data_path = "dataset_gopro/train"
gopro_dataset = Gopro_Loader(data_path=train_data_path, transform=transform_train)
train_loader = torch.utils.data.DataLoader(
                                        dataset=gopro_dataset,
                                        batch_size=batch_size, 
                                        shuffle=True)

test_data_path = "dataset_gopro/eval"
gopro_test_dataset = Gopro_Loader(data_path=test_data_path, transform=transform_test)
val_loader = torch.utils.data.DataLoader(
                                        dataset=gopro_test_dataset,
                                        batch_size=1, 
                                        shuffle=True)

acc_record, loss_record = train_net(net, train_loader, val_loader, num_epochs=100, learning_rate=0.001, device=device, save_model='CNN_HW')

multi GPUs




Epoch 1/100, Train Loss: 0.10607190515632343, Val Loss: 0.8433551812171936
Epoch 2/100, Train Loss: 0.0864009564356911, Val Loss: 0.8483907395601272
Epoch 3/100, Train Loss: 0.08402821057455201, Val Loss: 0.8495020988583565
Epoch 4/100, Train Loss: 0.08305414730771224, Val Loss: 0.8482861316204071
Epoch 5/100, Train Loss: 0.08110552951879335, Val Loss: 0.8504818809032441
Epoch 6/100, Train Loss: 0.08000567369627537, Val Loss: 0.8516104653477669
Epoch 7/100, Train Loss: 0.07910770385937203, Val Loss: 0.8513755306601525
Epoch 8/100, Train Loss: 0.07779405806724568, Val Loss: 0.8539001449942589
Epoch 9/100, Train Loss: 0.0770530879051608, Val Loss: 0.8549953383207322
Epoch 10/100, Train Loss: 0.07714683768755183, Val Loss: 0.8544245940446854
Epoch 11/100, Train Loss: 0.07506853104232256, Val Loss: 0.8512030106782913
Epoch 12/100, Train Loss: 0.07420470782943497, Val Loss: 0.8560220447182655
Epoch 13/100, Train Loss: 0.07326092312758106, Val Loss: 0.8560174956917763
Epoch 14/100, Train Los

In [14]:

metrics = {"accuracy": acc_record, "loss": loss_record}
save_metrics_to_json(metrics, 'CNN_HW/metrics.json')

In [18]:
def test_model(net, device, epoch_num, test_loader):
    
    net.load_state_dict(torch.load(f'CNN_HW/{epoch_num}model.pth', map_location=device))
    # 测试模式
    net.eval()

    criterion = MS_SSIM(data_range=1, size_average=True, channel=3).cuda() # reuse the gaussian kernel with SSIM & MS_SSIM.

    count = 0
    for x, y in test_loader:
        x, y = x.to(device=device), y.to(device=device)
        with torch.no_grad():
            pred = net(x)
            save_image(pred, f'dataset_gopro/test/{epoch_num}{count}.png')
        count += 1

In [17]:
# 选择设备，有cuda用cuda，没有就用cpu
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

net = UNet()

if torch.cuda.device_count() > 1:
    print ("multi GPUs")
    net = nn.DataParallel(net,device_ids = [0, 1])
net.to(device=device)

# 加载模型参数
epoch_num = 'epoch_0_'

transform = transforms.Compose([
#     transforms.Resize((368, 640)),
    transforms.ToTensor(),
])
test_dataset = Gopro_Loader(data_path='dataset_gopro/test', transform=transform)
test_loader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)

test_model(net, device, epoch_num, test_loader)

multi GPUs


KeyboardInterrupt: 

In [11]:


# 选择设备，有cuda用cuda，没有就用cpu
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载网络，图片三通道，分类为1。
net = UNet()
# 将网络拷贝到device中

if torch.cuda.device_count() > 1:
    print ("multi GPUs")
    net = nn.DataParallel(net,device_ids = [0, 1])
net.to(device=device)

# 加载模型参数
net.load_state_dict(torch.load('CNN_HW/epoch_0_model.pth', map_location=device))
# 测试模式
net.eval()

loss_record = []
acc_record = []
tmp_pred = torch.zeros(1)
tmp_y = torch.zeros(1)
tmp_x = torch.zeros(1)
tmp_acc = 0

transform = transforms.Compose([
#     transforms.Resize((368, 640)),
    transforms.ToTensor(),
])
test_dataset = Gopro_Loader(data_path='dataset_gopro/test', transform=transform)
test_loader = DataLoader(dataset=test_dataset, batch_size=1, shuffle=False)

criterion = MS_SSIM(data_range=1, size_average=True, channel=3).cuda() # reuse the gaussian kernel with SSIM & MS_SSIM.

count = 0
for x, y in test_loader:
    x, y = x.to(device=device), y.to(device=device)
    with torch.no_grad():
        pred = net(x)
        save_image(pred, f'dataset_gopro/test/{count}.png')
    count += 1


multi GPUs


KeyboardInterrupt: 

In [7]:
predii

tensor([[[[-0.0054, -0.1675, -0.1609,  ..., -0.1774, -0.0273,  0.0629],
          [-0.2080, -0.4057, -0.4016,  ..., -0.3591, -0.1425,  0.0419],
          [-0.1860, -0.3849, -0.3467,  ..., -0.2360, -0.0016,  0.1465],
          ...,
          [ 0.2849,  0.3475,  0.4042,  ...,  0.6581,  0.5788,  0.4309],
          [ 0.2858,  0.3381,  0.3899,  ...,  0.5763,  0.4839,  0.3844],
          [ 0.2304,  0.2728,  0.3027,  ...,  0.3843,  0.3445,  0.3091]],

         [[ 0.4628,  0.5244,  0.5164,  ...,  0.5609,  0.4948,  0.4322],
          [ 0.5276,  0.6672,  0.6536,  ...,  0.7334,  0.5765,  0.4565],
          [ 0.5093,  0.6151,  0.5571,  ...,  0.6167,  0.4612,  0.4102],
          ...,
          [ 0.3115,  0.2754,  0.2659,  ...,  0.1753,  0.1963,  0.2625],
          [ 0.3308,  0.2858,  0.2752,  ...,  0.1958,  0.2114,  0.2851],
          [ 0.3538,  0.3198,  0.2944,  ...,  0.2435,  0.2623,  0.3201]],

         [[ 0.4125,  0.4656,  0.4395,  ...,  0.4606,  0.4262,  0.4089],
          [ 0.4661,  0.5883,  

In [6]:
from torch.utils.data import DataLoader

# 选择设备，有cuda用cuda，没有就用cpu
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 加载网络，图片三通道，分类为1。
net = UNet()
# 将网络拷贝到device中
net.to(device=device)
# 加载模型参数
net.load_state_dict(torch.load('best_model.pth', map_location=device))
# 测试模式
net.eval()

tests_path = glob.glob('dataset_gopro/test/blur/*.png')
print (tests_path)

# 遍历所有图片
for test_path in tests_path:
    # 保存结果地址
    save_res_path = test_path.split('.')[0] + '_res.png'
    # 读取图片
    img = cv2.imread(test_path)
    print (img.shape)
    # 转为batch为1，通道为1，大小为512*512的数组
    img = cv2.resize(img, (256, 256), interpolation=cv2.INTER_AREA)
    img = img.reshape(1, 3, img.shape[0], img.shape[1])
    print (img.shape)
    # 转为tensor
    img_tensor = torch.from_numpy(img)
    
    # 将数据拷贝到device中
    image = img_tensor.to(device=device, dtype=torch.float32)
    # 使用网络参数，输出预测结果
    pred = net(image)
    print (pred.shape)
    
    pred = pred.detach().cpu().numpy()

    print (pred.shape)
    # 保存图片
    save_res_path = test_path.split('.')[0] + '_res.png'
    cv2.imwrite(save_res_path, pred[0].transpose(1, 2, 0).astype('uint8'))

['dataset_gopro/test/blur\\GOPR0871_11_01_000181.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000182.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000183.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000184.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000185.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000186.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000187.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000188.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000189.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000190.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000191.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000192.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000193.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000194.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000195.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000196.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000197.png', 'dataset_gopro/test/blur\\GOPR0871_11_01_000198.png', 'dataset_gopro/test/blur\\G

(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
to

(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
torch.Size([1, 3, 256, 256])
(1, 3, 256, 256)
(720, 1280, 3)
(1, 3, 256, 256)
x shape: torch.Size([1, 3, 256, 256])
to

KeyboardInterrupt: 