In [None]:
import numpy as np
import tensorflow as tf
import random as rn
np.random.seed(1)
rn.seed(2)
tf.random.set_seed(3)
import os

from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix, balanced_accuracy_score, accuracy_score
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.losses import categorical_crossentropy
from tensorflow.keras.layers import Input, Dense, Dropout, Activation, Add, Multiply, Lambda
from tensorflow.keras.activations import softmax
from tensorflow.keras.optimizers import SGD, RMSprop, Adagrad, Adadelta, Adam, Adamax, Nadam
from copy import deepcopy
import time

N_hess=1000

t_epoch=100
t_batch=500

d=784*2
num_of_classes=100

n_hl=2 # Number of hidden layers in teacher model
hlu_teacher=500

# Superfeatures
M=2




## Verbose
train_verbose = 1


###############################################################################
# FUNCTION DEFINITION
###############################################################################
def set_seed_TF2(seed):
    tf.random.set_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    rn.seed(seed)
    
def model_nn_teacher(d, num_of_classes):
    set_seed_TF2(100)
    nn = Sequential()
    nn.add(Dense(hlu_teacher, activation='relu', input_shape=(d,)))
    for _ in range(n_hl-1):
        nn.add(Dense(hlu_teacher, activation='relu'))
    nn.add(Dense(num_of_classes))
    nn.add(Activation('softmax'))
    nn.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return nn

def model_exp_teacher(d, num_of_classes, sf_idx):
    set_seed_TF2(100)
    inp=Input(shape=(d,))
    logits=[]
    for i in range(M):
        x=Dense(hlu_new_teacher, activation='relu')(tf.gather(inp, sf_idx[i], axis=1))
        for _ in range(n_hl-1):
            x=Dense(hlu_new_teacher, activation='relu')(x)
        y=Dense(num_of_classes, activation='softmax', name='sf_'+str(i+1))(x)
        logits.append(tf.math.log(y+1E-15))
    tot_logit=Add()(logits)-(M-1)*base_logit
    op=Activation('softmax')(tot_logit)
    nn = Model(inputs=inp, outputs=op)
    nn.compile('adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return nn

def solve(a, b, c):
    if(b*b-4*a*c)<0:
        raise ValueError('Problem is not feasible.')
    sol=np.max(np.roots([a,b,c]))
    return int(sol)

def confidence_interval(a,l):
    import numpy as np, scipy.stats as st
    return st.t.interval(l, len(a)-1, loc=np.mean(a), scale=st.sem(a))

def bootstrap_score(y_test, y_pred, metric=accuracy_score, l=0.95, seed=100):
    rng = np.random.RandomState(seed=seed)
    idx = np.arange(y_test.shape[0])
    test_accuracies = []
    for i in range(200):
        pred_idx = rng.choice(idx, size=idx.shape[0], replace=True)
        acc_test_boot = metric(y_test[pred_idx], y_pred[pred_idx])
        test_accuracies.append(acc_test_boot)
    bootstrap_score_mean = np.mean(test_accuracies)
    [ci_lower, ci_upper] = confidence_interval(test_accuracies,l)
    return bootstrap_score_mean, 0.5*(ci_upper-ci_lower)

###############################################################################
# EXPERIMENT
###############################################################################

# Dataset Preprocessing
from tensorflow.keras.datasets import mnist
(x_train_1, y_train_1), (x_test_1, y_test_1)=mnist.load_data()

x_train_1=x_train_1.astype(np.float32).reshape(-1,784)
x_test_1=x_test_1.astype(np.float32).reshape(-1,784)

from tensorflow.keras.datasets import fashion_mnist
(x_train_2, y_train_2), (x_test_2, y_test_2)=fashion_mnist.load_data()

x_train_2=x_train_2.astype(np.float32).reshape(-1,784)
x_test_2=x_test_2.astype(np.float32).reshape(-1,784)


x_train=np.concatenate([x_train_1,x_train_2], axis=1)
x_test=np.concatenate([x_test_1,x_test_2], axis=1)
alpha_le=LabelEncoder()
alpha_le.fit(['A','B','C','D','E','F','G','H','I','J'])
alpha_train_2=alpha_le.inverse_transform(y_train_2)
alpha_test_2=alpha_le.inverse_transform(y_test_2)
y_train=[a+b for a,b in zip(y_train_1.astype(str),alpha_train_2)]
y_test=[a+b for a,b in zip(y_test_1.astype(str),alpha_test_2)]

le=LabelEncoder()
y_train=le.fit_transform(y_train)
y_test=le.transform(y_test)

# Scaling
#scl=StandardScaler()
#x_train=scl.fit_transform(x_train)
#x_test=scl.transform(x_test)
x_train=x_train/255.0
x_test=x_test/255.0

y_train=y_train.reshape(-1,1)
y_test=y_test.reshape(-1,1)


# One-hot Encoding
enc = OneHotEncoder(sparse=False)
y_train=enc.fit_transform(y_train)

idx=np.random.permutation(x_train.shape[0])[0:N_hess]
x_hess=x_train[idx]
###############################################################################
# TEACHER
###############################################################################
teacher=model_nn_teacher(d,num_of_classes)
teacher.fit(x_train, y_train, epochs=t_epoch, batch_size=t_batch, verbose=train_verbose)
    
from sklearn.metrics import confusion_matrix, accuracy_score
print('Priviledged Test Classification')
print(confusion_matrix(y_test, np.argmax(teacher.predict(x_test),1)))
print(bootstrap_score(y_test, np.argmax(teacher.predict(x_test),1)))

###############################################################################
# GENERATE GRAPH
###############################################################################

# Get Hessian
@tf.function
def get_hessian(model, x):
    with tf.GradientTape() as t2:
        t2.watch(x)
        with tf.GradientTape() as t1:
            t1.watch(x)
            y = model(x, training=False)
            obj=tf.math.log(y+1E-15)
        g=t1.gradient(obj,x)
    return t2.jacobian(g,x)

# Compute Superfeatures
H=np.zeros([d,d])
print('Computing Hessian...')
t=time.time()
for i in range(N_hess):
    print('Sample '+str(i+1)+'/'+str(N_hess))
    sample=x_hess[i:(i+1)]
    hessian = get_hessian(teacher, sample)
    H+=tf.reduce_mean(hessian, axis=[0,2]).numpy()
elapsed_time=time.time()-t
print(elapsed_time)

H/=N_hess

# Set up graph structure
W_dir=np.abs(H)
W=W_dir+W_dir.T
np.fill_diagonal(W,0)


###############################################################################
# COMPUTE SUPERFEATURE
###############################################################################

def compute_sf(W,M):
    print('Detecting Communities...')
    from sknetwork.clustering import Louvain
    resolution=1.0
    max_iter=100
    n_com=1
    count=0
    while(n_com!=M):
        louvain=Louvain(resolution = resolution, random_state=100)
        parts = louvain.fit_transform(W)
        n_com=np.max(parts)+1
        print('No. of Communities: '+str(n_com))
        if(n_com<M):
            resolution=resolution+0.01
        if(n_com>M):
            resolution=resolution-0.01
        count+=1
        if(count>=max_iter):
            raise ValueError('Number of superfeatures should be equal to the number of communities detected.')
    print(resolution)
    sf_idx=[]
    for i in range(M):
        sf_idx.append(list(np.where(np.array(parts)==i)[0]))
    print(sf_idx)
    return sf_idx

sf_idx=compute_sf(W, M)
# size=int(d/M)
# np.random.seed(2000)
# arr=np.random.permutation(d)
# sf_idx=[arr[i*size:(i+1)*size] for i in range(M-1)]
# sf_idx.append(arr[(M-1)*size:])
#sf_idx=[np.arange(784), np.arange(784,d)]

base_val=np.mean(teacher(x_train).numpy(), axis=0)
base_logit=np.log(base_val)

###############################################################################
# EXPLAINING TEACHER
###############################################################################
n_params=teacher.count_params() # Teacher does not have any non-trainable param
hlu_new_teacher=solve(M*(n_hl-1), M*(n_hl+num_of_classes)+d, M*num_of_classes-n_params)

new_teacher=model_exp_teacher(d,num_of_classes,sf_idx)
new_teacher.fit(x_train, y_train, epochs=t_epoch, batch_size=t_batch, verbose=train_verbose)

y_pred_test=new_teacher.predict(x_test)

print('Explaining Teacher Test Classification')
print(confusion_matrix(y_test, np.argmax(y_pred_test,1)))
print(bootstrap_score(y_test, np.argmax(y_pred_test,1)))

In [None]:
import numpy as np
import tensorflow as tf
import random as rn
np.random.seed(1)
rn.seed(2)
tf.random.set_seed(3)
import os

from sklearn.preprocessing import StandardScaler, OneHotEncoder,  LabelEncoder
from sklearn.utils import resample
from sklearn.metrics import confusion_matrix, balanced_accuracy_score, accuracy_score
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.losses import categorical_crossentropy, kl_divergence
from tensorflow.keras.layers import Layer, Input, Dense, Dropout, Activation, Add, Multiply, Concatenate, LeakyReLU
from tensorflow.keras.activations import softmax
from tensorflow.keras.optimizers import SGD, RMSprop, Adagrad, Adadelta, Adam, Adamax, Nadam
from copy import deepcopy
import time
import functools
import matplotlib.pyplot as plt


d=784*2
num_of_classes=100

n_hl=2 # Number of hidden layers in student model
hlu_student=20

# Superfeatures
M=2

# Distillation params
N_train=x_train.shape[0]
s_epoch=100
s_batch=100
T=10
tau=10
L=0.7
mu=0.7

chunk_size=1000000
chi_batch=100

## Verbose
train_verbose = 1

###############################################################################
# FUNCTION DEFINITION
###############################################################################
def set_seed_TF2(seed):
    tf.random.set_seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    rn.seed(seed)
    
def custom_T(x,T=T):
    return tf.math.softmax(tf.math.log(x+1E-15)/T)

def random_order_cartesian_product(*factors, n_samples=100):
    amount = functools.reduce(lambda prod, factor: prod * len(factor), factors, 1)
    index_list = set([])
    seed=0
    while(len(index_list)!=n_samples):
        seed+=1
        rn.seed(seed)
        index=rn.randint(0, amount)
        index_list.add(index)
    for index in index_list:
        items = []
        for factor in factors:
            items.append(factor[index % len(factor)])
            index //= len(factor)
        yield items 

def model_nn_student(d, num_of_classes):
    set_seed_TF2(100)
    inp=Input(shape=(d,))
    x=Dense(hlu_student, activation='relu')(inp)
    for _ in range(n_hl-1):
        x=Dense(hlu_student, activation='relu')(x)
    tot_logit=Dense(num_of_classes)(x)
    op=Activation('softmax')(tot_logit)
    nn = Model(inputs=inp, outputs=op)
    nn.compile('adam', loss='categorical_crossentropy', metrics=['accuracy'])
    #nn.summary()
    return nn

def model_nn_soft(d, num_of_classes):
    set_seed_TF2(100)
    inp=Input(shape=(d,))
    x=Dense(hlu_student, activation='relu')(inp)
    for _ in range(n_hl-1):
        x=Dense(hlu_student, activation='relu')(x)
    tot_logit=Dense(num_of_classes)(x)
    hard_softmax=Activation('softmax')(tot_logit)
    soft_softmax=Activation('softmax')(tot_logit/T)
    nn = Model(inputs=inp, outputs=[hard_softmax, soft_softmax])
    nn.compile('adam', loss=['categorical_crossentropy', 'kl_divergence'], loss_weights=[1-L, T*T*L], metrics=['accuracy'])
    #nn.summary()
    return nn

def model_nn_ked(d, num_of_classes, sf_idx):
    set_seed_TF2(100)
    inp=Input(shape=(d,))
    logits=[]
    ops=[]
    loss=['categorical_crossentropy', 'kl_divergence']
    loss_weights=[1-L, T*T*L*(1-mu)]
    for i in range(M):
        x=Dense(hlu_ked, activation='relu')(tf.gather(inp, sf_idx[i], axis=1))
        for _ in range(n_hl-1):
            x=Dense(hlu_ked, activation='relu')(x)
        y=Dense(num_of_classes, activation='softmax', name='sf_'+str(i+1))(x)
        logits.append(tf.math.log(y+1E-15))
        ops.append(custom_T(y, T=tau))
        loss.append('kl_divergence')
        loss_weights.append(tau*tau*L*mu/M)
    tot_logit=Add()(logits)-(M-1)*base_logit 
    soft_softmax=Activation('softmax')(tot_logit/T)
    ops.insert(0,soft_softmax)
    hard_softmax=Activation('softmax')(tot_logit)
    ops.insert(0,hard_softmax)
    nn = Model(inputs=inp, outputs=ops)
    nn.compile('adam', loss=loss, loss_weights=loss_weights, metrics=['accuracy'])
    #nn.summary()
    return nn

def solve(a, b, c):
    if(b*b-4*a*c)<0:
        raise ValueError('Problem is not feasible.')
    sol=np.max(np.roots([a,b,c]))
    return int(sol)

def bootstrap_score(y_test, y_pred, metric=accuracy_score, l=0.95, seed=100):
    rng = np.random.RandomState(seed=seed)
    idx = np.arange(y_test.shape[0])
    test_accuracies = []
    for i in range(200):
        pred_idx = rng.choice(idx, size=idx.shape[0], replace=True)
        acc_test_boot = metric(y_test[pred_idx], y_pred[pred_idx])
        test_accuracies.append(acc_test_boot)
    bootstrap_score_mean = np.mean(test_accuracies)
    [ci_lower, ci_upper] = confidence_interval(test_accuracies,l)
    return bootstrap_score_mean, 0.5*(ci_upper-ci_lower)

###############################################################################
# EXPERIMENT
###############################################################################

# Dataset Preprocessing
from tensorflow.keras.datasets import mnist
(x_train_1, y_train_1), (x_test_1, y_test_1)=mnist.load_data()

x_train_1=x_train_1.astype(np.float32).reshape(-1,784)
x_test_1=x_test_1.astype(np.float32).reshape(-1,784)

from tensorflow.keras.datasets import fashion_mnist
(x_train_2, y_train_2), (x_test_2, y_test_2)=fashion_mnist.load_data()

x_train_2=x_train_2.astype(np.float32).reshape(-1,784)
x_test_2=x_test_2.astype(np.float32).reshape(-1,784)


x_train=np.concatenate([x_train_1,x_train_2], axis=1)
x_test=np.concatenate([x_test_1,x_test_2], axis=1)
alpha_le=LabelEncoder()
alpha_le.fit(['A','B','C','D','E','F','G','H','I','J'])
alpha_train_2=alpha_le.inverse_transform(y_train_2)
alpha_test_2=alpha_le.inverse_transform(y_test_2)
y_train=[a+b for a,b in zip(y_train_1.astype(str),alpha_train_2)]
y_test=[a+b for a,b in zip(y_test_1.astype(str),alpha_test_2)]

le=LabelEncoder()
y_train=le.fit_transform(y_train)
y_test=le.transform(y_test)

# Scaling
#scl=StandardScaler()
#x_train=scl.fit_transform(x_train)
#x_test=scl.transform(x_test)
x_train=x_train/255.0
x_test=x_test/255.0

y_train=y_train.reshape(-1,1)
y_test=y_test.reshape(-1,1)

# One-hot Encoding
enc = OneHotEncoder(sparse=False)
y_train=enc.fit_transform(y_train)

base_val=np.mean(teacher(x_train).numpy(), axis=0)
base_logit=np.log(base_val)

regular=[]
distilled_soft=[]
distilled_ked=[]
distilled_chi=[]
baseline=[]

idx=np.random.permutation(x_train.shape[0])[0:N_train]
x_tr=x_train[idx]
y_tr=y_train[idx]

# Soft Labels
soften = Model(inputs=teacher.inputs, outputs=[layer.output for layer in teacher.layers])
y_soft = softmax(soften(x_tr)[-2]/T)

###############################################################################
# STUDENT
###############################################################################
student=model_nn_student(d,num_of_classes)

student.fit(x_tr, y_tr, epochs=s_epoch, batch_size=s_batch, verbose=train_verbose)
regular.append(bootstrap_score(y_test, np.argmax(student.predict(x_test),1)))

print('Regular Test Classification')
print(confusion_matrix(y_test, np.argmax(student.predict(x_test),1)))
print(bootstrap_score(y_test, np.argmax(student.predict(x_test),1)))


###############################################################################
# SOFT DISTILLATION
###############################################################################
student_soft=model_nn_soft(d, num_of_classes)

student_soft.fit(x_tr, [y_tr, y_soft], epochs=s_epoch, batch_size=s_batch, verbose=train_verbose)
y_pred_test=student_soft.predict(x_test)[0]
distilled_soft.append(bootstrap_score(y_test, np.argmax(y_pred_test,1)))

print('Distilled Test Classification (Soft)')
print(confusion_matrix(y_test, np.argmax(y_pred_test, 1)))
print(bootstrap_score(y_test, np.argmax(y_pred_test,1)))    

###############################################################################
# KNOWLEDGE EXPLAINING DISTILLATION
###############################################################################
train_labels=[y_tr, custom_T(new_teacher(x_tr))]
for i in range(M):
    soften_sf = Model(inputs=new_teacher.inputs, outputs=new_teacher.get_layer('sf_'+str(i+1)).output)
    y_sf=soften_sf(x_tr).numpy()
    train_labels.append(custom_T(y_sf, T=tau))
    
n_params=student.count_params() # Student does not have any non-trainable param
hlu_ked=solve(M*(n_hl-1), M*(n_hl+num_of_classes)+d, M*num_of_classes-n_params)
    
student_ked=model_nn_ked(d, num_of_classes, sf_idx)

student_ked.fit(x_tr, train_labels, epochs=s_epoch, batch_size=s_batch, verbose=train_verbose)
y_pred_test=student_ked.predict(x_test)[0]
distilled_ked.append(bootstrap_score(y_test, np.argmax(y_pred_test,1)))

print('Distilled Test Classification (KED)')
print(confusion_matrix(y_test, np.argmax(y_pred_test, 1)))
print(bootstrap_score(y_test, np.argmax(y_pred_test,1)))

###############################################################################
# DISTILLATION USING CHIMERIC SET
###############################################################################
# Student's chimeric set
sample_lists=[]
for i in range(M):
    sample_lists.append(x_tr[:,sf_idx[i]])
gen=random_order_cartesian_product(*sample_lists, n_samples=chunk_size)

chunk_list=[]
# M-fold Cartesian product
t=time.time()
for element in gen:
    jumbled=np.hstack(element)
    ordered=jumbled[np.argsort(np.hstack(sf_idx))]
    chunk_list.append(ordered)
elapsed_time=time.time()-t
print(elapsed_time)

x_chi=np.asarray(chunk_list)
y_chi=new_teacher(x_chi)

train_labels=[enc.transform(np.argmax(y_chi,1).reshape(-1,1)), custom_T(y_chi)]
for i in range(M):
    soften_sf = Model(inputs=new_teacher.inputs, outputs=new_teacher.get_layer('sf_'+str(i+1)).output)
    y_sf=soften_sf(x_chi).numpy()
    train_labels.append(custom_T(y_sf, T=tau))

student_chi=student_ked

student_chi.fit(x_chi, train_labels, batch_size=chi_batch, verbose=train_verbose)
y_pred_test=student_chi.predict(x_test)[0]
distilled_chi.append(bootstrap_score(y_test, np.argmax(y_pred_test,1)))
    
print('Distilled Test Classification (Chimeric)')
print(confusion_matrix(y_test, np.argmax(y_pred_test, 1)))
print(bootstrap_score(y_test, np.argmax(y_pred_test,1)))
    
###############################################################################
# BASELINE
###############################################################################
x_base=deepcopy(x_chi)
y_base=teacher(x_base)
y_base_soft=custom_T(y_base)
    
student_base=student_soft

student_base.fit(x_base, [y_base, y_base_soft], batch_size=chi_batch, verbose=train_verbose)
y_pred_test=student_base.predict(x_test)[0]
baseline.append(bootstrap_score(y_test, np.argmax(y_pred_test,1)))
    
print('Distilled Test Classification (Baseline)')
print(confusion_matrix(y_test, np.argmax(y_pred_test, 1)))
print(bootstrap_score(y_test, np.argmax(y_pred_test,1)))

In [None]:
import PIL.Image as pil
###############################################################################
# VISUALIZE SUPERFEATURES
###############################################################################
i=100
img = pil.fromarray(np.uint8(x_train[i].reshape(56,28) * 255) , 'L')
img.show()
sf_1=deepcopy(x_train[i])
sf_1[sf_idx[1]]=0
img_1 = pil.fromarray(np.uint8(sf_1.reshape(56,28) * 255) , 'L')
img_1.show()
sf_2=deepcopy(x_train[i])
sf_2[sf_idx[0]]=0
img_2 = pil.fromarray(np.uint8(sf_2.reshape(56,28) * 255) , 'L')
img_2.show()
