<a href="https://colab.research.google.com/github/starkdg/pyConvnetPhash/blob/master/contracture_curve.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/gdrive')

import sys
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import tensorflow as tf
import tensorflow_hub as hub

model_dir = "/gdrive/My Drive/models"
module_inception_url = "https://tfhub.dev/google/imagenet/inception_v3/feature_vector/1"
module_mobilenetv2_url = "https://tfhub.dev/google/imagenet/mobilenet_v2_140_224/feature_vector/2"

module = hub.Module(module_mobilenetv2_url)
target_height, target_width = hub.get_expected_image_size(module)

normalization_constant = 5.0

n_inputs = 1792
n_hidden = 256
model_tag = "sigmoid1"
# model_file = "/gdrive/My Drive/models/pca_autoencoder/mobilenetv2_pca_autoenc_{0}to{1}_frozen_model-{2}.pb".format(n_inputs, n_hidden, model_tag)
model_file = "/gdrive/My Drive/models/cae_autoencoder/mobilenetv2_cae_autoenc_{0}to{1}_frozen_model-{2}.pb".format(n_inputs, n_hidden, model_tag)
# model_file = "/gdrive/My Drive/models/svd/mobilenetv2_pca_from_svd_1792to256_frozen_model.pb"

input_tensor_name = "autoenc/input:0"
output_tensor_name = "autoenc/output:0"

files_dir = "/gdrive/My Drive/imageset/test"
batch_size = 40
n_intervals = 100
min_distance = 0.
max_distance = 10.

plot_title_str = "Contractive Curve for CAE {0} to {1} Feature Space".format(n_inputs, n_hidden)

In [0]:

with tf.gfile.GFile(model_file, 'rb') as f:
  aec_graph_def = tf.GraphDef()  
  aec_graph_def.ParseFromString(f.read())

  
  
graph = tf.get_default_graph()

tf.import_graph_def(aec_graph_def, name='autoenc')

input_tensor = graph.get_tensor_by_name(input_tensor_name)
output_tensor = graph.get_tensor_by_name(output_tensor_name)


In [0]:
def get_tfrecord_files(path):
  files = []
  for entry in os.scandir(path):
    if entry.is_file() and entry.name.endswith('.tfrecord'):
             files.append(entry.path)
  return files
  
  
def _parse_example(example):
  features = {'height': tf.FixedLenFeature([], tf.int64),
              'width': tf.FixedLenFeature([], tf.int64),
              'image_raw': tf.FixedLenFeature([], tf.string)}
  parsed_features = tf.parse_single_example(example, features)
  img = tf.io.decode_raw(parsed_features['image_raw'], tf.uint8)
  height = tf.cast(parsed_features['height'], tf.int32)
  width = tf.cast(parsed_features['width'], tf.int32)

  img_reshaped = tf.manip.reshape(img, [height, width, 3])
  imgfl = tf.image.convert_image_dtype(img_reshaped, dtype=tf.float32)
  img_norm = tf.expand_dims(imgfl, 0)
  img_resized = tf.image.resize_bicubic(img_norm, [target_height, target_width])
  img_resized = tf.squeeze(img_resized, 0)
  return img_resized


def input_function(path, batch_size=1, num_epochs=None, shuffle=False):
  tfrecords = get_tfrecord_files(path)
  dataset = tf.data.TFRecordDataset(tfrecords)
  dataset = dataset.map(_parse_example)
  if (shuffle):
    dataset = dataset.shuffle(10000)
  dataset = dataset.batch(batch_size).repeat(num_epochs)
  iterator = dataset.make_initializable_iterator()
  return iterator


In [0]:
def generate_points_from_features(features, radius):
  n_points, n_features = features.shape
  features2 = np.random.normal(0., normalization_constant, size=(n_points, n_features))
  features2 = radius * features2 / np.linalg.norm(features2, axis=1, keepdims=True)
  features2 = features2 + features
  features2 = np.clip(features2, 0., normalization_constant)
  return features2
  
def avg_difference_between_points(features1, features2):
  avg_distance = np.mean(np.linalg.norm(features1 - features2, axis=1))
  return avg_distance
  
def normalize(x):
  normalize_x = x / normalization_constant
  return normalize_x

In [0]:
def calc_contracture_curve(files_dir, batch, n_intervals=100, min_distance=0, max_distance=100):
  images_iter = input_function(files_dir, batch)
  test_images = images_iter.get_next()
  test_features = module(test_images)
    
  delta_distance = (max_distance - min_distance)/n_intervals
  distances = [x*delta_distance + delta_distance + min_distance for x in range(0, n_intervals)]

  init = tf.global_variables_initializer()
  sess = tf.Session()
  sess.run(init)
  
  orig_distances = []
  cond_distances = []
  ratios = []
  print("Contraction of Autoencoder")
  for d in distances:
    sess.run([images_iter.initializer])
    
    # get next features
    features = sess.run(test_features)
    features2 = generate_points_from_features(features, d)
    
    condensed_features = sess.run(output_tensor, feed_dict={input_tensor : features})
    condensed_features2 = sess.run(output_tensor, feed_dict={input_tensor : features2})
        
    avg_distance1 = avg_difference_between_points(features, features2)
    avg_distance2 = avg_difference_between_points(condensed_features, condensed_features2)
    
    orig_distances.append(avg_distance1)
    cond_distances.append(avg_distance2)
    
    ratio = avg_distance2 / avg_distance1
    ratios.append(ratio)
    print("d = {0:.2f} orig = {1:.4f} cond = {2:.8f} cond/orig = {3:.6f}".format(d, avg_distance1, avg_distance2, ratio))
    
  return distances, orig_distances, cond_distances, ratios
   
    

In [0]:

d, orig, cond, ratios = calc_contracture_curve(files_dir, batch_size, n_intervals, min_distance, max_distance)

plt.figure(1)
plt.plot(orig, cond, orig, ratios)
plt.legend(['condensed', 'ratio'], loc='upper left')
plt.title(plot_title_str)
plt.xlabel("Original Feature Distance")
plt.ylabel("Condensed Feature Distance")
plt.show()

