In [1]:
import uproot
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from scipy.spatial.distance import cdist
from sklearn.model_selection import cross_val_score, cross_val_predict, train_test_split
from collections import namedtuple, defaultdict
import random
random.seed(42)
import h5py
import pickle
import blosc
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torch_geometric.data import Data
from torch_geometric.nn import GCNConv
from torch_geometric.utils import add_self_loops
from torchvision import transforms
from sklearn.preprocessing import StandardScaler, MinMaxScaler
import joblib # efficiently serializes python objects and is used in saving and loading machine learning models
import json # encode/decode JSON data

## Methods for the pairing datapoints notebook

In [21]:
def branches_from_root_file(filename):
    '''
    Returns the branches from a root file
    '''
    file = uproot.open(filename)
    tree = file[file.keys()[0]]
    branches = tree.arrays()
    return branches

# Original Code:
# file = uproot.open("MyxAODAnalysis_all2D.outputs.root")
# file.keys()
# tree = file['analysis']
# branches = tree.arrays()

In [23]:
def create_data_dict(rangeValue, variables, dataDict):
    for i in range(rangeValue):
        dataDict[f"data_{i}"] = np.concatenate([np.expand_dims(var[i], axis=1) for var in variables], axis=1)
    return dataDict
    
# Original Code:
# data = {}
# for i in range(generalized_range):
#     data[f"data_{i}"] = np.concatenate((np.expand_dims(cell_coordinate_x[i], axis=1), np.expand_dims(cell_coordinate_y[i], axis=1), np.expand_dims(cell_coordinate_z[i], axis=1),
#                         np.expand_dims(cell_eta[i], axis=1), np.expand_dims(cell_phi[i], axis=1),
#                         np.expand_dims(cell_sampling[i], axis=1),
#                         np.expand_dims(cell_noiseSigma[i], axis=1),
#                         np.expand_dims(cell_e[i], axis=1)), axis=1)

In [29]:
def save_dict_to_hdf5(dic, filename):
    """Save a dictionary to an HDF5 file"""
    with h5py.File(filename, 'w') as f:
        _save_dict_to_hdf5(f, dic)
# Opens hdf5 file in write mode. Calls a helper function
# to recursively save the dictionary to the hdf5 file.

def _save_dict_to_hdf5(group, dic):
    """Save a dictionary to an HDF5 group"""
    for key, value in dic.items():
        if isinstance(value, dict):
            subgroup = group.create_group(key)
            _save_dict_to_hdf5(subgroup, value)
        else:
            if isinstance(value, list):
                """Convert list to numpy array before saving"""
                value = np.array(value)
            group[key] = value

In [5]:
# A code to remove permutation variant
def canonical_form(t):
    """Sorts elements of the tuple and converts the sorted list back into a tuple."""
    return tuple(sorted(t))

def remove_permutation_variants(tuple_list):
    """
    Creates a set of unique tuples by converting each tuple to its canonical form.
    Remove permutation variants from a list of tuples.
    Converts set back into a list of tuples.
    """
    unique_tuples = set(canonical_form(t) for t in tuple_list)
    return [tuple(sorted(t)) for t in unique_tuples]

In [1]:
# y = cell_to_cluster_index[i]
# i = pair[0]=z[0], j = pair[1]=z[1]
def cluster_cluster_true(y,i,j):
    return y[i]==y[j] and y[i]!=0
def lone_lone(y,i,j):
    return y[i]==y[j] and y[i]==0
def cluster_cluster_false(y,i,j):
    return y[i]!=y[j] and y[i]!=0 and y[j]!=0
def cluster_lone(y,i,j):
    return y[i]!=y[j] and y[i]!=0 and y[j]==0
def lone_cluster(y,i,j):
    return y[i]!=y[j] and y[i]==0 and y[j]!=0

# x = neighbor_pairs_unique_sorted
# y = cell_to_cluster_index[i]
# z = pair
def assign_index(mapping, x, y):
    out = []
    for pair in x:
        for index, test in mapping.items():
            if test(y,z[0],z[1]):
                out.append(index)
                continue
    return out

def neighbor_pairs_mapping(ll_index, cl_index, lc_index, ccF_index):
    '''
    Set the class value for the background types (integer excluding 1) for the cases 
    where the neighbor pairs both lone cells (ll), one from a cluster and the other 
    a lone cell (cl or lc), or both are from differnt clusters (ccF)
    '''
    pairs_mapping = {1: cluster_cluster_true, ll_index: lone_lone, cl_index: cluster_lone, lc_index: lone_cluster, ccF_index: cluster_cluster_false}
    return pairs_mapping

# list_of_pair_indices=neighbor_pairs_unique_sorted
# index of cluster cell is a part of =cell_to_cluster_index
def label_neighbor_pairs(range_value, cell_to_cluster_index, list_of_pair_indices, mapping):
    neighbor_labels = []
    for i in range(range_value):
        assign_index(mapping=mapping, x=list_of_pair_indices, y=cell_cluster_index[i])      
    return np.array(neighbor_labels.append(out))

## Methods for the data preparation

In [8]:
def hdf5_to_dict(hdf5_file):
    """
    Convert HDF5 file to Python dictionary
    """
    data_dict = {}
    _hdf5_to_dict(hdf5_file, data_dict)
    return data_dict
# Initializes an empty dictionary and calls a function to recursively
# fill this dictionary with data from the hdf5 file.


def _hdf5_to_dict(group, dic):
    """
    Convert HDF5 group to dictionary recursively
    """
    for key, item in group.items():
        if isinstance(item, h5py.Group):
            subgroup = {}
            _hdf5_to_dict(item, subgroup)
            dic[key] = subgroup
        else:
            dic[key] = np.array(item)
# Iterates over items in the hdf5 group. If the item is a group, 
# it creates a new dictionary and calls itself recursively. If the item
# is a dataset, it converts it to a numpy array and stores it in the dictionary.

In [2]:
def indices_of_edges(range_value, true_list, bkg_0, bkg_2, bkg_3):
    '''
    bkg_0 represents the background with a truth value of 0
    bkg_2/3 represents the background with a truth value of 2/3
    '''
    for i in range(range_value):
        true_list.append(list(np.where(neigbor_truth_100evs[i]==1)[0]))
        bkg_0.append(list(np.where(neigbor_truth_100evs[i]==0)[0]))
        bkg_2.append(list(np.where(neigbor_truth_100evs[i]==2)[0]))
        bkg_3.append(list(np.where(neigbor_truth_100evs[i]==3)[0]))
    return np.array(true_list), np.array(bkg_0), np.array(bkg_2), np.array(bkg_3)

def pairs_of_edges(range_value, true_list, bkg_0, bkg_2, bkg_3):
    '''
    bkg_0 represents the background with a truth value of 0
    bkg_2/3 represents the background with a truth value of 2/3
    '''
    for i in range(range_value):
        true_list.append(len(true[i]))
        bkg_0.append(len(bkg_0[i]))
        bkg_2.append(len(bkg_2[i]))
        bkg_3.append(len(bkg_3[i]))
    return np.array(true_list), np.array(bkg_0), np.array(bkg_2), np.array(bkg_3)

In [3]:
def sorting_indices(pair_number_list, true_list, bkg_0, bkg_2, bkg_3):
    sorted_indices = np.argsort(-pair_number_list)
    return true_list[sorted_indices], bkg_0[sorted_indices], bkg_2[sorted_indices], bkg_3[sorted_indices]

In [4]:
def create_train_set_test_set(split_num, true_list, bkg_0, bkg_2, bkg_3):
    return true_list[:split_num],bkg_0[:split_num],bkg_2[:split_num],bkg_3[:split_num], true_list[split_num:],bkg_0[split_num:],bkg_2[split_num:],bkg_3[split_num:]

In [5]:
def scaling_of_features(dynamic_variables, split_num, sorted_indices):
    keys = list(dynamic_variables.keys())
    values = list(dynamic_variables.values())
    #Gets the keys and variables from the dynamic variables array and stores them in arrays.
    
    rearranged_values = [values[i] for i in sorted_indices]
    #Sorts the values with the sorted cluster indices.
    
    rearranged_dict = dict(zip(keys, rearranged_values))
    #Creates a rearranged dictionary out of the keys and sorted values.
    
    data_train = np.concatenate([value for key, value in list(rearranged_dict.items())[:split_num]])
    #Create a training data array.
    
    data_test = np.concatenate([value for key, value in list(dynamic_variables.items())[split_num:]])
    #Create a data testing array.
    
    scaler = MinMaxScaler()
    cellFeatures_trainS = scaler.fit_transform(data_train)
    scaler_filename = "./bscaler_neighbor_data_train_sorted.save"
    joblib.dump(scaler, scaler_filename)
    #Scales the training data with a minmaxscaler, put that scaled data into a training features array, and then save that scaler into a .save file.

In [6]:
def saveDataH5(fileName, dataDict, compressionType):
    with h5py.File(fileName, 'w') as file:
        for key, data in dataDict.items():
            file.create_dataset(key, data=data, compression=compressionType)

def loadDataH5(fileName):
    dataDict = {}
    with h5py.File(fileName, 'r') as file:
        for key in file.keys():
            dataDict[key] = np.array(file[key])
    return dataDict


In [15]:
print("The gnncalofunctions notebook has been successfully run")

The gnncalofunctions notebook has been successfully run
