In [50]:
import glob
import cv2
import numpy as np
import torch
from torch.utils.data import Dataset, IterableDataset, DataLoader
from torchvision import transforms

from torch.utils import data

import torch
from torch.utils.data import Dataset
import numpy as np
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

In [51]:
class FaceDataset(Dataset):
    def __init__(self, root_dir, set, component, transform):
        self.root_dir = root_dir
        self.set = set
        self.component = component
        self.transform = transform

        self.total_imgs, self.total_values = self.get_data_list()
        self.total_values = np.array(self.total_values).astype(np.float)
        self.gt_mean = np.mean(self.total_values)
        

    def __len__(self):
        return len(self.total_imgs)

    def __getitem__(self, idx):
        img_loc = self.total_imgs[idx]
        try:
            image = Image.open(open(img_loc, 'rb')).convert("RGB")
        except:
            print(img_loc, 'was not loaded')

        # Normalize the image data
        image = self.transform(image)
        image = torch.tensor((image.numpy() - np.mean(image.numpy())) / np.std(image.numpy()))

        value = self.total_values[idx]

        return image, value

    def get_data_list(self):
        image_list, value_list = [], []
        img_path = self.root_dir + "Data\\Image\\"
        anno_path = self.root_dir + "Data\\Annotation\\" + self.set + self.component

        print('Anno file: ', anno_path)

        f = open(anno_path,'r')
        for i,line in enumerate(f):
            splitted_line = line.split(',')
            image_list.append(img_path+splitted_line[0])
            value_list.append(splitted_line[1])
        f.close()

        print(len(image_list), len(value_list))

        return image_list, value_list

In [62]:
class FaceDataLoader():
    def __init__(self, datadir, set, component, batch_size, n_workers, shuffle):
        self.datadir = datadir
        self.set = set
        self.component = component
        self.batch_size = batch_size
        self.n_workers = n_workers
        self.shuffle = shuffle
        self.trans = transforms.Compose([
            transforms.Pad((10, 0, 10, 0)),
            transforms.ToTensor()
        ])

    def get_data_loader(self):
        dataset = FaceDataset(self.datadir, self.set, self.component, self.trans)
        data_loader = data.DataLoader(
            dataset,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
            num_workers=self.n_workers,
            drop_last=True,
            pin_memory=True
        )

        print('\ndata len:', dataset.__len__(), '\n')
        return data_loader

In [57]:
import torch
from torch import tensor
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from torchvision import models, transforms
from sklearn.metrics import r2_score
import numpy as np

class CNNmodel(pl.LightningModule):
    def __init__(self, batch_size=6):
        super().__init__()

        self.batch_size = batch_size

        self.predictions, self.ground_truths = [],[]

        self.encoder = torch.nn.Sequential(
            *list(models.vgg16(pretrained=True).children())[:-2],
        )

        self.valueDecoder = torch.nn.Sequential(
            nn.Flatten(),
            # Fully connected layer
            nn.Linear(512*4*4, 4608),
            nn.Linear(4608, 2048),
            nn.LeakyReLU(0.2),
            nn.Linear(2048, 512),
            nn.LeakyReLU(0.2),
            nn.Linear(512, 32),
            nn.LeakyReLU(0.2),
            nn.Linear(32, 1),
            # output: 1
        )

        self.faceDecoder = torch.nn.Sequential(
            nn.ConvTranspose2d(512, 3, kernel_size=3, stride=2),
            nn.ConvTranspose2d(3, 3, kernel_size=3, stride=2),
            nn.ConvTranspose2d(3, 3, kernel_size=3, stride=2),
            nn.ConvTranspose2d(3, 3, kernel_size=3, stride=2,),
            # output = 3 x 95 x 95
        )

        # self.weights_init(self.encoder)
        # self.weights_init(self.hm3d_xy_Decoder)
        # self.weights_init(self.hm3d_yz_Decoder)
        # self.weights_init(self.hm3d_xz_Decoder)

    def init_layer(self, layer, classname):
        if classname.find('Conv') != -1:
            nn.init.xavier_uniform_(layer.weight)
        elif classname.find('Linear') != -1:
            nn.init.xavier_uniform_(layer.weight)
        elif classname.find('BatchNorm') != -1:
            layer.weight.data.fill_(1)
            layer.bias.data.zero_()
        elif classname.find('Bottle') != -1:
            nn.init.xavier_uniform_(layer.conv1.weight)
            nn.init.xavier_uniform_(layer.conv2.weight)
            nn.init.xavier_uniform_(layer.conv3.weight)

    def weights_init(self, model, is_resnet=False):
        if is_resnet:
            for layer in model:
                classname = layer.__class__.__name__
                if 'Seq' in classname:
                    for subLayer in layer:
                        subclassname = subLayer.__class__.__name__
                        self.init_layer(subLayer, subclassname)
                else:
                    self.init_layer(layer, classname)
        else:
            for layer in model:
                classname = layer.__class__.__name__
                self.init_layer(layer, classname)

    def forward(self, x):
        # print('img size:', x.size())
        x_enout = self.encoder(x)
        x_valout = self.valueDecoder(x_enout)
        x_faceout = self.faceDecoder(x_enout)
        # print('x: ', x.size(), ', x_valout: ', x_valout.size(), ', x_faceout: ', x_faceout.size()) 

        return x, x_valout, x_faceout

    def training_step(self, batch, batch_idx):
        image, value = batch
        tensor_value = torch.zeros(self.batch_size, 1).cuda()
        for i in range(self.batch_size):
            tensor_value[i][0] = value[i]
        x, x_val, x_face = self.forward(image)

        lambda_value = 10
        lambda_face = 0.001

        # print('x_val:', x_val.size(), 'tensor_value:', tensor_value.size())
        loss_value = self.cust_loss(x_val, tensor_value) * lambda_value

        new_gt = self.resize_gt(image.cuda(), x_face.size())
        loss_face = F.mse_loss(x_face, new_gt, reduction='sum') * lambda_face

        loss = loss_value + loss_face

        self.log_dict({
            'L': loss,
            'L_v': loss_value,
            'L_face': loss_face,
        }, prog_bar=True)

        return loss

    def validation_step(self, batch, batch_idx):
        image, value = batch
        tensor_value = torch.zeros(self.batch_size, 1).cuda()
        for i in range(self.batch_size):
            tensor_value[i] = value[i]
        x, x_val, x_face = self.forward(image)

        val_loss = F.mse_loss(x_val, tensor_value, reduction='sum') * 10

        self.log('val_loss', val_loss)

    def test_step(self, batch, batch_idx):
        image, value = batch
        tensor_value = torch.zeros(self.batch_size, 1).cuda()
        for i in range(self.batch_size):
            tensor_value[i][0] = value[i]
        x, x_val, x_face = self.forward(image)

        x_val = x_val.cpu().detach().numpy()
        tensor_value = tensor_value.cpu().detach().numpy()

        for i in range(self.batch_size):
            self.ground_truths.append(tensor_value[i])
            self.predictions.append(x_val[i])
        
        loss_r2 = r2_score(x_val, tensor_value)

        self.log_dict({
            'R2': loss_r2,
        }, prog_bar=True)


    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-6)
        return optimizer

    def cust_loss(self, pred, gt):
        loss = 0
        for i in range(self.batch_size):
            tmp = gt[i][0] - pred[i][0]
            if tmp > 0:
                loss += tmp**2
            else:
                loss += (tmp * 2) ** 2

        return loss
    
    def r2_loss(self, pred, gt, gt_mean):
        # m = torch.ones(gt.size()) * gt_mean
        ss_tot = torch.sum((gt - gt_mean) ** 2)
        ss_res = torch.sum((gt - pred) ** 2)
        r2 = 1 - ss_res / ss_tot
        return r2
    
    def resize_gt(self, gt, pred_size):
        new_gt = torch.zeros(pred_size).cuda()
        trans = transforms.Compose([
            transforms.Resize(pred_size[-1]),
        ])

        for i in range(self.batch_size):
            new_gt[i] = trans(gt[i])

        return new_gt

In [58]:
model = CNNmodel()

In [None]:
data_dir = "C:\Users\hinds\CSResearch\"
set = 'validate'
component = 'Age' 
batch_size = 64
n_workers = 0
shuffle = True
train_loader = FaceDataLoader(data_dir,set,component,batch_size,n_workers,shuffle).get_data_loader()
train_iter = iter(train_loader)

In [None]:
from laplace import Laplace

In [None]:
for batch_idx, data in enumerate(train_loader):
    print('batch {}, shape {}'.format(batch_idx, data.shape))

In [None]:
# import pandas as pd
# from pathlib import Path

# root = 'C:\\Users\\hinds\\CSResearch\\Data\\Image'
# my_csv_file = ...

# # Loading csv as {image:class,...} format
# df = pd.read_csv(my_csv_file).set_index('images')
# class_dict = df.idxmax(axis="columns").to_dict()

# # Moving files to class-named subfolders
# for path in Path(root).iterdir():
#     if path.is_file() and path.name in class_dict.keys():
#         path.rename(Path(path.parent, class_dict[path.name], path.name)

# # Loading dataset
# dataset = torchvision.datasets.ImageFolder(root=root, transform=None)

In [None]:
crit = torch.nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=0.1)

for X,y in train_loader:
    loss = crit(model(X), y)
    loss.backward()
    opt.step()

#evaluating already trained model. 

In [None]:
la.fit(train_loader)