## Imports

In [3]:
import torch
import numpy as np
import pickle
import matplotlib.pyplot as plt
from keras.datasets import mnist     # MNIST dataset is included in Keras
from keras.datasets import cifar10  #
from keras.models import Sequential  # Model type to be used
from keras.layers import Dense, Conv2D, Flatten
import keras.utils as np_utils
import tensorflow as tf

## Define classes and functions

In [4]:
class LayerCharacteristics():
  def __init__(self):
    self.space_linearity_sum = []
    self.total_sum = []
    self.svd_weights = {}
    self.distances = {}
    self.neighbour_dict = {}
    self.U_dict = {}

In [6]:
def ManifoldAngles(layerfeatlist,neighboursize1=10,classsize=10,dim_reduc_size=3,fileloc = "layers/"):
  starttime = tf.timestamp()
  tf.print("start: ")

  no_of_layers = len(layerfeatlist)
  reduction_quality = []
  class_chars = []

  for c1 in range(classsize):
    class_chars.append([])
    for layer_i in range(no_of_layers):
      class_chars[c1].append(LayerCharacteristics())
      layer_features = layerfeatlist[layer_i]

      layer_start = tf.timestamp()

      for i,x_i in enumerate(layer_features):

        class_chars[c1][layer_i].neighbour_dict[i] = tf.argsort(tf.norm( tf.math.subtract(layer_features,x_i) ,axis=1))[0:neighboursize1+1]

        W_i = tf.gather(layer_features,class_chars[c1][0].neighbour_dict[i])

        W_i = ( W_i - tf.math.reduce_mean(W_i,axis=0) )
        s, u, v = tf.linalg.svd( W_i )
        W_i_reduced = v[:,:dim_reduc_size]

        class_chars[c1][layer_i].svd_weights[i] = s[:dim_reduc_size]
        reduction_quality.append(  tf.reduce_sum( (s[:dim_reduc_size])/ tf.reduce_sum(s) ) )
        class_chars[c1][layer_i].U_dict[i] = W_i_reduced

      tf.print("--layer time: ", tf.timestamp() - layer_start)
      class_chars[c1][layer_i].space_linearity_sum = 0.0
      angle_start = tf.timestamp()

      manifold_neighbour_angle_sum=[]
      for i in range(len(class_chars[c1][layer_i].U_dict)):
        manifold_neighbour_angle_sum_temp=[]
        manifold_neighbour_angle_sum.append([])

        for j in class_chars[c1][0].neighbour_dict[i]:
          if i != j:
            teta =  tf.matmul(  tf.transpose(class_chars[c1][layer_i].U_dict[i]),  class_chars[c1][layer_i].U_dict[int(j)]   )
            weights =  tf.matmul(  tf.transpose( tf.expand_dims(class_chars[c1][layer_i].svd_weights[i],0)), tf.expand_dims(class_chars[c1][layer_i].svd_weights[int(j)],0)  )
            Q = teta*weights

            s, u, v = tf.linalg.svd( Q )

            tetaw = tf.reduce_sum(s)/tf.linalg.trace(weights)
            angles = tf.math.acos( tf.clip_by_value(tetaw,-1,1) )
            manifold_neighbour_angle_sum_temp.append( tf.math.sin(angles)  )

        manifold_neighbour_angle_sum[i].append(tf.reduce_mean(tf.convert_to_tensor(manifold_neighbour_angle_sum_temp)))
      class_chars[c1][layer_i].space_linearity_sum = tf.reduce_mean( tf.convert_to_tensor(manifold_neighbour_angle_sum ))
      tf.print("--angle time: ", tf.timestamp() - angle_start)

  if no_of_layers==1: tf.print("Average reduction quality: ",  tf.reduce_mean(reduction_quality))
  tf.print("endtime: ", tf.timestamp() - starttime)
  return class_chars,manifold_neighbour_angle_sum

In [8]:
def shuffle_data(X: np.ndarray, Y: np.ndarray, seed: int):
  np.random.seed(seed)
  np.random.shuffle(X)
  np.random.seed(seed)
  np.random.shuffle(Y)

In [9]:
def get_reduced_datasets(data_size: int, X: np.ndarray, Y: np.ndarray, order_method: str, curvature_set='input', dataset='mnist'):

  _input_curvatures = cifar_input_curvature
  _output_curvatures = cifar_output_class_curvatures
  _train_y = cifar_train_y[:, 0]

  _curvature_set = _input_curvatures if curvature_set == 'input' else _output_curvatures

  for y_class in range(10):
    class_indices = _train_y == y_class
    keep_indices = None
    if order_method == 'random':
      keep_indices = np.random.choice(X[class_indices].shape[0], data_size, replace=True)
    elif order_method == 'low_to_high':
      keep_indices = np.argsort(_curvature_set[y_class][:, 0])[:data_size]
    elif order_method == 'high_to_low':
      keep_indices = np.argsort(_curvature_set[y_class][:, 0])[-data_size:]
    elif order_method == 'mid':
      num_low_curv = data_size // 2
      num_high_curv = data_size - num_low_curv

      curv_midpoint = _curvature_set[y_class][:, 0].shape[0] // 2

      keep_indices = np.argsort(_curvature_set[y_class][:, 0])[curv_midpoint - num_low_curv : curv_midpoint + num_high_curv]
    elif order_method == 'ratio_low_to_high':
      ratios = _output_curvatures[y_class][:, 0] / _input_curvatures[y_class][:, 0]
      keep_indices = np.argsort(ratios)[:data_size]
    elif order_method == 'ratio_high_to_low':
      ratios = _output_curvatures[y_class][:, 0] / _input_curvatures[y_class][:, 0]
      keep_indices = np.argsort(ratios)[-data_size:]
    else:
      raise ValueError(f"order method not implemented: {order_method}")

    new_x_row = X[class_indices][keep_indices]
    new_y_row = Y[class_indices][keep_indices]

    Reduced_X = np.vstack([Reduced_X, new_x_row]) if y_class > 0 else new_x_row
    Reduced_Y = np.vstack([Reduced_Y, new_y_row]) if y_class > 0 else new_y_row

  return Reduced_X, Reduced_Y

In [10]:
def get_accuracies(X_train, Y_train, X_test, Y_test, order_method='low_to_high', curvature_set='input', num_models=5, shuffle_seed=None, dataset='mnist'):
  valacclist = []
  acclist = []
  for data_size in datasizes:
    Reduced_X_train, Reduced_Y_train = get_reduced_datasets(data_size, X_train, Y_train, order_method, curvature_set, dataset=dataset)
    print(f'ReducedX.shape: {Reduced_X_train.shape}')

    if shuffle_seed is not None:
      shuffle_data(Reduced_X_train, Reduced_Y_train, shuffle_seed)

    valacclist.append([])
    acclist.append([])
    for _ in range(num_models):
      nt = create_cifar_model()
      nt.compile(loss= tf.keras.losses.CategoricalCrossentropy(), optimizer='adam', metrics=['categorical_accuracy'])
      history = nt.fit(Reduced_X_train, Reduced_Y_train, epochs=20, validation_data=(X_test, Y_test), verbose=0, batch_size = 128)
      del nt
      print(f"  train acc = {history.history['categorical_accuracy'][-1]} val acc = {history.history['val_categorical_accuracy'][-1]}")
      valacclist[-1].append(history.history['val_categorical_accuracy'][-1])
      acclist[-1].append(history.history['categorical_accuracy'][-1])
      del history

  return valacclist, acclist

In [11]:
def plot_accuracies(accuracies, names=None):
  plt.figure(figsize=(14, 10))
  for acclist in accuracies:
    plt.plot(datasizes, np.mean(acclist, axis=1))

  if names is not None:
    plt.legend(names, fontsize=11)

  plt.xscale('log')
  plt.gca().invert_xaxis()
  plt.grid()
  plt.show()

# cifar

In [12]:
(cifar_X_train, cifar_train_y), (cifar_X_test, cifar_test_y) = cifar10.load_data()
cifar_X_train = cifar_X_train / 255
cifar_X_test = cifar_X_test/ 255

In [13]:
cifar_Y_train = np_utils.to_categorical(cifar_train_y, 10)
cifar_Y_test = np_utils.to_categorical(cifar_test_y, 10)

del cifar_test_y

In [14]:
def create_cifar_model():
  model = tf.keras.models.Sequential([
    tf.keras.layers.Conv2D(32, 3, padding='same', input_shape=cifar_X_train.shape[1:], activation='relu'),
    tf.keras.layers.Conv2D(32, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Dropout(0.25),

    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.Conv2D(64, 3, activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Dropout(0.25),

    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dropout(0.5),
    tf.keras.layers.Dense(10, activation='softmax'),
  ])

  return model

In [15]:
filenames = [
  'Wang2023Better_WRN-70-16',
  'Wang2023Better_WRN-28-10',
  'Rebuffi2021Fixing_70_16_cutmix_extra',
  'Gowal2020Uncovering_extra',
]

curvatures = []
for filename in filenames:
  with open(f'./output_curv/cifar_output_curv_{filename}', 'rb') as file:
    curvatures.append(pickle.load(file))

curvatures = np.array(curvatures)
curvatures.shape

del file, filename, filenames

In [16]:
# define curvatures list
cifar_output_class_curvatures = np.average(curvatures, axis=0)
del curvatures

In [17]:
with open('cifar_input_curv', 'rb') as file:
  cifar_input_curvature = np.array(pickle.load(file))

del file

In [18]:
datasizes = np.logspace(3.699, 1.899, 20).astype('int')

In [None]:
# these take a while to train, so I'm breaking them into their own cells to avoid retraining all of them if changes need to be mande to just one
# only gets through second datasize so far
rand, _ = get_accuracies(cifar_X_train, cifar_Y_train, cifar_X_test, cifar_Y_test, order_method='random')

ReducedX.shape: (50000, 32, 32, 3)


In [None]:
ratio_low, _ = get_accuracies(cifar_X_train, cifar_Y_train, cifar_X_test, cifar_Y_test, order_method='ratio_low_to_high')

In [None]:
ratio_high, _ = get_accuracies(cifar_X_train, cifar_Y_train, cifar_X_test, cifar_Y_test, order_method='ratio_high_to_low')

In [None]:
plot_accuracies([rand, ratio_low, ratio_high], ['rand', 'ratio low', 'ratio high'])