In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!unzip -q /content/drive/MyDrive/iqa/ChallengeDB_release.zip -d /content/dataset

replace /content/dataset/ChallengeDB_release/.DS_Store? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [2]:
import pandas as pd
import numpy as np

In [3]:
labels_data = pd.read_csv('/content/drive/MyDrive/iqa/mos.csv')
labels_data = labels_data.values
labels_data = labels_data[0]
labels_data.shape

(1169,)

In [4]:
folder_path = '/content/dataset/ChallengeDB_release/Images'

# Resnet50

In [14]:
import tensorflow as tf
import tensorflow.keras.layers as layers
import tensorflow.keras.models as models
"""ResNet, ResNetV2, and ResNeXt models for Keras.

# Reference papers

- [Deep Residual Learning for Image Recognition]
  (https://arxiv.org/abs/1512.03385) (CVPR 2016 Best Paper Award)
- [Identity Mappings in Deep Residual Networks]
  (https://arxiv.org/abs/1603.05027) (ECCV 2016)
- [Aggregated Residual Transformations for Deep Neural Networks]
  (https://arxiv.org/abs/1611.05431) (CVPR 2017)

# Reference implementations

- [TensorNets]
  (https://github.com/taehoonlee/tensornets/blob/master/tensornets/resnets.py)
- [Caffe ResNet]
  (https://github.com/KaimingHe/deep-residual-networks/tree/master/prototxt)
- [Torch ResNetV2]
  (https://github.com/facebook/fb.resnet.torch/blob/master/models/preresnet.lua)
- [Torch ResNeXt]
  (https://github.com/facebookresearch/ResNeXt/blob/master/models/resnext.lua)

"""


def block1(x, filters, kernel_size=3, stride=1,
           conv_shortcut=True, name=None):
    """A residual block.

    # Arguments
        x: input tensor.
        filters: integer, filters of the bottleneck layer.
        kernel_size: default 3, kernel size of the bottleneck layer.
        stride: default 1, stride of the first layer.
        conv_shortcut: default True, use convolution shortcut if True,
            otherwise identity shortcut.
        name: string, block label.

    # Returns
        Output tensor for the residual block.
    """
    bn_axis = 3

    if conv_shortcut is True:
        shortcut = layers.Conv2D(4 * filters, 1, strides=stride,
                                 name=name + '_0_conv')(x)
        shortcut = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                             name=name + '_0_bn')(shortcut)
    else:
        shortcut = x

    x = layers.Conv2D(filters, 1, strides=stride, name=name + '_1_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                  name=name + '_1_bn')(x)
    x = layers.Activation('relu', name=name + '_1_relu')(x)

    x = layers.Conv2D(filters, kernel_size, padding='SAME',
                      name=name + '_2_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                  name=name + '_2_bn')(x)
    x = layers.Activation('relu', name=name + '_2_relu')(x)

    x = layers.Conv2D(4 * filters, 1, name=name + '_3_conv')(x)
    x = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                  name=name + '_3_bn')(x)

    x = layers.Add(name=name + '_add')([shortcut, x])
    x = layers.Activation('relu', name=name + '_out')(x)
    return x


def stack1(x, filters, blocks, stride1=2, name=None):
    """A set of stacked residual blocks.

    # Arguments
        x: input tensor.
        filters: integer, filters of the bottleneck layer in a block.
        blocks: integer, blocks in the stacked blocks.
        stride1: default 2, stride of the first layer in the first block.
        name: string, stack label.

    # Returns
        Output tensor for the stacked blocks.
    """
    x = block1(x, filters, stride=stride1, name=name + '_block1')
    for i in range(2, blocks + 1):
        x = block1(x, filters, conv_shortcut=False, name=name + '_block' + str(i))
    return x


def ResNet50(inputs,
             preact=False,
             use_bias=True,
             model_name='resnet50',
             include_top=False,
             pooling='avg',
             classes=1000,
             return_feature_maps=True,
             return_last_map=False):
    """Instantiates the ResNet, ResNetV2, and ResNeXt architecture.

    Optionally loads weights pre-trained on ImageNet.
    Note that the data format convention used by the model is
    the one specified in your Keras config at `~/.keras/keras.json`.

    # Arguments
        stack_fn: a function that returns output tensor for the
            stacked residual blocks.
        preact: whether to use pre-activation or not
            (True for ResNetV2, False for ResNet and ResNeXt).
        use_bias: whether to use biases for convolutional layers or not
            (True for ResNet and ResNetV2, False for ResNeXt).
        model_name: string, model name.
        include_top: whether to include the fully-connected
            layer at the top of the network.
        weights: one of `None` (random initialization),
              'imagenet' (pre-training on ImageNet),
              or the path to the weights file to be loaded.
        input_tensor: optional Keras tensor
            (i.e. output of `layers.Input()`)
            to use as image input for the model.
        input_shape: optional shape tuple, only to be specified
            if `include_top` is False (otherwise the input shape
            has to be `(224, 224, 3)` (with `channels_last` data format)
            or `(3, 224, 224)` (with `channels_first` data format).
            It should have exactly 3 inputs channels.
        pooling: optional pooling mode for feature extraction
            when `include_top` is `False`.
            - `None` means that the output of the model will be
                the 4D tensor output of the
                last convolutional layer.
            - `avg` means that global average pooling
                will be applied to the output of the
                last convolutional layer, and thus
                the output of the model will be a 2D tensor.
            - `max` means that global max pooling will
                be applied.
        classes: optional number of classes to classify images
            into, only to be specified if `include_top` is True, and
            if no `weights` argument is specified.

    # Returns
        A Keras model instance.

    # Raises
        ValueError: in case of invalid argument for `weights`,
            or invalid input shape.
    """
    # global backend, layers, models, keras_utils
    # backend, layers, models, keras_utils = get_submodules_from_kwargs(kwargs)

    # Determine proper input shape
    bn_axis = 3

    x = layers.ZeroPadding2D(padding=((3, 3), (3, 3)), name='conv1_pad')(inputs)
    x = layers.Conv2D(64, 7, strides=2, use_bias=use_bias, name='conv1_conv')(x)

    if preact is False:
        x = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                      name='conv1_bn')(x)
        x = layers.Activation('relu', name='conv1_relu')(x)

    x = layers.ZeroPadding2D(padding=((1, 1), (1, 1)), name='pool1_pad')(x)
    x = layers.MaxPooling2D(3, strides=2, name='pool1_pool')(x)

    outputs = []
    x = stack1(x, 64, 3, stride1=1, name='conv2')
    outputs.append(x)

    x = stack1(x, 128, 4, name='conv3')
    outputs.append(x)

    x = stack1(x, 256, 6, name='conv4')
    outputs.append(x)

    x = stack1(x, 512, 3, name='conv5')
    outputs.append(x)
    # x = stack_fn(x)

    if return_last_map:
        # x_shape = x.get_shape()
        # s_shape = x_shape[1]*x_shape[2]
        # x = tf.reshape(x, [tf.shape(x)[0], x_shape[1] * x_shape[2], x_shape[-1]])
        # x = tf.reshape(x, [tf.shape(x)[0], tf.shape(x)[1] * tf.shape(x)[2], tf.shape(x)[-1]])
        model = models.Model(inputs, x, name='last_map')
        return model

    if return_feature_maps:
        model = models.Model(inputs, outputs, name=model_name)
        return model

    if preact is True:
        x = layers.BatchNormalization(axis=bn_axis, epsilon=1.001e-5,
                                      name='post_bn')(x)
        x = layers.Activation('relu', name='post_relu')(x)

    if include_top:
        x = layers.GlobalAveragePooling2D(name='avg_pool')(x)
        x = layers.Dense(classes, activation='softmax', name='probs')(x)
    else:
        if pooling == 'avg':
            x = layers.GlobalAveragePooling2D(name='avg_pool')(x)
        elif pooling == 'max':
            x = layers.GlobalMaxPooling2D(name='max_pool')(x)
        x = layers.Dense(1, activation='linear', name='final_fc')(x)

    # Create model.
    model = models.Model(inputs, x, name=model_name)

    return model


# if __name__ == '__main__':
#     weights = r'.\pretrained_weights\resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5'

#     gpus = tf.config.experimental.list_physical_devices('GPU')
#     tf.config.experimental.set_visible_devices(gpus[1], 'GPU')

input_shape = (None, None, 3)
inputs = layers.Input(shape=input_shape)

model = ResNet50(inputs,
                     return_feature_maps=True)
model.summary()
#     if weights is not None:
#         print('Load weights')
#         model.load_weights(weights, by_name=True)
    # C3, C4, C5 = model.outputs[1:]
    # t = 0
    # print(model.outputs)

Model: "resnet50"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_6 (InputLayer)        [(None, None, None, 3)]      0         []                            
                                                                                                  
 conv1_pad (ZeroPadding2D)   (None, None, None, 3)        0         ['input_6[0][0]']             
                                                                                                  
 conv1_conv (Conv2D)         (None, None, None, 64)       9472      ['conv1_pad[0][0]']           
                                                                                                  
 conv1_bn (BatchNormalizati  (None, None, None, 64)       256       ['conv1_conv[0][0]']          
 on)                                                                                       

# VGG16

In [13]:
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# pylint: disable=invalid-name
"""VGG16 model for Keras."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from tensorflow.python.keras import layers
from tensorflow.python.keras.engine import training


def VGG16(inputs, return_last_map=False):
    """Instantiates the VGG16 model.

    By default, it loads weights pre-trained on ImageNet. Check 'weights' for
    other options.

    This model can be built both with 'channels_first' data format
    (channels, height, width) or 'channels_last' data format
    (height, width, channels).

    The default input size for this model is 224x224.

    Caution: Be sure to properly pre-process your inputs to the application.
    Please see `applications.vgg16.preprocess_input` for an example.

    Arguments:
        include_top: whether to include the 3 fully-connected
            layers at the top of the network.
        weights: one of `None` (random initialization),
              'imagenet' (pre-training on ImageNet),
              or the path to the weights file to be loaded.
        input_tensor: optional Keras tensor
            (i.e. output of `layers.Input()`)
            to use as image input for the model.
        input_shape: optional shape tuple, only to be specified
            if `include_top` is False (otherwise the input shape
            has to be `(224, 224, 3)`
            (with `channels_last` data format)
            or `(3, 224, 224)` (with `channels_first` data format).
            It should have exactly 3 input channels,
            and width and height should be no smaller than 32.
            E.g. `(200, 200, 3)` would be one valid value.
        pooling: Optional pooling mode for feature extraction
            when `include_top` is `False`.
            - `None` means that the output of the model will be
                the 4D tensor output of the
                last convolutional block.
            - `avg` means that global average pooling
                will be applied to the output of the
                last convolutional block, and thus
                the output of the model will be a 2D tensor.
            - `max` means that global max pooling will
                be applied.
        classes: optional number of classes to classify images
            into, only to be specified if `include_top` is True, and
            if no `weights` argument is specified.
        classifier_activation: A `str` or callable. The activation function to use
            on the "top" layer. Ignored unless `include_top=True`. Set
            `classifier_activation=None` to return the logits of the "top" layer.

    Returns:
      A `keras.Model` instance.

    Raises:
      ValueError: in case of invalid argument for `weights`,
        or invalid input shape.
      ValueError: if `classifier_activation` is not `softmax` or `None` when
        using a pretrained top layer.
    """

    # Block 1
    x = layers.Conv2D(
        64, (3, 3), activation='relu', padding='same', name='block1_conv1')(
        inputs)
    x = layers.Conv2D(
        64, (3, 3), activation='relu', padding='same', name='block1_conv2')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2), name='block1_pool')(x)

    outputs = []
    # Block 2
    x = layers.Conv2D(
        128, (3, 3), activation='relu', padding='same', name='block2_conv1')(x)
    x = layers.Conv2D(
        128, (3, 3), activation='relu', padding='same', name='block2_conv2')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2), name='block2_pool')(x)
    outputs.append(x)

    # Block 3
    x = layers.Conv2D(
        256, (3, 3), activation='relu', padding='same', name='block3_conv1')(x)
    x = layers.Conv2D(
        256, (3, 3), activation='relu', padding='same', name='block3_conv2')(x)
    x = layers.Conv2D(
        256, (3, 3), activation='relu', padding='same', name='block3_conv3')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2), name='block3_pool')(x)
    outputs.append(x)

    # Block 4
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block4_conv1')(x)
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block4_conv2')(x)
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block4_conv3')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2), name='block4_pool')(x)
    outputs.append(x)

    # Block 5
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block5_conv1')(x)
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block5_conv2')(x)
    x = layers.Conv2D(
        512, (3, 3), activation='relu', padding='same', name='block5_conv3')(x)
    x = layers.MaxPooling2D((2, 2), strides=(2, 2), name='block5_pool')(x)
    outputs.append(x)

    if return_last_map:
        model = training.Model(inputs, x, name='vgg16')
    else:
        # Create model.
        model = training.Model(inputs, outputs, name='vgg16')

    return model


# if __name__ == '__main__':
#     model = VGG16(None)

# Transformer


In [8]:
!pip install tensorflow_addons



In [9]:
import tensorflow as tf
from tensorflow.keras import Model
import tensorflow_addons as tfa
from tensorflow.keras.layers import (
    Dense,
    Dropout,
    LayerNormalization,
    Layer,
    Conv2D,
    MaxPool2D
)


def create_padding_mask(input):
    """
    Creates mask for input to Transformer based on the average of all elements = 0
    :param input: input sequence
    :return: mask
    """
    input = tf.pad(input, paddings=[[0, 0], [1, 0], [0, 0]], constant_values=1)
    input = tf.cast(tf.math.equal(tf.keras.backend.mean(input, axis=-1), 0), tf.float32)

    # add extra dimensions to add the padding to the attention logits.
    return input[:, tf.newaxis, tf.newaxis, :]  # (batch_size, 1, 1, seq_len)


class MultiHeadAttention(Layer):
    """
    This is the standard multi-head attention layer
    """
    def __init__(self, d_model, num_heads=8):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        if d_model % num_heads != 0:
            raise ValueError(
                f'embedding dimension = {d_model} should be divisible by number of heads = {num_heads}'
            )
        self.depth = d_model // num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(
            x, (batch_size, -1, self.num_heads, self.depth)
        )
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def scaled_dot_product_attention(self, query, key, value, mask):
        matmul_qk = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = matmul_qk / tf.math.sqrt(dim_key)
        if mask is not None:
            scaled_score += (mask * -1e9)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def call(self, inputs, mask):
        batch_size = tf.shape(inputs)[0]

        query = self.wq(inputs)
        key = self.wk(inputs)
        value = self.wv(inputs)

        query = self.split_heads(query, batch_size)
        key = self.split_heads(key, batch_size)
        value = self.split_heads(value, batch_size)

        attention, weights = self.scaled_dot_product_attention(query, key, value, mask)
        attention = tf.transpose(attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.d_model)
        )
        output = self.dense(concat_attention)
        return output, weights


class TransformerBlock(Layer):
    """
    This is the standard Transformer block
    """
    def __init__(self, d_model, num_heads, dff, dropout=0.1):
        super(TransformerBlock, self).__init__()
        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = tf.keras.Sequential(
            [Dense(dff, activation="relu"),
             Dense(d_model),]
        )

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(dropout)
        self.dropout2 = Dropout(dropout)

    def call(self, x, training, mask, vis=False):
        attn_output, attention_weigths = self.mha(x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)
        if vis:
            return out2, attention_weigths
        else:
            return out2


class TriQImageQualityTransformer(Model):
    """
    Transformer for video quality assessment using the standard Transformer,
    the maximum_position_encoding should cover the maximal clip number in the databases
    """
    def __init__(
        self,
        num_layers,
        d_model,
        num_heads,
        mlp_dim,
        dropout=0.1,
        n_quality_levels=5,
        maximum_position_encoding=257,
        vis=False
    ):
        super(TriQImageQualityTransformer, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        # positional embedding is predefined with a sufficient length
        self.pos_emb = self.add_weight('pos_emb', shape=(1, maximum_position_encoding, d_model))

        # add video quality token
        self.quality_emb = self.add_weight('quality_emb', shape=(1, 1, d_model))

        # normal Transformer architecture
        self.feature_proj_conv = Conv2D(d_model, (1, 1))
        # self.feature_proj = Dense(d_model)

        # self.pooling_big = MaxPool2D(pool_size=(4, 4))
        self.pooling_small = MaxPool2D(pool_size=(2, 2))

        self.dropout = Dropout(dropout)
        self.enc_layers = [
            TransformerBlock(d_model, num_heads, mlp_dim, dropout)
            for _ in range(num_layers)
        ]
        self.vis = vis

        # MLP head
        if n_quality_levels > 1:
            mlp_activation = 'softmax'
        else:
            mlp_activation = 'linear'
        self.mlp_head = tf.keras.Sequential(
            [
                Dense(mlp_dim, activation=tfa.activations.gelu),
                Dropout(dropout),
                Dense(n_quality_levels, activation=mlp_activation),
            ]
        )

    def call(self, x, training):
        batch_size = tf.shape(x)[0]

        # spatial_size = tf.shape(x)[1] * tf.shape(x)[2]
        mask = None

        # x = tf.reshape(x, [batch_size, spatial_size, 2048])
        # print('{}, {}'.format(batch_size, spatial_size))

        # x = self.feature_proj(x)
        x = self.feature_proj_conv(x)

        if tf.shape(x)[1] >= 16:
            x = self.pooling_small(x)
        # elif tf.shape(x)[2] >= 24:
        #     x = self.pooling_small(x)

        spatial_size = tf.shape(x)[1] * tf.shape(x)[2]
        x = tf.reshape(x, [batch_size, spatial_size, self.d_model])

        # x = tf.reshape(x, [batch_size, 192, 2048])
        # x = self.feature_proj(x)

        quality_emb = tf.broadcast_to(self.quality_emb, [batch_size, 1, self.d_model])
        x = tf.concat([quality_emb, x], axis=1)

        # truncate the positional embedding for shorter videos
        # print(spatial_size)
        x = x + self.pos_emb[:, : spatial_size + 1, :]
        # x = x + self.pos_emb

        x = self.dropout(x, training=training)

        if self.vis:
            attention_weights = []
            for layer in self.enc_layers:
                x, attention_weight = layer(x, training, mask, vis=True)
                attention_weights.append(attention_weight)

        else:
            for layer in self.enc_layers:
                x = layer(x, training, mask)

        # First (CLS) is used for VQA
        # return x[:, 0]
        x = self.mlp_head(x[:, 0])

        if self.vis:
            return x, attention_weights
        else:
            return x


TensorFlow Addons (TFA) has ended development and introduction of new features.
TFA has entered a minimal maintenance and release mode until a planned end of life in May 2024.
Please modify downstream libraries to take dependencies from other repositories in our TensorFlow community (e.g. Keras, Keras-CV, and Keras-NLP). 

For more information see: https://github.com/tensorflow/addons/issues/2807 



# TRIQ

In [15]:
"""
Main function to build TRIQ.
"""
from tensorflow.keras.layers import Input
from tensorflow.keras.models import Model
# from backbone.resnet50 import ResNet50
# from backbone.vgg16 import VGG16
import tensorflow as tf


def create_triq_model(n_quality_levels,
                      input_shape=(None, None, 3),
                      backbone='resnet50',
                      transformer_params=(2, 32, 8, 64),
                      maximum_position_encoding=193,
                      vis=False):
    """
    Creates the hybrid TRIQ model
    :param n_quality_levels: number of quality levels, use 5 to predict quality distribution
    :param input_shape: input shape
    :param backbone: bakbone nets, supports ResNet50 and VGG16 now
    :param transformer_params: Transformer parameters
    :param maximum_position_encoding: the maximal number of positional embeddings
    :param vis: flag to visualize attention weight maps
    :return: TRIQ model
    """
    inputs = Input(shape=input_shape)
    if backbone == 'resnet50':
        backbone_model = ResNet50(inputs,
                                  return_feature_maps=False, return_last_map=True)
    elif backbone == 'vgg16':
        backbone_model = VGG16(inputs, return_last_map=True)
    else:
        raise NotImplementedError

    C5 = backbone_model.output

    dropout_rate = 0.1

    transformer = TriQImageQualityTransformer(
        num_layers=transformer_params[0],
        d_model=transformer_params[1],
        num_heads=transformer_params[2],
        mlp_dim=transformer_params[3],
        dropout=dropout_rate,
        n_quality_levels=n_quality_levels,
        maximum_position_encoding=maximum_position_encoding,
        vis=vis
    )
    outputs = transformer(C5)

    model = Model(inputs=inputs, outputs=outputs)
    # model.summary()
    return model


# if __name__ == '__main__':
gpus = tf.config.experimental.list_physical_devices('GPU')
print(gpus)
tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
# input_shape = [None, None, 3]
    # input_shape = [768, 1024, 3]
input_shape = [500, 500, 3]
    # input_shape = [384, 512, 3]
    # model = cnn_transformer(n_quality_levels=5, input_shape=input_shape, backbone='vgg16')
model = create_triq_model(n_quality_levels=5, input_shape=input_shape, backbone='resnet50')
model.summary()

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_7 (InputLayer)        [(None, 500, 500, 3)]        0         []                            
                                                                                                  
 conv1_pad (ZeroPadding2D)   (None, 506, 506, 3)          0         ['input_7[0][0]']             
                                                                                                  
 conv1_conv (Conv2D)         (None, 250, 250, 64)         9472      ['conv1_pad[0][0]']           
                                                                                                  
 conv1_bn (BatchNormalizati  (None, 250, 250, 64)         256       ['conv1_conv[0][0]']          
 on)                       

# Callbacks

In [13]:

import collections
import csv
import io
from tensorflow.python.lib.io import file_io

import numpy as np
import six
import datetime

from tensorflow.python.util.compat import collections_abc
from tensorflow.keras.callbacks import CSVLogger


class MyCSVLogger(CSVLogger):
    """
    This is basically a copy of CSVLogger, the only change is that 4 decimal precision is used in loggers.
    """
    def __init__(self, filename, model_name=None, separator=',', append=False):
        self.model_name = model_name
        super(MyCSVLogger, self).__init__(filename, separator, append)

    def on_train_begin(self, logs=None):
        if self.append:
            if file_io.file_exists(self.filename):
                with open(self.filename, 'r' + self.file_flags) as f:
                    self.append_header = not bool(len(f.readline()))
            mode = 'a'
        else:
            mode = 'w'
        self.csv_file = io.open(self.filename,
                                mode + self.file_flags,
                                **self._open_args)
        if self.model_name:
            self.csv_file.write('\nModel name: {}\n'.format(self.model_name))
        self.csv_file.write('\nTrain start: {}\n'.format(datetime.datetime.now().strftime("%d/%m/%Y %H:%M:%S")))
        self.csv_file.flush()

    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}

        def handle_value(k):
            is_zero_dim_ndarray = isinstance(k, np.ndarray) and k.ndim == 0
            if isinstance(k, six.string_types):
                return k
            elif isinstance(k, collections_abc.Iterable) and not is_zero_dim_ndarray:
                return '"[%s]"' % (', '.join(map(str, k)))
            else:
                return '{:.4f}'.format(k)

        if self.keys is None:
            self.keys = sorted(logs.keys())

        if self.model.stop_training:
            # We set NA so that csv parsers do not fail for this last epoch.
            logs = dict([(k, logs[k]) if k in logs else (k, 'NA') for k in self.keys])

        if not self.writer:
            class CustomDialect(csv.excel):
                delimiter = self.sep

            fieldnames = ['epoch'] + self.keys

            self.writer = csv.DictWriter(
                self.csv_file,
                fieldnames=fieldnames,
                dialect=CustomDialect)
            if self.append_header:
                self.writer.writeheader()

        row_dict = collections.OrderedDict({'epoch': epoch})
        row_dict.update((key, handle_value(logs[key])) for key in self.keys)
        self.writer.writerow(row_dict)
        self.csv_file.flush()


import os
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, TensorBoard
# from callbacks.csv_callback import MyCSVLogger


def create_callbacks(model_name, result_folder, other_callback=None, checkpoint=True, early_stop=True, metrics='accuracy'):
    """Creates callbacks for model training

    :param model_name: model name
    :param result_folder: folder to write to
    :param other_callback: other evaluation callbacks
    :param checkpoint: flag to use checkpoint or not
    :param early_stop: flag to use early_stop or not
    :param metrics: evaluation metrics for writing to checkpoint file
    :return: callbacks
    """

    callbacks = []
    if other_callback is not None:
        callbacks.append(other_callback)
    csv_log_file = os.path.join(result_folder, model_name + '.log')
    csv_logger = MyCSVLogger(csv_log_file, model_name, append=True, separator=';')
    callbacks.append(csv_logger)
    if early_stop:
        callbacks.append(EarlyStopping(monitor='plcc', min_delta=0.001, patience=40, mode='max'))
    if checkpoint:
        if metrics == None:
            mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{val_loss:.4f}.h5')
        else:
            if metrics == 'accuracy':
                mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{accuracy:.4f}_{val_loss:.4f}_{val_accuracy:.4f}.h5')
            elif metrics == 'mae':
                mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{mae:.4f}_{val_loss:.4f}_{val_mae:.4f}.h5')
            elif metrics == 'categorical_crossentropy':
                mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{categorical_crossentropy:.4f}_{val_loss:.4f}_{val_categorical_crossentropy:.4f}.h5')
            elif metrics == 'distribution_loss':
                mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{distribution_loss:.4f}_{val_loss:.4f}_{val_distribution_loss:.4f}.h5')
            else:
                mcp_file = os.path.join(result_folder, '{epoch:01d}_{loss:.4f}_{val_loss:.4f}.h5')
        mcp = ModelCheckpoint(mcp_file, save_best_only=True, save_weights_only=True, monitor='plcc', verbose=1, mode='max')
        callbacks.append(mcp)

    # tensorboard_callback = TensorBoard(log_dir=result_folder, histogram_freq=1)
    # callbacks.append(tensorboard_callback)

    return callbacks

# Image Scores

In [14]:
"""
This script contains several methods to process images and image groups
"""
import os
from sklearn.model_selection import train_test_split
import numpy as np
from PIL import Image
from scipy.ndimage import sobel
import shutil
import glob
import scipy.stats
import matplotlib.pyplot as plt


def si_image(image):
    """
    SI of image based on the ITU-R Recommendation
    :param image: image array
    :return: SI
    """
    # return np.mean(sobel(image))
    return np.std(sobel(image))


def get_scores(folders, image_scores):
    """
    Get the image scores in folders
    :param folders: data folders
    :param image_scores: a dictionary of images and their MOS scores
    :return: score list in the data folders
    """
    scores = []
    for folder in folders:
        files = os.listdir(folder)
        for file in files:
            file_name = file.lower()
            if file_name.endswith(('.jpg', '.bmp')):
                score = image_scores[file_name]
                scores.append(score)
    return scores


def get_image_means(train_folders):
    """
    Get R,G,B means of images in the train folders
    :param train_folders:
    :return: R,G,B means
    """
    # train_folders = [r'.\image_quality_koniq10k\train\koniq_normal',
    #                  r'.\image_quality_koniq10k\train\koniq_small',
    #                  r'.\image_quality_koniq10k\train\live']
    mean_R = 0
    mean_G = 0
    mean_B = 0
    std_R = 0
    std_G = 0
    std_B = 0
    num = 0
    for folder in train_folders:
        files = os.listdir(folder)
        for file in files:
            file_name = file.lower()
            if file_name.endswith(('.jpg', '.bmp')):
                image_file = os.path.join(folder, file)
                image = np.asarray(Image.open(image_file), dtype=np.float32)
                mean_R += np.mean(image[:, :, 0])
                mean_G += np.mean(image[:, :, 1])
                mean_B += np.mean(image[:, :, 2])
                std_R += np.std(image[:, :, 0])
                std_G += np.std(image[:, :, 1])
                std_B += np.std(image[:, :, 2])
                num += 1
    mean_R /= num
    mean_G /= num
    mean_B /= num
    std_R /= num
    std_G /= num
    std_B /= num
    print('Mean-R: {}, mean-G: {}, mean-B:{}'.format(mean_R, mean_G, mean_B))
    print('Std-R: {}, Std-G: {}, Std-B:{}'.format(std_R, std_G, std_B))


def get_si(folders):
    """
    Get SI values in data folders
    :param folders: data folders
    :return: SI list
    """
    si = []
    for folder in folders:
        files = os.listdir(folder)
        for file in files:
            file_name = file.lower()
            if file_name.endswith(('.jpg', '.bmp')):
                image_file = os.path.join(folder, file)
                image = np.asarray(Image.open(image_file), dtype=np.float32)
                si.append(si_image(image))
            print('{} done'.format(file))
    return si


def draw_train_val_si_hist():
    """
    Draw the histogram of SI of train and validation sets
    :return:
    """
    # train_folders = [r'.\image_quality_koniq10k\train\koniq_normal',
    #                  r'.\image_quality_koniq10k\train\live']
    # val_folders = [r'.\image_quality_koniq10k\val\koniq_normal',
    #                r'.\image_quality_koniq10k\val\live']
    train_folders = [r'.\database\train\koniq_normal',
                     r'.\database\train\live']
    val_folders = [r'.\database\val\koniq_normal',
                   r'.\database\val\live']

    train_si = get_si(train_folders)
    val_si = get_si(val_folders)
    np.save(r'.\database\train_si.npy', train_si)
    np.save(r'.\database\val_si.npy', val_si)
    max_si = np.max(train_si)
    min_si = np.min(train_si)

    plt.figure()
    bins = np.linspace(min_si, max_si, 100)
    # bins = 100
    plt.hist(train_si, bins=bins, alpha=0.5, rwidth=0.95, color='skyblue', label='Train set')
    plt.xlim(min_si, max_si)
    plt.hist(val_si, bins=bins, alpha=1., rwidth=0.95, label='Validation set')
    plt.legend(loc='upper right')
    # plt.ylabel('Density')
    plt.xlabel('SI', fontsize=14)
    # plt.show()

    # plt.subplot(211)
    # plt.hist(train_si, density=True, bins=100)
    # plt.ylabel('Density')
    # plt.xlabel('Train SI')
    #
    # plt.subplot(212)
    # plt.hist(val_si, density=True, bins=100)
    # plt.ylabel('Density')
    # plt.xlabel('Val SI')
    # plt.show()


def draw_train_val_mos_hist():
    """
    Draw the histogram of MOS in the train and val sets
    :return:
    """
    train_folders = [r'.\database\train\koniq_normal',
                     # r'.\database\train\koniq_small',
                     r'.\database\train\live']
    val_folders = [r'.\database\val\koniq_normal',
                   # r'.\database\val\koniq_small',
                   r'.\database\val\live']

    koniq_mos_file = r'.\database\koniq10k_images_scores.csv'
    live_mos_file = r'.\database\live_wild\live_mos.csv'
    image_scores = get_image_scores(koniq_mos_file, live_mos_file)
    train_scores = get_scores(train_folders, image_scores)
    val_scores = get_scores(val_folders, image_scores)

    plt.figure()
    plt.subplot(211)
    bins = np.linspace(1, 5, 100)
    plt.hist(train_scores, bins=bins, alpha=0.5, rwidth=0.95, color='skyblue', label='Training set')
    plt.xlim(1, 5)
    plt.hist(val_scores, bins=bins, alpha=1., rwidth=0.95, label='Testing set')
    plt.legend(loc='upper left')
    # plt.ylabel('Density')
    plt.xlabel('MOS', fontsize=14)

    train_si = np.load(r'.\database\train_si.npy')
    val_si = np.load(r'.\database\val_si.npy')
    max_si = np.max(train_si)
    min_si = np.min(train_si)

    plt.subplot(212)
    bins = np.linspace(min_si, max_si, 100)
    # bins = 100
    plt.hist(train_si, bins=bins, alpha=0.5, rwidth=0.95, color='skyblue', label='Training set')
    plt.xlim(min_si, max_si)
    plt.hist(val_si, bins=bins, alpha=1., rwidth=0.95, label='Testing set')
    plt.legend(loc='upper right')
    # plt.ylabel('Density')
    plt.xlabel('SI', fontsize=14)

    plt.show()


def get_image_scores_from_two_file_formats(mos_file, file_format, mos_format, using_single_mos=True):
    """
    Get single MOS or distribution of scores from mos files with two format: koniq and live
    :param mos_file: mos file containing image path, distribution or std, and MOS
    :param file_format: koniq or live
    :param mos_format: MOS or Z-score
    :param using_single_mos: single MOS or distribution
    :return: dict {image_path: MOS or distribution}
    """
    mos_scale = [1, 2, 3, 4, 5]
    image_files = {}
    with open(mos_file, 'r+') as f:
        lines = f.readlines()
        for line in lines:
            content = line.split(',')
            image_file = content[0].replace('"', '').lower()

            if using_single_mos:
                score = float(content[-1]) if mos_format == 'mos' else float(content[1]) / 25. + 1
            else:
                if file_format == 'koniq':
                    scores_softmax = np.array([float(score) for score in content[1 : 6]])
                    score = [score_softmax / scores_softmax.sum() for score_softmax in scores_softmax]
                else:
                    std = float(content[-2]) if mos_format == 'mos' else float(content[-2]) / 25.
                    mean = float(content[-1]) if mos_format == 'mos' else float(content[-1]) / 25. + 1
                    score = get_distribution(mos_scale, mean, std)

            image_files[image_file] = score
    return image_files


def get_image_scores(koniq_mos_file, live_mos_file, using_single_mos=True):
    # image_scores_koniq = get_image_scores_from_two_file_formats(koniq_mos_file, 'koniq', 'mos', using_single_mos)
    image_scores = get_image_scores_from_two_file_formats(koniq_mos_file, 'koniq', 'mos', using_single_mos)
    # image_scores_live = get_image_scores_from_two_file_formats(live_mos_file, 'live', 'z-score', using_single_mos)
    # return {**image_scores_koniq, **image_scores_live}
    return {**image_scores}


def get_image_score_from_groups(folders, image_scores):
    """
    Get group lists of image files and scores
    :param folders: image folders
    :param image_scores: a dictionary of images and their MOS scores
    :return: two lists
                image_file_groups: a list containing image file groups, each group containing image files
                score_groups: a list containing score groups, each group containing image scores
    """
    image_file_groups = []
    score_groups = []
    for folder in folders:
        files = os.listdir(folder)
        image_file_group = []
        score_group = []
        for file in files:
            file_name = file.lower()
            if file_name in image_scores:
                score = image_scores[file_name]
                score_group.append(score)
                image_file_group.append(os.path.join(folder, file))

        image_file_groups.append(image_file_group)
        score_groups.append(score_group)
    return image_file_groups, score_groups


def get_distribution(score_scale, mean, std, distribution_type='standard'):
    """
    Calculate the distribution of scores from MOS and standard distribution, two types of distribution are supported:
        standard Gaussian and Truncated Gaussian
    :param score_scale: MOS scale, e.g., [1, 2, 3, 4, 5]
    :param mean: MOS
    :param std: standard deviation
    :param distribution_type: distribution type (standard or truncated)
    :return: Distribution of scores
    """
    if distribution_type == 'standard':
        distribution = scipy.stats.norm(loc=mean, scale=std)
    else:
        distribution = scipy.stats.truncnorm((score_scale[0] - mean) / std, (score_scale[-1] - mean) / std, loc=mean, scale=std)
    score_distribution = []
    for s in score_scale:
        score_distribution.append(distribution.pdf(s))

    return score_distribution


def get_live_images():
    image_folder = r'.\database\live_wild\Images'
    image_mos_file = r'.\database\live_wild\live_mos.csv'
    # image_si = []
    # scores = []
    image_files = {}
    with open(image_mos_file, 'r+') as f:
        lines = f.readlines()
        for line in lines:
            content = line.split(',')
            image_file = os.path.join(image_folder, content[0])
            # image_files.append(image_file)
            # image = np.asarray(image_file, dtype=np.float32)
            score = float(content[-1])
            mos = (score / 25.) + 1
            image_files[image_file] = mos
            # scores.append(mos)
            # image_si.append(si_image(image))
    return image_files


def split_train_val(ratio=0.5):
    """
    Randomly split images in a database to training and testing sets in terms of SI and MOS
    :param ratio: splitting ratio
    :return:
    """
    target_train_folder = r'.\database\train\live'
    target_val_folder = r'.\database\val\live'
    # image_files = self.get_si_scores()
    image_files = get_live_images()
    mos_scale = [1, 2, 3, 4, 5]
    image_groups = []
    # train_image_files = []
    # val_image_files = []
    for s in range(len(mos_scale)-1):
        images = []
        for k, v in image_files.items():
            if mos_scale[s] <= v < mos_scale[s + 1]:
                images.append(k)
        image_groups.append(images)

    for image_group in image_groups:
        image_si = {}
        for image_file in image_group:
            image = np.asarray(Image.open(image_file), dtype=np.float32)
            si = si_image(image)
            image_si[image_file] = si

        sorted_si = {k : v for k, v in sorted(image_si.items(), key=lambda item: item[1])}
        val_num = int(1.0 / (1 - ratio)) + 1
        sorted_image_files = sorted_si.keys()
        for i, sorted_image_file in enumerate(sorted_image_files):
            basename = os.path.basename(sorted_image_file)
            image = Image.open(sorted_image_file)
            resized_image = image.resize(512, 512)
            if i % val_num == 0:
                resized_image.save(os.path.join(target_val_folder, basename))
                # shutil.copy(sorted_image_file, os.path.join(target_val_folder, basename))
                # val_image_files.append(sorted_image_file)
            else:
               resized_image.save(os.path.join(target_train_folder, basename))
                # shutil.copy(sorted_image_file, os.path.join(target_train_folder, basename))
                # train_image_files.append(sorted_image_file)


def resize_koniq_images(image_folder):
    """
    Halve size of images in the KonIQ-10k database
    :param image_folder: image folder of KonIQ-10 database
    :return:
    """
    target_folder = r'.\database\train\koniq_small'
    image_files = glob.glob(os.path.join(image_folder, '*.jpg'))
    for image_file in image_files:
        image = Image.open(image_file)
        resized_image = image.resize((512, 384))
        basename = os.path.basename(image_file)
        resized_image.save(os.path.join(target_folder, basename))


class GroupProvider:
    def __init__(self, image_folder, image_mos_file):
        self.image_folder = image_folder
        self.image_mos_file = image_mos_file

    def get_si_scores(self):
        # image_si = []
        # scores = []
        image_files = {}
        with open(self.image_mos_file, 'r+') as f:
            lines = f.readlines()
            for line in lines:
                content = line.split(',')
                image_file = os.path.join(self.image_folder, content[0].replace('"', ''))
                # image = np.asarray(Image.open(image_file), dtype=np.float32)
                # si = si_image(image)
                score = float(content[-3])
                image_files[image_file] = score
                # scores.append(score)
                # image_si.append(si)
        return image_files

    def generate_images(self):
        image_files, scores, image_files = self.get_si_scores()
        train_image_files, test_image_files, train_scores, test_scores = train_test_split(image_files, scores,
                                                                                          test_size=0.1,
                                                                                          random_state=42)
        return train_image_files, test_image_files, train_scores, test_scores

    def get_live_images(self):
        image_folder = r'.\database\live_wild\Images'
        image_mos_file = r'.\database\live_wild\live_mos.csv'
        # image_si = []
        # scores = []
        image_files = {}
        with open(image_mos_file, 'r+') as f:
            lines = f.readlines()
            for line in lines:
                content = line.split(',')
                image_file = os.path.join(image_folder, content[0])
                # image_files.append(image_file)
                # image = np.asarray(image_file, dtype=np.float32)
                score = float(content[1])
                mos = (score / 25.) + 1
                image_files[image_file] = mos
                # scores.append(mos)
                # image_si.append(si_image(image))
        return image_files

    def split_train_val(self, ratio=0.5):
        target_train_folder = r'.\database\train\live'
        target_val_folder = r'.\database\val\live'
        # image_files = self.get_si_scores()
        image_files = self.get_live_images()
        mos_scale = [1, 2, 3, 4, 5]
        image_groups = []
        # train_image_files = []
        # val_image_files = []
        for s in range(len(mos_scale)-1):
            images = []
            for k, v in image_files.items():
                if mos_scale[s] <= v < mos_scale[s + 1]:
                    images.append(k)
            image_groups.append(images)

        for image_group in image_groups:
            image_si = {}
            for image_file in image_group:
                image = np.asarray(Image.open(image_file), dtype=np.float32)
                si = si_image(image)
                image_si[image_file] = si

            sorted_si = {k : v for k, v in sorted(image_si.items(), key=lambda item: item[1])}
            val_num = int(1.0 / (1 - ratio)) + 1
            sorted_image_files = sorted_si.keys()
            for i, sorted_image_file in enumerate(sorted_image_files):
                basename = os.path.basename(sorted_image_file)
                image = Image.open(sorted_image_file)
                resized_image = image.resize(512, 512)
                if i % val_num == 0:
                    resized_image.save(os.path.join(target_val_folder, basename))
                    # shutil.copy(sorted_image_file, os.path.join(target_val_folder, basename))
                    # val_image_files.append(sorted_image_file)
                else:
                   resized_image.save(os.path.join(target_train_folder, basename))
                    # shutil.copy(sorted_image_file, os.path.join(target_train_folder, basename))
                    # train_image_files.append(sorted_image_file)

    def resize_koniq_images(self, image_folder):
        target_folder = r'.\database\train\koniq_small'
        image_files = glob.glob(os.path.join(image_folder, '*.jpg'))
        for image_file in image_files:
            image = Image.open(image_file)
            resized_image = image.resize((512, 384))
            basename = os.path.basename(image_file)
            resized_image.save(os.path.join(target_folder, basename))


# if __name__ == '__main__':
    # image_folder = r'.\database\1024x768'
    # image_mos_file = r'.\database\koniq10k_images_scores.csv'

    # provider = GroupProvider(image_folder, image_mos_file)
    # provider.split_train_val()
    # print(1e-4/2)

    # draw_train_val_si_hist()
    # draw_train_val_mos_hist()
    # get_distribution()

    # get_image_means()

    # v = [4,5]
    # s = np.std(v)
    #
    # mean = 68.9221 / 25. + 1
    # std = 21.2405 / 25.
    # mos_scale = [1, 2, 3, 4, 5]
    # score = get_distribution(mos_scale, mean, std)



# Training

In [15]:
import os
import numpy as np
import glob
import tensorflow as tf
from tensorflow.keras.optimizers import Adam

# from models.triq_model import create_triq_model
# from callbacks.callbacks import create_callbacks
# from misc.imageset_handler import get_image_scores, get_image_score_from_groups
# from train.group_generator import GroupGenerator
# from callbacks.evaluation_callback_generator import ModelEvaluationIQGenerator
# from callbacks.warmup_cosine_decay_scheduler import WarmUpCosineDecayScheduler


def identify_best_weights(result_folder, history, best_plcc):
    pos = np.where(history['plcc'] == best_plcc)[0][0]

    pos_loss = '{}_{:.4f}'.format(pos + 1, history['loss'][pos])
    all_weights_files = glob.glob(os.path.join(result_folder, '*.h5'))
    for all_weights_file in all_weights_files:
        weight_file = os.path.basename(all_weights_file)
        if weight_file.startswith(pos_loss):
            best_weights_file = all_weights_file
            return best_weights_file
    return None


def remove_non_best_weights(result_folder, best_weights_files):
    all_weights_files = glob.glob(os.path.join(result_folder, '*.h5'))
    for all_weights_file in all_weights_files:
        if all_weights_file not in best_weights_files:
            os.remove(all_weights_file)


def train_main(args):
    if args['multi_gpu'] == 0:
        gpus = tf.config.experimental.list_physical_devices('GPU')
        tf.config.experimental.set_visible_devices(gpus[args['gpu']], 'GPU')

    result_folder = args['result_folder']
    model_name = 'triq_conv2D_all'

    # Define loss function according to prediction objective (score distribution or MOS)
    if args['n_quality_levels'] > 1:
        using_single_mos = False
        loss = 'categorical_crossentropy'
        metrics = None
        model_name += '_distribution'
    else:
        using_single_mos = True
        metrics = None
        loss = 'mse'
        model_name += '_mos'

    if args['lr_base'] < 1e-4 / 2:
        model_name += '_finetune'
    if not args['image_aug']:
        model_name += '_no_imageaug'

    optimizer = Adam(args['lr_base'])

    if args['multi_gpu'] > 0:
        strategy = tf.distribute.MirroredStrategy(cross_device_ops=tf.distribute.HierarchicalCopyAllReduce())
        print('Number of devices: {}'.format(strategy.num_replicas_in_sync))

        with strategy.scope():
            # Everything that creates variables should be under the strategy scope.
            # In general this is only model construction & `compile()`.
            model = create_triq_model(n_quality_levels=5,
                                      input_shape=(None, None, 3),
                                      backbone=args['backbone'],
                                      maximum_position_encoding=193)

            model.compile(loss=loss, optimizer=optimizer, metrics=[metrics])

    else:
        model = create_triq_model(n_quality_levels=5,
                                  input_shape=(None, None, 3),
                                  backbone=args['backbone'],
                                  maximum_position_encoding=193)
        model.compile(loss=loss, optimizer=optimizer, metrics=[metrics])

    # model.summary()
    print('Load ImageNet weights')
    model.load_weights(args['weights'], by_name=True)

    imagenet_pretrain = True

    # Define train and validation data
    image_scores = get_image_scores(args['koniq_mos_file'], args['live_mos_file'], using_single_mos=using_single_mos)
    """
    train_image_file_groups, train_score_groups = get_image_score_from_groups(args['train_folders'], image_scores)
    train_generator = GroupGenerator(train_image_file_groups,
                                     train_score_groups,
                                     batch_size=args['batch_size'],
                                     image_aug=args['image_aug'],
                                     imagenet_pretrain=imagenet_pretrain)
    train_steps = train_generator.__len__()

    if args['val_folders'] is not None:
        test_image_file_groups, test_score_groups = get_image_score_from_groups(args['val_folders'], image_scores)
        validation_generator = GroupGenerator(test_image_file_groups,
                                              test_score_groups,
                                              batch_size=args['batch_size'],
                                              image_aug=False,
                                              imagenet_pretrain=imagenet_pretrain)
        validation_steps = validation_generator.__len__()

        evaluation_callback = ModelEvaluationIQGenerator(validation_generator,
                                                         using_single_mos,
                                                         evaluation_generator=None)

    else:
        evaluation_callback = None
        validation_generator = None
        validation_steps = 0
"""
    result_folder = os.path.join(result_folder, model_name)
    if not os.path.exists(result_folder):
        os.makedirs(result_folder)

    # Create callbacks including evaluation and learning rate scheduler
    callbacks = create_callbacks(model_name,
                                 result_folder,
                                 evaluation_callback,
                                 checkpoint=True,
                                 early_stop=True,
                                 metrics=metrics)

    warmup_epochs = 10
    if args['lr_schedule']:
        total_train_steps = args['epochs'] * train_steps
        warmup_steps = warmup_epochs * train_steps
        warmup_lr = WarmUpCosineDecayScheduler(learning_rate_base=args['lr_base'],
                                               total_steps=total_train_steps,
                                               warmup_learning_rate=0.0,
                                               warmup_steps=warmup_steps,
                                               hold_base_rate_steps=30 * train_steps,
                                               verbose=1)
        callbacks.append(warmup_lr)


    # Define optimizer and train

    model_history = model.fit(x=train_generator,
                              epochs=args['epochs'],
                              steps_per_epoch=train_steps,
                              validation_data=validation_generator,
                              validation_steps=validation_steps,
                              verbose=1,
                              shuffle=False,
                              callbacks=callbacks,
                              initial_epoch=args['initial_epoch'],
                              )

    # model.save(os.path.join(result_folder, model_name + '.h5'))
    # plot_history(model_history, result_folder, model_name)

    best_weights_file = identify_best_weights(result_folder, model_history.history, callbacks[3].best)
    remove_non_best_weights(result_folder, [best_weights_file])

    # do fine-tuning
    if args['do_finetune'] and best_weights_file:
        print('Finetune...')
        del (callbacks[-1])
        model.load_weights(best_weights_file)
        finetune_lr = 1e-6
        if args['lr_schedule']:
            warmup_lr_finetune = WarmUpCosineDecayScheduler(learning_rate_base=finetune_lr,
                                                            total_steps=total_train_steps,
                                                            warmup_learning_rate=0.0,
                                                            warmup_steps=warmup_steps,
                                                            hold_base_rate_steps=10 * train_steps,
                                                            verbose=1)
            callbacks.append(warmup_lr_finetune)
        finetune_optimizer = Adam(finetune_lr)
        model.compile(loss=loss, optimizer=finetune_optimizer, metrics=[metrics])

        finetune_model_history = model.fit(x=train_generator,
                                  epochs=args['epochs'],
                                  steps_per_epoch=train_steps,
                                  validation_data=validation_generator,
                                  validation_steps=validation_steps,
                                  verbose=1,
                                  shuffle=False,
                                  callbacks=callbacks,
                                  initial_epoch=args['initial_epoch'],
                                  )

        best_weights_file_finetune = identify_best_weights(result_folder, finetune_model_history.history, callbacks[3].best)
        remove_non_best_weights(result_folder, [best_weights_file, best_weights_file_finetune])


# if __name__ == '__main__':
# def main():
args = {}
args['multi_gpu'] = 1
args['gpu'] = 1

args['result_folder'] = r'.\database\results_triq\triq_conv2D_all'
args['n_quality_levels'] = 5

args['backbone'] = 'resnet50'

args['train_folders'] = [
        r'.\database\train\koniq_normal',
        r'.\database\train\koniq_small',
        r'.\database\train\live']
args['val_folders'] = [
        r'.\database\val\koniq_normal',
        r'.\database\val\koniq_small',
        r'.\database\val\live']
# args['koniq_mos_file'] = r'.\database\koniq10k_images_scores.csv'
args['live_mos_file'] = r'.\database\live_wild\live_mos.csv'

args['initial_epoch'] = 0

args['lr_base'] = 1e-4/2
args['lr_schedule'] = True
args['batch_size'] = 8
args['epochs'] = 120

args['image_aug'] = True
    # args['weights'] = r'.\pretrained_weights\vgg16_weights_tf_dim_ordering_tf_kernels_notop.h5'
# args['weights'] = r'.\pretrained_weights\resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5'
args['weights'] = r'./drive/MyDrive/iqa/resnet50_weights_tf_dim_ordering_tf_kernels_notop.h5'

args['do_finetune'] = True

train_main(args)

Number of devices: 1


ValueError: ignored

# train & test data

In [33]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split


# Import necessary libraries
import torch
from PIL import Image
import torchvision.transforms as transforms


def load_images_from_folder(folder_path):
    transform = transforms.Compose([
      transforms.PILToTensor()
  ])
    images = []
    image_path = []
    labels = []
    i=0
    print(folder_path)
    for filename in os.listdir(folder_path):
        # if i== 100:
        #   break
        label = filename.split('.')[0]  # Assuming the filename is in the format "label.xxx.jpg/png"

        path = os.path.join(folder_path, filename)
        img = cv2.imread(path)
        # if path == '/content/dataset/ChallengeDB_release/Images/trainingImages' or path == '/content/dataset/ChallengeDB_release/Images/.DS_Store':
        #   continue
        # img = Image.open(path).convert('RGB')
        # img = transform(img)
        if img is not None:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB format
            # print(type(img.tolist)))
            # torch.cat(images, out=img)
            # torch.cat(label, out=img)
            images.append(img)
            image_path.append(path)
            labels.append(labels_data[int(label)-1])
        i+=1
    return images, labels, image_path

# Path to the folder containing your images
folder_path = '/content/dataset/ChallengeDB_release/Images'

images, labels, image_path = load_images_from_folder(folder_path)

# You might need to preprocess your images here (resize, normalize, etc.) before using them to train your CNN model

# Convert the lists to numpy arrays
images = np.asarray(images)
# labels = np.asarray(labels)
# print(images)
# images = torch.FloatTensor(images)
# labels = torch.tensor(labels)
# images = torch.cat(images)
# labels = torch.cat(labels)

# Split the data into training and testing sets
train_images, test_images, train_labels, test_labels = train_test_split(image_path, labels, test_size=0.2, random_state=42)

# Now, you can use train_images, train_labels, test_images, and test_labels to train your CNN model


/content/dataset/ChallengeDB_release/Images


  images = np.asarray(images)


In [35]:
train_labels = [str(int(i/20)) for i in train_labels]
test_labels = [str(int(i/20)) for i in test_labels]

In [37]:
from keras.preprocessing.image import ImageDataGenerator

# Define parameters for ImageDataGenerator
datagen = ImageDataGenerator(
    rescale=1./255,  # rescale pixel values between 0 and 1
    # rotation_range=20,  # randomly rotate images in the range (degrees, 0 to 180)
    # width_shift_range=0.2,  # randomly shift images horizontally (fraction of total width)
    # height_shift_range=0.2,  # randomly shift images vertically (fraction of total height)
    # shear_range=0.2,  # shear intensity (angle in counter-clockwise direction in degrees)
    # zoom_range=0.2,  # randomly zooming inside pictures
    # horizontal_flip=False,  # randomly flip images horizontally
    # fill_mode='nearest'  # strategy used for filling in newly created pixels
)
train_dataframe = pd.DataFrame({'filename': train_images, 'class': train_labels})
# Provide the directory where your images are stored




train_generator = datagen.flow_from_dataframe(
    dataframe = train_dataframe ,  # path to the training data folder
    target_size=(500, 500),  # resizing images to a fixed size
    batch_size=8,
    class_mode='categorical',
    classes=train_dataframe['class'].unique().tolist()# binary labels (if you have more classes, use 'categorical')
)


test_dataframe = pd.DataFrame({'filename':test_images,'class': test_labels})
test_generator = datagen.flow_from_dataframe(
    dataframe = test_dataframe,  # path to the training data folder
    target_size=(500, 500),  # resizing images to a fixed size
    batch_size=8,
    class_mode='categorical',
    classes=train_dataframe['class'].unique().tolist()# binary labels (if you have more classes, use 'categorical')
)

# Use the generator in model training
# model.fit_generator(
#     train_generator,
#     steps_per_epoch=2000 // 32,  # total_training_samples // batch_size
#     epochs=50
# )

Found 929 validated image filenames belonging to 5 classes.
Found 233 validated image filenames belonging to 5 classes.


In [57]:
from tensorflow.keras.optimizers import Adam
model = create_triq_model(n_quality_levels=5,
                                  input_shape=(500, 500, 3),
                                  backbone='resnet50',
                                  maximum_position_encoding=193)
loss = 'categorical_crossentropy'
metrics = None
optimizer = Adam(1e-3/2)
model.compile(loss=loss, optimizer=optimizer, metrics=[metrics])

In [39]:
train_steps = train_generator.__len__()
test_steps = test_generator.__len__()

In [40]:
for data_batch, labels_batch in train_generator:
    print('Data batch shape:', data_batch.shape)
    print(labels_batch) # Shape of the batch of images
    print('Labels batch shape:', labels_batch.shape)  # Shape of the batch of labels
    break
for data_batch, labels_batch in test_generator:
    print('Data batch shape:', data_batch.shape)
    print(labels_batch)  # Shape of the batch of images
    print('Labels batch shape:', labels_batch.shape)  # Shape of the batch of labels
    break

Data batch shape: (8, 500, 500, 3)
[[0. 1. 0. 0. 0.]
 [0. 0. 0. 1. 0.]
 [0. 0. 1. 0. 0.]
 [0. 0. 1. 0. 0.]
 [1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 1.]
 [0. 0. 0. 1. 0.]
 [0. 0. 1. 0. 0.]]
Labels batch shape: (8, 5)
Data batch shape: (8, 500, 500, 3)
[[1. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [0. 1. 0. 0. 0.]
 [1. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0.]
 [0. 0. 1. 0. 0.]
 [1. 0. 0. 0. 0.]]
Labels batch shape: (8, 5)


In [58]:
model_history = model.fit(train_generator,
                              epochs=10,
                              steps_per_epoch=train_steps,
                              validation_data=test_generator,
                              validation_steps=test_steps,
                              verbose=1,
                              batch_size=8
                              )

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
from numba import cuda
device = cuda.get_current_device()
device.reset()

In [59]:
results = model.predict(test_generator)



In [60]:
predicted_test_classes = []
for sample in results:
  quality = 0
  for i in range(5):
    quality += sample[i]*i
  predicted_test_classes.append(quality)

In [66]:
results[10]

array([0.45605758, 0.06827413, 0.27128553, 0.12835717, 0.07602566],
      dtype=float32)

In [61]:
predicted_test_classes

[1.2722263187170029,
 1.2783728241920471,
 1.2701248154044151,
 1.2871485650539398,
 1.2783337235450745,
 1.2708377316594124,
 1.2590095326304436,
 1.269657738506794,
 1.2779415175318718,
 1.2732749581336975,
 1.3000193387269974,
 1.3002692386507988,
 1.2646305486559868,
 1.302994281053543,
 1.299253299832344,
 1.261731095612049,
 1.2642550617456436,
 1.2804260849952698,
 1.2707082107663155,
 1.3041929826140404,
 1.301278680562973,
 1.2737338170409203,
 1.2714650928974152,
 1.2783905863761902,
 1.2717852219939232,
 1.2639565020799637,
 1.270854502916336,
 1.2998098954558372,
 1.2730018571019173,
 1.2678489536046982,
 1.3012461438775063,
 1.3031225427985191,
 1.3009610623121262,
 1.2756898924708366,
 1.2740958705544472,
 1.2648754939436913,
 1.3016283214092255,
 1.2943001314997673,
 1.289016880095005,
 1.2717735469341278,
 1.2743413746356964,
 1.280335620045662,
 1.2720211446285248,
 1.30580023676157,
 1.2690058574080467,
 1.2664222568273544,
 1.273209162056446,
 1.3011225834488869,
 1.