# Shapley Value Explainer

## Installing the packages

In [None]:
# Make the `checkpoints` directory if it doesn't exist
import pathlib
import time
pathlib.Path('/content/checkpoints').mkdir(parents=True, exist_ok=True)

# Import the libs
import os
import torch
os.environ['TORCH'] = torch.__version__
print("Pytorch Version:", torch.__version__)

# Installing a few additional libs
!pip install -q torch-scatter -f https://data.pyg.org/whl/torch-${TORCH}.html
!pip install -q torch-sparse -f https://data.pyg.org/whl/torch-${TORCH}.html

# Install Pytorch Geometric
# !pip install -q git+https://github.com/pyg-team/pytorch_geometric.git
! pip install torch_geometric

#Installing DGL
!pip install dgl
import numpy as np
from torch_geometric.utils import subgraph, k_hop_subgraph, degree, mask_to_index
import torch.nn.functional as F

from itertools import chain, combinations
from random import sample
from random import choice
from random import randint
import numpy as np

## Dataset setup

In [None]:
from torch_geometric.datasets import Planetoid
from torch_geometric.transforms import NormalizeFeatures
from torch_geometric.datasets import BAShapes
from torch_geometric.nn import GCN
from torch_geometric.datasets import ExplainerDataset
from torch_geometric.datasets.graph_generator import BAGraph
import torch_geometric.transforms as T
from dgl.data import TreeCycleDataset
from torch_geometric.utils import from_networkx
from torch_geometric.utils import from_dgl
from torch_geometric.transforms import ToUndirected
from dgl.data import TreeGridDataset

#Load the BAShapes Dataset
dataset = ExplainerDataset(
    graph_generator=BAGraph(num_nodes=300, num_edges=5),
    motif_generator='house',
    num_motifs=80,
    transform=T.Constant(),
)
data = dataset[0]

############## Code block to Load TreeCycle and TreeGrid Datasets###########
#Use the appropriate line from the 2 subsequent lines depending on whether TreeCycle or TreeGrid dataset is required
# dataset = TreeCycleDataset()
# #dataset = TreeGridDataset()

# data = dataset[0]
# data = from_dgl(data)

# data['y'] = data['label']
# data['x'] = data['feat']
# del(data['feat'])
# del(data['label'])
# data = ToUndirected()(data)
# print(data.is_directed())
######################################################################

# Printing out the dataset
print(data)
#print(data.x)
print(data.edge_index)
print((data.y)[200:300])
print("***************")


In [None]:
#This function is used to add random noise to the graph. Random noise means adding edges randomly to nodes that don't have any edge between them.

#Parameter:
# add_ratio: The fraction of existing count of edges to be added to the graph.
def add_noise(add_ratio):
  edgeIndexTransposed = torch.transpose(data.edge_index, 0 ,1)
  edgeIndexNumpy = edgeIndexTransposed.numpy()
  edgeIndexList = list(map(tuple, edgeIndexNumpy))
  edgeIndexSet = set(edgeIndexList) #Set of tuples(each tuple is an edge)

  #No of edges to add
  edgesToAdd = int(add_ratio*len(edgeIndexSet))/2

  edgesAdded = 0 #edges actually added
  while(edgesAdded < edgesToAdd):
    nodeList = sample(range((data.x.size())[0]),k = 2 )
    nodeList = tuple(nodeList)
    if(nodeList not in edgeIndexSet):
      edgeIndexSet.add(nodeList)
      reversedEdge = tuple(reversed(nodeList))
      edgeIndexSet.add(reversedEdge)
      edgesAdded+=1

  edgeIndexArray = np.array(list(edgeIndexSet))
  edgeIndexArray = np.transpose(edgeIndexArray)
  edgeIndexTensor = torch.from_numpy(edgeIndexArray)
  data.edge_index = edgeIndexTensor



# Uncomment the lines below to call the add_noise function to add random noise

In [None]:
# noise_ratio = 0.1 # Adding 10% noise
# add_noise(noise_ratio)

# Printing out data after adding random noise. Uncomment the lines below to print data after adding random noise

In [None]:
# print(data)
# print(data.edge_index)
# print((data.y)[200:300])
# print("***************")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Saving and loading the noisy graph This cell stores the noisy graph in a pickle file, so that the same graph can be used across different runs of this notebook. Uncomment the lines below if you want to compare results by tweaking other parts of the model(such as the number of coalitions sampled for Banzhaf index) while keeping the graph constant. Make sure to replace the "PATH_OF_FILE_THAT_WILL_HOLD_THE_NOISY_GRAPH" with the appropriate path.

In [None]:
# import pickle
# path = '/content/drive/MyDrive/'
# file = open(path + <PATH_OF_FILE_THAT_WILL_HOLD_THE_NOISY_GRAPH>, 'wb')
# pickle.dump(data, file)
# file.close()

# Uncomment the lines below to load the noisy graph. Make sure to replace the "PATH_OF_FILE_THAT_HOLDS_THE_NOISY_GRAPH" with the appropriate path.

In [None]:
# # open the file, where you stored the noisy graph
# path = '/content/drive/MyDrive/'
# file = open(path + <PATH_OF_FILE_THAT_HOLDS_THE_NOISY_GRAPH>, 'rb')

# # load information from that file
# data = pickle.load(file)

In [None]:
print(data)

# Setup for edge mask. This is utilized for efficient calculation of Shapley and Banzhaf Index

In [None]:
edge_index_tp = torch.transpose(data.edge_index, 0 ,1)
edge_index_np = edge_index_tp.numpy()

In [None]:
ls = list(map(tuple, edge_index_np))
tuple_edge_index = np.empty(len(ls), dtype=object)
tuple_edge_index[:] = ls

# Simple node classifier

We are explaining the GCN model here, although our proposed method can be used to explain any model, it does not depend on the GNN. However, it is important that the model performs well on the node classification task for our explanation method to work well.

In [None]:
import torch
from torch.nn import Linear
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from torch_geometric.nn import GCNConv


# Train and Test methods. The GCN model hyperparameters are different for BAShapes v/s TreeCycles and TreeGrid. See the cell below, and uncomment the appropriate lines according to whether BAShapes is being evaluated or TreeCycle/TreeGrid

In [None]:
# from IPython.display import Javascript  # Restrict height of output cell.
# display(Javascript('''google.colab.output.setIframeHeight(0, true, {maxHeight: 300})'''))
data = data.to("cpu")
idx = torch.arange(data.num_nodes)
train_idx, test_idx = train_test_split(idx, train_size=0.8, stratify=data.y)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#For BAShapes
model = GCN(data.num_node_features, hidden_channels=20, num_layers=3,
            out_channels=dataset.num_classes)

#For TreeCycles and TreeGrid:
# model = GCN(data.num_node_features, hidden_channels=128, num_layers=2,
#             out_channels=dataset.num_classes)

model = model.to(device)
data = data.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.005)
criterion = torch.nn.CrossEntropyLoss()



def train():
    model.train()
    optimizer.zero_grad()
    out = model(data.x, data.edge_index)
    loss = F.cross_entropy(out[train_idx], data.y[train_idx])
    torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0)
    loss.backward()
    optimizer.step()
    return float(loss)


@torch.no_grad()
def test():
    model.eval()
    pred = model(data.x, data.edge_index).argmax(dim=-1)

    train_correct = int((pred[train_idx] == data.y[train_idx]).sum())
    train_acc = train_correct / train_idx.size(0)

    test_correct = int((pred[test_idx] == data.y[test_idx]).sum())
    test_acc = test_correct / test_idx.size(0)

    return train_acc, test_acc



# Uncomment the lines below to train the GCN. This portion is kept commented to train the model only once, and then save the trained state for further use.



In [None]:

# pbar = tqdm(range(1, 4001))
# for epoch in pbar:
#     loss = train()
#     if epoch == 1 or epoch % 200 == 0:
#         train_acc, test_acc = test()
#         pbar.set_description(f'Loss: {loss:.4f}, Train: {train_acc:.4f}, '
#                              f'Test: {test_acc:.4f}')
# pbar.close()
# model.eval()

In [None]:
data = data.to(device)

# Uncomment the lines below to save the trained model. You may want to save different versions of the model: model saved on noisy graph, and model saved on graph without noise. Make sure to replace "PATH_TO_FILE_THAT_HOLDS_THE_SAVED_MODEL" with the appropriate path

In [None]:
# path = '/content/drive/MyDrive/'
# file = open(path + <PATH_TO_FILE_THAT_HOLDS_THE_SAVED_MODEL>, 'wb')
# torch.save(model, file)
# file.close()

# Loading the saved model. Make sure to replace the "PATH_TO_FILE_THAT_HOLDS_THE_SAVED_MODEL" with the appropriate path.

In [None]:
# open a file, where you stored the saved model
path = '/content/drive/MyDrive/'
file = open(path + <PATH_TO_FILE_THAT_HOLDS_THE_SAVED_MODEL>, 'rb')


# dump information to that file
model = torch.load(file)

## Test accuracy

In [None]:
train_acc, test_acc = test()
print("The train acc is: ", train_acc, " and the test acc is: ", test_acc)

# Explainer

## Some Utility Functions

In [None]:
from torch_geometric.utils import subgraph, k_hop_subgraph, degree, mask_to_index
from torch_geometric.nn.conv import edge_conv

#Returns hop_size_k hop subgraph around the node node_id
def get_edges(data, node_id, hop_size_k):
  #print("In get_edges, hop size is: ", hop_size_k)
  return k_hop_subgraph(node_idx = node_id, num_hops = hop_size_k, edge_index = data.edge_index)

#Returns class probabilities(a vector) and predicted class for node node_id.
def get_node_class_prob (node_id, edge_ind):
  model.eval()
  edge_ind = edge_ind.to(device)
  out = model(data.x, edge_ind)
  pred = out.argmax(dim=-1)
  probs = F.softmax(out, dim=-1).detach()
  return probs[node_id], pred[node_id].item()


In [None]:
#Calculates the change in probability for node_id after removing the set of edges given by hard_set_of_edges
def get_change_in_class_prob_for_set_of_edges (node_id, hard_set_of_edges):
  # get original classification
  model.eval()
  original_prob_vector, original_class = get_node_class_prob(node_id, data.edge_index)
  original_prob = original_prob_vector[original_class].item()
  edge_index = data.edge_index

  set_of_edges = list(i for i in hard_set_of_edges)
  set_of_edges = set(set_of_edges)
  def generate_mask(arr_ele):
    return not(arr_ele in set_of_edges or tuple(reversed(arr_ele)) in set_of_edges)

  # Generate mask
  vectorized_mask_method = np.vectorize(generate_mask)
  mask_arr = vectorized_mask_method(tuple_edge_index)

  # Filter the edge_index_np(numpy array of the transposed edge_index) based on the mask generated above and convert it into a pytorch tensor
  filtered_edge_index = edge_index_np[mask_arr]
  filtered_edge_index = np.transpose(filtered_edge_index)
  filtered_edge_index = torch.from_numpy(filtered_edge_index)

  #remove edges and get new classification
  #edge_index = remove_set_of_edges(edge_index, set_of_edges)
  changed_prob_vector, changed_class = get_node_class_prob(node_id, filtered_edge_index)
  changed_prob = changed_prob_vector[original_class].item()

  #put back edges
  #edge_index = add_set_of_edges(edge_index, set_of_edges)
  return original_prob - changed_prob

# Calculates if the predicted class changes for node_id after removing the set of edges given by hard_set_of_edges. If the class changes, then the return
# value is 0, otherwise 1. hard_set_of_edges is a collection of edges where each edge is represented as a tuple.
def get_change_in_class_for_set_of_edges (node_id, hard_set_of_edges):
  # get original classification
  model.eval()
  original_prob_vector, original_class = get_node_class_prob(node_id, data.edge_index)
  original_prob = original_prob_vector[original_class].item()
  edge_index = data.edge_index

  set_of_edges = list(i for i in hard_set_of_edges)
  set_of_edges = set(set_of_edges)
  def generate_mask(arr_ele):
    return not(arr_ele in set_of_edges or tuple(reversed(arr_ele)) in set_of_edges)

  # Generate mask
  vectorized_mask_method = np.vectorize(generate_mask)
  mask_arr = vectorized_mask_method(tuple_edge_index)

  # Filter the edge_index_np(numpy array of the transposed edge_index) based on the mask generated above and convert it into a pytorch tensor
  filtered_edge_index = edge_index_np[mask_arr]
  filtered_edge_index = np.transpose(filtered_edge_index)
  filtered_edge_index = torch.from_numpy(filtered_edge_index)

  #remove edges and get new classification
  #edge_index = remove_set_of_edges(edge_index, set_of_edges)
  changed_prob_vector, changed_class = get_node_class_prob(node_id, filtered_edge_index)
  changed_prob = changed_prob_vector[original_class].item()

  #put back edges
  #edge_index = add_set_of_edges(edge_index, set_of_edges)
  return (original_prob - changed_prob, original_class == changed_class)


# Set the following hyperparameters in the cell below:

**Budget**: The no of edges given out as the explanation

**HOP_SIZE**: The no of hops up to which an edge is considered as a candidate for explanation  

**Coalition_Count**: The no of coalitions to be considered when calculating the Banzhaf value.

**THRESHOLD**: The threshold parameter in our paper

In [None]:
BUDGET = "JUNK" # No of explanation edges returned for each node
HOP_SIZE = 1 # The no of hops upto which the edges for a node are considered
Coalition_Count = 1500 # Parameter of banzhaf index: No of total coalitions considered from which the critical voters are considered
THRESHOLD = "JUNK"

# Function to compute the Banzhaf Value. The implementation is based on the paper: [Data Banzhaf](https://proceedings.mlr.press/v206/wang23e.html)
**Parameter:**

node_id: The id of the node for which Banzhaf value of edges is calculated

**Returns:**

A list of tuples, where each tuple consists of the edge and its Banzhaf Value

In [None]:
from itertools import chain, combinations
from random import sample
from random import choice
from random import randint

def get_explanation(node_id):
  subset, edge_index, mapping, edge_mask = get_edges(data, node_id, HOP_SIZE)
  #print("Edge index is: ", edge_index)
  edge_list = torch.transpose(edge_index, 0, 1).tolist()
  #print(edge_list)

  for i in edge_list:
    #print(i)
    cur_from_edge = i[0]
    cur_to_edge = i[1]
    edge_list.remove([cur_to_edge, cur_from_edge])
  #return edge_list

  edge_set = set(tuple(x) for x in edge_list)

  edgeUtilityDict = dict() # Dictionary containing the following key value pairs (edge: the sum of utilities of coalitions for which the edge is a part)
  ret_set = set() #This stores the coalitions that have been sampled
  counter = 0

  #Initializing the edgeUtilityDict
  for tup in edge_set:
    edgeUtilityDict[tup] = (0,0)

  #print("Coalition Count: ", Coalition_Count)
  original_prob_vector, original_class = get_node_class_prob(node_id, data.edge_index)
  original_prob = original_prob_vector[original_class].item()

  coalition_dict=dict()#This stores the utility value of each sampled_coalition
  while(len(ret_set) < Coalition_Count):
    #print("In while")
    #sampled_coalition_list = sample(edge_set, k = randint(2,min(len(edge_set),BUDGET+2)) )
    sampled_coalition_list = sample(edge_set,k = min(len(edge_set),BUDGET) )
    sampled_coalition_list = tuple(sorted(sampled_coalition_list))
    probab_change = get_change_in_class_prob_for_set_of_edges (node_id, sampled_coalition_list)

    if((sampled_coalition_list in ret_set) or ((probab_change/original_prob)<THRESHOLD) ):
      #print("if: ", sampled_coalition)
      counter = counter + 1
      if(counter==18):
        break
    else:
      #print("else: ", sampled_coalition)
      ret_set.add(sampled_coalition_list)
      counter = 0
      # probab_change = get_change_in_class_prob_for_set_of_edges (node_id, sampled_coalition_list)
      coalition_dict[sampled_coalition_list] = probab_change
      # totalUtility += probab_change

  global BANZHAF_TIME
  totalUtility = 0 #The sum of utilities of all the coalitions sampled so far
  t00 = time.time()
  for sampled_coalition_list in ret_set:
    probab_change_fake = get_change_in_class_prob_for_set_of_edges (node_id, sampled_coalition_list)
    probab_change = coalition_dict[sampled_coalition_list]
    totalUtility += probab_change
    for tup in sampled_coalition_list:
      existingUtilitySum = edgeUtilityDict[tup][0]
      existingCoalitionCount = edgeUtilityDict[tup][1]
      edgeUtilityDict[tup] = (existingUtilitySum + probab_change, existingCoalitionCount+1)

  banzhaf_dict = dict() # Dictionary storing the banzhaf value of each edge

  for tup in edgeUtilityDict:
    UtilitySum = edgeUtilityDict[tup][0]
    CoalitionCount = edgeUtilityDict[tup][1]
    if(CoalitionCount == 0 or CoalitionCount == len(ret_set)):
      banzhaf_dict[tup] = 0
    else:
      RemainingUtility = totalUtility - UtilitySum
      RemainingCoalitions = len(ret_set) - CoalitionCount
      edgeBanzhafValue = (UtilitySum/CoalitionCount) - (RemainingUtility/RemainingCoalitions)
      banzhaf_dict[tup] = edgeBanzhafValue

  val_banzhaf = sorted(banzhaf_dict.keys(), key= lambda x: banzhaf_dict[x], reverse = True)
  explanation_list_banzhaf = []
  for edg in val_banzhaf:
    explanation_list_banzhaf.append((edg,banzhaf_dict[edg]))
  t11 = time.time()
  BANZHAF_TIME+=(t11-t00)

  return explanation_list_banzhaf



# Set the SAMPLING_SIZE variable which holds the number of permutations considered when calculating the shapley value

In [None]:
SAMPLING_SIZE = 50 #Hyperparameter of shapley value: It is the no of permutations using which shapley value is calculated

# Function to Compute the Shapley Value.
Parameter:

node_id: The id of the node for which Shapley value of edges is calculated

Returns:

A list of tuples, where each tuple consists of the edge and its Shapley Value

In [None]:
from itertools import chain, combinations
from random import sample
from random import choice
from random import randint


def get_explanation_shapley(node_id):
  #print("hop size: ", hop_size)
  subset, edge_index, mapping, edge_mask = get_edges(data, node_id, HOP_SIZE)
  #print("Edge index is: ", edge_index)
  edge_list = torch.transpose(edge_index, 0, 1).tolist()
  #print(edge_list)

  for i in edge_list:
    #print(i)
    cur_from_edge = i[0]
    cur_to_edge = i[1]
    edge_list.remove([cur_to_edge, cur_from_edge])
  #return edge_list

  shapely_dict = dict()
  for edg in edge_list:
    shapely_dict[tuple(edg)] = 0


  rng = np.random.default_rng()
  val_dictionary = dict()
  for counter in range(0, SAMPLING_SIZE):
    current_list = []
    current_val = 0
    perm = rng.permutation(edge_list)
    perm = [tuple(x) for x in perm]
    #t0 = time.time()
    for ind in range(0, len(perm)):
      current_list.append(perm[ind])
      sorted_current_list = tuple(sorted(current_list))
      # print("Single entity: ", perm[ind])
      # print("First n entities:", tuple(perm[0:ind+1]))
      # print(type(tuple(perm[0:ind+1])))
      if(sorted_current_list in val_dictionary):
        val = val_dictionary[sorted_current_list]
      else:
        val = get_change_in_class_prob_for_set_of_edges(node_id, current_list)
        val_dictionary[sorted_current_list] = val

      #print("perm[ind]: ", tuple(perm[ind]))
      shapely_dict[perm[ind]] += (val - current_val)
      current_val = val

    # t1 = time.time()
    # print("Time taken: ",t1-t0)

  for edg in shapely_dict:
    shapely_dict[edg] /= SAMPLING_SIZE

  val = sorted(shapely_dict.keys(), key= lambda x: shapely_dict[x], reverse = True)
  explanation_list = []
  for edg in val:
    explanation_list.append((edg,shapely_dict[edg]))

  return explanation_list


# Uncomment the cell below to get an idea of the number of edges within the set hop size for each node in the graph

In [None]:
# for node_id in range((data.x.size())[0]):
#   subset, edge_index, mapping, edge_mask = get_edges(data, node_id, 1)
#   print((edge_index.size()[1])/2)

# Sampling 50 % of nodes randomly from the graph. The sampling is done thrice. The cell is commented to perform the sampling only once and store the result later(see the next cell)

In [None]:
# numNodes = (data.x.size())[0]
# testNodes = []
# for i in range(0,3):
#   testNodes.append(sample(range(0,numNodes), (int)(numNodes/2) ))

# Save the sampled nodes for use in subsequent runs. Make sure to replace the "PATH_TO_FILE_THAT_WILL_STORE_THE_SAMPLED_NODES" with the appropriate path.

In [None]:
# import pickle
# path = '/content/drive/MyDrive/'
# file = open(path + <PATH_TO_FILE_THAT_WILL_STORE_THE_SAMPLED_NODES>, 'wb')
# pickle.dump(testNodes, file)
# file.close()

# Load the saved sampled nodes. Make sure to replace the "PATH_TO_FILE_THAT_STORES_THE_SAMPLED_NODES" with the appropriate path

In [None]:
import pickle

# open a file, where you stored the pickled data
path = '/content/drive/MyDrive/'
file = open(path+ <PATH_TO_FILE_THAT_STORES_THE_SAMPLED_NODES>, 'rb')

# dump information to that file
testNodes = pickle.load(file)

In [None]:
BANZHAF_TIME = 0

# Initialize the following lists in the cell below:

**thresLis**: The list of thresholds that will be evaluated for a given budget

**budgetList**: The list of budgets that will be evaluated

Note that the computation for the thresholds and budgets listed above will be done for both Shapley and Banzhaf.

Make sure to replace the "NAME_OF_FILE_THAT_STORES_THE_BANZHAF_AND_SHAPLEY_RESULTS" with the appropriate path.

In [None]:
thresLis = [0, 1e-2, 0.1]
budgetList = [3,4,5]
FILE_NAME = <NAME_OF_FILE_THAT_STORES_THE_BANZHAF_AND_SHAPLEY_RESULTS>

In [None]:
from statistics import mean

# Average Fidelity for Banzhaf index

In [None]:
def banFun():
  from statistics import mean

  total_nodes = data.x.shape[0]
  #print("Total nodes: ", len(testNodes[0]))
  global BANZHAF_TIME
  BANZHAF_TIME = 0
  cumu_impact_banzhaf = []
  # t0 = time.time()
  for j in tqdm(testNodes):
    run_impacts = []
    for i in j:
      subset, edge_index, mapping, edge_mask = get_edges(data, i, HOP_SIZE)
      '''
      if edge_index.size()[1] < 15:
        print(i)
        impact = get_impact_of_best_set_by_shapley_value(i, BUDGET, HOP_SIZE, SAMPLING_SIZE)
        if impact != None: #Discard when explenations weren't generated
          cumu_impact.append(impact)
      '''
      #print(i)
      #impact = get_impact_of_best_set_by_shapley_value(i, BUDGET, HOP_SIZE, SAMPLING_SIZE)
      explanationList = get_explanation(i)
      t0 = time.time()
      budgetExplanation = []
      for jbud in range(0,min(BUDGET,len(explanationList)) ):
        budgetExplanation.append(explanationList[jbud][0])

      impact = get_change_in_class_for_set_of_edges (i, budgetExplanation)
      # if impact > 0: #Discard when explenations weren't generated
      #   cumu_impact.append(impact)
      # else:
      #   cumu_impact.append(0)
      run_impacts.append(impact[1])
      t1 = time.time()
      BANZHAF_TIME+=t1-t0
      # BANZHAF_TIME = BANZHAF_TIME/3.0

    cumu_impact_banzhaf.append(mean(run_impacts))


  # t1 = time.time()
  BANZHAF_TIME = BANZHAF_TIME/3.0
  opstr = "Threshold = "+str(THRESHOLD)+ "  Budget= "+ str(BUDGET) + "--> Fidelity= " + str(mean(cumu_impact_banzhaf)) + " Variance= " + str(np.var(cumu_impact_banzhaf)) + " Std Dev = " + str(np.std(cumu_impact_banzhaf))+ " Total time taken: " + str(BANZHAF_TIME)+"\n"
  path = '/content/drive/MyDrive/'
  file = open(path+ FILE_NAME, 'a')
  file.write(opstr)
  file.close()
  print("Threshold = ", THRESHOLD, "  Budget= ", BUDGET,"--> Fidelity= ",mean(cumu_impact_banzhaf)," Variance= ", np.var(cumu_impact_banzhaf), " Std Dev = ", str(np.std(cumu_impact_banzhaf)), " Total time taken: ", BANZHAF_TIME)

# Average Fidelity for Shapley Value

In [None]:
def shapFun():
  from statistics import mean
  total_nodes = data.x.shape[0]
  #print("Total nodes: ", len(testNodes))


  cumu_impact = []
  t0 = time.time()
  for j in tqdm(testNodes):
    #run_impacts = [[],[],[],[]]
    run_impacts = []
    for i in j:

      subset, edge_index, mapping, edge_mask = get_edges(data, i, HOP_SIZE)
      '''
      if edge_index.size()[1] < 15:
        print(i)
        impact = get_impact_of_best_set_by_shapley_value(i, BUDGET, HOP_SIZE, SAMPLING_SIZE)
        if impact != None: #Discard when explenations weren't generated
          cumu_impact.append(impact)
      '''
      #print(i)
      #impact = get_impact_of_best_set_by_shapley_value(i, BUDGET, HOP_SIZE, SAMPLING_SIZE)
      explanationList = get_explanation_shapley(i)

      #print("explanationList: ", explanationList)
      # for bud in [3,5,6,7]:
      #   budgetExplanation = []
      #   for jbud in range(0,min(bud,len(explanationList)) ):
      #     budgetExplanation.append(explanationList[jbud][0])

      #   impact = get_change_in_class_for_set_of_edges (i, budgetExplanation)
      #   if(bud==3):
      #     run_impacts[0].append(impact[1])
      #   else:
      #     run_impacts[bud-4].append(impact[1])

      budgetExplanation = []
      for jbud in range(0,min(BUDGET,len(explanationList)) ):
        budgetExplanation.append(explanationList[jbud][0])

      impact = get_change_in_class_for_set_of_edges (i, budgetExplanation)
      run_impacts.append(impact[1])

    # mytup=()
    # for myvar in range(0,len(run_impacts)):
    #   mytup = mytup + (mean(run_impacts[myvar]),)

    # cumu_impact.append(mytup)
    cumu_impact.append(mean(run_impacts))

  t1 = time.time()
  ttaken = (t1-t0)/3.0
  print("Total time taken: ", ttaken)

  opstr = "  Budget= "+ str(BUDGET) + "--> Fidelity= " + str(mean(cumu_impact)) + " Variance= " + str(np.var(cumu_impact)) + " Std Dev = " + str(np.std(cumu_impact))+ " Total time taken: " + str(ttaken)+"\n"
  path = '/content/drive/MyDrive/'
  file = open(path+ FILE_NAME, 'a')
  file.write(opstr)
  file.close()
  print("Budget= ", BUDGET,"--> Fidelity= ",mean(cumu_impact), " Variance= " + str(np.var(cumu_impact)), " Std Dev = ", np.std(cumu_impact), " Total time taken: ", ttaken)

# The cell below runs a script to calculate average fidelity and time for different budget and threshold(For Banzhaf) combinations for Banzhaf and Shapley

In [None]:
for scriptBudget in budgetList:
  BUDGET = scriptBudget

  path = '/content/drive/MyDrive/'
  file = open(path+ FILE_NAME, 'a')
  file.write("Banzhaf Results:\n")
  file.close()

  for scriptThres in thresLis:
    THRESHOLD = scriptThres
    banFun()

  path = '/content/drive/MyDrive/'
  file = open(path+ FILE_NAME, 'a')
  file.write("\nShapley Results:\n")
  file.close()

  shapFun()

  path = '/content/drive/MyDrive/'
  file = open(path+ FILE_NAME, 'a')
  file.write("\n\n")
  file.close()