### Optimizing Ming's graph-generating code for speed

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
### Generic imports
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize, LogNorm
import scipy
import uproot
from tqdm import tqdm
import functools
from glob import glob

### ML-related
import tensorflow as tf
import atlas_mpl_style as ampl
from sklearn.neighbors import NearestNeighbors
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc
import sonnet as snt

### GNN-related
from graph_nets import blocks
from graph_nets import graphs
from graph_nets import modules
from graph_nets import utils_np
from graph_nets import utils_tf
import networkx as nx

In [3]:
# ### GPU Setup
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
# os.environ["CUDA_VISIBLE_DEVICES"] = "3" # pick a number between 0 & 3
# gpus = tf.config.list_physical_devices('GPU') 
# tf.config.experimental.set_memory_growth(gpus[0], True)

In [4]:
### Other setup 
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 20)

params = {'legend.fontsize': 13, 'axes.labelsize': 18}
plt.rcParams.update(params)

SEED = 15
np.random.seed(SEED)
tf.random.set_seed(SEED)

# Run on a single file

### Load files

First, a sample of single neutral pions:

In [5]:
file_path = '../data/neutral_pion_sample.root'
f_pi0 = uproot.open(file_path)

FileNotFoundError: file not found

    '../data/neutral_pion_sample.root'

Files may be specified as:
   * str/bytes: relative or absolute filesystem path or URL, without any colons
         other than Windows drive letter or URL schema.
         Examples: "rel/file.root", "C:\abs\file.root", "http://where/what.root"
   * str/bytes: same with an object-within-ROOT path, separated by a colon.
         Example: "rel/file.root:tdirectory/ttree"
   * pathlib.Path: always interpreted as a filesystem path or URL only (no
         object-within-ROOT path), regardless of whether there are any colons.
         Examples: Path("rel:/file.root"), Path("/abs/path:stuff.root")

Functions that accept many files (uproot.iterate, etc.) also allow:
   * glob syntax in str/bytes and pathlib.Path.
         Examples: Path("rel/*.root"), "/abs/*.root:tdirectory/ttree"
   * dict: keys are filesystem paths, values are objects-within-ROOT paths.
         Example: {"/data_v1/*.root": "ttree_v1", "/data_v2/*.root": "ttree_v2"}
   * already-open TTree objects.
   * iterables of the above.


Define the primary DataFrame:

In [None]:
df = f_pi0['EventTree'].arrays(["cluster_cell_E", "cluster_cell_ID", "cluster_E", "cluster_Eta", "cluster_Phi"], library="pd")
df.reset_index(inplace=True) # flatten MultiIndexing

Define the cell geometry DataFrame:

In [None]:
df_geo = f_pi0['CellGeo'].arrays(library="pd")
df_geo = df_geo.reset_index() # remove redundant multi-indexing
df_geo.drop(columns = ["entry", "subentry"], inplace=True)

### Add x,y,z coordinates
df_geo["cell_geo_x"] = df_geo["cell_geo_rPerp"] * np.cos(df_geo["cell_geo_phi"])
df_geo["cell_geo_y"] = df_geo["cell_geo_rPerp"] * np.sin(df_geo["cell_geo_phi"])
cell_geo_theta = 2*np.arctan(np.exp(-df_geo["cell_geo_eta"]))
df_geo["cell_geo_z"] = df_geo["cell_geo_rPerp"] / np.tan(cell_geo_theta)

#### Define graph-making function

In [None]:
def make_graph(event: pd.Series, geo_df: pd.DataFrame, is_charged=False):
    """
    Creates a graph representation of an event
    
    inputs
    event (pd.Series) one event/row from EventTree
    geo_df (pd.DataFrame) the CellGeo DataFrame mapping cell_geo_ID to information about the cell
    is_charged (bool) True for charged pion, False for uncharged pion
    
    returns
    A pair of graph representations of the event for the GNN (train_graph, target_graph)
    returns (None, None) if no cell energies detected
    """
    
    ### No cell energies present
    if len(event["cluster_cell_E"]) == 0:
        return None, None
    
    ### Get cell geometry information for this particular event
    temp_df = geo_df[geo_df["cell_geo_ID"].isin([item for sublist in event["cluster_cell_ID"] for item in sublist])]
    temp_df = temp_df.set_index("cell_geo_ID")
    ### Assign cell energies
    for cell_id, cell_e in zip(
        [item for sublist in event["cluster_cell_ID"] for item in sublist],
        [item for sublist in event["cluster_cell_E"] for item in sublist]
    ):
        temp_df.loc[int(cell_id), "cell_E"] = cell_e
    
    ### Define node features
    n_nodes = temp_df.shape[0]
    node_features = ["cell_E", "cell_geo_eta",
                     "cell_geo_phi", "cell_geo_rPerp",
                     "cell_geo_deta", "cell_geo_dphi",
                     "cell_geo_volume"]
    nodes = temp_df[node_features].to_numpy(dtype=np.float32).reshape(-1, len(node_features))
    
    ### Apply k-NN search to find cell neighbors
    # NOTE FAIR also has a faster algo for KNN search. Might want to try it
    k = 6
    k = min(n_nodes, k)
    
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree').fit(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    distances, indices = nbrs.kneighbors(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    
    senders = np.repeat([x[0] for x in indices], k-1)               # k-1 for no self edges
    receivers = np.array([x[1:] for x in indices]).flatten()        # x[1:] for no self edges
    edges = np.array([x[1:] for x in distances], dtype=np.float32).flatten().reshape(-1, 1)
    n_edges = len(senders)
        
    global_features = ["cluster_E", "cluster_Eta", "cluster_Phi"]
    global_values = np.asarray(event[global_features]).astype('float32')
    
    input_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": global_values            # np.array([n_nodes], dtype=np.float32)
    }
    
    target_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": np.array([int(is_charged)], dtype=np.float32)
    }

    input_graph = utils_tf.data_dicts_to_graphs_tuple([input_datadict])
    target_graph = utils_tf.data_dicts_to_graphs_tuple([target_datadict])
    
    return input_graph, target_graph

def make_dict(event: pd.Series, geo_df: pd.DataFrame, is_charged=False):
    """
    Creates a graph representation of an event
    
    inputs
    event (pd.Series) one event/row from EventTree
    geo_df (pd.DataFrame) the CellGeo DataFrame mapping cell_geo_ID to information about the cell
    is_charged (bool) True for charged pion, False for uncharged pion
    
    returns
    A pair of graph representations of the event for the GNN (train_graph, target_graph)
    returns (None, None) if no cell energies detected
    """
    
    ### No cell energies present
    if len(event["cluster_cell_E"]) == 0:
        return None, None
    
    ### Get cell geometry information for this particular event
    temp_df = geo_df[geo_df["cell_geo_ID"].isin([item for sublist in event["cluster_cell_ID"] for item in sublist])]
    temp_df = temp_df.set_index("cell_geo_ID")
    ### Assign cell energies
    for cell_id, cell_e in zip(
        [item for sublist in event["cluster_cell_ID"] for item in sublist],
        [item for sublist in event["cluster_cell_E"] for item in sublist]
    ):
        temp_df.loc[int(cell_id), "cell_E"] = cell_e
    
    ### Define node features
    n_nodes = temp_df.shape[0]
    node_features = ["cell_E", "cell_geo_eta",
                     "cell_geo_phi", "cell_geo_rPerp",
                     "cell_geo_deta", "cell_geo_dphi",
                     "cell_geo_volume"]
    nodes = temp_df[node_features].to_numpy(dtype=np.float32).reshape(-1, len(node_features))
    
    ### Apply k-NN search to find cell neighbors
    # NOTE FAIR also has a faster algo for KNN search. Might want to try it
    k = 6
    k = min(n_nodes, k)
    
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree').fit(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    distances, indices = nbrs.kneighbors(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    
    senders = np.repeat([x[0] for x in indices], k-1)               # k-1 for no self edges
    receivers = np.array([x[1:] for x in indices]).flatten()        # x[1:] for no self edges
    edges = np.array([x[1:] for x in distances], dtype=np.float32).flatten().reshape(-1, 1)
    n_edges = len(senders)
        
    global_features = ["cluster_E", "cluster_Eta", "cluster_Phi"]
    global_values = np.asarray(event[global_features]).astype('float32')
    
    input_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": global_values            # np.array([n_nodes], dtype=np.float32)
    }
    
    target_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": np.array([int(is_charged)], dtype=np.float32)
    }

#     input_graph = utils_tf.data_dicts_to_graphs_tuple([input_datadict])
#     target_graph = utils_tf.data_dicts_to_graphs_tuple([target_datadict])
    
    return input_datadict, target_datadict

In [None]:
%%timeit
n_entries = 100
graph_list = []
for i in range(n_entries):
    graph_list.append(make_graph(df.iloc[i], geo_df=df_geo, is_charged=False))
    
input_graph_list = [tuple[0] for tuple in graph_list]
target_graph_list = [tuple[1] for tuple in graph_list]

Pick an event to look at:

In [None]:
# graph_list = []
# for i in tqdm(range(len(df))):
#     graph_list.append(make_graph(df.iloc[i], geo_df=df_geo, is_charged=False))

In [None]:
n_entries = 20
graph_list = []
for i in range(n_entries):
    graph_list.append(make_graph(df.iloc[i], geo_df=df_geo, is_charged=False))

input_graph_list = [tuple[0] for tuple in graph_list]
target_graph_list = [tuple[1] for tuple in graph_list]

graph = input_graph_list[15] # pick event #15, say
print(graph.globals)

In [None]:
import pickle 
with open('test.pkl', 'wb') as f:
    pickle.dump(graph_list, f)

In [None]:
# try reading it back in...
with open('test.pkl', 'rb') as f:
    graph_list_loaded = pickle.load(f)

In [None]:
input_graph_list[0].n_node

In [None]:
target_graph_list[0].n_node

In [None]:
graph_list[10][0].globals

# Run over all files with multiprocessing

In [5]:
pi0_files = glob('../data/*singlepi0*/*.root')
pion_files = glob('../data/*singlepion*/*.root')

In [8]:
uproot.open(pion_files[0])['EventTree'].keys()

['runNumber',
 'eventNumber',
 'lumiBlock',
 'coreFlags',
 'mcEventNumber',
 'mcChannelNumber',
 'mcEventWeight',
 'nTruthPart',
 'G4PreCalo_n_EM',
 'G4PreCalo_E_EM',
 'G4PreCalo_n_Had',
 'G4PreCalo_E_Had',
 'truthVertexX',
 'truthVertexY',
 'truthVertexZ',
 'truthPartPdgId',
 'truthPartStatus',
 'truthPartBarcode',
 'truthPartPt',
 'truthPartE',
 'truthPartMass',
 'truthPartEta',
 'truthPartPhi',
 'nTrack',
 'trackPt',
 'trackP',
 'trackMass',
 'trackEta',
 'trackPhi',
 'trackNumberOfPixelHits',
 'trackNumberOfSCTHits',
 'trackNumberOfPixelDeadSensors',
 'trackNumberOfSCTDeadSensors',
 'trackNumberOfPixelSharedHits',
 'trackNumberOfSCTSharedHits',
 'trackNumberOfPixelHoles',
 'trackNumberOfSCTHoles',
 'trackNumberOfInnermostPixelLayerHits',
 'trackNumberOfNextToInnermostPixelLayerHits',
 'trackExpectInnermostPixelLayerHit',
 'trackExpectNextToInnermostPixelLayerHit',
 'trackNumberOfTRTHits',
 'trackNumberOfTRTOutliers',
 'trackChiSquared',
 'trackNumberDOF',
 'trackD0',
 'trackZ0',
 '

In [None]:
def process(file):
    n_events = len(uproot.open(file)['EventTree'].arrays(["cluster_cell_E", "cluster_cell_ID", "cluster_E", "cluster_Eta", "cluster_Phi"], library="pd"))
    file_name = file.split('.')[-2]
    print("File {} has {:,} events.".format(file_name, n_events))

In [None]:
for file in pi0_files[:1]:
    process(file)

### Define functions

In [42]:
from multiprocessing import Pool
import time
import pickle

def async_tqdm(func, argument_list, num_processes):
    pool = Pool(processes=num_processes)
    jobs = [pool.apply_async(func=func, args=(*argument,)) if isinstance(argument, tuple) else pool.apply_async(func=func, args=(argument,)) for argument in argument_list]
    pool.close()
    result_list_tqdm = []
    for job in tqdm(jobs, desc="Jobs"):
        result_list_tqdm.append(job.get())
    return result_list_tqdm

def make_graph(event: pd.Series, geo_df: pd.DataFrame, is_charged=False):
    """
    Creates a graph representation of an event
    
    inputs
    event (pd.Series) one event/row from EventTree
    geo_df (pd.DataFrame) the CellGeo DataFrame mapping cell_geo_ID to information about the cell
    is_charged (bool) True for charged pion, False for uncharged pion
    
    returns
    A pair of graph representations of the event for the GNN (train_graph, target_graph)
    returns (None, None) if no cell energies detected
    """
    
    ### No cell energies present
    if len(event["cluster_cell_E"]) == 0:
        return None, None
    
    ### Get cell geometry information for this particular event
    temp_df = geo_df[geo_df["cell_geo_ID"].isin([item for sublist in event["cluster_cell_ID"] for item in sublist])]
    temp_df = temp_df.set_index("cell_geo_ID")
    ### Assign cell energies
    for cell_id, cell_e in zip(
        [item for sublist in event["cluster_cell_ID"] for item in sublist],
        [item for sublist in event["cluster_cell_E"] for item in sublist]
    ):
        temp_df.loc[int(cell_id), "cell_E"] = cell_e
    
    ### Define node features
    n_nodes = temp_df.shape[0]
    node_features = ["cell_E", "cell_geo_eta",
                     "cell_geo_phi", "cell_geo_rPerp",
                     "cell_geo_deta", "cell_geo_dphi",
                     "cell_geo_volume"]
    nodes = temp_df[node_features].to_numpy(dtype=np.float32).reshape(-1, len(node_features))
    
    ### Apply k-NN search to find cell neighbors
    # NOTE FAIR also has a faster algo for KNN search. Might want to try it
    k = 6
    k = min(n_nodes, k)
    
    nbrs = NearestNeighbors(n_neighbors=k, algorithm='ball_tree').fit(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    distances, indices = nbrs.kneighbors(temp_df[["cell_geo_x", "cell_geo_y", "cell_geo_z"]])
    
    senders = np.repeat([x[0] for x in indices], k-1)               # k-1 for no self edges
    receivers = np.array([x[1:] for x in indices]).flatten()        # x[1:] for no self edges
    edges = np.array([x[1:] for x in distances], dtype=np.float32).flatten().reshape(-1, 1)
    n_edges = len(senders)
        
    global_features = ["cluster_E", "cluster_Eta", "cluster_Phi"]
    global_values = np.asarray(event[global_features]).astype('float32')
    
    input_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": global_values            # np.array([n_nodes], dtype=np.float32)
    }
    
    target_datadict = {
        "n_node": n_nodes,
        "n_edge": n_edges,
        "nodes": nodes,
        "edges": edges,
        "senders": senders,
        "receivers": receivers,
        "globals": np.array([int(is_charged)], dtype=np.float32)
    }

    input_graph = utils_tf.data_dicts_to_graphs_tuple([input_datadict])
    target_graph = utils_tf.data_dicts_to_graphs_tuple([target_datadict])
    
    return input_graph, target_graph

def process_file(file, is_charged: bool = False):
    ### Define primary dataframe
    f = uproot.open(file)
    df = f['EventTree'].arrays(["cluster_cell_E", "cluster_cell_ID", "cluster_E", "cluster_Eta", "cluster_Phi"], library="pd")
    df.reset_index(inplace=True) # flatten MultiIndexing

    ### Define cell geometry dataframe
    df_geo = f['CellGeo'].arrays(library="pd")
    df_geo = df_geo.reset_index() # remove redundant multi-indexing
    df_geo.drop(columns = ["entry", "subentry"], inplace=True)

    ### Add x,y,z coordinates
    df_geo["cell_geo_x"] = df_geo["cell_geo_rPerp"] * np.cos(df_geo["cell_geo_phi"])
    df_geo["cell_geo_y"] = df_geo["cell_geo_rPerp"] * np.sin(df_geo["cell_geo_phi"])
    cell_geo_theta = 2*np.arctan(np.exp(-df_geo["cell_geo_eta"]))
    df_geo["cell_geo_z"] = df_geo["cell_geo_rPerp"] / np.tan(cell_geo_theta)

    ### Make the graphs for the specified events
    graph_list = []
    n_events = len(df) #100 # limit dataframe size for testing
    for i in tqdm(range(len(df[:n_events])), desc="Events"):
        graph_list.append(make_graph(df.iloc[i], geo_df=df_geo, is_charged=is_charged))

    ### Save Pickle file, with zero-indexing:
    if is_charged == False:
        save_dir = "../graphs/neutral_pion/"
    elif is_charged == True:
        save_dir = "../graphs/charged_pion/"
    os.makedirs(save_dir, exist_ok=True)
    filepath = os.path.join(save_dir,file.split('.')[-2][1:]+'.pkl')
    with open(filepath, 'wb') as f:
        pickle.dump(graph_list, f)

### Test speedup:

In [43]:
n_files = 1

Neutral pion samples:

In [44]:
async_tqdm(func=process_file, argument_list=zip(pi0_files[:n_files], [False]*n_files), num_processes=1)

Events:   3%|▎         | 582/21147 [00:21<12:33, 27.29it/s]
Jobs:   0%|          | 0/1 [00:26<?, ?it/s]Process ForkPoolWorker-20:



KeyboardInterrupt: 

Traceback (most recent call last):
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "<ipython-input-42-e67df3903f73>", line 112, in process_file
    graph_list.append(make_graph(df.iloc[i], geo_df=df_geo, is_charged=is_charged))
  File "<ipython-input-42-e67df3903f73>", line 40, in make_graph
    temp_df.loc[int(cell_id), "cell_E"] = cell_e
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/site-packages/pandas/core/indexing.py", line 692, in __setitem__
    iloc._setitem_with_indexer(indexer, value, self.name)
  File "/clusterfs/ml4hep/mpettee/min

In [None]:
async_tqdm(func=process_file, argument_list=zip(pi0_files[:n_files], [False]*n_files), num_processes=2)

In [20]:
async_tqdm(func=process_file, argument_list=zip(pi0_files[:n_files], [False]*n_files), num_processes=4)

Jobs:   0%|          | 0/4 [2:35:02<?, ?it/s]Process ForkPoolWorker-12:
Process ForkPoolWorker-11:
Process ForkPoolWorker-9:
Process ForkPoolWorker-10:

Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):


KeyboardInterrupt: 

Traceback (most recent call last):
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/clusterfs/ml4hep/mpettee/miniconda3/envs/ml4pions/lib/python3.

Charged pion samples:

In [None]:
async_tqdm(func=process_file, argument_list=zip(pion_files[:n_files], [True]*n_files), num_processes=1)

In [None]:
async_tqdm(func=process_file, argument_list=zip(pion_files[:n_files], [True]*n_files), num_processes=2)

In [None]:
async_tqdm(func=process_file, argument_list=zip(pion_files[:n_files], [True]*n_files), num_processes=4)