In [2]:
import tensorflow as tf
from tensorflow import keras
from scipy import sparse
import awkward as ak
import joblib
import numpy as np

2025-10-22 15:55:40.542012: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Loading File

In [3]:
f = joblib.load('/u/markhoff/Documents/ML/MLPrtdirc/TPBinnedHist/22TO90DEG.pk1')

traintimes  = f['traintimes'][:]
valtimes    = f['valtimes'][:]
testtimes   = f['testtimes'][:]

trainangles = f['trainangles'][:]
valangles   = f['valangles'][:]
testangles  = f['testangles'][:]

trainlabels = f['trainlabels'][:]/2 - 1
vallabels   = f['vallabels'][:]/2 - 1
testlabels  = f['testlabels'][:]/2 - 1

input_dim = traintimes.shape[1] # Assume X.shape = (n_events, n_features), Y.shape = (n_events), then input_dim should be n_features


In [4]:
# ---------------------------------------------------------------
#
#                       PARAMETERS
#
# ---------------------------------------------------------------
class_names = ['Pi+', 'Proton']
num_classes = len(class_names) # Pions or kaons?


batch_size  = 128 # How many events to feed to NN at a time?
nepochs     = 10 # How many epochs?

# ---------------------------------------------------------------

# Defining and Training

In [5]:
class ScheduledFiLM(keras.layers.Layer):
    def __init__(self, **kwargs):
        super(ScheduledFiLM, self).__init__(**kwargs)
        self.lambda_var = tf.Variable(0.0, trainable=False, dtype=tf.float32)
        self.ranp_rate = 0.01
        

    def call(self, inputs):
        x, gamma, beta = inputs
        lam = tf.clip_by_value(self.lambda_var, 0.0, 1.0)
        return (1.0 + lam * gamma) * x + lam * beta
    

In [6]:
class ScheduledFiLMCallback(keras.callbacks.Callback):
    def __init__(self, film_layer, nepochs):
        super(ScheduledFiLMCallback, self).__init__()
        self.film_layer = film_layer
        self.nepochs = nepochs

    def on_epoch_end(self, epoch, logs=None):
        new_lambda = (epoch + 1) / self.nepochs
        self.film_layer.lambda_var.assign(new_lambda)
        print(f'\nUpdated FiLM lambda to {new_lambda:.4f}')

In [None]:
class SparseBatchGenerator(keras.utils.Sequence):
    """
    Converts sparse matrices to dense batches for training during runtime. Full sparse dataset is loaded into memory, and dense batches are created using this class.
    Class keeps ordering and supports shuffling each epoch.

    
    Parameters
    ----------
    *args : ndarray, sparseMatrix, awkwardArray 
        Sparse or dense matrices to be placed into batches. The arguments should include label data as the last argument.
    >>> train_gen = SparseBatchGenerator(times, angles, labels, \*kwargs)
    batch_size : int
        Number of matrices to batch.
    shuffle : bool
        Whether or not to shuffle data on epoch end.
    """

    def __init__(self, *args, batch_size=256, shuffle=True):
        self.args = args
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indices = np.arange(self.args[0].shape[0])
        self.on_epoch_end()

    def __len__(self):
        return np.ceil(self.args[0].shape[0] / batch_size).astype(int)
    
    def __getitem__(self, idx):
        batch_idx = self.indices[idx * self.batch_size:(idx + 1) * self.batch_size]
        batches = [(data[batch_idx].toarray() if isinstance(data, sparse.spmatrix) else data[batch_idx]) for data in self.args]
        batches = [(data[batch_idx].to_numpy() if isinstance(data, ak.Array) else data[batch_idx]) for data in self.args]

        return batches[:-1], batches[-1]
    
    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indices)

train_gen   = SparseBatchGenerator(traintimes, trainangles, trainlabels, batch_size=batch_size, shuffle=True)
val_gen     = SparseBatchGenerator(valtimes, valangles, vallabels, batch_size=batch_size, shuffle=True)
test_gen    = SparseBatchGenerator(testtimes, testangles, testlabels, batch_size=batch_size, shuffle=True)


_ = len(train_gen)
print(train_gen[3])

ValueError: TypeError: sparse array length is ambiguous; use getnnz() or shape[0]
Traceback (most recent call last):

  File "/u/markhoff/Kitten/lib/python3.11/site-packages/scipy/sparse/_base.py", line 449, in __len__
    raise TypeError("sparse array length is ambiguous; use getnnz()"

TypeError: sparse array length is ambiguous; use getnnz() or shape[0]



In [48]:
# Time Histogram Branch
hist_input = keras.Input(shape=(input_dim,))
h = keras.layers.Dropout(0.2)(hist_input)
h = keras.layers.Dense(64, activation='relu')(hist_input)

#h = keras.layers.Dense(64, activation='relu')(h)


# Angle Branch
angle_input = keras.Input(shape=(3,))
a = keras.layers.Dense(64, activation='relu')(angle_input)
#a = keras.layers.Dense(64, activation='relu')(a)

# Produce FiLM parameters
gamma = keras.layers.Dense(64, activation='linear', name='gamma')(a)
beta  = keras.layers.Dense(64, activation='linear', name='beta')(a)
gamma = keras.layers.Lambda(lambda g: 1.0 + 1.5*g)(gamma)
beta  = keras.layers.Lambda(lambda b: 1.5*b)(beta)

# FiLM layer
h_mod = keras.layers.Multiply()([h, gamma])
h_mod = keras.layers.Add()([h_mod, beta])

# Combined layers and output
x = keras.layers.Dense(16, activation='relu')(h_mod)
out = keras.layers.Dense(num_classes, activation='softmax', name='output')(x)

model = keras.Model(inputs=[hist_input, angle_input], outputs=out)
model.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(),
    metrics=['accuracy']
)
#model.summary()

In [49]:
#film = ScheduledFiLM()

train_gen   = SparseBatchGenerator(traintimes, trainangles, trainlabels, batch_size=batch_size, shuffle=True)
val_gen     = SparseBatchGenerator(valtimes, valangles, vallabels, batch_size=batch_size, shuffle=True)
test_gen    = SparseBatchGenerator(testtimes, testangles, testlabels, batch_size=batch_size, shuffle=True)



model.fit(
    train_gen,
    validation_data=val_gen,
    epochs=nepochs, 
    #callbacks=[ScheduledFiLMCallback(film, nepochs)],
)

#test_loss, test_acc = model.evaluate(
#    test_gen, Y_test, verbose=2
#)

#print('\nTest accuracy:', test_acc)
#print('Test loss:', test_loss)

  self._warn_if_super_not_called()


TypeError: `output_signature` must contain objects that are subclass of `tf.TypeSpec` but found <class 'list'> which is not.

In [None]:
%%script false --no-raise-error

import time
import numpy as np

accs = []
effys = []
times = []

preds = []

for num_nodes in range(2, 17, 1):
    print(f'\n\nTraining with {num_nodes} nodes in hidden layer\n')
    t1 = time.time()

    model = keras.Sequential([
        keras.layers.Flatten(input_shape=(input_dim,)),
        keras.layers.Dense(num_nodes, activation='relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(num_nodes, activation='relu'),
        keras.layers.Dense(num_classes)
        ])

    model.compile(optimizer='adam',
                  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    model.fit(train_ds, validation_data=val_ds, epochs=nepochs)

    test_loss, test_acc = model.evaluate(test_ds, verbose=2)

    probability_model = keras.Sequential([model, 
                                         keras.layers.Softmax()])

    predictions = probability_model.predict(test_ds)
    confidence = 0.95 # p-value (1 sigma = 0.68, 2 sigma = 0.95, 3 sigma = 0.997, etc)

    correct     = 0
    total       = 0

    Y_test_labels = np.array([y.numpy() for _, y in test_ds.unbatch()])

    for i, pred in enumerate(predictions):
        preds.append((np.argmax(pred), np.max(pred)))
        if pred[np.argmax(pred)] >= confidence:
            total += 1
            if np.argmax(pred) == Y_test_labels[i]:
                correct += 1

    t2 = time.time()


    accs.append(correct/total if total > 0 else np.nan)
    effys.append(total/predictions.shape[0])
    times.append(t2-t1)
    
    print(f'\nTest accuracy: {test_acc}, time: {t2-t1} s')

print(f'\n\nAccuracies: {accs}')
print(f'Efficiencies: {effys}')
print(f'Times: {times}')
print(preds)
np.save('predictions.npy', preds)

# Confidence Filtering

- If a prediction for a given event does not reach significance, it is disgarded. Only events which reach significance count towards efficiency. 

- Significant events that are correct are recorded as a correct prediction. Significant events that are wrong are recorded as an incorrect prediction.

In [None]:
predictions = model.predict({"hist_input": X_hist_test, "angle_input": X_angle_test})
confidence = 0.003 # p-value (2 sigma = 0.05, 3 sigma = 0.003, etc)

correct     = 0
total       = 0

for i, pred in enumerate(predictions):
    if pred[np.argmax(pred)] >= 1 - confidence:
        total += 1
        if np.argmax(pred) == Y_test[i]:
            correct += 1

try:
    print(f'Accuracy: {correct} out of {total}... {correct/total:0.5f}\nEvents Kept: {total} out of {predictions.shape[0]}... {total/predictions.shape[0]:0.5f}')
except ZeroDivisionError:
    print('No events kept!')

# Test for Dataset

- This code checks if there are any duplicate events in the dataset, and gives the confusion matrix of the NN.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [None]:
def quick_corr(X_hist, X_angle, Y, name="Set"):
    X_all = np.concatenate([X_hist, X_angle], axis=1)
    df = pd.DataFrame(X_all)
    df['label'] = Y
    corr = df.corr()['label'].abs().sort_values(ascending=False)
    print(f"\nTop correlations with label in {name}:")
    print(corr.head(10))



quick_corr(X_hist_train[:2], X_angle_train[:2], Y_train[:2], "Train")
quick_corr(X_hist_val[:2], X_angle_val[:2], Y_val[:2], "Val")
quick_corr(X_hist_test[:2], X_angle_test[:2], Y_test[:2], "Test")

In [None]:
for cls in np.unique(Y_train):
    plt.hist(X_angle_train[Y_train == cls, 2], bins=50, alpha=0.5, label=f'Class {int(cls)}')
plt.xlabel('Angle Feature 0')
plt.ylabel('Count')
plt.legend()
plt.title('Distribution of Angle Feature 2 by Class (Train)')
plt.show()

In [None]:
# Predict on test set
probability_model = keras.Sequential([model, keras.layers.Softmax()])
predictions = probability_model.predict({"hist_input": X_hist_test, "angle_input": X_angle_test})
y_pred = np.argmax(predictions, axis=1)

print(np.arccos(X_angle_test.T[2]).min()*180/np.pi)

X_angle_test = np.arccos(X_angle_test)*180/np.pi

# Bin by angle (e.g., first angle feature)
angle_bins = np.linspace(X_angle_test.T[2].min(), X_angle_test.T[2].max(), 10)
digitized = np.digitize(X_angle_test[:,2], angle_bins)
acc_by_bin = [np.mean(y_pred[digitized == i] == Y_test[digitized == i]) for i in range(1, len(angle_bins))]

plt.plot(angle_bins[1:], acc_by_bin, marker='o')
plt.xlabel('Angle Feature 0')
plt.ylabel('Accuracy')
plt.title('Test Accuracy vs Angle Feature 0')
plt.show()

In [None]:
# --- Confusion Matrix on Test Set ---
probability_model = keras.Sequential([model, keras.layers.Softmax()])
predictions = probability_model.predict({"hist_input": X_hist_test, "angle_input": X_angle_test})
y_pred = np.argmax(predictions, axis=1)
y_true = Y_test.astype(int)

cm = confusion_matrix(y_true, y_pred, labels=range(num_classes))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
disp.plot(cmap='Blues')
plt.title("Confusion Matrix (Test Set)")
plt.show()