In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

In [None]:
!pip install pytorch_lightning
!pip install torchtext
!pip install matplotlib
!pip install torchvision
!pip install torch
!pip install tqdm
!pip install timm
!pip install scikit_learn
!pip install pandas
!pip install opencv_contrib_python
!pip install opencv_python_headless
! pip install torchsummary
! pip install einops
!pip install wandb

In [None]:
from pathlib import Path
import os

# setup path to data folder
data_path = Path("/content/drive/MyDrive/data")
# print(data_path)
image_path = data_path / "image-rv-70k-float"
print(image_path)

# If the image folder doesn't exist, dowload it and prepare it...
if image_path.is_dir():
  print(f"{image_path} directory exists. ")
else:
  print(f"Did not find {image_path} directory, creating one...")
  image_path.mkdir(parents=True, exist_ok=True)

In [None]:

# import zipfile
# with zipfile.ZipFile(os.path.join(data_path, "/content/drive/MyDrive/Data_NAVER_Image_Review/data70k_float.zip"), "r") as zip_ref:
#   print("Unzipping data....")
#   zip_ref.extractall(image_path)



In [None]:
 # Helper function
import os
images = []
labels = []
def walk_through_dir(dir_path):
  '''
    walk through dir_path, returning its contents
    Args:
      dir_path(str or pathlib.Path): target directory
    Return
      A orint out of:
          number of subdirctories in dir_path
          number of image (files) in each subdirectory
          number of each subdirectory
  '''
  for dirpath, dirnames, filenames in os.walk(dir_path):
    index = dirpath.rindex('/')
    number_string = dirpath[index+1:]
    print(number_string)
    if number_string == 'train' or number_string == 'test':
      pass
    else:
      labels.append(float(dirpath[index+1:]))
      images.append(len(filenames))
    # print(f"There are {len(dirnames)} directoires and {len(filenames)} images in '{dirpath}'")


In [None]:
walk_through_dir("/content/drive/MyDrive/Data_NAVER_Image_Review/image-rv-70k-float/data70k/train")

In [None]:
import matplotlib.pyplot as plt

# Dữ liệu nhãn và số lượng tương ứng
# labels = [0, 1, 2, 3, 4]
# counts = [2659, 6976, 3881, 473, 11]

# Màu sắc cho cột
# colors = ['#FF6F61', '#6B5B95', '#88B04B', '#955251', '#B565A7']

# Vẽ biểu đồ cột với màu sắc
plt.bar(labels, images)

# Đặt tên cho trục x và y
plt.xlabel('Label')
plt.ylabel('Number of image')

# Đặt tiêu đề cho biểu đồ
plt.title('Distribution of test set')

# Hiển thị biểu đồ
plt.show()

In [None]:
import torch
from torch import nn
import torchvision
from torchsummary import summary
from torchvision.models.feature_extraction import create_feature_extractor
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import cv2
from albumentations.pytorch import ToTensorV2
import albumentations as A

import timm
from pytorch_lightning.utilities.model_summary import ModelSummary, summarize
from pytorch_lightning.tuner.tuning import Tuner
from einops.layers.torch import Rearrange
from torchmetrics import ConfusionMatrix
from scipy.ndimage import gaussian_filter1d, convolve1d
from scipy.signal.windows import triang

from torchvision import datasets, transforms, models
from PIL import Image

from torch.utils.data import DataLoader, random_split
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from typing import Optional
import pytorch_lightning as pl
from pytorch_lightning import loggers as pl_loggers
from pytorch_lightning.callbacks import ModelCheckpoint, RichProgressBar

from scipy.io import loadmat
from collections import OrderedDict

import os
from os import listdir
from os.path import isfile, join
import shutil

import seaborn as sns
%matplotlib inline

from IPython.display import display




In [None]:
def get_lds_kernel_window(kernel, ks, sigma):
    assert kernel in ['gaussian', 'triang', 'laplace']
    half_ks = (ks - 1) // 2
    if kernel == 'gaussian':
        base_kernel = [0.] * half_ks + [1.] + [0.] * half_ks
        kernel_window = gaussian_filter1d(base_kernel, sigma=sigma) / max(gaussian_filter1d(base_kernel, sigma=sigma))
    elif kernel == 'triang':
        kernel_window = triang(ks)
    else:
        laplace = lambda x: np.exp(-abs(x) / sigma) / (2. * sigma)
        kernel_window = list(map(laplace, np.arange(-half_ks, half_ks + 1))) / max(map(laplace, np.arange(-half_ks, half_ks + 1)))

    return kernel_window

class Naver2Dataset(Dataset):
    def __init__(
        self, root_dir, mode = 'train',
        transform = None,
        reweight = "sqrt_inv", resolution = 0.1,
        max_target=51, lds=False, lds_kernel='gaussian', lds_ks=5, lds_sigma=2,
        label_type = 'int',
    ):
        super().__init__()
        print("Collecting data at: ", root_dir)
        self.root_dir = f"{root_dir}/{mode}" #  + mode + '/'
        self.transform = transform
        self.mode = mode
        self.img_names, self.labels = [], []
        # self.set_labels = [0.0]
        self.set_labels = sorted([float(folder.name) for folder in os.scandir(self.root_dir) if folder.is_dir()])
        self.label_type = label_type
        self.lds = lds

        for label in self.set_labels:
            for name in os.listdir(f"{self.root_dir}/{label}"):
                self.img_names.append(name)
                self.labels.append(float(label))

        # self.img_names = self.img_names[0: 20]
        # self.labels = self.labels[0: 20]
        if label_type == 'int':
            self.set_labels = sorted([round(label) for label in self.set_labels])

        # if mode == "train":
        self.weights = self._prepare_weights(reweight, resolution = resolution,
                                             max_target=max_target,
                                             lds=lds, lds_kernel=lds_kernel, lds_ks=lds_ks, lds_sigma=lds_sigma)



    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        label = self.labels[idx]
        image = Image.open(f"{self.root_dir}/{label}/{self.img_names[idx]}")
        if self.label_type == 'int':
            label = round(self.labels[idx])

        if self.transform:
            image = self.transform(image)

        label = torch.FloatTensor([label])
        if self.lds == True:
            weight = torch.FloatTensor([self.weights[idx]])
            return image, label, weight
        else:
            return image, label


    def _prepare_weights(self, reweight, resolution = 0.1, max_target=51, lds=False, lds_kernel='gaussian', lds_ks=5, lds_sigma=2):
        assert reweight in {'none', 'inverse', 'sqrt_inv'}
        assert reweight != 'none' if lds else True, \
            "Set reweight to \'sqrt_inv\' (default) or \'inverse\' when using LDS"
        print(max_target)


        labels = self.labels # self.data[:, -1].tolist()
        labels = list(map(lambda x: float(format(round(x / resolution) * resolution, '.1f')), labels))
        value_dict = {x: 0 for x in set(labels)}
        set_labels_smooth = set(labels)
        # value_dict = {round(x, 1): 0 for x in np.arange(0, 4 + resolution, resolution)}

        # mbr
        for label in labels:
            if self.label_type == 'int':
                label = round(label)
            value_dict[label] += 1



        print("value_dict after statistics: ")
        self.distribution = value_dict
        print(value_dict)
        print("-----------------------------")



        if lds:
            set_labels_smooth = list(set(labels))
            if reweight == 'sqrt_inv':
                value_dict = {k: np.sqrt(v) for k, v in value_dict.items()}
            elif reweight == 'inverse':
                value_dict = {k: np.clip(v, 5, 1000) for k, v in value_dict.items()}  # clip weights for inverse re-weight

            num_per_label = [value_dict[label] for label in labels]
            if not len(num_per_label) or reweight == 'none':
                return None
            print(f"Using re-weighting: [{reweight.upper()}]")

            lds_kernel_window = get_lds_kernel_window(lds_kernel, lds_ks, lds_sigma)
            print(f'Using LDS: [{lds_kernel.upper()}] ({lds_ks}/{lds_sigma})')
            smoothed_value = convolve1d(
                np.asarray([v for _, v in value_dict.items()]), weights=lds_kernel_window, mode='constant')
            smoothed_value_dict = {label: smoothed_value[i] for i, label in enumerate(set_labels_smooth)}
            num_per_label = [smoothed_value_dict[label] for label in labels]

            weights = [np.float32(1 / x) for x in num_per_label]
            scaling = len(weights) / np.sum(weights)
            weights = [scaling * x for x in weights]
            return weights

In [None]:
from __future__ import annotations
class NaverDataModule(pl.LightningDataModule):
    def __init__(self, batch_size = 128, root_dir = '/content/drive/MyDrive/data/image-rv-70k-round/data70k'):
        super().__init__()
        self.root_dir = root_dir

        self.batch_size = batch_size
        self.train_transform = transforms.Compose([transforms.ToTensor(),
                                                   transforms.Resize((224, 224)),
                                                   transforms.RandomHorizontalFlip(p = 0.5),
                                                   transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))])

        self.test_transform = transforms.Compose([# transforms.Resize((384, 288)),
                                                  transforms.Resize((224, 224)),
                                                  transforms.ToTensor(),
                                                  transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))])

    def setup(self, stage: Optional[str] = None):
        if stage == 'fit' or stage is None:
            self.train_set = Naver2Dataset(root_dir = self.root_dir, mode = 'train', transform = self.train_transform, lds = False, label_type = 'float')
            self.test_set = Naver2Dataset(root_dir = self.root_dir, mode = 'test', transform = self.test_transform,
                                          lds = False, label_type = "float")


        if stage == "test" or stage is None:
            self.test_set = Naver2Dataset(root_dir = self.root_dir, mode = 'test', transform = self.test_transform,
                                          lds = False, label_type = "float")

        if stage == 'predict' or stage is None:
            self.predict_set = Naver2Dataset(root_dir = self.root_dir, mode = 'test', transform = self.test_transform,
                                             lds = False, label_type = "float")


    def train_dataloader(self):
        return DataLoader(
            dataset  = self.train_set,
            batch_size = self.batch_size,
            shuffle = True,
            num_workers = 12,
            persistent_workers = True,
            # pin_memory = True
        )

    def val_dataloader(self):
        return DataLoader(
            dataset = self.test_set,
            batch_size = self.batch_size,
            num_workers = 12,
            persistent_workers = True,
            # pin_memory = True,
        )

    def test_dataloader(self):
        return DataLoader(
            dataset = self.test_set,
            batch_size = self.batch_size,
            num_workers = 12,
            persistent_workers = True,
            # pin_memory = True,
        )

    def predict_dataloader(self):
        return DataLoader(
            dataset = self.predict_set,
            batch_size = self.batch_size,
            num_workers = 12,
            persistent_workers = True,
            # pin_memory = True,
        )

In [None]:
# datamodule = NaverDataModule(root_dir = "/content/drive/MyDrive/data/image-rv-70k-float/data70k")
# datamodule.setup('test')
# test_dataset = datamodule.test_dataloader().dataset

In [None]:

import torch
import torch.nn as nn

class SABlock(nn.Module):

    def __init__(self, kernel_size=7):
        super().__init__()

        self.kernel_size = kernel_size

        self.conv2d = nn.Conv2d(2, 1, kernel_size, padding=kernel_size // 2, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs):
        avg_pool = torch.mean(inputs, dim=1, keepdim=True)
        max_pool, _ = torch.max(inputs, dim=1, keepdim=True)

        x = torch.cat([avg_pool, max_pool], dim=1)
        x = self.conv2d(x)
        x = self.sigmoid(x)

        return x * inputs

class LCABlock(nn.Module):
    def __init__(self, in_channels):
        super().__init__()

        self.depthwise = nn.Conv2d(
            in_channels=in_channels,
            out_channels=in_channels,
            kernel_size=3,
            stride=1,
            padding=1,
            groups=in_channels,
            bias=False
        )

        self.gamma = nn.Parameter(torch.zeros(1, 1, 1, in_channels))
        self.beta = nn.Parameter(torch.zeros(1, 1, 1, in_channels))

    def forward(self, x):
        x = self.depthwise(x)
        x = x.permute(0, 2, 3, 1)
        group_norm_x = torch.norm(x, p=2, dim=(1, 2), keepdim=True)
        relative_important_x = group_norm_x / (group_norm_x.mean(dim=-1, keepdim=True) + 1e-6)
        x = self.gamma * (x * relative_important_x) + self.beta + x
        x = x.permute(0, 3, 1, 2)

        return x

class CABlock(nn.Module):

    def __init__(self, in_planes, ratio=16):
        super().__init__()

        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        self.block = nn.Sequential(nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False),
                               nn.ReLU(),
                               nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False))

        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        avg_out = self.block(self.avg_pool(x))
        max_out = self.block(self.max_pool(x))
        out = avg_out + max_out

        return self.sigmoid(out) * x

#BackBones

#Ghost Module

In [None]:
import torch
import torch.nn as nn

class SeperableGhostModule(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        stride,
        bias=False
    ):
        super().__init__()


        self.primary = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=in_channels,
                kernel_size=3,
                stride=1,
                padding=1,
                groups=in_channels,
                bias=bias
            ),
            nn.BatchNorm2d(in_channels),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=out_channels//2,
                kernel_size=1,
                stride=1,
                bias=bias
            ),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU()
        )

        self.cheap = nn.Sequential(
            nn.Conv2d(
                in_channels=out_channels//2,
                out_channels=out_channels//2,
                kernel_size=kernel_size,
                stride=stride,
                padding=kernel_size//2,
                groups=out_channels//2,
                bias=bias
            ),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU()
        )

    def forward(self, x):
        x = self.primary(x)
        x_cheap = self.cheap(x)
        x = torch.concat([x, x_cheap], axis=1)
        return x

class GhostModule(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        stride,
        bias=False
    ):
        super().__init__()

        self.primary = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=out_channels//2,
                kernel_size=1,
                stride=1,
                bias=bias
            ),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU()
        )

        self.cheap = nn.Sequential(
            nn.Conv2d(
                in_channels=out_channels//2,
                out_channels=out_channels//2,
                kernel_size=kernel_size,
                stride=stride,
                padding=kernel_size//2,
                groups=out_channels//2,
                bias=bias
            ),
            nn.BatchNorm2d(out_channels//2),
            nn.ReLU()
        )

    def forward(self, x):
        x = self.primary(x)
        x_cheap = self.cheap(x)
        x = torch.concat([x, x_cheap], axis=1)
        return x

#Sperable Block

In [None]:
import torch
import torch.nn as nn

class SeperableConv2d(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        stride,
        padding,
        bias=False
    ):
        super().__init__()
        pass


#Loss

In [None]:
import torch
import torch.nn as nn

class RFCLoss(nn.Module):
    def __init__(self, alpha=10, min_delta=0.47, loss=nn.L1Loss(reduction = 'none')):
        super().__init__()

        self.loss = loss
        self.alpha = alpha
        self.min_delta = min_delta

    def forward(self, y_pred, y_true):
        mae_loss = torch.abs(y_pred - y_true)
        weights = 1/(1 + torch.exp(self.alpha * (self.min_delta - mae_loss)))
        loss = weights * self.loss(y_pred, y_true)

        return torch.mean(loss).cuda()

#DPD Block

In [None]:
from timm.models.layers import DropPath
class DPDBlockV1(nn.Module):
    def __init__(
        self,
        in_channels,
        expand_channels,
        freeze_channels,
        is_downsize=False
    ):
        super().__init__()

        self.in_channels = in_channels
        self.expand_channels = self._make_divisible(expand_channels, in_channels)
        self.freeze_channels = freeze_channels
        self.is_downsize = is_downsize

        self.depthwise_expand = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.expand_channels,
                kernel_size=3,
                stride=2 if self.is_downsize else 1,
                padding=1,
                groups=self.in_channels,
                bias=False
            ),
            nn.BatchNorm2d(self.expand_channels),
            nn.ReLU()
        )

        self.pointwise = nn.Sequential(
            CABlock(in_channels=self.expand_channels, ratio=8),
            nn.Conv2d(
                in_channels=self.expand_channels,
                out_channels=self.freeze_channels,
                kernel_size=1,
                stride=1,
                bias=False
            ),
            nn.BatchNorm2d(self.freeze_channels),
            nn.ReLU()
        )

        self.depthwise_freeze = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.freeze_channels,
                out_channels=self.freeze_channels,
                kernel_size=3,
                stride=2 if self.is_downsize else 1,
                padding=1,
                groups=self.freeze_channels,
                bias=False
            ),
            nn.BatchNorm2d(self.freeze_channels),
            nn.ReLU()
        )

        self.skip_att = SABlock()

        self._initialize_weights()

    def forward(self, x):
        inputs = x
        x = self.depthwise_expand(x)
        x = self.pointwise(x)
        x = self.depthwise_freeze(x)

        if not(self.is_downsize) and x.shape == inputs.shape:
            inputs = self.skip_att(inputs)
            x = inputs + x

        return x

    def _make_divisible(self, out_channels, groups):
        ratio = out_channels // groups
        return int(groups * ratio)

    def _initialize_weights(self):
        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out")
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

class DPDBlockV3(nn.Module):
    def __init__(
        self,
        in_channels,
        expand_channels,
        freeze_channels,
        is_downsize=False
    ):
        super().__init__()

        self.in_channels = in_channels
        self.expand_channels = self._make_divisible(expand_channels, in_channels)
        self.freeze_channels = freeze_channels
        self.is_downsize = is_downsize

        self.depthwise_expand = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.expand_channels,
                kernel_size=3,
                stride=2 if self.is_downsize else 1,
                padding=1,
                groups=self.in_channels,
                bias=True
            ),
            nn.BatchNorm2d(self.expand_channels),
            nn.ReLU()
        )

        self.pointwise = nn.Sequential(
            LCABlock(in_channels=self.expand_channels),
            SeperableGhostModule(
                in_channels=self.expand_channels,
                out_channels=self.freeze_channels,
                kernel_size=1,
                stride=1,
                bias=True
            )
        )

        self.depthwise_freeze = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.freeze_channels,
                out_channels=self.freeze_channels,
                kernel_size=3,
                stride=1,
                padding=1,
                groups=self.freeze_channels,
                bias=True
            ),
            nn.BatchNorm2d(self.freeze_channels),
        )

        self.drop_path = DropPath(0.2)

        self._initialize_weights()

    def forward(self, x):
        inputs = x
        x = self.depthwise_expand(x)
        x = self.pointwise(x)
        x = self.depthwise_freeze(x)

        if not(self.is_downsize) and x.shape == inputs.shape:
            x = x + self.drop_path(inputs)

        return x

    def _make_divisible(self, out_channels, groups):
        ratio = out_channels // groups
        return int(groups * ratio)

    def _initialize_weights(self):
        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out")
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)


class DPDBlockV2(nn.Module):
    def __init__(
        self,
        in_channels,
        expand_channels,
        freeze_channels,
        is_downsize=False
    ):
        super().__init__()

        self.in_channels = in_channels
        self.expand_channels = self._make_divisible(expand_channels, in_channels)
        self.freeze_channels = freeze_channels
        self.is_downsize = is_downsize

        self.depthwise_expand = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.expand_channels,
                kernel_size=3,
                stride=2 if self.is_downsize else 1,
                padding=1,
                groups=self.in_channels,
                bias=False
            ),
            nn.BatchNorm2d(self.expand_channels),
            nn.ReLU()
        )

        self.pointwise = nn.Sequential(
            LCABlock(in_channels=self.expand_channels),
            nn.Conv2d(
                in_channels=self.expand_channels,
                out_channels=self.freeze_channels,
                kernel_size=1,
                stride=1,
                bias=False
            ),
            nn.BatchNorm2d(self.freeze_channels),
            nn.ReLU()
        )

        self.depthwise_freeze = nn.Sequential(
            SABlock(),
            nn.Conv2d(
                in_channels=self.freeze_channels,
                out_channels=self.freeze_channels,
                kernel_size=3,
                stride=2 if self.is_downsize else 1,
                padding=1,
                groups=self.freeze_channels,
                bias=False
            ),
            nn.BatchNorm2d(self.freeze_channels),
        )

        self.drop_path = DropPath(0.2)

        self._initialize_weights()

    def forward(self, x):
        inputs = x
        x = self.depthwise_expand(x)
        x = self.pointwise(x)
        x = self.depthwise_freeze(x)

        if not(self.is_downsize) and x.shape == inputs.shape:
            x = x + self.drop_path(inputs)

        return x

    def _make_divisible(self, out_channels, groups):
        ratio = out_channels // groups
        return int(groups * ratio)

    def _initialize_weights(self):
        # weight initialization
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode="fan_out")
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.zeros_(m.bias)

In [None]:
class ActivationFunction(nn.Module):
    def __init__(self):
        super().__init__()
        self.name = self.__class__.__name__
        self.config = {"name": self.name}
class ReLU_04(ActivationFunction):
    def forward(self, x):
        x = x * (x > 0).float()
        # * (x > 0) if true, return 1, therefore x * 1 = x, x <= 0 true then x > 0 = 1 then it is 0
        # If x >=4 then (x >= 4) = 1 and (x < 4) = 0; then it choose one of x or 4
        return ((x >= 4) * 4 + (x < 4) * x).float()

#PL_Trainer

In [None]:

class DMGNet(pl.LightningModule):
    def __init__(self, lds = True):
        super().__init__()
        self.lds = lds
        self.configure_criterion()
        self.backbone = BackboneFactory.create_model("mobile_torchvision_13")
#         for param in self.backbone.parameters():
#             param.require_grad = False

#         for module in self.backbone.modules():
#             if isinstance(module, nn.BatchNorm2d):
#                 if hasattr(module, 'weight'):
#                     module.weight.requires_grad_(False)
#                 if hasattr(module, 'bias'):
#                     module.bias.requires_grad_(False)
# #                 module.affine = False
#                 module.eval()

        self.init_conv = nn.Sequential(
            nn.Conv2d(
                in_channels=576,
                out_channels=64,
                kernel_size=1,
                stride=1,
                bias=False
            ),
            nn.BatchNorm2d(64)
        )

        self.dpd_blocks = nn.Sequential(
            DPDBlockV3(64, 384, 96, False),
            DPDBlockV3(96, 576, 160, True),
            DPDBlockV3(160, 960, 160, False),
            DPDBlockV3(160, 960, 320, False)
        )

        self.smooth_conv = nn.Sequential(
            nn.Conv2d(
                in_channels=320,
                out_channels=256,
                kernel_size=1,
                stride=1,
                bias=False
            ),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )

        self.gap = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.3)
        )

        self.classifier = nn.Sequential(
            nn.Linear(256, 1),
            nn.ReLU()
        )

        self.loss_fn = RFCLoss()
        self.test_metric = torch.nn.L1Loss()

    def forward(self, x):
        x = self.backbone(x)
        x = x["expand"]
        x = self.init_conv(x)
        x = self.dpd_blocks(x)
        x = self.smooth_conv(x)
        x = self.gap(x)
        x = self.classifier(x)

        return x

    def configure_criterion(self):
        if self.lds:
            def weighted_mse_loss(inputs, targets, weights=None):
                loss = (inputs - targets) ** 2
                if weights is not None:
                    loss *= weights.expand_as(loss)
                loss = torch.mean(loss)
                return loss
            self.train_criterion = weighted_mse_loss # nn.L1Loss()
        else:
            self.train_criterion = nn.L1Loss()
        self.test_criterion = nn.L1Loss()
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0001)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=8, min_lr=1e-7, verbose=True)

        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}
    def training_step(self, batch, batch_idx):
        if self.lds:
            x, y, weights = batch
            yhat = self.forward(x)
            train_loss = self.train_criterion(yhat, y, weights)
        else:
            x, y = batch
            yhat = self.forward(x)
            train_loss = self.train_criterion(yhat, y)
        # y = y.squeeze(dim = 1)

        self.log("train_loss", train_loss)
        # self.log("train_loss", train_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist = True)
        # self.lr_scheduler.step()
        # self.log("lr", round(self.lr_scheduler.get_lr()[0], 5), prog_bar=True, logger=True, on_step=True)
        return train_loss

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch
        # y = y.squeeze(dim = 1)

        yhat = self.forward(x)
        val_loss = self.test_criterion(yhat, y)
        # self.log("val_loss", val_loss, on_step=False, on_epoch=True, prog_bar=True, sync_dist = True)

        # self.log("lr", round(self.lr_schduler.get_lr()[0], 5), prog_bar=True, logger=True, on_step=True)

        self.log("val_loss", val_loss)
        return val_loss

    def test_step(self, batch, batch_idx):
        x, y,_ = batch
        # y = y.squeeze(dim = 1)
        yhat = self.forward(x)
        mae_loss = nn.L1Loss()(yhat, y)
        test_loss = mae_loss
        self.log_dict(dictionary = test_loss, on_step=False, on_epoch=True, prog_bar=True)
        return test_loss

    def predict_step(self, batch, batch_idx):
        x,y, _ = batch
        yhat = ReLU_04()(self(x))

        # yhat = torch.argmax(self(x))
        return yhat


In [None]:
torchvision.models.ShuffleNetV2

# ShuffleNet

In [None]:
import torch
import torch.nn as nn
import pytorch_lightning as pl
# from modules.backbone import BackboneFactory
# from modules.dpd_blocks import DPDBlockV1, DPDBlockV2, DPDBlockV3
# from modules.losses import RFCLoss
# from modules.mobiledpd_version import MobileDPD10, MobileDPD13, MobileDPDBig
import torchvision
from torchmetrics import F1Score, Accuracy

class InferenceModel(nn.Module):
    def __init__(self, model):
        super().__init__()

        self.model = model
        self.focal_loss = torch.nn.CrossEntropyLoss(torch.tensor([50., 9., 3., 1.88, 50.]).to("cuda:1"))

    def forward(self, x):
        output = self.model(x)
        return output

# class MobileNet(nn.Module):
#     def __init__(self):
#         super().__init__()

#         self.backbone = BackboneFactory.create_model("mobilenet")


#     def forward(self, x):
#         x = self.backbone(x)
#         return x

# class QualityNet(pl.LightningModule):
#     def __init__(self, model=MobileNet()):
#         super().__init__()

#         self.model = model

#         self.focal_loss = torch.nn.CrossEntropyLoss(torch.tensor([50., 9., 3., 1.88, 50.]).to("cuda:1"))
#         self.f1_train = F1Score(task="multiclass", num_classes=5, average="macro")
#         self.f1_val = F1Score(task="multiclass", num_classes=5, average="macro")
#         self.acc_train = Accuracy(task="multiclass", num_classes=5)
#         self.acc_val = Accuracy(task="multiclass", num_classes=5)
#         self.validation_step_outputs = []

#     def forward(self, x, label):
#         x = self.model(x)
#         return x

#     def configure_optimizers(self):
#         optimizer = torch.optim.Adam(self.parameters(), lr=2e-5)
#         # scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50, eta_min=0)
#         scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=7, min_lr=1e-7, verbose=True)

#         return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_acc"}

#     def _calculate_train_acc(self, batch):
#         images, labels = batch

#         logits = self.forward(images, labels)
#         preds = torch.argmax(logits, dim=-1)

#         f1 = self.f1_train(preds, labels)
#         acc = self.acc_train(labels, preds)

#         return acc, f1

#     def _calculate_val_acc(self, batch):
#         images, labels = batch

#         logits = self.forward(images, labels)
#         preds = torch.argmax(logits, dim=-1)

#         self.f1_val.update(preds, labels)
#         self.acc_val.update(labels, preds)

#     def _calculate_loss(self, batch):
#         image, label = batch
#         y_pred = self.forward(image, label)

#         loss = self.focal_loss(y_pred, label)
#         return loss

#     def _calculate_test_loss(self, batch):
#         image, label = batch
#         y_pred = self.forward(image, label)

#         loss = self.focal_loss(y_pred, label)
#         return loss, y_pred

#     def training_step(self, batch, batch_index):
#         image, label = batch
#         loss, y_pred = self._calculate_test_loss(batch)
#         acc, f1 = self._calculate_train_acc(batch)

#         self.log("train_loss", loss, prog_bar=True)
#         self.log("train_f1", f1, prog_bar=True)
#         self.log("train_acc", acc, prog_bar=True)

#         return {"loss": loss, "predictions": y_pred, "labels": label}

#     def on_train_epoch_end(self):
#         # compute metrics
#         train_accuracy = self.acc_train.compute()
#         train_f1 = self.f1_train.compute()
#         # log metrics
#         self.log("epoch_train_accuracy", train_accuracy)
#         self.log("epoch_train_f1", train_f1)
#         # reset all metrics
#         self.acc_train.reset()
#         self.f1_train.reset()
#         print(f"\ntraining accuracy: {train_accuracy:.4}, "\
#         f"f1: {train_f1:.4}")

# #         preds = []
# #         labels = []

# #         for output in training_step_outputs:
# #             logits = output['predictions']
# #             preds += torch.argmax(logits, dim=-1).cpu().numpy().tolist()
# #             labels += output['labels'].cpu().numpy().tolist()

#         # print(classification_report(labels, preds))

#     def validation_step(self, batch, batch_index):
#         image, label = batch
#         loss, y_pred = self._calculate_test_loss(batch)
#         self._calculate_val_acc(batch)

#         self.log("val_loss", loss)

#         return {"val_loss": loss, "predictions": y_pred, "labels": label}

#     # Validation epoch end
#     def on_validation_epoch_end(self):
#         val_accuracy = self.acc_val.compute()
#         val_f1 = self.f1_val.compute()
#         # log metrics
#         self.log("val_acc", val_accuracy)
#         self.log("val_f1", val_f1)
#         # reset all metrics
#         self.acc_val.reset()
#         self.f1_val.reset()
#         print(f"\nValidation accuracy: {val_accuracy:.4} "\
#         f"f1: {val_f1:.4}")

# #         preds = []
# #         labels = []

# #         for output in self.validation_step_outputs:
# #             logits = output['predictions']
# #             preds += torch.argmax(logits, dim=-1).cpu().numpy().tolist()
# #             labels += output['labels'].cpu().numpy().tolist()

# #         print(classification_report(labels, preds))

#     def test_step(self, batch, batch_index):
#         loss = self._calculate_loss(batch)
#         self._calculate_val_acc(batch)

#         # self.log("test_loss", loss)

#         return loss

#     def on_test_epoch_end(self):
#         val_accuracy = self.acc_val.compute()
#         val_f1 = self.f1_val.compute()
#         # log metrics
#         self.log("val_acc", val_accuracy)
#         self.log("val_f1", val_f1)
#         # reset all metrics
#         self.acc_val.reset()
#         self.f1_val.reset()
#         print(f"\nTest accuracy: {val_accuracy:.4} "\
#         f"Test f1: {val_f1:.4}")

class ShuffleNet(pl.LightningModule):
    def __init__(self):
        super().__init__()

#         # ShuffleNet
        # self.backbone = BackboneFactory.create_model("shufflenet")

#         GhostNet
#         self.backbone = BackboneFactory.create_model("ghost")
#         self.backbone.classifier = nn.Sequential(
#             nn.Linear(1280, 1)

#         )
        #MobileNetV3
        # self.backbone = BackboneFactory.create_model("mobilev3")
#         MobileNetV2
        self.backbone = BackboneFactory.create_model("mobilenet")

        # MobileDPD10
#         self.backbone = MobileDPD10()

        # MobileDPD13
#         self.backbone = MobileDPD13()
        # MobileDPDBig
#         self.backbone = MobileDPDBig()

        self.loss_fn = torch.nn.L1Loss()
        self.test_metric = torch.nn.L1Loss()

    def forward(self, x):
        x = self.backbone(x)
#         x = x["expand"]
#         x = self.init_conv(x)
#         x = self.dpd_blocks(x)
#         x = self.smooth_conv(x)
#         x = self.gap(x)
#         x = self.classifier(x)
#         x = torch.where(x > 4, 4, x)
#         x = torch.where(x < 0, 0, x)

        return x * 4

    def get_top_layers(self):
        return nn.Sequential(
            self.dpd_blocks,
            self.smooth_conv,
            self.gap,
            self.classifier
        )

#     def get_backbone(self):
#         return nn.Sequential(
#             self.backbone,
#             self.init_conv
#         )
    def get_backbone_feature(self, x):
        self.eval()

        with torch.no_grad():
            x = self.backbone(x)
            x = x["expand"]
            x = self.init_conv(x)

        return x


    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=0.0001)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=7, min_lr=1e-7, verbose=True)

        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}

    def _calculate_loss(self, batch):
        image, rating = batch
        y_pred = self.forward(image)
        y_pred = y_pred[:, 0]

        loss = self.loss_fn(y_pred, rating)
        return loss

    def _calculate_test_loss(self, batch):
        image, rating = batch
        y_pred = self.forward(image)
        y_pred = y_pred[:, 0]

        loss = self.test_metric(y_pred, rating)
        return loss

    def training_step(self, batch, batch_index):
        loss = self._calculate_loss(batch)
        self.log("train_loss", loss, prog_bar=True)

        return {"loss": loss}

    def validation_step(self, batch, batch_index):
        loss = self._calculate_test_loss(batch)
        self.log("val_loss", loss)

        return {"val_loss": loss}

    def test_step(self, batch, batch_index):
        loss = self._calculate_test_loss(batch)

        self.log("test_loss", loss)

        return loss
    def predict_step(self, batch, batch_idx):
        x,y= batch
        yhat = ReLU_04()(self(x))

        # yhat = torch.argmax(self(x))
        return yhat





In [None]:
import torchvision
torchvision.models.mobilenet_v3_small

#Net_Trainer

In [None]:
# import torch
# import torch.nn as nn
# import pytorch_lightning as pl
# from modules.backbone import BackboneFactory
# from modules.dpd_blocks import DPDBlockV1, DPDBlockV2, DPDBlockV3

class ScratchNet(pl.LightningModule):
    def __init__(self):
        super().__init__()

#         self.backbone = BackboneFactory.create_model("mobile_torchvision_13")
# #         for param in self.backbone.parameters():
# #             param.require_grad = False

#         for module in self.backbone.modules():
#             if isinstance(module, nn.BatchNorm2d):
#                 if hasattr(module, 'weight'):
#                     module.weight.requires_grad_(False)
#                 if hasattr(module, 'bias'):
#                     module.bias.requires_grad_(False)
# #                 module.affine = False
#                 module.eval()

        self.init_conv = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=32,
                kernel_size=3,
                stride=2,
                padding=1,
                bias=False
            ),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        self.sep_block = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=32,
                kernel_size=3,
                stride=1,
                padding=1,
                groups=32,
                bias=False
            ),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(
                in_channels=32,
                out_channels=16,
                kernel_size=1,
                stride=1,
                bias=False
            ),
            nn.BatchNorm2d(16),
        )

        self.dpd_blocks = nn.Sequential(
            DPDBlockV3(16, 96, 16, False),
            DPDBlockV3(16, 96, 24, True),
            nn.Dropout(0.2),
            DPDBlockV3(24, 144, 24, False),
            DPDBlockV3(24, 144, 32, True),
            nn.Dropout(0.2),
            DPDBlockV3(32, 192, 32, False),
            DPDBlockV3(32, 192, 48, True),
            nn.Dropout(0.2),
            DPDBlockV3(48, 288, 48, False),
            DPDBlockV3(48, 288, 64, True),
            nn.Dropout(0.2),
            DPDBlockV3(64, 256, 64, False),
            DPDBlockV3(64, 256, 64, False),
            DPDBlockV3(64, 256, 96, False)
        )

        self.smooth_conv = nn.Sequential(
            nn.Conv2d(
                in_channels=96,
                out_channels=256,
                kernel_size=1,
                stride=1,
                bias=True
            ),
            nn.BatchNorm2d(256),
            nn.ReLU()
        )

        self.gap = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(0.3)
        )

        self.classifier = nn.Sequential(
            nn.Linear(256, 1),
            nn.ReLU()
        )

        self.loss_fn = torch.nn.L1Loss()
        self.test_metric = torch.nn.L1Loss()

    def forward(self, x):
        x = self.init_conv(x)
        x = self.sep_block(x)
        x = self.dpd_blocks(x)
        x = self.smooth_conv(x)
        x = self.gap(x)
        x = self.classifier(x)

        return x

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=0.001, weight_decay=0.0001, eps=1e-08)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.2, patience=5, min_lr=1e-7, verbose=True)

        return {"optimizer": optimizer, "lr_scheduler": scheduler, "monitor": "val_loss"}

    def _calculate_loss(self, batch):
        image, rating = batch
        y_pred = self.forward(image)
        y_pred = y_pred[:, 0]

        loss = self.loss_fn(y_pred, rating)
        return loss

    def _calculate_test_loss(self, batch):
        image, rating = batch
        y_pred = self.forward(image)
        y_pred = y_pred[:, 0]

        loss = self.test_metric(y_pred, rating)
        return loss

    def training_step(self, batch, batch_index):
        loss = self._calculate_loss(batch)
        self.log("train_loss", loss, prog_bar=True)

        return {"loss": loss}

    def validation_step(self, batch, batch_index):
        loss = self._calculate_test_loss(batch)
        self.log("val_loss", loss)

        return {"val_loss": loss}

    def test_step(self, batch, batch_index):
        loss = self._calculate_test_loss(batch)

        self.log("test_loss", loss)

        return loss


In [None]:
import wandb

In [None]:
wandb.login(key="a851146869476d016286e78f75911117e3701e5c")

In [None]:
torch.cuda.device_count()

In [None]:
from torch.serialization import MAP_LOCATION
from lightning_fabric.accelerators import accelerator
from pytorch_lightning.callbacks.model_checkpoint import ModelCheckpoin
from pytorch_lightning.loggers import WandbLogger
import warnings
warnings.filterwarnings("ignore")


# DataModule
# review_datamodule = ReviewDataModule(train_csv,test_new_csv, batch_size=64)
datamodule = NaverDataModule(
    root_dir = '/content/drive/MyDrive/data/image-rv-70k-float/data70k',
    batch_size = 128)

# Checkpoint
checkpoint_callback = ModelCheckpoint(
        dirpath="/content/drive/MyDrive/Data_Naver_ImageReview/weights/best_model_shuffle",
        filename='dmg_net_v3_{epoch}_{val_loss:.3f}',
        save_top_k=1,
        verbose=True,
        monitor="val_loss",
        mode="min",
)

# Training
# wandb_logger = WandbLogger(project="imagereview", log_model="all")
# wandb_logger.experiment.config["batch_size"] = 64
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# model = ShuffleNet()
# model = torch.load("/content/drive/MyDrive/Data_Naver_ImageReview/weights/best_model/dmg_net_v3_epoch=28_val_loss=0.492.ckpt", map_location=device)
model =DMGNet.load_from_checkpoint("", map_location=device)

trainer = pl.Trainer(accelerator="gpu",
                     devices=[0],
                     max_epochs=100,
                     callbacks=[checkpoint_callback], accumulate_grad_batches=2)

trainer.fit(model, datamodule)


#VALIDATE

In [None]:

def mae_cus(A, B, l_range):
    loss = 0
    count = 0
    for i in range(len(A)):
        if l_range[0] <= B[i] <= l_range[1]:
            loss += abs(A[i] - B[i])
            count += 1
    if count == 0:
        return 'No predicted label at {}'.format(values)
    cost = loss/count
    return round(cost.item(), 4)

def predict(ckpt_name):
    # global trainer
    model =DMGNet.load_from_checkpoint("/content/drive/MyDrive/Data_Naver_ImageReview/weights/best_model/" + ckpt_name, map_location=device)
    predictions = trainer.predict(model = model, datamodule = val_dataloader, return_predictions = True)
    y_pred = [y for batch in predictions for y in batch]
    return y_pred

def evaluate(ckpt_name):
    print("Evaluation for {}: ".format(ckpt_name))
    global y_real
    y_pred = predict(ckpt_name)
    print("Total mae: ", mae_cus(y_pred, y_real, l_range = [0, 4]))
    # print("Total mse: ", mse_cus(y_pred, y_real, [0, 4]))
    print("MAE for 3: ", mae_cus(y_pred, y_real, [2.5, 3.5]))
    print("MSE for 4: ", mae_cus(y_pred, y_real, [3.5, 4]))
    print("MAE for 3, 4: ", mae_cus(y_pred, y_real, [2.5, 4]))

In [None]:
evaluate("dmg_net_v3_epoch=7_val_loss=0.394.ckpt")
model = torch.vison

In [None]:
!pip install tqdm

In [None]:
datamodule = NaverDataModule(root_dir = "/content/drive/MyDrive/Data_NAVER_Image_Review/image-rv-70k-float/data70k")
datamodule.setup('test')
test_dist = datamodule.test_dataloader().dataset.distribution
fig, ax = plt.subplots()
ax.set_yscale('log')
plt.bar(test_dist.keys(), test_dist.values())


In [None]:
def get_y_real(label_type = "float"):
    y_real = Naver2Dataset('/content/drive/MyDrive/Data_NAVER_Image_Review/image-rv-70k-float/data70k', mode = 'test', lds = False, label_type="float").labels
    if label_type == "int":
        y_real = [round(label) for label in y_real]
    return y_real
y_real = get_y_real()

In [None]:
def mae_cus(y_pred, y_real, l_range, type_metrics = 'prec'):
    loss = 0
    count = 0
    for i in range(len(y_pred)):
        if l_range[0] <= y_real[i] <= l_range[1]:
            loss += abs(y_pred[i] - y_real[i])
            count += 1
    if count == 0:
        return 'No predicted label at {}'.format([y_real, l_range])

    if type_metrics == 'prec':
        loss = loss/count
    return round(loss.item(), 4)

def mse_cus(y_pred, y_real, l_range, type_metrics = 'prec'):
    loss = 0
    count = 0
    for i in range(len(y_pred)):
        if l_range[0] <= y_real[i] <= l_range[1]:
            loss += (y_pred[i] - y_real[i])**2
            count += 1
    if count == 0:
        return 'No predicted label at {}'.format([y_real, l_range])
    if type_metrics == 'prec':
        loss = loss/count

    return round(loss.item(), 4)

def predict(ckpt_name):
    global trainer
    model = ShuffleNet.load_from_checkpoint("/content/drive/MyDrive/Data_NAVER_Image_Review/weights/best_model/" + ckpt_name)
    # print("batch_size: ", model.batch_size)
    predictions = trainer.predict(model = model, datamodule = datamodule, return_predictions = True)
    y_pred = [y for batch in predictions for y in batch]
    return y_pred

def scatter_plot(y_pred, y_real):
    data = {'predict': y_pred, 'real': y_real}
    plt.scatter(y = 'predict', x = 'real', data = data)
    plt.xlabel('real')
    plt.ylabel('predict')
    plt.figure()

def combine_list_to_df(y_pred, y_real):
    if torch.is_tensor(y_pred[0]):
        y_pred_ = [e.item() for e in y_pred]
    else:
        y_pred_ = y_pred

    if torch.is_tensor(y_real[0]):
        y_real_ = [e.item() for e in y_real]
    else:
        y_real_ = y_real


    data = {'predict': y_pred_, 'real': y_real_}
    inferred_df = pd.DataFrame(data = data)
    return inferred_df

def cal_linearity(inferred_df):
    display(inferred_df.corr(method = 'pearson'))
    display(inferred_df.corr(method = 'spearman'))

def hist_plot_dist_by(inferred_df, l_range, ckpt_name):
    df_label = inferred_df.loc[(inferred_df['real'] >= l_range[0]) & (inferred_df['real'] <= l_range[1])]
    # print(df_label)
    sns.histplot(x = df_label['predict'], bins = 20)\
        .set(title= 'Hist plot for label = {} of {}'.format(str(l_range), ckpt_name))
    plt.show(block = False)


def evaluate(ckpt_name, mode = 'regress'):
    print("Evaluation for {}: ".format(ckpt_name))
    global y_real
    y_pred = predict(ckpt_name)
    print("Total mae: ", mae_cus(y_pred, y_real, l_range = [0, 4]))
    print("Total mse: ", mse_cus(y_pred, y_real, [0, 4]))
    print("MAE for 3: ", mae_cus(y_pred, y_real, [2.5, 3.5]))
    print("MAE for 4: ", mae_cus(y_pred, y_real, [3.5, 4]))
    print("MAE for 3, 4: ", mae_cus(y_pred, y_real, [2.5, 4]))


    if mode == 'classify':
        print("Confusion matrix: ")
        coff_mat = ConfusionMatrix(task = 'multiclass', num_classes = 5)
        print(coff_mat(torch.Tensor(y_pred), torch.Tensor(y_real)))

    inferred_df = combine_list_to_df(y_pred, y_real)
    print("Calculate linearity: ")
    cal_linearity(inferred_df)


    print("Hist plot for table: ")
    mae_each_label = {}
    for i in range(5):
        hist_plot_dist_by(inferred_df, l_range = [i - 0.5, i + 0.5], ckpt_name = ckpt_name)

    for i in range(len(y_pred)):
        mae_each_label[y_real[i]] = mae_each_label.get(y_real[i], 0) + abs(y_pred[i] - y_real[i]).item()

    for key, value in mae_each_label.items():
        mae_each_label[key] = value/test_dist[key]

    # _ , ax = plt.subplots()
    # ax.set_title('Test loss for each label of {}'.format(ckpt_name))
    sns.barplot(x = list(mae_each_label.keys()), y = list(mae_each_label.values())).set(title = 'Test loss for each label of {}'.format(ckpt_name))
    plt.show(block = False)

    # test_dist_df = combine_list_to_df(list(test_dist.values()), list(mae_each_label.values()))
    # cal_linearity(test_dist_df)

    print("Loss ")
    sns.boxplot(x = 'real', y = 'predict', data = inferred_df)\
        .set(title= 'Total box plot of {}'.format(ckpt_name))
    plt.show(block = False)
    sns.violinplot(x = 'real', y = 'predict', data = inferred_df)\
        .set(title= 'Total violin plot of {}'.format(ckpt_name))
    plt.show(block = False)

    print("Max value of predictions: {}".format(max(y_pred)))

In [None]:
evaluate("dmg_net_v3_epoch=4_val_loss=0.568.ckpt")


In [None]:
from matplotlib import pyplot as plt
import seaborn as sns
def _plot_series(series, series_name, series_index=0):
  from matplotlib import pyplot as plt
  import seaborn as sns
  palette = list(sns.palettes.mpl_palette('Dark2'))
  xs = series['real']
  ys = series['predict']

  plt.plot(xs, ys, label=series_name, color=palette[series_index % len(palette)])

fig, ax = plt.subplots(figsize=(10, 5.2), layout='constrained')
df_sorted = _df_3.sort_values('real', ascending=True)
_plot_series(df_sorted, '')
sns.despine(fig=fig, ax=ax)
plt.xlabel('real')
_ = plt.ylabel('predict')