<a href="https://colab.research.google.com/github/vvjft/IQA-CNNpp/blob/main/iqa_cnn%2B%2B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install unrar

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


In [4]:
import numpy as np
import pandas as pd
import cv2
import subprocess
import os
from scipy.signal import convolve2d
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
import sys

class database_loader:
    ''' Parent class for database-specific loaders '''
    def __init__(self):
      self.catalogue = 'databases'

      ### Attributes to be declared within the child class ###
      self.url = ''      # URL of the dataset
      self.exdir = ''      # Directory where the exctracted dataset is stored
      self.score = ''    # MOS/DMOS column name
      self.images = ''   # Directory where the images are stored
      self.archive_file = '' # Path to the rar file
      self.zip_file = '' # Path to the zip file

    def data_exist(self):
        '''Check if patch files are present in the directory.'''
        return (os.path.exists(os.path.join(self.exdir, 'X_train.npy')) and os.path.exists(os.path.join(self.exdir, 'y_train.npy')) and
                os.path.exists(os.path.join(self.exdir, 'X_val.npy')) and os.path.exists(os.path.join(self.exdir, 'y_val.npy')) and
                os.path.exists(os.path.join(self.exdir, 'X_test.npy')) and os.path.exists(os.path.join(self.exdir, 'y_test.npy')))

    def save_data(self, datasets):
        '''Save the data to disk.'''
        for name, data in datasets.items():
            (X, y) = data
            np.save(os.path.join(self.exdir, f'X_{name}.npy'), X)
            np.save(os.path.join(self.exdir, f'y_{name}.npy'), y)
        print(f"Data saved successfully.")

    def load_data(self, datasets):
        '''Load the data from disk.'''
        data = []
        for name in datasets:
            X = np.load(os.path.join(self.exdir, f'X_{name}.npy'))
            y = np.load(os.path.join(self.exdir, f'y_{name}.npy'))
            data.append((X, y))
        #logger.info("Data loaded successfully.")
        return data

    def download(self, extract_in='databases'):
        '''Download the dataset from the URL and extract it to the directory.
        Args:
            file: (str) either rar_file or zip_file
            extract_in (str, optional): Provide if the dataset is not extracted into a folder named after the file
        Note:
            You need to provide path into WinRAR or 7zip exe file.
        '''
        try:
            if os.path.exists(self.images or self.data_exist):
                print("Dataset found.")
                return True
            if not os.path.exists(self.archive_file):
                print(f"Dataset not found. Downloading from {self.url}...")
                !wget {self.url} -P {self.catalogue}
            print('Extracting files...')
            if self.archive_file.endswith('.rar'):
                !unrar x -inul {self.archive_file} {extract_in}
            elif self.archive_file.endswith('.zip'):
                !unzip -q {self.archive_file} -d {extract_in}
            else:
                print(f"Unsupported file type: {self.archive_file}")
                return False
        except Exception as e:
            print(f"Failed to download or extract dataset: {e}.")
            return False
        return True

    def split_data(self, data1):
        train_data, test_data = train_test_split(data1, test_size=0.2, random_state=40)
        train_data, val_data = train_test_split(train_data, test_size=0.25, random_state=40)
        return train_data, val_data, test_data

    def preprocess(self, datasets, patch_size=32):

        data = dict()
        total_images = sum(len(data) for data in datasets.values())
        processed_images = 0
        print('Preprocessing images...')

        def normalize_image(patch, P=3, Q=3, C=1):
            kernel = np.ones((P, Q)) / (P * Q)
            patch_mean = convolve2d(patch, kernel, boundary='symm', mode='same')
            patch_sm = convolve2d(np.square(patch), kernel, boundary='symm', mode='same')
            patch_std = np.sqrt(np.maximum(patch_sm - np.square(patch_mean), 0)) + C
            patch_ln = (patch - patch_mean) / patch_std
            return patch_ln.astype('float32')

        def slice_image(image, patch_size=32):
            height, width = image.shape[:2]
            num_patches_y = height // patch_size
            num_patches_x = width // patch_size
            patch_count = 0
            for i in range(num_patches_y):
                for j in range(num_patches_x):
                    patch = image[i*patch_size:(i+1)*patch_size, j*patch_size:(j+1)*patch_size]
                    patch_path = os.path.join(output_dir_patches, f"{os.path.splitext(filename)[0]}_patch_{patch_count}{extension}")
                    patch_filename = f"{os.path.splitext(filename)[0]}_patch_{patch_count}{extension}"
                    cv2.imwrite(patch_path, patch)
                    self.patches.append([patch_filename, mos, distortion])
                    patch_count += 1

        for (name, dataset) in datasets.items():
            output_dir_full = os.path.join(self.exdir, 'normalized_distorted_images', name, 'full')
            output_dir_patches = os.path.join(self.exdir, 'normalized_distorted_images', name, 'patches')
            os.makedirs(output_dir_full, exist_ok=True)
            os.makedirs(output_dir_patches, exist_ok=True)
            self.patches = []
            for row in dataset.itertuples(index=False):
                filename = row[0]
                extension = os.path.splitext(filename)[1]
                mos = row[1]
                distortion = row[2]
                image_path = os.path.join(self.images, filename)
                image = cv2.imread(image_path)
                if image is None:
                    print(f"Failed to load image: {filename}")
                    continue
                image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                image_normalized = normalize_image(image_gray)
                filename = f'NORM_{filename.lower()}'
                cv2.imwrite(os.path.join(output_dir_full, filename), image_normalized)
                slice_image(image_normalized, patch_size)
                processed_images += 1
                #print(f"Processed {processed_images}/{total_images} images.", end='\r')

            patches = pd.DataFrame(self.patches, columns=['image', self.score, 'distortion'])
            data[name] = patches
        return data

    def encode(self, dataframes):
        '''Encodes distortion labels into one-hot vectors.'''
        for i in range(len(dataframes)):
            dists = dataframes[i]['distortion']
            le = LabelEncoder()
            y_class_encoded = le.fit_transform(dists)
            dists_one_hot = to_categorical(y_class_encoded, num_classes=13).astype(int)
            dataframes[i]['distortion_encoded'] = [np.array(one_hot) for one_hot in dists_one_hot]
            dataframes[i] = dataframes[i].drop(['distortion'], axis=1)
        return dataframes

    def map2tf(self, datasets):
        '''
        Maps data into format excected by TensorFlow: adds chanel dimension and stores data in arrays.
        '''
        dataset_tensors = []
        for name, data in datasets.items():
            images_dir = os.path.join(self.exdir, 'normalized_distorted_images', name, 'patches')
            X, y = [], []
            for row in data.itertuples(index=False):
                filename = row[0]
                score = row[1]
                file_path = os.path.join(images_dir, filename)
                if filename.endswith(('.bmp', '.BMP', '.png')) and os.path.exists(file_path):
                    img = cv2.imread(file_path, cv2.IMREAD_UNCHANGED)
                    X.append(img)
                    y.append(score)
                else:
                    print(f"File not found: {file_path}")
            X = np.array(X)
            y = np.array(y)
            X = X[..., np.newaxis]
            dataset_tensors.append((X, y))
        return dataset_tensors

class tid2013_loader(database_loader):
    def __init__(self):
        super().__init__()
        self.url = 'https://www.ponomarenko.info/tid2013/tid2013.rar'
        self.exdir = os.path.join(self.catalogue, 'tid2013')
        self.score = 'MOS'
        self.images = os.path.join(self.exdir, 'distorted_images')
        self.archive_file = os.path.join(self.catalogue, 'tid2013.rar')
        self.distortion_mapping = {1: 'wn', 2:'wnc', 3:'scn', 4:'mn', 5:'hfn',
                                   6:'in', 7:'qn', 8: 'gblur', 9:'idn', 10: 'jpeg',
                                   11: 'jp2k', 12:'jpegte', 13:'jp2kte'} # According to TID2013 documentation

        os.makedirs(self.exdir, exist_ok=True)

        self.download(extract_in=self.exdir)

        if self.data_exist():
            print("Patch files found. Loading patched data...")
            self.train, self.val, self.test = self.load_data(['train', 'val', 'test'])
        else:
            data = self.prepare_data()
            print('Mapping data to TensorFlow format...')
            self.train, self.val, self.test = self.map2tf(data)
            self.save_data({'train': self.train, 'val': self.val, 'test': self.test })
        #self.train, self.val, self.test = self.encode(data)

        print("Data loaded successfully.")

    def prepare_data(self, filter=True):
        data_path = os.path.join(self.exdir, 'mos_with_names.txt')
        data = pd.read_csv(data_path, header=None, delimiter=' ')
        data = data.iloc[:, [1, 0]]  # swap column order
        data.columns = ['image', 'MOS']
        data['distortion'] = data['image'].apply(lambda x: self.distortion_mapping.get(int(x.split('_')[1]), 'other'))
        if filter:
            data = data[data['distortion'].isin(self.distortion_mapping.values())]
        data.to_csv(os.path.join(self.exdir,'mos_with_names.csv'), index=False)

        train_data, val_data, test_data = self.split_data(data)
        datasets = {'training': train_data, 'validation': val_data, 'test': test_data}
        datasets = self.preprocess(datasets)
        return datasets

class kadid10k_loader(database_loader):
    def __init__(self, download=True):
        super().__init__()
        self.url = 'https://datasets.vqa.mmsp-kn.de/archives/kadid10k.zip'
        self.exdir = os.path.join(self.catalogue, 'kadid10k')
        self.score = 'DMOS'
        self.images = os.path.join(self.exdir, 'images')
        self.archive_file = os.path.join(self.catalogue, 'kadid10k.zip')
        self.distortion_mapping = {1: 'gblur', 2: 'lblur', 3: 'mblur', 4: 'cdiff', 5: 'cshift', # According to KADID-10k documentation
                                   6: 'cquant', 7: 'csat1', 8: 'csat2', 9: 'jp2k', 10: 'jpeg',
                                   11: 'wniose1', 12: 'wniose2', 13: 'inoise', 14: 'mnoise', 15: 'denoise',
                                   16: 'bright', 17: 'dark', 18: 'meanshft', 19: 'jit', 20: 'patch',
                                   21: 'pixel', 22: 'quant', 23: 'cblock', 24: 'sharp', 25: 'contrst'}
        self.download()

        if self.data_exist():
            print("Patch files found. Loading patched data...")
            self.train, self.val, self.test = self.load_data(['train', 'val', 'test'])
        else:
            data = self.prepare_data()
            print('Mapping data to TensorFlow format...')
            self.train, self.val, self.test = self.map2tf(data)
            self.save_data({'train': self.train, 'val': self.val, 'test': self.test })
        #self.train, self.val, self.test = self.encode(data)

        print("Data loaded successfully.")

    def prepare_data(self, filter=True):
        data_path = os.path.join(self.exdir, 'dmos.csv')
        data = pd.read_csv(data_path, header=0, usecols=[0, 2])
        data.columns = ['image', 'DMOS']
        data['distortion'] = data['image'].apply(lambda x: self.distortion_mapping.get(int(x.split('_')[1]), 'other'))
        #if filter:
            #data = data[data['distortion'].isin(self.distortion_mapping.values())]
        data.to_csv(os.path.join(self.exdir,'dmos_with_names.csv'), index=False)

        train_data, val_data, test_data = self.split_data(data)
        datasets = {'training': train_data, 'validation': val_data, 'test': test_data}
        datasets = self.preprocess(datasets)
        return datasets

If the download is slow, stop it, delete rar/zip file (`!rm`)
 and run it again.

In [5]:
#!rm -r 'databases/tid2013'
#!rm -r 'databases/kadid10k'
#!rm 'databases/tid2013.rar'
#!rm 'databases/kadid10k.zip

data_loader = tid2013_loader()

Dataset found.
Patch files found. Loading patched data...
Data loaded successfully.


In [6]:
X_train, y_train = data_loader.train
X_val, y_val = data_loader.val
X_test, y_test = data_loader.test

In [8]:
import tensorflow as tf
from tensorflow.keras import layers, models

model = models.Sequential([
    layers.Input(shape=(32, 32, 1)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(32, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dense(512, activation='relu'),
    layers.Dense(1, activation='linear')
])

model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
history = model.fit(X_train, y_train, epochs=5, batch_size=32, validation_data=(X_val, y_val))

Epoch 1/5
[1m5850/5850[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m78s[0m 13ms/step - loss: 2.4155 - mae: 1.1392 - val_loss: 1.1750 - val_mae: 0.8882
Epoch 2/5
[1m5850/5850[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m79s[0m 13ms/step - loss: 1.0842 - mae: 0.8399 - val_loss: 1.0977 - val_mae: 0.8468
Epoch 3/5
[1m5850/5850[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m76s[0m 13ms/step - loss: 1.0374 - mae: 0.8158 - val_loss: 1.0691 - val_mae: 0.8307
Epoch 4/5
[1m5850/5850[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 13ms/step - loss: 0.9936 - mae: 0.7948 - val_loss: 1.0972 - val_mae: 0.8216
Epoch 5/5
[1m5850/5850[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m74s[0m 13ms/step - loss: 0.9645 - mae: 0.7805 - val_loss: 1.0345 - val_mae: 0.8158


In [8]:
loss, mae = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', loss)
print('Test MAE:', mae)

Test loss: 1.0550847053527832
Test MAE: 0.7985630035400391
