In [1]:
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.callbacks import Callback, CSVLogger, TensorBoard
from tensorflow.python.saved_model.loader_impl import parse_saved_model
from tensorflow.keras.losses import MSE, MeanAbsoluteError, BinaryCrossentropy
from modules.CustomLosses import LSSIM, L1AdversarialLoss, AdversarialLoss
from modules.misc import ssim_metric, get_model
from psnrb import psnrb
from modules.DataMod import DataSet
from modules.TrainingManager import KerasTrainingManager
from os import environ
from typing import Callable
import numpy as np

environ["CUDA_VISIBLE_DEVICES"]="1"


In [6]:
def LPSNRB (target_imgs, degraded_imgs):
    return -psnrb(target_imgs, degraded_imgs)

In [7]:
model = get_model(model_json_name = "AutoEncoder-2.0-64x64.json")

In [11]:
model.compile(
    optimizer = SGD(),
    loss = LSSIM(),
    metrics = [ssim_metric, psnrb]
)

In [12]:
dataset = DataSet().load_rafael_cifar_10_noise_data()

In [4]:
class MultipleTrainingLogger(Callback):
    """

    """
    def __init__(self, 
                 stop_fucntion: Callable[[list, str, Callable], bool],
                 metric_name: str,
                 dir_name: str = 'logs'
                   ):
        super(MultipleTrainingLogger, self).__init__()
        
        self.training_index: int 
        
        self.per_epoch_batch_results: dict = {}
        self.epoch_mean_results: dict = {}
        self.all_batch_results: dict = {}

        self.stop_function: Callable[[list, str, Callable], bool] = stop_fucntion
        self.metric_name: str = metric_name
        self.stoped_epoch: int = 0
        self.training_stoped: bool = False

        self.dir_name = dir_name
    
    # Geters

    def get_optimizer_kwargs(self) -> dict: 
        return self.model.optimizer.get_config()

    def get_model_name(self) -> str:
        return self.model.name

    def get_loss_kwargs(self) -> dict:
        pass


    # Operations

    def append_results(self, results_dict: dict, logs: dict):
        
        for metric_name, metric_value in logs.items():
            
            if not metric_name in results_dict:
                results_dict[metric_name] = []
            
            results_dict[metric_name].append(metric_value)

    def append_epoch_mean (self):

        for metric_name, metric_array in self.per_epoch_batch_results.items():
            
            metric_epoch_mean =  np.mean(metric_array, axis = 0)
            
            if not metric_name in self.epoch_mean_results:
                self.epoch_mean_results[metric_name] = []
            
            self.epoch_mean_results[metric_name].append(metric_epoch_mean)

    
    
    # Batch and epoch operations
    
    def on_train_begin(self, logs=None):
        pass

    def on_epoch_begin(self, epoch, logs=None):
        
        # clean epoch data
        self.per_epoch_batch_results: dict = {}
    
    def on_batch_end(self, batch, logs=None):
        
        self.append_results(self.per_epoch_batch_results, logs)
        self.append_results(self.all_batch_results, logs)
    
    def on_epoch_end(self, epoch, logs=None):
        
        self.append_epoch_mean()

        self.model.stop_training = self.stop_function(self.epoch_mean_results, self.metric_name)
        
        self.stoped_epoch = epoch
    
    def on_train_end(self, logs=None):
        return super().on_train_end(logs)
    


    

In [14]:
def stop_fucntion (epoch_mean_results: dict, 
                   metric_name: str,
                   best_metric_selector = max,
                   min_percentual_variation = 0.02/100,
                   observation_window_lenght = 7) -> bool: 

    if len(epoch_mean_results[metric_name]) < observation_window_lenght + 1:
        return False
    
    best_window_result = best_metric_selector(epoch_mean_results[metric_name][-observation_window_lenght:])

    best_result_outside_window = best_metric_selector(epoch_mean_results[metric_name][:-observation_window_lenght])

    stop_signal = (best_window_result - best_result_outside_window)/best_result_outside_window < min_percentual_variation

    return stop_signal

In [15]:
callback = MultipleTrainingLogger(stop_fucntion, "ssim_metric")
model.fit(
    x = dataset.x_train,
    y = dataset.y_train,
    batch_size = 50,
    epochs = 500,
    validation_data = (dataset.x_test, dataset.y_test),
    callbacks = [callback]
)

Epoch 1/500
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500

<tensorflow.python.keras.callbacks.History at 0x7fa4f05b9130>

In [None]:
import sys
callback.epoch_mean_results["ssim_metric"]

In [132]:
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import control_flow_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn
from tensorflow.python.ops import nn_ops
import tensorflow as tf

def _ssim_helper(x, y, reducer, max_val, compensation=1.0, k1=0.01, k2=0.03):
  r"""Helper function for computing SSIM.

  SSIM estimates covariances with weighted sums.  The default parameters
  use a biased estimate of the covariance:
  Suppose `reducer` is a weighted sum, then the mean estimators are
    \mu_x = \sum_i w_i x_i,
    \mu_y = \sum_i w_i y_i,
  where w_i's are the weighted-sum weights, and covariance estimator is
    cov_{xy} = \sum_i w_i (x_i - \mu_x) (y_i - \mu_y)
  with assumption \sum_i w_i = 1. This covariance estimator is biased, since
    E[cov_{xy}] = (1 - \sum_i w_i ^ 2) Cov(X, Y).
  For SSIM measure with unbiased covariance estimators, pass as `compensation`
  argument (1 - \sum_i w_i ^ 2).

  Arguments:
    x: First set of images.
    y: Second set of images.
    reducer: Function that computes 'local' averages from the set of images. For
      non-convolutional version, this is usually tf.reduce_mean(x, [1, 2]), and
      for convolutional version, this is usually tf.nn.avg_pool2d or
      tf.nn.conv2d with weighted-sum kernel.
    max_val: The dynamic range (i.e., the difference between the maximum
      possible allowed value and the minimum allowed value).
    compensation: Compensation factor. See above.
    k1: Default value 0.01
    k2: Default value 0.03 (SSIM is less sensitivity to K2 for lower values, so
      it would be better if we took the values in the range of 0 < K2 < 0.4).

  Returns:
    A pair containing the luminance measure, and the contrast-structure measure.
  """

  c1 = (k1 * max_val)**2
  c2 = (k2 * max_val)**2

  # SSIM luminance measure is
  # (2 * mu_x * mu_y + c1) / (mu_x ** 2 + mu_y ** 2 + c1).
  mean0 = reducer(x)
  mean1 = reducer(y)
  num0 = mean0 * mean1 * 2.0
  den0 = math_ops.square(mean0) + math_ops.square(mean1)
  luminance = (num0 + c1) / (den0 + c1)

  # SSIM contrast-structure measure is
  #   (2 * cov_{xy} + c2) / (cov_{xx} + cov_{yy} + c2).
  # Note that `reducer` is a weighted sum with weight w_k, \sum_i w_i = 1, then
  #   cov_{xy} = \sum_i w_i (x_i - \mu_x) (y_i - \mu_y)
  #          = \sum_i w_i x_i y_i - (\sum_i w_i x_i) (\sum_j w_j y_j).
  num1 = reducer(x * y) * 2.0
  den1 = reducer(math_ops.square(x) + math_ops.square(y))
  c2 *= compensation
  cs = (num1 - num0 + c2) / (den1 - den0 + c2)

  # SSIM score is the product of the luminance and contrast-structure measures.
  return luminance, cs

def _fspecial_gauss(size, sigma):
  """Function to mimic the 'fspecial' gaussian MATLAB function."""
  size = ops.convert_to_tensor(size, dtypes.int32)
  sigma = ops.convert_to_tensor(sigma)

  coords = math_ops.cast(math_ops.range(size), sigma.dtype)
  coords -= math_ops.cast(size - 1, sigma.dtype) / 2.0

  g = math_ops.square(coords)
  g *= -0.5 / math_ops.square(sigma)

  g = array_ops.reshape(g, shape=[1, -1]) + array_ops.reshape(g, shape=[-1, 1])
  g = array_ops.reshape(g, shape=[1, -1])  # For tf.nn.softmax().
  g = nn_ops.softmax(g)
  return array_ops.reshape(g, shape=[size, size, 1, 1])

def _ssim_map_per_channel(img1,
                      img2,
                      max_val=1.0,
                      filter_size=11,
                      filter_sigma=1.5,
                      k1=0.01,
                      k2=0.03,
                      keep_padding = True):
  """Computes SSIM index between img1 and img2 per color channel.

  This function matches the standard SSIM implementation from:
  Wang, Z., Bovik, A. C., Sheikh, H. R., & Simoncelli, E. P. (2004). Image
  quality assessment: from error visibility to structural similarity. IEEE
  transactions on image processing.

  Details:
    - 11x11 Gaussian filter of width 1.5 is used.
    - k1 = 0.01, k2 = 0.03 as in the original paper.

  Args:
    img1: First image batch.
    img2: Second image batch.
    max_val: The dynamic range of the images (i.e., the difference between the
      maximum the and minimum allowed values).
    filter_size: Default value 11 (size of gaussian filter).
    filter_sigma: Default value 1.5 (width of gaussian filter).
    k1: Default value 0.01
    k2: Default value 0.03 (SSIM is less sensitivity to K2 for lower values, so
      it would be better if we took the values in the range of 0 < K2 < 0.4).

  Returns:
    The ssim map for the imgs with a shape like: [..., img_dim_1 - kernel_dim_1 + 1, img_dim_2 - kernel_dim_2 + 1, n_chanels]
  """
  filter_size = constant_op.constant(filter_size, dtype=dtypes.int32)
  filter_sigma = constant_op.constant(filter_sigma, dtype=img1.dtype)

  shape1, shape2 = array_ops.shape_n([img1, img2])
  checks = [
      control_flow_ops.Assert(
          math_ops.reduce_all(
              math_ops.greater_equal(shape1[-3:-1], filter_size)),
          [shape1, filter_size],
          summarize=8),
      control_flow_ops.Assert(
          math_ops.reduce_all(
              math_ops.greater_equal(shape2[-3:-1], filter_size)),
          [shape2, filter_size],
          summarize=8)
  ]

  # Enforce the check to run before computation.
  with ops.control_dependencies(checks):
    img1 = array_ops.identity(img1)

  # TODO(sjhwang): Try to cache kernels and compensation factor.
  kernel = _fspecial_gauss(filter_size, filter_sigma)
  kernel = array_ops.tile(kernel, multiples=[1, 1, shape1[-1], 1])

  # The correct compensation factor is `1.0 - tf.reduce_sum(tf.square(kernel))`,
  # but to match MATLAB implementation of MS-SSIM, we use 1.0 instead.
  compensation = 1.0

  # TODO(sjhwang): Try FFT.
  # TODO(sjhwang): Gaussian kernel is separable in space. Consider applying
  #   1-by-n and n-by-1 Gaussian filters instead of an n-by-n filter.
  def reducer(x):
    shape = array_ops.shape(x)
    x = array_ops.reshape(x, shape=array_ops.concat([[-1], shape[-3:]], 0))
    y = nn.depthwise_conv2d(x, kernel, strides=[1, 1, 1, 1], padding = 'SAME' if keep_padding else 'VALID')
    return array_ops.reshape(
        y, array_ops.concat([shape[:-3], array_ops.shape(y)[1:]], 0))

  luminance, cs = _ssim_helper(img1, img2, reducer, max_val, compensation, k1,
                               k2)

  ssim_map = luminance*cs

  return ssim_map

  # Average over the second and the third from the last: height, width.
  axes = constant_op.constant([-3, -2], dtype=dtypes.int32)
  ssim_val = math_ops.reduce_mean(luminance * cs, axes)
  cs = math_ops.reduce_mean(cs, axes)
  return ssim_val, cs


def get_regions_indexes (magnitude_gradient: tf.Tensor,
                         threshold_for_edges: float = 0.12,
                         threshold_for_texture: float = 0.06):
  """
  """
  max_gradient_magnitude_per_image = tf.reduce_max(magnitude_gradient, axis = (-1, -2 , -3), keepdims=True)
  normalized_magnitude_gradient = magnitude_gradient/max_gradient_magnitude_per_image

  edge_indexes = tf.where( normalized_magnitude_gradient >= threshold_for_edges)
  texture_indexes = tf.where( tf.math.logical_and( normalized_magnitude_gradient >= threshold_for_texture, normalized_magnitude_gradient < threshold_for_edges) )
  smooth_indexes = tf.where( normalized_magnitude_gradient < threshold_for_texture)

  return edge_indexes, texture_indexes, smooth_indexes
    
def three_ssim (original_images,
                degraded_images,
                max_val=1.0,
                weight_for_edges = 2,
                weight_for_texture = 1,
                weight_for_smooth = 1,
                filter_size=11,
                filter_sigma=1.5,
                k1=0.01,
                k2=0.03,
                keep_padding = True):
  """
    Computes the SSIM (3-SSIM) modified version described in the article: Content-weighted video quality 
    assessment using a three-component image model
  """
  ssim_map = _ssim_map_per_channel(original_images, degraded_images, max_val, filter_size, filter_sigma, k1, k2, keep_padding)

  img_grad = tf.image.sobel_edges(original_images)
  imgs_magnitude_grad = tf.sqrt( tf.square(img_grad[:,:,:,:, 0]) + tf.square(img_grad[:,:,:,:, 1]))

  if not keep_padding:
    size_decrease = (filter_size - 1)//2
    imgs_magnitude_grad = imgs_magnitude_grad[:, size_decrease:-size_decrease, size_decrease:-size_decrease]
    print(math_ops.reduce_mean(ssim_map, axis = (-1, -2, -3)))
  
  edge_indexes, texture_indexes, smooth_indexes = get_regions_indexes (imgs_magnitude_grad, threshold_for_edges = 0.12, threshold_for_texture = 0.06)

  zero_map = tf.zeros(shape=imgs_magnitude_grad.shape , dtype = tf.float64)

  # Calculate the ssim_map with zeros in diferent regions.

  edge_mask = tf.tensor_scatter_nd_update(zero_map, edge_indexes, tf.constant(1., shape = edge_indexes.shape[0], dtype=tf.float64))
  ssim_map_for_edges = tf.math.multiply(ssim_map, edge_mask)

  texture_mask = tf.tensor_scatter_nd_update(zero_map, texture_indexes, tf.constant(1., shape = texture_indexes.shape[0], dtype=tf.float64))
  ssim_map_for_texture = tf.math.multiply(ssim_map, texture_mask)

  smooth_mask = tf.tensor_scatter_nd_update(zero_map, smooth_indexes, tf.constant(1., shape = smooth_indexes.shape[0], dtype=tf.float64))
  ssim_map_for_smooth = tf.math.multiply(ssim_map, smooth_mask)

  ones = edge_mask + texture_mask + smooth_mask
  print(tf.reduce_max(ones), tf.reduce_min(ones))

  # Calculate the number of nom zero pixels per mask

  n_edge_pixels_per_image = tf.reduce_sum(edge_mask, axis = (-1, -2, -3))
  n_texture_pixels_per_image = tf.reduce_sum(texture_mask, axis = (-1, -2, -3))
  n_smooth_pixels_per_image = tf.reduce_sum(smooth_mask, axis = (-1, -2, -3))

  # 3 component calculations

  ssim_on_edges = tf.reduce_sum(ssim_map_for_edges, axis = (-1 ,-2, -3))/n_edge_pixels_per_image
  ssim_on_textures = tf.reduce_sum(ssim_map_for_texture, axis = (-1 ,-2, -3))/n_texture_pixels_per_image
  ssim_on_smooth = tf.reduce_sum(ssim_map_for_smooth, axis = (-1 ,-2, -3))/n_smooth_pixels_per_image
  
  ssim3 = ( weight_for_edges*n_edge_pixels_per_image*ssim_on_edges + weight_for_texture*n_texture_pixels_per_image*ssim_on_textures + 
           weight_for_smooth*n_smooth_pixels_per_image*ssim_on_smooth ) / (weight_for_edges*n_edge_pixels_per_image 
          + weight_for_texture*n_texture_pixels_per_image + weight_for_smooth*n_smooth_pixels_per_image)
  
  return ssim3

In [134]:
import tensorflow as tf
img1_np = np.random.normal(127, 25, size = (2,64,64,1))
img2_np = np.random.normal(127, 35/3, size = (2,64,64,1))
img1 = tf.constant(img1_np, dtype="float64")
img2 = tf.constant(img2_np, dtype="float64")

print(three_ssim(img1, img2, 255, weight_for_edges=1., weight_for_smooth= 1., weight_for_texture = 1., keep_padding=False))
print(tf.image.ssim(img1, img2, 255))

tf.Tensor(1.0, shape=(), dtype=float64) tf.Tensor(1.0, shape=(), dtype=float64)
tf.Tensor([0.23340836 0.22501541], shape=(2,), dtype=float64)
tf.Tensor([0.08359226 0.07156157], shape=(2,), dtype=float32)


In [24]:
a = tf.where(ssim_map > 0.7)

tf.tensor_scatter_nd_update(ssim_map, a, tf.constant(2, shape = a.shape[0], dtype=tf.float64))

<tf.Tensor: shape=(1, 64, 64, 1), dtype=float64, numpy=
array([[[[0.67532778],
         [0.65899728],
         [0.6521072 ],
         ...,
         [2.        ],
         [2.        ],
         [2.        ]],

        [[0.63145714],
         [0.53927537],
         [0.45392767],
         ...,
         [0.53240444],
         [0.60048486],
         [0.69276183]],

        [[0.62823282],
         [0.45356033],
         [0.26450085],
         ...,
         [0.34222462],
         [0.50108014],
         [0.65615168]],

        ...,

        [[0.69053757],
         [0.55463405],
         [0.41501626],
         ...,
         [0.36821719],
         [0.56090262],
         [2.        ]],

        [[2.        ],
         [0.63833916],
         [0.57517684],
         ...,
         [0.60191172],
         [0.68441739],
         [2.        ]],

        [[2.        ],
         [2.        ],
         [2.        ],
         ...,
         [2.        ],
         [2.        ],
         [2.        ]]]])>

In [52]:
a = tf.constant([1,2,3,4,5])
tf.where( tf.math.logical_and( a > 2, a < 5) )

<tf.Tensor: shape=(2, 1), dtype=int64, numpy=
array([[2],
       [3]])>