<a href="https://colab.research.google.com/github/mariemtouihri/GRAM-Metric/blob/main/GRAM_calculation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import libraries and data

In [None]:
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
import math
from sklearn.model_selection import train_test_split
import numpy as np
import torch
import torch.nn as nn
from torch.optim import Adam
from scipy.integrate import quad
import torch.nn.functional as F
from torch.autograd import Function

In [None]:
#  To load files
file_path_1 = 'path to true values .npy' # reference matrices
file_path_2 = 'path to predictions .npy' # predictions or augmented matrices

true_values = np.load(file_path_1)
predictions = np.load(file_path_2)

del file_path_1, file_path_2

### Normalize graphs

In [None]:
def normalize_graph(dataset):
    # Iterate over each data sample in the dataset
    for matrix in dataset:
        # Access the x attribute (target adjacency matrix)
        matrix = torch.from_numpy(matrix)

        # Compute min and max values for normalization
        min_val = matrix.min()
        max_val = matrix.max()

        # Normalize the matrix in-place
        matrix.sub_(min_val).div_(max_val - min_val)

        # Convert the PyTorch tensor back to a NumPy array
        matrix = matrix.numpy()

    # Clear local variables
    for var in list(locals().keys()):
      del locals()[var]

In [None]:
normalize_graph(true_values)
normalize_graph(predictions)

# Correlation

In [None]:
def calculate_correlations(original_values, modified_values, metric):
  p = []

  for i in range(0, len(original_values), 90):

    # Calculate Pearson Correlation (can be changed according the type of correlation to be calculated)
    p_correlation = np.corrcoef(original_values[i:i+90], modified_values[i:i+90])[0, 1]
    p.append(p_correlation)


  # store results in an .npy file depending on the used metric
  p_array = np.array(p)
  file_path = f'file path to {metric} correlations .npy'
  np.save(file_path, p_array)
  print(f"Results saved to {file_path}")


  # Clear local variables
  for var in list(locals().keys()):
        del locals()[var]


# Metrics

## Closeness

In [None]:
def closeness (matrices):

    closeness_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      closeness_v = nx.closeness_centrality(G, distance='weight')

      closeness_list = list(closeness_v.values())
      closeness_values.extend(closeness_list)


    # save results
    list_array = np.array(closeness_values)
    file_path = f'file path to closeness values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")

    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["closeness_values"]:  # Retain variables needed for return
            del locals()[var]

    return closeness_values

In [None]:
closeness_true_values = closeness(true_values)
closeness_predictions = closeness(predictions)
correlations = calculate_correlations(closeness_true_values, closeness_predictions, metric='closeness')
del closeness_true_values, closeness_predictions, correlations

## Betweenness

In [None]:
def betweenness (matrices):

    betweenness_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      betweenness_v = nx.betweenness_centrality(G, weight='weight', normalized=True)


      betweenness_list = list(betweenness_v.values())
      betweenness_values.extend(betweenness_list)

    # save results
    list_array = np.array(betweenness_values)
    file_path = f'file path to betweenness values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")

    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["betweenness_values"]:  # Retain variables needed for return
            del locals()[var]


    return betweenness_values

In [None]:
betweenness_true_values = betweenness(true_values)
betweenness_predictions = betweenness(predictions)
correlations = calculate_correlations(betweenness_true_values, betweenness_predictions, metric='betweenness')
del betweenness_true_values, betweenness_predictions, correlations

## Average neighbor degree

In [None]:
def avg_n_d (matrices):

    avg_n_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      avg_n_v = nx.average_neighbor_degree(G, weight='weight', source= 'in+out')

      avg_n_list = list(avg_n_v.values())
      avg_n_values.extend(avg_n_list)


    # save results
    list_array = np.array(avg_n_values)
    file_path = f'file path to avg_n values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")

    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["avg_n_values"]:  # Retain variables needed for return
            del locals()[var]


    return avg_n_values

In [None]:
avg_n_true_values = avg_n_d(true_values)
avg_n_predictions = avg_n_d(predictions)
correlations = calculate_correlations(avg_n_true_values, avg_n_predictions, metric='avg_n_d')
del avg_n_true_values, avg_n_predictions, correlations

## Diversity index

In [None]:
def matrices_with_nodes_zero (matrix):

  # Generate different matrices with each node set to zero
  matrices_with_nodes_zeroed = []


  for i in range(matrix.shape[0]):

    new_matrix = matrix.copy()
    # Create a copy of the original matrix
    new_matrix[i, :] = 0  # Set the ith row to zero
    new_matrix[:, i] = 0  # Set the ith column to zero

    # Append the modified graph to the list
    matrices_with_nodes_zeroed.append(new_matrix)
    array_of_matrices = np.array(matrices_with_nodes_zeroed)


  return array_of_matrices


In [None]:
def shannon_diversity_index(graph):
    diversity_index = 0.0

    for node in graph.nodes():
        # Get the outgoing edge weights of the node
        outgoing_weights = [data['weight'] for _, _, data in graph.out_edges(node, data=True)]

        # Calculate the relative proportion of each outgoing edge weight
        total_weight = sum(outgoing_weights)
        proportions = [weight / total_weight for weight in outgoing_weights]

        # Calculate the Shannon Diversity Index for the node
        node_diversity = -sum(p * math.log(p) for p in proportions if p > 0)

        # Add the node's diversity to the overall diversity index
        diversity_index += node_diversity

    return diversity_index

In [None]:
def diversity (matrices):

    div_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]

      new_matrices = matrices_with_nodes_zero(matrix)

      for new_matrix in new_matrices:
        new_G = nx.DiGraph(new_matrix)
        diversity_index = shannon_diversity_index(new_G)
        div_values.append(diversity_index)

    # save results
    list_array = np.array(div_values)
    file_path = f'file path to diversity values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")

    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["div_values"]:  # Retain variables needed for return
            del locals()[var]


    return div_values

In [None]:
diversity_true_values = diversity(true_values)
diversity_predictionss = diversity(predictions)
correlations = calculate_correlations(diversity_true_values, diversity_predictionss, metric='diversity')
del diversity_true_values, diversity_predictionss, correlations

## Eigenvector


In [None]:
def eigen (matrices):

    eigen_c_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      eigenvector_c = nx.eigenvector_centrality(G, weight='weight', max_iter=500)

      eigenvector_c_values = list(eigenvector_c.values())
      eigen_c_values.extend(eigenvector_c_values)

    # save results
    list_array = np.array(eigen_c_values)
    file_path = f'file path to eigen values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")


    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["eigen_c_values"]:  # Retain variables needed for return
            del locals()[var]


    return eigen_c_values

In [None]:
eigen_true_values = eigen(true_values)
eigen_predictions = eigen(predictions)
correlations = calculate_correlations(eigen_true_values, eigen_predictions, metric='eigen')
del eigen_true_values, eigen_predictions, correlations

## Weighted degree

In [None]:
def w_degree (matrices):
  w_in = []
  w_out = []
  for i in range(len(matrices)):

    matrix = matrices[i]
    G = nx.DiGraph(matrix)

    weighted_in_degree_centrality = nx.in_degree_centrality(G)
    w_in_degree_values = list(weighted_in_degree_centrality.values())

    weighted_out_degree_centrality= nx.out_degree_centrality(G)
    w_out_degree_values = list(weighted_out_degree_centrality.values())

    w_in.extend(w_in_degree_values)
    w_out.extend(w_out_degree_values)

  # do the summation of w_in and w_out
  w = [x + y for x, y in zip(w_in, w_out)]

  # save results
  list_array = np.array(w)
  file_path = f'file path to w_degree values .npy'
  np.save(file_path, list_array)
  print(f"Values are saved")


  # Clear local variables
  for var in list(locals().keys()):
      if var not in ["w"]:  # Retain variables needed for return
          del locals()[var]


  return w

In [None]:
w_degree_true_values = w_degree(true_values)
w_degree_predictions = w_degree(predictions)
correlations = calculate_correlations(w_degree_true_values, w_degree_predictions, metric='w_degree')
del w_degree_true_values, w_degree_predictions, correlations

## Katz

In [None]:
def katz (matrices):

    katz_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      katz_centrality = nx.katz_centrality_numpy(G, alpha=0.5, weight="weight")

      katz_centrality = list(katz_centrality.values())
      katz_values.extend(katz_centrality)

    # save results
    list_array = np.array(katz_values)
    file_path = f'file path to katz values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")


    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["katz_values"]:  # Retain variables needed for return
            del locals()[var]


    return katz_values

In [None]:
katz_true_values = katz(true_values)
katz_predictions = katz(predictions)
correlations = calculate_correlations(katz_true_values, katz_predictions, metric='katz')
del katz_true_values, katz_predictions, correlations

## Hub-authority

In [None]:
def hub_auth (matrices):

    hub_auth_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)

      # Calculate Hub and Authority Centrality
      hub_scores, authority_scores = nx.hits(G, max_iter=100, tol=1e-6, normalized=True)

      # Calculate combined centrality scores by summing Hub and Authority values
      combined_centrality = {node: hub_scores[node] + authority_scores[node] for node in G.nodes()}


      combined_centrality = list(combined_centrality.values())
      hub_auth_values.extend(combined_centrality)

    # save results
    list_array = np.array(hub_auth_values)
    file_path = f'file path to hub_auth values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")


    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["hub_auth_values"]:  # Retain variables needed for return
            del locals()[var]


    return hub_auth_values

In [None]:
hub_auth_true_values = hub_auth(true_values)
hub_auth_predictions = hub_auth(predictions)
correlations = calculate_correlations(hub_auth_true_values, hub_auth_predictions, metric='hub_auth')
del hub_auth_true_values, hub_auth_predictions, correlations

## Harmony

In [None]:
def harmony (matrices):

    harmony_values = []
    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      harmony_v = nx.harmonic_centrality(G, distance="edge")

      harmony_list = list(harmony_v.values())
      harmony_values.extend(harmony_list)


    # save results
    list_array = np.array(harmony_values)
    file_path = f'file path to harmony values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")


    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["harmony_values"]:  # Retain variables needed for return
            del locals()[var]


    return harmony_values

In [None]:
harmony_true_values = harmony(true_values)
harmony_predictions = harmony(predictions)
correlations = calculate_correlations(harmony_true_values, harmony_predictions, metric='harmony')
del harmony_true_values, harmony_predictions, correlations

## Pagerank

In [None]:
def pagerank (matrices):

    pagerank_c_values = []

    for i in range(len(matrices)):
      matrix = matrices[i]
      G = nx.DiGraph(matrix)
      pagerank_centrality = nx.pagerank(G, weight='weight')
      pagerank_values = list(pagerank_centrality.values())
      pagerank_c_values.extend(pagerank_values)


    # save results
    list_array = np.array(pagerank_c_values)
    file_path = f'file path to pagerank values .npy'
    np.save(file_path, list_array)
    print(f"Values are saved")


    # Clear local variables
    for var in list(locals().keys()):
        if var not in ["pagerank_c_values"]:  # Retain variables needed for return
            del locals()[var]


    return pagerank_c_values

In [None]:
pagerank_true_values = pagerank(true_values)
pagerank_predictions = pagerank(predictions)
correlations = calculate_correlations(pagerank_true_values, pagerank_predictions, metric='pagerank')
del pagerank_true_values, pagerank_predictions, correlations

# Create A_vectors


In [None]:
# read correlations from stored files
metrics = ["betweenness", "closeness", "w_degree", "eigen", "pagerank", "katz", "hub_auth", "harmony", "avg_n_d", "diversity"]
all_corr_dict = {}

for metric in metrics:
  all_corr_dict[metric] = []


A_vectors = [np.zeros((1, 10)) for _ in range(len(all_corr_dict["betweenness"]))] # to intiate the length of the A_vectors

for metric in metrics:

  file_path = f'file path to {metric} correlations .npy'
  data = np.load(file_path)

  all_corr_dict[metric].extend(list(data))



column = 0
for metric, values in all_corr_dict.items():

  for i in range(len(values)):
    A_vectors[i][0, column] = values[i]


  column +=1


# save results
file_path = f'file path to A_vectors .npy'
np.save(file_path, A_vectors)
print(f"A_vectors are saved")



In [None]:
# To delete unneeded variables to avoid memory crash
def clear_variables():
    # Delete variables while keeping methods and imported libraries
    for name in list(globals().keys()):
        if not name.startswith("_") and not callable(globals()[name]) and not isinstance(globals()[name], type(__builtins__)):
            del globals()[name]

# Call the function to clear variables
clear_variables()

# GRAM calculation

In [None]:
def gram_testing(predictions):

    # Convert predictions to tensors if not already
    predictions_tensors = [torch.tensor(matrix, dtype=torch.float32) for matrix in predictions]

    product_list = []

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


    for A in predictions_tensors:
        # Move each tensor to the appropriate device
        A = A.to(device)



        B_tensor = torch.tensor([[0.230],
       [0.473],
       [-0.087],
       [-0.067],
       [0.133],
       [0.022],
       [0.001],
       [0.022],
       [0.309],
       [0.260]], dtype=torch.float32)  # the outputed result from the training (of pearson correlations) according to GRAM paper
        B_tensor = B_tensor.to(device)


        product = torch.matmul(A, B_tensor)
        # Move the product back to CPU and append to the list as a scalar
        product_list.append(product.cpu().numpy())

    # Convert the product list to a numpy array for mean calculation
    product_tensor = torch.tensor(product_list)
    mean_product = torch.mean(product_tensor)
    std_product = torch.std(product_tensor)

    print(f"The distortion level is: {mean_product.item() * 100:.2f}%")  # Display as a percentage with 2 decimal places
    print(f"The standard deviation is: {std_product.item()}")

    return mean_product.item(), std_product.item()


In [None]:
file_path = f'file path to A_vectors .npy'
A_vectors = np.load(file_path)
gram_value = gram_testing(A_vectors)

# GRAM Training

### Need to generate graphs with augmented noise to create A_matrices instead of A_vectors where each row of the A_matrices represent the noise level (as explained in GRAM paper)

In [None]:
def read_correlations():

  X = {}

  types = ["pearson"]  # other correlations can be used
  for corr_type in types:
    file_path = f'path to A_matrices .npy' # Path to A_matrices or A_vectors
    A_matrices = np.load(file_path)
    X[corr_type] = {}
    X[corr_type]["X_train"] = []
    X[corr_type]["X_test"] = []

    # Split data into training and testing sets
    X_train, X_test = train_test_split(A_matrices, test_size=0.2, random_state=42)

    X[corr_type]["X_train"] = X_train
    X[corr_type]["X_test"] = X_test

  return X




def gram_training(correlation, X, target_area1, target_area2, model):

  A_tensors = [torch.tensor(A, dtype=torch.float32) for A in X[correlation]["X_train"]]


  model = SurfacePredictionModel()
  optimizer = Adam(list(model.parameters()),lr=0.01)

  # Training loop
  num_epochs = 250
  for epoch in range(num_epochs):
      optimizer.zero_grad()
      loss = custom_loss(A_tensors, target_area1, target_area2, model)
      loss.backward()
      optimizer.step()

      if epoch % 25 == 0:
          print(f'Epoch {epoch}/{num_epochs}, Loss: {loss.item()}')

  # Save the trained model
  model_path = f'path to save the model after training' # Path to save the model
  torch.save(model.state_dict(), model_path)
  print(f'Model saved to {model_path}')


  # The B matrix after training
  optimized_B = model.linear.weight.detach().numpy()
  optimized_B = optimized_B.reshape(-1,1)

  return optimized_B


In [None]:
# reference vector
C_vector = np.array([0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.3, 0.2, 0.1, 0.0])

noise1 = np.array([0.1, 0.2, 0.3, 0.4, 0.5])
noise2 = np.array([0.5, 0.6, 0.7, 0.8, 0.9, 1.0])

noise_tensor1 = torch.tensor(noise1, dtype=torch.float32)
noise_tensor2 = torch.tensor(noise2, dtype=torch.float32)

In [None]:
class SurfacePredictionModel(nn.Module):
    def __init__(self):
        super(SurfacePredictionModel, self).__init__()
        self.linear = nn.Linear(10, 1, bias=False)  # B matrix as a linear layer without bias
        self.linear.weight.requires_grad = True

    def forward(self, A):
        # The forward pass will multiply A with the learned B matrix
        B = self.linear.weight
        return torch.matmul(A, B.t())


def fit_polynomial(x, y, degree):
    # Create Vandermonde matrix for polynomial fit
    powers = torch.arange(degree + 1).unsqueeze(0).repeat(len(x), 1)
    X = torch.pow(x.unsqueeze(1), powers)
    # Solve the least squares problem X * coeffs = y to find coeffs
    result = torch.linalg.lstsq(X, y.unsqueeze(1))
    coeffs = result.solution[:degree + 1, 0]  # Get the coefficients up to the required degree
    return coeffs

def approximate_integral(poly_coeffs, x_min, x_max, num_points=1000):
    x = torch.linspace(x_min, x_max, num_points)
    powers = torch.arange(len(poly_coeffs)).flip(0).unsqueeze(0).repeat(len(x), 1)
    poly_values = torch.pow(x.unsqueeze(1), powers) * poly_coeffs.flip(0).unsqueeze(0)
    integral = poly_values.sum(dim=1).mean() * (x_max - x_min)
    return integral

def custom_loss(A_tensors, target_area1,target_area2, model):
    total_loss1 = 0.0
    total_loss2 = 0.0
    degree = 9

    for A in A_tensors:
        predicted_C = model(A)
        predicted_C1 = predicted_C[:5].view(-1)  # First 5 elements, reshaped for fitting
        predicted_C2 = predicted_C[-6:].view(-1)  # Last 6 elements, reshaped for fitting

        # Fit and integrate for the first part
        coefficients1 = fit_polynomial(noise_tensor1, predicted_C1, degree)
        integral_value1 = approximate_integral(coefficients1, noise_tensor1.min(), noise_tensor1.max())
        integral_value_tensor1 = integral_value1.requires_grad_(True)
        loss1 = nn.functional.l1_loss(integral_value_tensor1, target_area1)
        total_loss1 += loss1

        # Fit and integrate for the second part
        coefficients2 = fit_polynomial(noise_tensor2, predicted_C2, degree)
        integral_value2 = approximate_integral(coefficients2, noise_tensor2.min(), noise_tensor2.max())
        integral_value_tensor2 = integral_value2.requires_grad_(True)
        loss2 = nn.functional.l1_loss(integral_value_tensor2, target_area2)
        total_loss2 += loss2

    avg_loss1 = total_loss1 / len(A_tensors)
    avg_loss2 = total_loss2 / len(A_tensors)
    return (avg_loss1 + avg_loss2) / 2





target_area1 = torch.tensor(0.28, dtype=torch.float32)
target_area2 = torch.tensor(0.125, dtype=torch.float32)
model = SurfacePredictionModel()
optimizer = Adam(list(model.parameters()),lr=0.01)
