<a href="https://colab.research.google.com/github/vvjft/MLP-from-scratch/blob/main/iqa_cnn%2B%2B.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import cv2
import subprocess
import os
from scipy.signal import convolve2d
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

!apt-get install unrar

class data_loader:
    ''' Parent class for database-specific loaders '''
    def patch_files_exist(self):
        '''Check if patch files are present in the directory.'''
        return (os.path.exists(os.path.join(self.dir, 'normalized_distorted_images', 'training', 'patch_training.csv')) and
                os.path.exists(os.path.join(self.dir, 'normalized_distorted_images', 'validation', 'patch_validation.csv')) and
                os.path.exists(os.path.join(self.dir, 'normalized_distorted_images', 'test', 'patch_test.csv')))

    def split_data(self, data1):
        train_data, test_data = train_test_split(data1, test_size=0.2, random_state=40)
        train_data, val_data = train_test_split(train_data, test_size=0.25, random_state=40)
        return train_data, val_data, test_data

    def normalize_and_slice(self, train_data, val_data, test_data, patch_size=32, cross=False):
        def local_normalize(patch, P=3, Q=3, C=1):

            kernel = np.ones((P, Q)) / (P * Q)
            patch_mean = convolve2d(patch, kernel, boundary='symm', mode='same')
            patch_sm = convolve2d(np.square(patch), kernel, boundary='symm', mode='same')
            patch_std = np.sqrt(np.maximum(patch_sm - np.square(patch_mean), 0)) + C
            patch_ln = (patch - patch_mean) / patch_std
            return patch_ln.astype('float32')
        def slice_image(image, patch_size=32):

            height, width = image.shape[:2]
            num_patches_y = height // patch_size
            num_patches_x = width // patch_size
            patch_count = 0
            for i in range(num_patches_y):
                for j in range(num_patches_x):
                    patch = image[i*patch_size:(i+1)*patch_size, j*patch_size:(j+1)*patch_size]
                    patch_path = os.path.join(output_dir_patches, f"{os.path.splitext(image_filename)[0]}_patch_{patch_count}.bmp")
                    patch_filename = f"{os.path.splitext(image_filename)[0]}_patch_{patch_count}.bmp"
                    cv2.imwrite(patch_path, patch)
                    # Add patch info to the list
                    self.patch_info_list.append([patch_filename, mos_value, distortion])
                    patch_count += 1

        sets = [(train_data, 'training'), (val_data, 'validation'), (test_data, 'test')]
        print('Normalizing and slicing images...')

        for (data, name) in sets:
            output_dir_full = os.path.join(self.dir, 'normalized_distorted_images', name, 'full')
            output_dir_patches = os.path.join(self.dir, 'normalized_distorted_images', name, 'patches')
            norm_file_info_path = os.path.join(self.dir, 'normalized_distorted_images', name, f'norm_{name}.csv')
            patch_file_info_path = os.path.join(self.dir, 'normalized_distorted_images', name, f'patch_{name}.csv')
            os.makedirs(output_dir_full, exist_ok=True)
            os.makedirs(output_dir_patches, exist_ok=True)
            self.norm_info_list = []
            self.patch_info_list = []
            for row in data.itertuples(index=False):
                image_filename = row[0]
                mos_value = row[1]
                distortion = row[2]
                image_path = f'{self.dir}/distorted_images/{image_filename}'
                image = cv2.imread(image_path)
                if image is None:
                    print(f"Failed to load image: {image_filename}")
                    continue
                # Normalize the image
                image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
                image_normalized = local_normalize(image_gray)
                # Save
                image_filename = f'NORM_{image_filename.lower()}'
                self.norm_info_list.append([image_filename, mos_value, distortion])
                cv2.imwrite(os.path.join(output_dir_full, image_filename), image_normalized)
                # Slice to patches
                slice_image(image_normalized, patch_size)

            norm_info_df = pd.DataFrame(self.norm_info_list, columns=['image', 'MOS', 'distortion'])
            norm_info_df.to_csv(norm_file_info_path, index=False)
            print(f"[{name}]: Full image info listed in: {norm_file_info_path}.")
            patch_info_df = pd.DataFrame(self.patch_info_list, columns=['image', 'MOS', 'distortion'])
            patch_info_df.to_csv(patch_file_info_path, index=False)
            print(f"[{name}]: Patch info listed in: {patch_file_info_path}.")

    def encode(self, dataframes):
        '''Encodes distortion labels into one-hot vectors.'''
        for i in range(len(dataframes)):
            dists = dataframes[i]['distortion']
            le = LabelEncoder()
            y_class_encoded = le.fit_transform(dists)
            dists_one_hot = to_categorical(y_class_encoded, num_classes=13).astype(int)
            dataframes[i]['distortion_encoded'] = [np.array(one_hot) for one_hot in dists_one_hot]
            dataframes[i] = dataframes[i].drop(['distortion'], axis=1)
        return dataframes

    def map2tf(self, dir, set_type, data_df):
        '''Maps data into format excected by TensorFlow: adds chanel dimension and stores data in arrays.'''
        images_dir = os.path.join(dir, 'normalized_distorted_images', set_type, 'patches')
        X = []
        y = []
        for row in data_df.itertuples(index=False):
            filename = row[0]
            score = row[1]
            file_path = os.path.join(images_dir, filename)
            if filename.endswith('.bmp') and os.path.exists(file_path):
                img = cv2.imread(file_path, cv2.IMREAD_UNCHANGED)
                X.append(img)
                y.append(score)
            else:
                print(f"File not found: {file_path}")
        X = np.array(X)
        y = np.array(y)
        X = X[..., np.newaxis]
        return X, y

class tid2013_loader(data_loader):
    def __init__(self, download=True, path='/content/databases'):
        self.url = 'https://www.ponomarenko.info/tid2013/tid2013.rar'
        self.dir = os.path.join(path, 'tid2013')
        self.file_path = os.path.join(self.dir, 'tid2013.rar')
        self.distortion_mapping = {1: 'wn', 2:'wnc', 3:'scn', 4:'mn', 5:'hfn', 6:'in', 7:'qn', 8: 'gblur', 9:'idn', 10: 'jpeg', 11: 'jp2k', 12:'jpegte', 13:'jp2kte'} # According to TID2013 documentation

        os.makedirs(self.dir, exist_ok=True)
        if download:
            self.download()
        if self.patch_files_exist():
            print("Patch files found. Loading patched data...")
            data = self.__load_patched_data()
        else:
            data = self.prepare_data()
        #self.train, self.val, self.test = self.encode(data)
        self.train = self.map2tf(self.dir, 'training', data[0])
        self.val = self.map2tf(self.dir, 'validation', data[1])
        self.test = self.map2tf(self.dir, 'test', data[2])

    def __load_patched_data(self):
        # Load the existing patched data
        columns=['image', 'MOS', 'distortion']
        train_data = pd.read_csv(os.path.join(self.dir, 'normalized_distorted_images', 'training', 'patch_training.csv'))
        val_data = pd.read_csv(os.path.join(self.dir, 'normalized_distorted_images', 'validation', 'patch_validation.csv'))
        test_data = pd.read_csv(os.path.join(self.dir, 'normalized_distorted_images', 'test', 'patch_test.csv'))
        return [train_data, val_data, test_data]

    def download(self):
        try:
            print(f"Downloading dataset from {self.url}...")
            !wget {self.url} -P {self.dir}
            !unrar x -inul {os.path.join(self.dir, 'tid2013.rar')} {self.dir}
            print(f"Dataset downloaded and extracted in '{self.dir}'")
        except Exception as e:
            print(f"Failed to download dataset: {e}.")
            return False

    def prepare_data(self, filter=True):
        '''Processes data.'''
        data_path = os.path.join(self.dir, 'mos_with_names.txt')
        data = pd.read_csv(data_path, header=None, delimiter=' ')
        data = data.iloc[:, [1, 0]]  # swap column order
        data.columns = ['image', 'MOS']
        data['distortion'] = data['image'].apply(lambda x: self.distortion_mapping.get(int(x.split('_')[1]), 'other'))
        if filter:
            data = data[data['distortion'].isin(self.distortion_mapping.values())]
        data.to_csv('databases/tid2013/mos_with_names.csv', index=False)

        train_data, val_data, test_data = self.split_data(data)
        self.normalize_and_slice(train_data, val_data, test_data)

        train_data = pd.read_csv(f'{self.dir}/normalized_distorted_images/training/patch_training.csv')
        val_data = pd.read_csv(f'{self.dir}/normalized_distorted_images/validation/patch_validation.csv')
        test_data = pd.read_csv(f'{self.dir}/normalized_distorted_images/test/patch_test.csv')
        return [train_data, val_data, test_data]

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
unrar is already the newest version (1:6.1.5-1).
0 upgraded, 0 newly installed, 0 to remove and 45 not upgraded.


If the download is slow, stop it, delete databases directory (!rm)
 and run it again.

In [3]:
#!rm -r 'databases/'

In [8]:
tid2013_loader = tid2013_loader(download=True)

X_train, y_train = tid2013_loader.train
X_val, y_val = tid2013_loader.val
X_test, y_test = tid2013_loader.test

In [7]:
import tensorflow as tf
from tensorflow.keras import layers, models
# Create a simple neural network model
model = models.Sequential([
    layers.Input(shape=(32, 32, 1)),
    layers.MaxPooling2D((2, 2)),
    layers.Conv2D(32, (3, 3), activation='relu'),
    layers.MaxPooling2D((2, 2)),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dense(512, activation='relu'),
    layers.Dense(1, activation='linear')
])

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
# Train the model
history = model.fit(X_train, y_train, epochs=5, batch_size=16, validation_data=(X_val, y_val))

Epoch 1/5
[1m11700/11700[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m26s[0m 2ms/step - loss: 2.0410 - mae: 1.0696 - val_loss: 1.4637 - val_mae: 0.9445
Epoch 2/5
[1m11700/11700[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 1.1058 - mae: 0.8497 - val_loss: 1.1365 - val_mae: 0.8645
Epoch 3/5
[1m11700/11700[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 1.0530 - mae: 0.8223 - val_loss: 1.1197 - val_mae: 0.8472
Epoch 4/5
[1m11700/11700[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 1.0258 - mae: 0.8089 - val_loss: 1.2606 - val_mae: 0.8771
Epoch 5/5
[1m11700/11700[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m21s[0m 2ms/step - loss: 0.9922 - mae: 0.7926 - val_loss: 1.1162 - val_mae: 0.8340


In [9]:
loss, mae = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', loss)
print('Test MAE:', mae)

Test loss: 1.080154299736023
Test MAE: 0.8252202272415161
