exam the similarity metric difference
1. representation cosine
2. EL2N
3. Grad Norm

In [None]:
import os, sys
import json
import torch
import numpy as np
import torch.nn as nn
import torch.optim as optim

from scipy.spatial import distance
from scipy import stats
from scipy.special import softmax

sys.path.append("..")
from singleVis.data import NormalDataProvider
from singleVis.utils import find_neighbor_preserving_rate

In [None]:
VIS_METHOD = "tDVI" # DeepVisualInsight
CONTENT_PATH = "/home/xianglin/projects/DVI_data/resnet18_mnist"
GPU_ID = "0"

In [None]:
sys.path.append(CONTENT_PATH)
with open(os.path.join(CONTENT_PATH, "config.json"), "r") as f:
    config = json.load(f)
config = config[VIS_METHOD]

In [None]:
SETTING = config["SETTING"]
CLASSES = config["CLASSES"]
DATASET = config["DATASET"]
PREPROCESS = config["VISUALIZATION"]["PREPROCESS"]

# Training parameter (subject model)
TRAINING_PARAMETER = config["TRAINING"]
NET = TRAINING_PARAMETER["NET"]
LEN = TRAINING_PARAMETER["train_num"]
EPOCH_START = config["EPOCH_START"]
EPOCH_END = config["EPOCH_END"]
EPOCH_PERIOD = config["EPOCH_PERIOD"]

# Training parameter (visualization model)
VISUALIZATION_PARAMETER = config["VISUALIZATION"]
VIS_MODEL = VISUALIZATION_PARAMETER['VIS_MODEL']
LAMBDA = VISUALIZATION_PARAMETER["LAMBDA"]
B_N_EPOCHS = VISUALIZATION_PARAMETER["BOUNDARY"]["B_N_EPOCHS"]
L_BOUND = VISUALIZATION_PARAMETER["BOUNDARY"]["L_BOUND"]
ENCODER_DIMS = VISUALIZATION_PARAMETER["ENCODER_DIMS"]
DECODER_DIMS = VISUALIZATION_PARAMETER["DECODER_DIMS"]
S_N_EPOCHS = VISUALIZATION_PARAMETER["S_N_EPOCHS"]
N_NEIGHBORS = VISUALIZATION_PARAMETER["N_NEIGHBORS"]
PATIENT = VISUALIZATION_PARAMETER["PATIENT"]
MAX_EPOCH = VISUALIZATION_PARAMETER["MAX_EPOCH"]

VIS_MODEL_NAME = VISUALIZATION_PARAMETER["VIS_MODEL_NAME"]
EVALUATION_NAME = VISUALIZATION_PARAMETER["EVALUATION_NAME"]

# Define hyperparameters
DEVICE = torch.device("cuda:{}".format(GPU_ID) if torch.cuda.is_available() else "cpu")

In [None]:
VIS_MODEL_NAME = "tDVI_baseAE"
EVALUATION_NAME = "evaluation_tDVI_baseAE"

In [None]:
import Model.model as subject_model
net = eval("subject_model.{}()".format(NET))

In [None]:
# Define data_provider
data_provider = NormalDataProvider(CONTENT_PATH, net, EPOCH_START, EPOCH_END, EPOCH_PERIOD, device=DEVICE, classes=CLASSES, epoch_name="Epoch", verbose=1)
if PREPROCESS:
    data_provider._meta_data()
    if B_N_EPOCHS >0:
        data_provider._estimate_boundary(LEN//10, l_bound=L_BOUND)

# Define semantic change

In [None]:
def gradient_diff(prev_e, next_e, x, target, data_provider, criterion):

    model_t = data_provider.model_function(prev_e)
    model_t = model_t.to(DEVICE)
    optimizer = optim.SGD(model_t.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)

    # Forward pass and compute gradients at time t
    output_t = model_t(x)
    loss_t = criterion(output_t, target)
    optimizer.zero_grad()
    loss_t.backward()

    # Save gradients at time t
    grads_t = [p.grad.clone() for p in model_t.parameters()]


    model_t1 = data_provider.model_function(next_e)
    model_t1 = model_t1.to(DEVICE)
    optimizer = optim.SGD(model_t1.parameters(), lr=0.1, momentum=0.9, weight_decay=5e-4)
    # Forward pass and compute gradients at time t+1
    output_t1 = model_t1(x)
    loss_t1 = criterion(output_t1, target)
    optimizer.zero_grad()
    loss_t1.backward()

    # Save gradients at time t+1
    grads_t1 = [p.grad.clone() for p in model_t1.parameters()]

    # Compute cosine similarity between gradients at t and t+1
    cos_sim_values = []
    cos = nn.CosineSimilarity(dim=0)
    for g_t, g_t1 in zip(grads_t, grads_t1):
        cos_sim = cos(g_t.flatten(), g_t1.flatten())
        cos_sim_values.append(cos_sim.item())

    # Average cosine similarity
    avg_cos_sim = sum(cos_sim_values) / len(cos_sim_values)

    # Compute cosine distance
    cos_dist = 1 - avg_cos_sim

    # print(f"Cosine Distance: {cos_dist}")
    return cos_dist

In [None]:
prev_e = 13
next_e = 15

In [None]:
criterion = nn.CrossEntropyLoss()

training_data = data_provider._training_data()
targets = data_provider.train_labels(prev_e)

test_len = 100
idxs = np.random.choice(len(training_data), test_len, replace=False)

dists = np.zeros(test_len)
for i in range(test_len):
    x = training_data[idxs[i]:idxs[i]+1]
    y = torch.from_numpy(targets[idxs[i]:idxs[i]+1]).to(DEVICE)
    dist = gradient_diff(prev_e, next_e, x, y, data_provider, criterion)
    dists[i] = dist

In [None]:
# compute EL2N
prev_data = data_provider.train_representation(prev_e)
next_data = data_provider.train_representation(next_e)
train_labels = data_provider.train_labels(prev_e)
prev_pw = data_provider.get_pred(next_e, prev_data)
next_pw = data_provider.get_pred(next_e, next_data)
y = np.eye(np.max(train_labels)+1)[train_labels]

prev_pw = softmax(prev_pw, axis=1)
next_pw = softmax(next_pw, axis=1)
prev_el2n = prev_pw-y
next_el2n = next_pw-y

In [None]:
print("Temporal repr")
repr_dists = np.array([distance.cosine(prev_data[idxs[i]], next_data[idxs[i]]) for i in range(len(idxs))])
repr_dists_eu = np.array([distance.euclidean(prev_data[idxs[i]], next_data[idxs[i]]) for i in range(len(idxs))])
stats.pearsonr(repr_dists, dists), stats.pearsonr(repr_dists_eu, dists)

In [None]:
# z-score similarity calculation
from scipy import stats
prev_norm = stats.zscore(prev_data)
next_norm = stats.zscore(next_data)
norm_repr_dists_eu = np.array([distance.euclidean(prev_norm[idxs[i]], next_norm[idxs[i]]) for i in range(len(idxs))])
stats.pearsonr(norm_repr_dists_eu, dists)

In [None]:
# # neighbor preserving rate
# npr_eu = find_neighbor_preserving_rate(prev_data[idxs], next_data[idxs], n_neighbors=15, metric="euclidean")
# npr_cosine = find_neighbor_preserving_rate(prev_data[idxs], next_data[idxs], n_neighbors=15, metric="cosine")
# stats.spearmanr(npr_eu, dists), stats.spearmanr(npr_cosine, dists)

# evaluate

In [None]:
criterion = nn.CrossEntropyLoss()

training_data = data_provider._training_data()
targets = data_provider.train_labels(1)

test_len = 10
idxs = np.random.choice(len(training_data), test_len, replace=False)

In [None]:
data = training_data[idxs]
labels = torch.from_numpy(targets[idxs]).to(DEVICE)

start = data_provider.s
end = data_provider.e
period = data_provider.p
LEN = len(idxs)
EPOCH = (end - start) // period + 1

In [None]:
high_diff = np.zeros((test_len, EPOCH-1))
for i in range(test_len):
    for prev_e, next_e in zip(range(start, EPOCH), range(start+period,EPOCH+1)):
        cos_diff = gradient_diff(prev_e, next_e, data[i:i+1], labels[i:i+1], data_provider, criterion)
        high_diff[i, prev_e-1] = cos_diff
high_diff

In [None]:
from singleVis.projector import DVIProjector
from singleVis.vis_models import vis_models as vmodels
VIS_MODEL_NAME = "tDVI_cnAE_sequence"
VIS_MODEL = "cnAE"
model = vmodels[VIS_MODEL](ENCODER_DIMS, DECODER_DIMS)
projector = DVIProjector(vis_model=model, content_path=CONTENT_PATH, vis_model_name=VIS_MODEL_NAME, epoch_name="Epoch", device=DEVICE)

In [None]:
from singleVis.projector import tfDVIProjector
flag = "_temporal_id_withoutB"
projector = tfDVIProjector(CONTENT_PATH, flag=flag)

In [None]:
from singleVis.projector import TimeVisProjector
from singleVis.SingleVisualizationModel import VisModel
ENCODER_DIMS = [512,256,256,256,256,256,2]
DECODER_DIMS = [2,256,256,256,256,256,512]
ENCODER_DIMS = [512,256,2]
DECODER_DIMS = [2,256,512]
VIS_MODEL_NAME = "timevis"
model = VisModel(ENCODER_DIMS, DECODER_DIMS)
projector = TimeVisProjector(vis_model=model, content_path=CONTENT_PATH, vis_model_name=VIS_MODEL_NAME, device=DEVICE)

In [None]:
low_repr = np.zeros((EPOCH,LEN,2))
for i in range(start,end + 1, period):
    index = (i - start) //  period
    low_repr[index] = projector.batch_project(i, data_provider.train_representation(i)[idxs]);

In [None]:
low_repr = low_repr.transpose([1,0,2])
low_dists = np.linalg.norm(low_repr[:,start//period:,:]-low_repr[:,:(end-period)//period,:], axis=2)
low_dists.shape

In [None]:
corrs = np.zeros(LEN)
ps = np.zeros(LEN)
for i in range(LEN):
    corr, p = stats.spearmanr(high_diff[i], low_dists[i])
    corrs[i] = corr
    ps[i] = p
print(f"Global temporal ranking #train:{corrs.mean()}")

In [None]:
corrs = np.zeros(LEN)
ps = np.zeros(LEN)
for i in range(LEN):
    corr, p = stats.spearmanr(high_diff[i], low_dists[i])
    corrs[i] = corr
    ps[i] = p
print(f"DVI: Global temporal ranking #train:{corrs.mean()}")

In [None]:
corrs = np.zeros(LEN)
ps = np.zeros(LEN)
for i in range(LEN):
    corr, p = stats.spearmanr(high_diff[i], low_dists[i])
    corrs[i] = corr
    ps[i] = p
print(f"TimeVis: Global temporal ranking #train:{corrs.mean()}")

In [None]:
from singleVis.eval.evaluator import Evaluator
from singleVis.eval.evaluate import *

In [None]:
from singleVis.projector import tfDVIProjector
projector = tfDVIProjector(CONTENT_PATH, flag)
evaluator = Evaluator(data_provider, projector, metric="euclidean")

In [None]:
from singleVis.projector import TimeVisProjector
projector = TimeVisProjector(vis_model=model, content_path=CONTENT_PATH, vis_model_name=VIS_MODEL_NAME, device=DEVICE)
evaluator = Evaluator(data_provider, projector, metric="euclidean")

In [None]:
from singleVis.projector import DVIProjector
VIS_MODEL_NAME = "tDVI_baseAE"
VIS_MODEL = "bnAE"
from singleVis.vis_models import vis_models as vmodels
model = vmodels[VIS_MODEL](ENCODER_DIMS, DECODER_DIMS)
projector = DVIProjector(vis_model=model, content_path=CONTENT_PATH, vis_model_name=VIS_MODEL_NAME, epoch_name="Epoch", device=DEVICE)
evaluator = Evaluator(data_provider,projector, metric="euclidean")

In [None]:
from singleVis.projector import DVIProjector
VIS_MODEL_NAME = "tDVI_cnAE"
# EVALUATION_NAME = "evaluation_singleDVI_baseAE"
VIS_MODEL = "cnAE"
model = vmodels[VIS_MODEL](ENCODER_DIMS, DECODER_DIMS)
projector = DVIProjector(vis_model=model, content_path=CONTENT_PATH, vis_model_name=VIS_MODEL_NAME, epoch_name="Epoch", device=DEVICE)
evaluator = Evaluator(data_provider,projector, metric="euclidean")

In [None]:
p = np.zeros(15)
for i in range(1, 16):
    p[i-1] = evaluator.eval_temporal_nn_test(i, 3)

In [None]:
epoch = 10
n_neighbors = 3
epoch_num = (data_provider.e - data_provider.s) // data_provider.p + 1
l = data_provider.test_num
high_dists = np.zeros((l, epoch_num))
low_dists = np.zeros((l, epoch_num))

curr_data = data_provider.test_representation(epoch)
curr_embedding = projector.batch_project(epoch, curr_data)

for t in range(epoch_num):
    data = data_provider.test_representation(t * data_provider.p + data_provider.s)
    embedding = projector.batch_project(t * data_provider.p + data_provider.s, data)

    high_dist = evaluate_embedding_distance(data, curr_data, metric="euclidean", one_target=False)
    low_dist = evaluate_embedding_distance(embedding, curr_embedding, metric="euclidean", one_target=False)
    high_dists[:, t] = high_dist
    low_dists[:, t] = low_dist

# find the index of top k dists
# argsort descent order
high_orders = np.argsort(high_dists, axis=1)
low_orders = np.argsort(low_dists, axis=1)

high_rankings = high_orders[:, 1:n_neighbors+1]
low_rankings = low_orders[:, 1:n_neighbors+1]

corr = np.zeros(len(high_dists))
for i in range(len(data)):
    corr[i] = len(np.intersect1d(high_rankings[i], low_rankings[i]))
print(corr.mean())1] = evaluator.eval_temporal_nn_test(i, 3)