In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.models import Model
from tensorflow.keras import layers

from sklearn.preprocessing import OneHotEncoder
from sklearn.cluster import KMeans
from tensorflow.keras.layers import Input, Dense, Conv2D, MaxPooling2D, UpSampling2D, Flatten, Reshape, Conv2DTranspose, Activation, BatchNormalization

from tensorflow.keras import backend as K
from tensorflow.keras.layers import Layer, InputSpec

import random

In [2]:
def model_encoder(input_shape, n_classes=None):
    
    
    
    
    initializer = tf.keras.initializers.RandomNormal(mean=0., stddev=0.5)
    
    model = tf.keras.Sequential()
    model.add( layers.Input(shape = input_shape))
    
   
    model.add( layers.Conv2D(
    32,
    [7,7],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_0'
) )                                       
    model.add (layers.Activation('relu', name ='AC_0'))
    model.add (layers.BatchNormalization(name = 'BN_0'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2),strides=(2, 2), padding='valid', name ='max_pool_0'))  
       
    
    
    model.add( layers.Conv2D(          
    16,
    [5,5],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_1'
) )                                   
    model.add (layers.Activation('relu',name ='AC_1'))
    model.add (layers.BatchNormalization(name ='BN_1'))
    model.add(layers.MaxPooling2D(pool_size=(2, 2),strides=(2, 2), padding='valid',name ='max_pool_1'))   
    
       
    
    model.add( layers.Conv2D(          
    8,
    [3,3],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_2'
) )                                    
    model.add (layers.Activation('relu',name ='AC_2'))
    model.add (layers.BatchNormalization(name ='BN_2'))
    
        
    
    model.add(layers.Flatten())
    model.add (layers.Activation('relu',name ='AC_3'))
   # model.add (layers.BatchNormalization(name ='BN_3'))
    
    
    
    if not (n_classes==None):
        
        model.add(layers.Dense(128,  kernel_initializer="glorot_uniform"))
        model.add (layers.Activation('relu',name ='AC_3_1'))
        #model.add (layers.BatchNormalization(name ='BN_3_1'))
        
        #model.add(layers.Dense(32,  kernel_initializer="glorot_uniform"))
        #model.add (layers.Activation('relu',name ='AC_3_2'))
        #model.add (layers.BatchNormalization(name ='BN_3_2'))
        
        
        
        model.add(layers.Dense(n_classes,  kernel_initializer="glorot_uniform", name='encodedLayer'))
        #model.add (layers.Activation('relu',name ='encoderLayer_AC'))
        #model.add (layers.BatchNormalization(name ='encoderLayer_BN'))
                
    
                
        #model.add(layers.Dense(32,  kernel_initializer="glorot_uniform"))
        #model.add (layers.Activation('relu',name ='AC_3_3'))
        #model.add (layers.BatchNormalization(name ='BN_3_3'))
        
        model.add(layers.Dense(128,  kernel_initializer="glorot_uniform"))
        model.add (layers.Activation('relu',name ='AC_3_4'))
        #model.add (layers.BatchNormalization(name ='BN_3_4'))

        
    else:
        model.add(layers.Dense(128,  kernel_initializer="glorot_uniform", name='encodedLayer'))
        #model.add (layers.Activation('relu',name ='encoderLayer_AC'))
        #model.add (layers.BatchNormalization(name ='encoderLayer_BN'))
    
    
    
    model.add(layers.Dense((input_shape[0]//4)*(input_shape[1]//4)*8,  kernel_initializer="glorot_uniform"))
    model.add (layers.Activation('relu',name ='AC_4'))
    #model.add (layers.BatchNormalization(name ='BN_4'))
    
    model.add(tf.keras.layers.Reshape(((input_shape[0]//4), (input_shape[1]//4), 8)))
    
    
    
    model.add( layers.Conv2D(
    8,
    [3,3],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_3'
) )                                       
    model.add (layers.Activation('relu',name ='AC_5'))
    model.add (layers.BatchNormalization(name ='BN_5'))
    
                                         
    model.add(layers.UpSampling2D(size=(2, 2), interpolation="bilinear",name ='US_0'))   
    model.add( layers.Conv2D(
    16,
    [5,5],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_4'
) )                                     
    model.add (layers.Activation('relu',name ='AC_6'))
    model.add (layers.BatchNormalization(name ='BN_6'))
                                         
    model.add(layers.UpSampling2D(size=(2, 2), interpolation="bilinear",name ='UP_1'))   
    model.add( layers.Conv2D(
    32,
    [7,7],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'conv_5'
) )                                       
    model.add (layers.Activation('relu',name ='AC_7'))
   

    model.add( layers.Conv2D(
    input_shape[2],
    [3,3],
    strides=(1, 1),
    padding="same",
    use_bias=True,
    kernel_initializer="glorot_uniform",
    bias_initializer="zeros",
    name = 'last_conv'
) )                                       # sai 8
 
    model.add (layers.Activation('relu',name ='finalLayer_AC'))
    
    
    
    
    
    
    
    return model

In [None]:
def my_loss_fn(y_true, y_pred):
        
    mse = tf.keras.losses.MeanSquaredError()
    cs = tf.keras.losses.CosineSimilarity() 
    
    
    return ( mse(y_true, y_pred) + (cs(y_true, y_pred)+1)  )

In [None]:
def deep_clustering_model(arq_model, input_shape):
    
    model = tf.keras.models.load_model(arq_model, custom_objects={'my_loss_fn': my_loss_fn})
    
    model_encoder = tf.keras.Sequential()
    
  
    k=0
    while  not model.layers[k].name=='encodedLayer': 
        model_encoder.add(model.layers[k]) 
        k+=1
    model_encoder.add(model.layers[k])
    
    
    return model, model_encoder

In [None]:
class ClusteringLayer(Layer):
    """
    Clustering layer converts input sample (feature) to soft label, i.e. a vector that represents the probability of the
    sample belonging to each cluster. The probability is calculated with student's t-distribution.

    # Example
    ```
        model.add(ClusteringLayer(n_clusters=10))
    ```
    # Arguments
        n_clusters: number of clusters.
        weights: list of Numpy array with shape `(n_clusters, n_features)` witch represents the initial cluster centers.
        alpha: degrees of freedom parameter in Student's t-distribution. Default to 1.0.
    # Input shape
        2D tensor with shape: `(n_samples, n_features)`.
    # Output shape
        2D tensor with shape: `(n_samples, n_clusters)`.
    """

    def __init__(self, n_clusters, weights=None, alpha=1.0, **kwargs):
        if 'input_shape' not in kwargs and 'input_dim' in kwargs:
            kwargs['input_shape'] = (kwargs.pop('input_dim'),)
        super(ClusteringLayer, self).__init__(**kwargs)
        self.n_clusters = n_clusters
        self.alpha = alpha
        self.initial_weights = weights
        self.input_spec = InputSpec(ndim=2)

    def build(self, input_shape):
        assert len(input_shape) == 2
        input_dim = input_shape[1]
        self.input_spec = InputSpec(dtype=K.floatx(), shape=(None, input_dim))
        self.clusters = self.add_weight(shape=(self.n_clusters, input_dim), initializer='glorot_uniform', name='clusters')
        if self.initial_weights is not None:
            self.set_weights(self.initial_weights)
            del self.initial_weights
        self.built = True

    def call(self, inputs, **kwargs):
        """ student t-distribution, as same as used in t-SNE algorithm.
         Measure the similarity between embedded point z_i and centroid µ_j.
                 q_ij = 1/(1+dist(x_i, µ_j)^2), then normalize it.
                 q_ij can be interpreted as the probability of assigning sample i to cluster j.
                 (i.e., a soft assignment)
        Arguments:
            inputs: the variable containing data, shape=(n_samples, n_features)
        Return:
            q: student's t-distribution, or soft labels for each sample. shape=(n_samples, n_clusters)
        """
        q = 1.0 / (1.0 + (K.sum(K.square(K.expand_dims(inputs, axis=1) - self.clusters), axis=2) / self.alpha))
        q **= (self.alpha + 1.0) / 2.0
        q = K.transpose(K.transpose(q) / K.sum(q, axis=1)) # Make sure each sample's 10 values add up to 1.
        return q

    def compute_output_shape(self, input_shape):
        assert input_shape and len(input_shape) == 2
        return input_shape[0], self.n_clusters

    def get_config(self):
        config = {'n_clusters': self.n_clusters}
        base_config = super(ClusteringLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

In [None]:
# computing an auxiliary target distribution
def target_distribution(q):
    weight = q ** 2 / q.sum(0)
    return (weight.T / weight.sum(1)).T