In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torchvision

import PIL
from PIL import Image

import numpy as np
import matplotlib.pyplot as plt

import os
import re

from tqdm.auto import tqdm

In [None]:
import pandas as pd

In [None]:
from fold_actions import *

In [None]:
f_path = "data/fold_validation/"

In [None]:
IMAGE_SIZE = 28

In [None]:
USE_CUDA = True

In [None]:
class ClothImages(Dataset):
    def __init__(self, folder_path,  transforms=None, revert=False):
        self.revert = revert
        self.data_root_folder = folder_path
        self.image_list, self.action_list  = self.load_image_list(self.data_root_folder)
        self.transforms = transforms
        
    def load_image_list(self, folder_path):
        image_list = []
        action_list = None
        for dress_folder in os.listdir(folder_path):
            if dress_folder.endswith(".csv"):
                action_list = pd.read_csv(folder_path + dress_folder)
            else:
                dress_image_list = []
                image_folder = os.path.join(folder_path, dress_folder)
                order_image_list = os.listdir(image_folder)
                if self.revert:
                    order_image_list = order_image_list[::-1]
                for file_name in order_image_list:
                    #image_list.append(os.path.join(image_folder, file_name))
                    single_image_path = os.path.join(image_folder, file_name)
                    dress_image_list.append(single_image_path)

                image_list.append(dress_image_list)
        
        return image_list, action_list
    
    def __len__(self):
        return len(self.image_list)
    
    def __getitem__(self, index):
        img_list = [] 
        for dress_image_path in self.image_list[index]:
            #print(dress_image_path)
            # Open image
            img = PIL.Image.open(dress_image_path).convert("L")
            img = PIL.ImageOps.invert(img)
            #img = resize(img,(IMAGE_SIZE,IMAGE_SIZE))
            if self.transforms is not None:
                img = self.transforms(img)
            
            img_list.append(img)
            
        if self.transforms is None:
            return img_list
        
        img_tensor = torch.stack(img_list,dim=0)
        return img_tensor
        

In [None]:
# transforms = torchvision.transforms.Compose([
#     torchvision.transforms.Resize((IMAGE_SIZE,IMAGE_SIZE), interpolation= PIL.Image.NEAREST),
#     torchvision.transforms.RandomAffine(degrees = 90, translate = (0.2,0.2), scale = (0.6,1)),
#     #torchvision.transforms.ColorJitter(hue=.05, saturation=.05),
#     torchvision.transforms.RandomHorizontalFlip(),
#     torchvision.transforms.RandomVerticalFlip(),
#     # torchvision.transforms.RandomRotation(20, resample=PIL.Image.BILINEAR)
#     torchvision.transforms.ToTensor(),
# ])

In [None]:
ci = ClothImages(f_path, revert=False)

In [None]:
def conv3x3(in_channels, out_channels, **kwargs):
    # The convolutional layers (for feature extraction) use standard layers from
    # `torch.nn`, since they do not require adaptation.
    # See `examples/maml/model.py` for comparison.
    return nn.Sequential(
        nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, **kwargs),
        nn.BatchNorm2d(out_channels, momentum=1., track_running_stats=False),
        nn.ReLU(),
        nn.MaxPool2d(2)
    )

In [None]:
class ConvolutionalNeuralNetwork(nn.Module):
    def __init__(self, in_channels, out_features, hidden_size=64):
        super(ConvolutionalNeuralNetwork, self).__init__()
        self.in_channels = in_channels
        self.out_features = out_features
        self.hidden_size = hidden_size

        self.features = nn.Sequential(
            conv3x3(in_channels, hidden_size),
            conv3x3(hidden_size, hidden_size),
            conv3x3(hidden_size, hidden_size),
            conv3x3(hidden_size, hidden_size)
        )

        self.linear_aesthetics = nn.Sequential(
            nn.Linear(hidden_size, out_features),
            nn.Sigmoid(),
        )

    def forward(self, inputs, params=None):
        features = self.features(inputs)
        #print(features.shape)
        features = features.view((features.size(0), -1))
        #print(features.shape)
        scores = self.linear_aesthetics(features)
        return scores

In [None]:
model = ConvolutionalNeuralNetwork(1, 1, hidden_size=64)

In [None]:
if USE_CUDA and torch.cuda.is_available():
    model = model.cuda()

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
transforms_after = torchvision.transforms.Compose([
    torchvision.transforms.Resize((IMAGE_SIZE,IMAGE_SIZE),interpolation = PIL.Image.NEAREST),
    torchvision.transforms.RandomAffine(degrees = 90, translate = (0.2,0.2), scale = (0.6,1)),
    torchvision.transforms.ColorJitter(hue=.05, saturation=.05),
    torchvision.transforms.RandomHorizontalFlip(),
    torchvision.transforms.RandomVerticalFlip(),
    torchvision.transforms.RandomRotation(20, resample=PIL.Image.BILINEAR),
    torchvision.transforms.ToTensor(),
])


# Train

In [None]:
# def calculate_image_score(cloth_img: np.array, transform_img, model_img, use_cuda = False):
#     img = Image.fromarray(np.uint8(cloth_img * 255), 'L')
#     img_t = transform_img(img)
#     if use_cuda:
#         img_t = img_t.to("cuda")
#     img_t_score = model_img(img_t.unsqueeze(0))
    
#     return img_t_score

In [None]:
epochs = 100

In [None]:
for epoch in tqdm(range(epochs)):
    loss_epoch = []
    for i in range(len(ci.image_list)):
        for j in range(len(ci.image_list[i]) - 1):
            oriImage = read_cloth_image(ci.image_list[i][j], show_image=False)
            nextImage = read_cloth_image(ci.image_list[i][j + 1], show_image=False)

            curA, curS, curImage = fold_action_max(oriImage, model, transforms_after, use_cuda=USE_CUDA)
            
            # print(curImage.shape)
            expert_score = calculate_image_score(nextImage, transforms_after, model, USE_CUDA)
            sample_score = calculate_image_score(curImage, transforms_after, model, USE_CUDA)
            
            # print(expert_score)
            # print(sample_score)
            
            loss = - torch.log(0.0001 + expert_score) \
                    - torch.log(1.001 - sample_score)
            
            #loss = - expert_score + sample_score

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            loss_epoch.append(loss.item())
            
            #print(loss.item())
    print(np.mean(loss_epoch))

In [None]:
torch.save(model, "record/fold_gail.pth")

In [None]:
# img0 = read_cloth_image(ci.image_list[0][0])
# img1 = read_cloth_image(ci.image_list[0][1])

# oriImage = img0

# curA, curS, curImage = fold_action_max(oriImage, model, transforms_after)

# # calculate_image_score(img1, transforms_after, model)

# # calculate_image_score(curImage, transforms_after, model)

# loss = - calculate_image_score(img1, transforms_after, model) + calculate_image_score(curImage, transforms_after, model)

# optimizer.zero_grad()
# loss.backward()
# optimizer.step()

In [None]:
model2 = ConvolutionalNeuralNetwork(1, 1, hidden_size=64)

In [None]:
pretrained_dict = torch.load("10_29.pth")

model2_dict = model2.state_dict()
# 1. filter out unnecessary keys
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model2_dict}
# 2. overwrite entries in the existing state dict
model2_dict.update(pretrained_dict) 
# 3. load the new state dict
model2.load_state_dict(model2_dict)

if torch.cuda.is_available():
    model2 = model2.cuda()

optimizer2 = torch.optim.Adam(model2.parameters(), lr=1e-3)


In [None]:
for epoch in range(epochs):
    loss_epoch = []
    for i in tqdm(range(len(ci.image_list))):
        for j in range(len(ci.image_list[i]) - 1):
            oriImage = read_cloth_image(ci.image_list[i][j], show_image=False)
            nextImage = read_cloth_image(ci.image_list[i][j + 1], show_image=False)

            curA, curS, curImage = fold_action_max(oriImage, model2, transforms_after, use_cuda=USE_CUDA)

            # print(curImage.shape)
            expert_score = calculate_image_score(nextImage, transforms_after, model2, USE_CUDA)
            sample_score = calculate_image_score(curImage, transforms_after, model2, USE_CUDA)
            
            # print(expert_score)
            # print(sample_score)
            
            loss = - torch.log(0.0001 + expert_score) \
                    - torch.log(1.001 - sample_score)

            optimizer2.zero_grad()
            loss.backward()
            optimizer2.step()
            
            loss_epoch.append(loss.item())
            
            #print(loss.item())
    print(np.mean(loss_epoch))

In [None]:
# torch.save(model2, "record/fold_gail_pre.pth")

# Test |

In [None]:
model = torch.load("record/fold_gail.pth")
model2 = torch.load("record/fold_gail_pre.pth")

In [None]:
model.eval()
model2.eval()

In [None]:
model = model.cpu()
model2 = model2.cpu()

In [None]:
transforms_test = transforms_after

In [None]:
transforms_test= torchvision.transforms.Compose([
    torchvision.transforms.Resize((IMAGE_SIZE,IMAGE_SIZE),interpolation = PIL.Image.NEAREST),
    torchvision.transforms.RandomAffine(degrees = 0, translate = (0.0,0.0), scale = (0.8,0.8)),
    torchvision.transforms.ToTensor(),
])

In [None]:
cloth_folder = "C:\\Users\\Yizhou Zhao\\Desktop\\AI\\validation\\"

In [None]:
os.listdir(cloth_folder)

In [None]:
for file_name in os.listdir(cloth_folder):
    if file_name.endswith(".png"):
        print("-----------\n",file_name)
        cloth_name = file_name

        cloth_file = cloth_folder + cloth_name

        cloth_img = read_cloth_image(cloth_file)

        #plt.imshow(cloth_img, cmap="gray")

        img = Image.fromarray(np.uint8(cloth_img * 255) , 'L')

        t3 = transforms_test(img)

        print(t3.shape)

        plt.imshow(t3[0].data.numpy(), cmap="gray")

        print(model(t3.unsqueeze(0)).item(), model2(t3.unsqueeze(0)).item())

 # Rank

In [None]:
for index in range(6):
    for jj in [0,1]:
        if jj == 0:
            q_model = model
        else:
            q_model = model2

        cloth_img = read_cloth_image(ci.image_list[index][0])

        step = 0

        save_fold_folder = "data/user_study_step_1/" + "exampleB" + str(index) + str(jj)

        if not os.path.exists(save_fold_folder):
            os.mkdir(save_fold_folder)

        while step < 6:
            cloth_img_I = Image.fromarray(np.uint8(cloth_img * 255) , 'L')
            cloth_img_I = PIL.ImageOps.invert(cloth_img_I)
            display(cloth_img_I)
            cloth_img_I.save(save_fold_folder + "/" + str(step) + ".png")
            score, path_image_list = fold_action_max_beam_search(cloth_img, q_model, transforms_test, steps = 1)
        #     for i in range(len(path_image_list)):
        #         plt.imshow(path_image_list[i],cmap="gray")
        #         plt.show()
            if np.sum((cloth_img - path_image_list[-1])**2) > 5:
                cloth_img = path_image_list[-1]
            elif np.sum((cloth_img - path_image_list[0])**2) > 5:
                cloth_img = path_image_list[0]
            else:
                break

            step += 1

            print(step)

In [None]:
index = 0

In [None]:
q_model = model

In [None]:
# for j in range(len(ci.image_list[index])):
#     cloth_img = read_cloth_image(ci.image_list[index][j])
#     print(calculate_image_score(cloth_img, transforms_test, model))
#     print(calculate_image_score(cloth_img, transforms_test, model2))

In [None]:
cloth_img = read_cloth_image(ci.image_list[5][0])

In [None]:
step = 0

In [None]:
save_fold_folder = "data/user_study/" + "example0b"

In [None]:
if not os.path.exists(save_fold_folder):
    os.mkdir(save_fold_folder)

In [None]:
while step < 6:
    cloth_img_I = Image.fromarray(np.uint8(cloth_img * 255) , 'L')
    cloth_img_I = PIL.ImageOps.invert(cloth_img_I)
    display(cloth_img_I)
    cloth_img_I.save(save_fold_folder + "/" + str(step) + ".png")
    score, path_image_list = fold_action_max_beam_search(cloth_img, q_model, transforms_test, steps = 2)
#     for i in range(len(path_image_list)):
#         plt.imshow(path_image_list[i],cmap="gray")
#         plt.show()
    if np.sum((cloth_img - path_image_list[-1])**2) > 5:
        cloth_img = path_image_list[-1]
    elif np.sum((cloth_img - path_image_list[0])**2) > 5:
        cloth_img = path_image_list[0]
    else:
        break
    
    step += 1
    
    print(step)

In [None]:
cloth_img_I = Image.fromarray(np.uint8(cloth_img * 255) , 'L')
cloth_img_I = PIL.ImageOps.invert(cloth_img_I)
display(cloth_img_I)

In [None]:
print(step)

In [None]:
cloth_img_I.save(save_fold_folder + "/" + str(step) + ".png")

In [None]:
score, path_image_list = fold_action_max_beam_search(cloth_img, model, transforms_test, steps = 2)

In [None]:
for i in range(len(path_image_list)):
    plt.imshow(path_image_list[i],cmap="gray")
    plt.show()

In [None]:
if np.sum((cloth_img - path_image_list[-1])**2) > 5:
    cloth_img = path_image_list[-1]
elif np.sum((cloth_img - path_image_list[0])**2) > 5:
    cloth_img = path_image_list[0]
else:
    print("no renew")

In [None]:
cloth_img = path_image_list[-1]

In [None]:
step += 1