<a href="https://colab.research.google.com/github/vvjft/DL_BIQA/blob/main/CNN%2B%2BIQA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
import numpy as np
import scipy
import pandas as pd
import os
import matplotlib.pyplot as plt
import seaborn as sns
import cv2

In [2]:
from google.colab import drive
drive.mount('/content/drive') # LIVE
!cp -r /content/drive/MyDrive/magisterka/LIVEIQA_release2 /content
os.rename('/content/LIVEIQA_release2', '/content/LIVE')
drive.flush_and_unmount()

Mounted at /content/drive


In [3]:
from scipy.signal import convolve2d
def normalize_and_slice(db_dir, train_data, val_data, test_data, patch_size=32, cross=False):

  def local_normalize(patch, P=3, Q=3, C=1):
      kernel = np.ones((P, Q)) / (P * Q)
      patch_mean = convolve2d(patch, kernel, boundary='symm', mode='same')
      patch_sm = convolve2d(np.square(patch), kernel, boundary='symm', mode='same')
      patch_std = np.sqrt(np.maximum(patch_sm - np.square(patch_mean), 0)) + C
      patch_ln = (patch - patch_mean) / patch_std
      return patch_ln.astype('float32')

  sets = {'train': [train_data, 'training'], 'val':[val_data, 'validation'], 'test':[test_data, 'test']}

  for key, (data, name) in sets.items():

    output_dir_full = f'{db_dir}/normalized_distorted_images/{name}/full/' # where to store normalized distorted images
    output_dir_patches = f'{db_dir}/normalized_distorted_images/{name}/patches/' # where to store patches
    norm_file_info_path = f'{db_dir}/normalized_distorted_images/{name}/norm_{name}.csv'
    patch_file_info_path = f'{db_dir}/normalized_distorted_images/{name}/patch_{name}.csv'
    os.makedirs(output_dir_full, exist_ok=True)
    os.makedirs(output_dir_patches, exist_ok=True)

    norm_info_list = []
    patch_info_list = []

    for index, row in data.iterrows():
        image_filename = row[0]
        mos_value = row[1]
        distortion = row[2]
        image_path = f'{db_dir}/distorted_images/{image_filename}'
        image = cv2.imread(image_path)

        if image is None:
            print(f"Failed to load image: {image_filename}")
            continue

        # Normalize the image
        image_gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        image_normalized = local_normalize(image_gray)
        # Save
        image_filename = f'NORM_{image_filename}'
        norm_info_list.append([image_filename, mos_value, distortion])
        cv2.imwrite(output_dir_full+image_filename, image_normalized)
        # Slice to patches
        height, width = image_normalized.shape[:2]
        num_patches_y = height // patch_size
        num_patches_x = width // patch_size
        patch_count = 0
        for i in range(num_patches_y):
            for j in range(num_patches_x):
                patch = image_normalized[i*patch_size:(i+1)*patch_size, j*patch_size:(j+1)*patch_size]
                patch_path = os.path.join(output_dir_patches, f"{os.path.splitext(image_filename)[0]}_patch_{patch_count}.bmp")
                patch_filename = f"{os.path.splitext(image_filename)[0]}_patch_{patch_count}.bmp"
                cv2.imwrite(patch_path, patch)
                # Add patch info to the list
                patch_info_list.append([patch_filename, mos_value, distortion])
                patch_count += 1

    norm_info_df = pd.DataFrame(norm_info_list, columns=['image_filename', score_measure, 'Distortion'])
    norm_info_df.to_csv(norm_file_info_path, index=False)
    print(f"[{name}]: Saved full normalized distorted image info to:\n{patch_file_info_path}.")
    patch_info_df = pd.DataFrame(patch_info_list, columns=['image_filename', score_measure, 'Distortion'])
    patch_info_df.to_csv(patch_file_info_path, index=False)
    print(f"[{name}]: Saved patch info to {patch_file_info_path}.")

In [76]:
from sklearn.model_selection import train_test_split, StratifiedShuffleSplit

def load_live(base_dir, filter=False):
    data_path = os.path.join(base_dir, 'dmos_with_names.csv')
    data = pd.read_csv(data_path, index_col=False)
    if filter:
      distortion_types = [1, 2, 3, 4] # filter distortions: jp2k, jpeg, wn and blur
      data = data[data['image_filename'].apply(lambda x: int(x.split('_')[1]) in distortion_types)]
    return data

def split_data(data1, data2=None, params=None, cross=False):
    train_data, test_data = train_test_split(data1, test_size=0.2, random_state=40)
    train_data, val_data = train_test_split(train_data, test_size=0.25, random_state=40)
    return train_data, val_data, test_data

live_dir = 'LIVE'
db_dir = live_dir
score_measure = 'DMOS'

data_live = load_live(live_dir, filter=False)
data_live.to_csv('LIVE/dmos_with_names.csv', index=False)
data = data_live

train_data, val_data, test_data = split_data(data)
normalize_and_slice(db_dir, train_data, val_data, test_data)

train_data = pd.read_csv(f'{db_dir}/normalized_distorted_images/training/patch_training.csv')
val_data = pd.read_csv(f'{db_dir}/normalized_distorted_images/validation/patch_validation.csv')
test_data = pd.read_csv(f'{db_dir}/normalized_distorted_images/test/patch_test.csv')

[training]: Saved full normalized distorted image info to:
LIVE/normalized_distorted_images/training/patch_training.csv.
[training]: Saved patch info to LIVE/normalized_distorted_images/training/patch_training.csv.
[validation]: Saved full normalized distorted image info to:
LIVE/normalized_distorted_images/validation/patch_validation.csv.
[validation]: Saved patch info to LIVE/normalized_distorted_images/validation/patch_validation.csv.
[test]: Saved full normalized distorted image info to:
LIVE/normalized_distorted_images/test/patch_test.csv.
[test]: Saved patch info to LIVE/normalized_distorted_images/test/patch_test.csv.


In [None]:
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder

dataframes = [train_data, val_data, test_data]

for i in range(len(dataframes)):
  my_dists = dataframes[i]['Distortion']
  le = LabelEncoder()
  y_train_class_encoded = le.fit_transform(my_dists)
  my_dists_one_hot = to_categorical(y_train_class_encoded, num_classes=5)
  one_hot_df = pd.DataFrame(my_dists_one_hot, columns = ["Distortion_"+str(int(j)) for j in range(my_dists_one_hot.shape[1])])

  # Concatenate the one-hot encoded dataframe with your original dataframe
  dataframes[i] = pd.concat([dataframes[i], one_hot_df], axis=1)

  # If you want to drop the original 'Distortion' column
  dataframes[i] = dataframes[i].drop(['Distortion'], axis=1)

# Now, your dataframes are one-hot encoded and the original 'Distortion' column is dropped.
train_data, val_data, test_data = dataframes

In [77]:
from tensorflow.keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder
# Function to create a single column with one-hot encoded numpy arrays with integer values
def encode_distortion(dataframes):
    for i in range(len(dataframes)):
        my_dists = dataframes[i]['Distortion']
        le = LabelEncoder()
        y_class_encoded = le.fit_transform(my_dists)
        my_dists_one_hot = to_categorical(y_class_encoded, num_classes=5).astype(int)
        # Assign to a new column as numpy arrays
        dataframes[i]['Distortion_encoded'] = [np.array(one_hot) for one_hot in my_dists_one_hot]
        # Drop the original 'Distortion' column
        dataframes[i] = dataframes[i].drop(['Distortion'], axis=1)
    return dataframes

# Apply the function to the dataframes
train_data, val_data, test_data = encode_distortion([train_data, val_data, test_data])

In [78]:
test_data.head()

Unnamed: 0,image_filename,DMOS,Distortion_encoded
0,NORM_lighthouse2_4_5_patch_0.bmp,24.912864,"[0, 1, 0, 0, 0]"
1,NORM_lighthouse2_4_5_patch_1.bmp,24.912864,"[0, 1, 0, 0, 0]"
2,NORM_lighthouse2_4_5_patch_2.bmp,24.912864,"[0, 1, 0, 0, 0]"
3,NORM_lighthouse2_4_5_patch_3.bmp,24.912864,"[0, 1, 0, 0, 0]"
4,NORM_lighthouse2_4_5_patch_4.bmp,24.912864,"[0, 1, 0, 0, 0]"


In [None]:
help(ImageDataGenerator.flow_from_dataframe)

In [79]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

def create_data_generators(base_dir, train_data, val_data, test_data, batch_size):
    datagen = ImageDataGenerator()
    train_generator = datagen.flow_from_dataframe(
        dataframe=train_data,
        directory=os.path.join(base_dir, 'training/patches/'),
        x_col='image_filename',
        y_col=['DMOS', 'Distortion_encoded'],
        target_size=(32, 32),
        batch_size=batch_size,
        class_mode='multi_output',
        shuffle=True,
        seed=42
    )
    val_generator = datagen.flow_from_dataframe(
        dataframe=val_data,
        directory=os.path.join(base_dir, 'validation/patches/'),
        x_col='image_filename',
        y_col=['DMOS', 'Distortion_encoded'],
        target_size=(32, 32),
        batch_size=batch_size,
        class_mode='multi_output',
        shuffle=True,
        seed=42
    )
    test_generator = datagen.flow_from_dataframe(
        dataframe=test_data,
        directory=os.path.join(base_dir, 'test/patches/'),
        x_col='image_filename',
        y_col=['DMOS', 'Distortion_encoded'],
        target_size=(32, 32),
        batch_size=batch_size,
        class_mode='multi_output',
        shuffle=False,
        seed=42
    )
    return train_generator, val_generator, test_generator

# Define file paths
base_dir = 'LIVE/normalized_distorted_images'
# Create generators
train_generator, val_generator, test_generator = create_data_generators(base_dir, train_data, val_data, test_data, batch_size=32)


Found 161332 validated image filenames.
Found 53196 validated image filenames.
Found 53332 validated image filenames.


In [82]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, GlobalMaxPooling2D, Dense, Dropout, Input

# Define the model
input_shape = (32, 32, 3)

inputs = tf.keras.layers.Input(shape=input_shape)
x = tf.keras.layers.Conv2D(32, (3, 3), activation='relu')(inputs)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.Conv2D(64, (3, 3), activation='relu')(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.Conv2D(128, (3, 3), activation='relu')(x)
x = tf.keras.layers.MaxPooling2D((2, 2))(x)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(128, activation='relu')(x)
x = tf.keras.layers.Dense(64, activation='relu')(x)

dmos_output = tf.keras.layers.Dense(1, name='dmos')(x)
distortion_output = tf.keras.layers.Dense(5, activation='softmax', name='distortion')(x)

model = tf.keras.models.Model(inputs=inputs, outputs=[dmos_output, distortion_output])

model.compile(optimizer='adam',
              loss={'dmos': 'mse', 'distortion': 'categorical_crossentropy'},
              metrics={'dmos': 'mse', 'distortion': 'accuracy'})

In [83]:
history = model.fit(train_generator, epochs=10, validation_data=val_generator)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [64]:
help(tf.keras.models.Model)

Help on class Model in module keras.src.engine.training:

class Model(keras.src.engine.base_layer.Layer, keras.src.utils.version_utils.ModelVersionSelector)
 |  Model(*args, **kwargs)
 |  
 |  A model grouping layers into an object with training/inference features.
 |  
 |  Args:
 |      inputs: The input(s) of the model: a `keras.Input` object or a
 |          combination of `keras.Input` objects in a dict, list or tuple.
 |      outputs: The output(s) of the model: a tensor that originated from
 |          `keras.Input` objects or a combination of such tensors in a dict,
 |          list or tuple. See Functional API example below.
 |      name: String, the name of the model.
 |  
 |  There are two ways to instantiate a `Model`:
 |  
 |  1 - With the "Functional API", where you start from `Input`,
 |  you chain layer calls to specify the model's forward pass,
 |  and finally you create your model from inputs and outputs:
 |  
 |  ```python
 |  import tensorflow as tf
 |  
 |  inputs =