## DIP

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
from skimage.metrics import structural_similarity as compare_ssim

In [None]:
# 定義 Hourglass 模型和輔助函數
class Hourglass(nn.Module):
    def __init__(self):
        super(Hourglass, self).__init__()

        self.leaky_relu = nn.LeakyReLU()

        self.d_conv_1 = nn.Conv2d(2, 8, 5, stride=2, padding=2)
        self.d_bn_1 = nn.BatchNorm2d(8)

        self.d_conv_2 = nn.Conv2d(8, 16, 5, stride=2, padding=2)
        self.d_bn_2 = nn.BatchNorm2d(16)

        self.d_conv_3 = nn.Conv2d(16, 32, 5, stride=2, padding=2)
        self.d_bn_3 = nn.BatchNorm2d(32)
        self.s_conv_3 = nn.Conv2d(32, 4, 5, stride=1, padding=2)

        self.d_conv_4 = nn.Conv2d(32, 64, 5, stride=2, padding=2)
        self.d_bn_4 = nn.BatchNorm2d(64)
        self.s_conv_4 = nn.Conv2d(64, 4, 5, stride=1, padding=2)

        self.d_conv_5 = nn.Conv2d(64, 128, 5, stride=2, padding=2)
        self.d_bn_5 = nn.BatchNorm2d(128)
        self.s_conv_5 = nn.Conv2d(128, 4, 5, stride=1, padding=2)

        self.d_conv_6 = nn.Conv2d(128, 256, 5, stride=2, padding=2)
        self.d_bn_6 = nn.BatchNorm2d(256)

        self.u_deconv_5 = nn.ConvTranspose2d(256, 124, 4, stride=2, padding=1)
        self.u_bn_5 = nn.BatchNorm2d(128)

        self.u_deconv_4 = nn.ConvTranspose2d(128, 60, 4, stride=2, padding=1)
        self.u_bn_4 = nn.BatchNorm2d(64)

        self.u_deconv_3 = nn.ConvTranspose2d(64, 28, 4, stride=2, padding=1)
        self.u_bn_3 = nn.BatchNorm2d(32)

        self.u_deconv_2 = nn.ConvTranspose2d(32, 16, 4, stride=2, padding=1)
        self.u_bn_2 = nn.BatchNorm2d(16)

        self.u_deconv_1 = nn.ConvTranspose2d(16, 8, 4, stride=2, padding=1)
        self.u_bn_1 = nn.BatchNorm2d(8)

        self.out_deconv = nn.ConvTranspose2d(8, 3, 4, stride=2, padding=1)
        self.out_bn = nn.BatchNorm2d(3)

    def forward(self, noise):
        down_1 = self.d_conv_1(noise)
        down_1 = self.d_bn_1(down_1)
        down_1 = self.leaky_relu(down_1)

        down_2 = self.d_conv_2(down_1)
        down_2 = self.d_bn_2(down_2)
        down_2 = self.leaky_relu(down_2)

        down_3 = self.d_conv_3(down_2)
        down_3 = self.d_bn_3(down_3)
        down_3 = self.leaky_relu(down_3)
        skip_3 = self.s_conv_3(down_3)

        down_4 = self.d_conv_4(down_3)
        down_4 = self.d_bn_4(down_4)
        down_4 = self.leaky_relu(down_4)
        skip_4 = self.s_conv_4(down_4)

        down_5 = self.d_conv_5(down_4)
        down_5 = self.d_bn_5(down_5)
        down_5 = self.leaky_relu(down_5)
        skip_5 = self.s_conv_5(down_5)

        down_6 = self.d_conv_6(down_5)
        down_6 = self.d_bn_6(down_6)
        down_6 = self.leaky_relu(down_6)

        up_5 = self.u_deconv_5(down_6)
        up_5 = torch.cat([up_5, skip_5], 1)
        up_5 = self.u_bn_5(up_5)
        up_5 = self.leaky_relu(up_5)

        up_4 = self.u_deconv_4(up_5)
        up_4 = torch.cat([up_4, skip_4], 1)
        up_4 = self.u_bn_4(up_4)
        up_4 = self.leaky_relu(up_4)

        up_3 = self.u_deconv_3(up_4)
        up_3 = torch.cat([up_3, skip_3], 1)
        up_3 = self.u_bn_3(up_3)
        up_3 = self.leaky_relu(up_3)

        up_2 = self.u_deconv_2(up_3)
        up_2 = self.u_bn_2(up_2)
        up_2 = self.leaky_relu(up_2)

        up_1 = self.u_deconv_1(up_2)
        up_1 = self.u_bn_1(up_1)
        up_1 = self.leaky_relu(up_1)

        out = self.out_deconv(up_1)
        out = self.out_bn(out)
        out = nn.Sigmoid()(out)

        return out

In [None]:
def pixel_thanos(img, p=0.5):
    assert p > 0 and p < 1, 'The probability value should lie in (0, 1)'
    mask = torch.rand(img.shape[2], img.shape[3])
    img[:, :, mask < p] = 0
    mask = mask > p
    mask = mask.unsqueeze(0).repeat(1, 3, 1, 1)
    return img, mask

In [None]:
def add_noise(image, beta_schedule, t):
    noise = torch.randn_like(image)
    beta_t = beta_schedule[t]
    alpha_t = 1 - beta_t
    noisy_image = torch.sqrt(alpha_t) * image + torch.sqrt(1 - alpha_t) * noise
    return noisy_image, noise

In [None]:
def initialize_model_and_optimizer(lr=1e-2, device='mps'):
    model = Hourglass()
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr)
    return model, optimizer

In [None]:
def train_model(n_iter, model, optimizer, x, mask, z, n):
    mse = nn.MSELoss()
    losses = []
    ssim_scores = []
    images = []
    
    for i in range(n_iter):
        optimizer.zero_grad()
        
        y = model(z)
        loss = mse(x, y * mask)
        losses.append(loss.item())
        loss.backward()
        optimizer.step()

        with torch.no_grad():
            y_np = (y * mask)[0].cpu().detach().permute(1, 2, 0).numpy()
            x_np = (x * mask)[0].cpu().detach().permute(1, 2, 0).numpy()
            ssim_score = compare_ssim(x_np, y_np, win_size=7, channel_axis=2, data_range=1.0)
            ssim_scores.append(ssim_score)

        if (i + 1) % 25 == 0 or i == 0:
            with torch.no_grad():
                out = x + y * (~mask)
                out = out[0].cpu().detach().permute(1, 2, 0) * 255
                out = np.array(out, np.uint8)
                if i == 0 or (i + 1) % n == 0:
                    images.append(out)
        
        if (i + 1) % 100 == 0:
            print('Iteration: {} Loss: {:.07f} SSIM: {:.07f}'.format(i + 1, losses[-1], ssim_scores[-1]))
    
    return losses, images, ssim_scores


In [None]:
def load_and_preprocess_image(image_path):
    image = Image.open(image_path).resize((512, 512))
    to_tensor = transforms.ToTensor()
    input_tensor = to_tensor(image).unsqueeze(0)
    if input_tensor.shape[1] == 4:
        input_tensor = input_tensor[:, :3, :, :]
    return input_tensor

In [None]:
def generate_noisy_images(input_tensor, beta_schedules, schedule_names, num_images=10):
    stepsize = len(beta_schedules[0]) // num_images

    noisy_images_dict = {name: [] for name in schedule_names}

    fig, axs = plt.subplots(len(beta_schedules), num_images, figsize=(15, 10))

    for row_idx, betas in enumerate(beta_schedules):
        image = input_tensor.clone().to(device)
        for col_idx in range(num_images):
            t = col_idx * stepsize
            noisy_image, noise = add_noise(image, betas, t)
            noisy_image_clipped = torch.clamp(noisy_image, 0, 1)
            axs[row_idx, col_idx].imshow(noisy_image_clipped[0].cpu().permute(1, 2, 0).numpy())
            axs[row_idx, col_idx].axis('off')
            if col_idx == 0:
                axs[row_idx, col_idx].set_title(schedule_names[row_idx])
            
            noisy_images_dict[schedule_names[row_idx]].append(noisy_image)

    plt.tight_layout()
    plt.show()

    return noisy_images_dict

In [None]:
def train_and_visualize(image_path):
    # 加載並處理圖像
    input_tensor = load_and_preprocess_image(image_path)

    # 設置 beta schedules
    T = 1000
    lin_betas = torch.linspace(0.0001, 1.0, T)
    cos_betas = 1.0 * (1 - torch.cos(torch.linspace(0, np.pi / 2, T)))
    qud_betas = torch.linspace(0.0001, 1.0, T) ** 2
    sig_betas = torch.sigmoid(torch.linspace(-6, 6, T)) * 1.0

    beta_schedules = [lin_betas, cos_betas, qud_betas, sig_betas]
    schedule_names = ['Linear', 'Cosine', 'Quadratic', 'Sigmoid']

    # 生成噪聲圖像
    noisy_images_dict = generate_noisy_images(input_tensor, beta_schedules, schedule_names)

    # 初始化模型和優化器
    model1, optimizer1 = initialize_model_and_optimizer(lr, device)
    model2, optimizer2 = initialize_model_and_optimizer(lr, device)
    model3, optimizer3 = initialize_model_and_optimizer(lr, device)
    model4, optimizer4 = initialize_model_and_optimizer(lr, device)
    model5, optimizer5 = initialize_model_and_optimizer(lr, device)

    # 訓練模型 1
    x, mask = pixel_thanos(input_tensor, 0.8)
    z1 = torch.empty((1, 2, 512, 512)).normal_().to(device)
    mask = mask.to(device)
    x = x.to(device)
    losses1, images1, ssim1 = train_model(n_iter1, model1, optimizer1, x, mask, z1, 200)

    # 顯示模型 1 的結果
    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 5, 5))
    for idx, img in enumerate(images1[:num_images]):
        axes[idx].imshow(img.astype(np.uint8))
        axes[idx].set_title(f'Iteration {200 * (idx + 1)} - Original')
        axes[idx].axis('off')
    plt.tight_layout()
    plt.show()

    # 訓練模型 2 並顯示結果（Linear）
    linear_noisy_images = noisy_images_dict['Linear']
    linear_noisy_images.reverse()

    processed_images = []
    processed_losses = []

    for noisy_image in linear_noisy_images:
        x, mask = pixel_thanos(noisy_image, 0.8)
        mask = mask.to(device)
        x = x.to(device)
        losses2, images2, ssim2 = train_model(n_iter2, model2, optimizer2, x, mask, z1, 200)
        processed_losses.extend(losses2)
        processed_images.append(images2[-1])

    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 5, 5))
    for idx, img in enumerate(processed_images):
        axes[idx].imshow(img.astype(np.uint8))
        axes[idx].set_title(f'Processed Image {idx + 1} - Linear')
        axes[idx].axis('off')
    plt.tight_layout()
    plt.show()

    # 訓練模型 3 並顯示結果（Cosine）
    cosine_noisy_images = noisy_images_dict['Cosine']
    cosine_noisy_images.reverse()

    processed_images_cosine = []
    processed_losses_cosine = []

    for noisy_image in cosine_noisy_images:
        x, mask = pixel_thanos(noisy_image, 0.8)
        mask = mask.to(device)
        x = x.to(device)
        losses3, images3, ssim3 = train_model(n_iter2, model3, optimizer3, x, mask, z1, 200)
        processed_losses_cosine.extend(losses3)
        processed_images_cosine.append(images3[-1])

    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 5, 5))
    for idx, img in enumerate(processed_images_cosine):
        axes[idx].imshow(img.astype(np.uint8))
        axes[idx].set_title(f'Processed Image {idx + 1} - Cosine')
        axes[idx].axis('off')
    plt.tight_layout()
    plt.show()

    # 訓練模型 4 並顯示結果（Quadratic）
    quadratic_noisy_images = noisy_images_dict['Quadratic']
    quadratic_noisy_images.reverse()

    processed_images_quadratic = []
    processed_losses_quadratic = []

    for noisy_image in quadratic_noisy_images:
        x, mask = pixel_thanos(noisy_image, 0.8)
        mask = mask.to(device)
        x = x.to(device)
        losses4, images4, ssim4 = train_model(n_iter2, model4, optimizer4, x, mask, z1, 200)
        processed_losses_quadratic.extend(losses4)
        processed_images_quadratic.append(images4[-1])

    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 5, 5))
    for idx, img in enumerate(processed_images_quadratic):
        axes[idx].imshow(img.astype(np.uint8))
        axes[idx].set_title(f'Processed Image {idx + 1} - Quadratic')
        axes[idx].axis('off')
    plt.tight_layout()
    plt.show()

    # 訓練模型 5 並顯示結果（Sigmoid）
    sigmoid_noisy_images = noisy_images_dict['Sigmoid']
    sigmoid_noisy_images.reverse()

    processed_images_sigmoid = []
    processed_losses_sigmoid = []

    for noisy_image in sigmoid_noisy_images:
        x, mask = pixel_thanos(noisy_image, 0.8)
        mask = mask.to(device)
        x = x.to(device)
        losses5, images5, ssim5 = train_model(n_iter2, model5, optimizer5, x, mask, z1, 200)
        processed_losses_sigmoid.extend(losses5)
        processed_images_sigmoid.append(images5[-1])

    fig, axes = plt.subplots(1, num_images, figsize=(num_images * 5, 5))
    for idx, img in enumerate(processed_images_sigmoid):
        axes[idx].imshow(img.astype(np.uint8))
        axes[idx].set_title(f'Processed Image {idx + 1} - Sigmoid')
        axes[idx].axis('off')
    plt.tight_layout()
    plt.show()

    # 繪製損失曲線
    # fig, axes = plt.subplots(1, 2, figsize=(12, 6))
    # axes[0].plot(losses1, color='skyblue')
    # axes[0].set_title('Losses for Model 1')
    # axes[0].set_xlabel('Iteration')
    # axes[0].set_ylabel('Loss')

    # axes[1].plot(processed_losses, color='pink')
    # axes[1].set_title('Losses for Model 2')
    # axes[1].set_xlabel('Iteration')
    # axes[1].set_ylabel('Loss')

    # plt.tight_layout()
    # plt.show()

    fig, ax = plt.subplots(figsize=(12, 6))
    ax.plot(losses1, label='Model 1', color='skyblue')
    ax.plot(processed_losses, label='Model 2 (Linear)', color='pink')
    ax.plot(processed_losses_cosine, label='Model 3 (Cosine)', color='orange')
    ax.plot(processed_losses_quadratic, label='Model 4 (Quadratic)', color='green')
    ax.plot(processed_losses_sigmoid, label='Model 5 (Sigmoid)', color='red')
    ax.set_title('Losses for All Models')
    ax.set_xlabel('Iteration')
    ax.set_ylabel('Loss')
    ax.legend()
    plt.tight_layout()
    plt.show()

In [None]:
# 設置參數
lr = 1e-2
device = 'mps' if torch.backends.mps.is_available() else 'cpu'
n_iter1 = 2000
n_iter2 = 200
num_images = 10

In [None]:
image_path = 'loopy.jpeg' 
train_and_visualize(image_path)

In [None]:
losses1, images1, ssim1 = train_model(n_iter1, model1, optimizer1, x, mask, z1, 200)

In [None]:
image_path = 'cat.jpeg' 
train_and_visualize(image_path)

In [None]:
image_path = 'a.jpeg' 
train_and_visualize(image_path)

In [None]:
!pip freeze > requirements.txt