# ROOT preprocessing pipeline for machine learning with TensorFlow


#### by Matthias Komm (CERN) <div align=right><img width=10% style="margin-right:10%" src="figures/cms.png"/></div>


# Prerequisites
* setup of this notebook: python 2.7, tensorflow 1.6, keras 2.1.6, root 6.18
* binaries taken from anaconda/conda-forge repositories

# Introduction
* common machine learning frameworks: PyTorch, TensorFlow (+keras), MXNet, scikit-learn, XGBoost
* frameworks require data for training to be typically in the form of numpy arrays, panda dataframes, etc.
* convenient to store training data in npz or hdf5 files 

# Example 1: MNIST
training neural network on MNIST dataset for handwritten digit classification

In [None]:
# note: matplotlib and ROOT needs to be imported before tensorflow/keras
# this will otherwise cause a libpng version mismatch
import matplotlib.pyplot as plt
import ROOT

from keras.datasets import mnist
from keras.utils import np_utils 
import numpy as np

#download the MNIST dataset;
(X_train, y_train), (X_test, y_test) = mnist.load_data(path='cache.npz')

#at this point the dataset is being held in RAM
print "Training data set size: %.2f MB" % ((X_train.size*X_train.itemsize)/1e6)
print "Test data set size: %.2f MB" % ((X_test.size*X_test.itemsize)/1e6)

In [None]:
#some preprocessing
nb_classes = 10 # number of unique digits

#convert number to one-hot encoding
Y_train = np_utils.to_categorical(y_train, nb_classes)

#expand dimension; last dimension interpreted as color channel
#here: 1 channel = grayscale
X_train = np.expand_dims(X_train,axis=3)

#train only the first 1000 images for this example
X_train = X_train[:1000]
Y_train = Y_train[:1000]

### Setup convolutional neural network model

In [None]:
import keras

model_mnist = keras.models.Sequential()
model_mnist.add(keras.layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28,28,1)))
for _ in range(3):
    model_mnist.add(keras.layers.Conv2D(16, (3, 3), activation='relu'))
    model_mnist.add(keras.layers.MaxPooling2D(pool_size=(2,2)))

model_mnist.add(keras.layers.Flatten())
model_mnist.add(keras.layers.Dense(10,activation='relu'))
model_mnist.add(keras.layers.Dense(10,name='last_dense'))
model_mnist.add(keras.layers.Activation('softmax'))

model_mnist.summary()

### Train neural network

In [None]:
model_mnist.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
history = model_mnist.fit(X_train, Y_train, batch_size=128, epochs=2, verbose=1)

# Data for TensorFlow
* TensorFlow (TF) backend for keras used here
* calling `model_mnist.fit` populates tensor objects as follows 

In [None]:
import tensorflow as tf
from keras import backend as K

#construct TF graph
input_tensor = tf.placeholder(tf.float32,shape=(None,28,28,1))
input_layer = keras.layers.Input(tensor=input_tensor)
output = model_mnist(input_layer)

#execute TF graph
sess = K.get_session()
sess.run(output, feed_dict={input_tensor: X_train[0:1]})

# Example 1: Summary
* basic ingredients for constructing and training a neural network
* data is being held in memory
* format: numpy array
* keras' `model.fit` method feeds data to TensorFlow using placeholders

# Intermezzo: Machine learning for jet classification
* collimated spray of particles (=jet) producted during hardonization of a quark or gluon
* in CMS, particles are clustered according to <a href="https://arxiv.org/abs/0802.1189">anti-kT algorithm</a> to construct jets
* example multijet event recorded by the CMS experiment; jets are marked by the yellow cones
<div style="width: 70%;"><div style='border-width: 1px; border-style: solid; border-color: gray; padding: 10px;  background: white; color: gray; font-size: 0.65em'><img style="margin: auto;" width=50% src="figures/event.jpg"/>CMS Document 12150-v2</div></div>

* classification of jets according to their origin (b,c,uds quarks, gluons, boosted W, top quark)
* several algorithms developed over the years; <a href="https://arxiv.org/abs/1712.07158">JINST 13 (2018)</a>, <a href="https://arxiv.org/abs/2004.08262">JINST 15 (2020)</a>
* typical inputs: global jet features (e.g. kinematics), secondary vertices, clustered particles (charged tracks, neutrals from calorimeter deposits)


# Example 2: Jet classification
* using here a sample of simulated QCD events from the CMS <a href="http://opendata.cern.ch/record/12021">open data set</a>
* jet and constituent properties have been saved in a simple ROOT TTree

In [None]:
import uproot
import os
import glob

input_file_list = glob.glob('Samples/QCD_Pt-15to7000_unpacked_train1_[0-9]*.root')
print "Files:",input_file_list

In [None]:
#exploring the files
uproot_file = uproot.open("Samples/QCD_Pt-15to7000_unpacked_train1_1.root")
jets = uproot_file['jets']
print "number of jets:",len(jets)

In [None]:
#print some labels/features
labels = ['isB','isC','isS','isUD','isG']
features = ['global_pt','global_eta','nsv'] #note: global_pt is in log10(pt / 1 GeV)
print "%4s"%("#jet"),
for label in labels:
    print "%6s"%(label),
for feature in features:
    print "%10s"%(feature),
print
print "-"*100
for i in range(10,20):
    print "%4i"%(i),
    for label in labels:
        print "%6i"%(jets['jetorigin_'+label].array()[i]),
    for feature in features:
        print "%10.2f"%(jets[feature].array()[i]),
    print

In [None]:
#generator to iterate over input files
def generate(file_list, labels, features, batch_size=10):
    for file_name in file_list:
        uproot_file = uproot.open('Samples/'+file_name)
        jets = uproot_file['jets']
        for batch_index in range(len(jets)/batch_size):
            batch_features = []
            batch_labels = []
            for feature in features:
                batch_features.append(jets[feature].array(
                    entrystart=batch_index*batch_size,
                    entrystop=(batch_index+1)*batch_size
                ))
            for label in labels:
                batch_labels.append(jets['jetorigin_'+label].array(
                    entrystart=batch_index*batch_size,
                    entrystop=(batch_index+1)*batch_size
                ))
            yield np.stack(batch_features,axis=1), np.stack(batch_labels,axis=1)

            
for X,y in generate(os.listdir("Samples"),labels,features):
    print X
    print y
    break

# Train a simple neural network to classify jets

In [None]:
#build keras model
model_jets = keras.models.Sequential()
model_jets.add(keras.layers.Dense(10, activation='relu', input_shape=(len(features),)))
for _ in range(3):
    model_jets.add(keras.layers.Dense(10,activation='relu'))
model_jets.add(keras.layers.Dense(len(labels)))
model_jets.add(keras.layers.Activation('softmax'))
model_jets.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

model_jets.summary()

In [None]:
#train model using generator
history = model_jets.fit_generator(generate(os.listdir("Samples"),labels,features),steps_per_epoch=100, epochs=2, verbose=1)

# Example 2: Summary
* data read sequentially from disk using `generator` pattern
* advantages
    * works with arbirary large data sets
    * does not dictate decoder (here: uproot)
    * allows to select features
    * optional preprocessing possible
* disadvantages
    * I/O limited; training needs to wait until data has been read and processed
    * no shuffeling of batches

* obvious solution
    * read data asychroniously from disk
    * populate cache
    * train on random batch from cache
* technical problems
    * need to deal with threads and points of synchronization

# TensorFlow queue approach

* TensorFlow v1 allows to build complex input pipelines using queues
<div style="width: 80%;"><div style='border-width: 1px; border-style: solid; border-color: gray; padding: 10px;  background: white;'><img style='margin: auto;' width=70% src="figures/AnimatedFileQueues.gif"/></div></div>
* first queue holds list of file names for reading
* files are read in parallel
* decoded data enqueued as tensors into second queue
* second queue caches tensors from which (random) batches can be dequeued for training
* problem: no native TensorFlow operation to read ROOT TTrees



# Working with queues
* construct a FIFO or random queue
* enqueue/dequeue elements

In [None]:
with tf.Session() as sess:
    with tf.device('/cpu:0'):
        queue_fifo = tf.FIFOQueue(
            capacity=100, 
            dtypes=[tf.float32], 
            shapes=[()]
        )
        
        queue_rnd = tf.RandomShuffleQueue(
            capacity=100,
            min_after_dequeue=1, 
            dtypes=[tf.float32], 
            shapes=[()],
        )
        
        queue = queue_fifo
        #queue = queue_rnd

        element = tf.placeholder(tf.float32,shape=())
        enqueue_op = queue.enqueue(element)
        dequeue_op = queue.dequeue()
        dequeue_many_op = queue.dequeue_many(4)
        
    init_op = tf.group(
        tf.global_variables_initializer(),
        tf.local_variables_initializer()
    )

    sess.run(init_op)
    
    values = np.arange(0,10)
    print 'values to be enqueued:',values    
    for i in range(len(values)):
        sess.run(enqueue_op, feed_dict = {element: values[i]})
        
    for i in range(3):
        print 'dequeued:',sess.run(dequeue_op)
    print 'dequeued many',sess.run(dequeue_many_op)

# A TTree reader as TensorFlow operation
* written in C++ as plugin for TensorFlow
* linked against ROOT to read TTrees
* employ <a href="https://cmake.org/">cmake</a> for compiling the plugin

In [None]:
%%bash
mkdir Ops/build || rm -rf Ops/build/*
cd Ops/build

#create make file from cmake configuration
cmake -DCMAKE_INSTALL_PREFIX=$PWD/release ..

In [None]:
%%bash
cd Ops/build

#compile and install library under given path
make -j
make install

In [None]:
import sys
import os

# add to python path for import
sys.path.append(os.path.join(os.getcwd(),'Ops','build','release'))

In [None]:
# module can now be imported
import rtf
print rtf.root_reader

# Example 3: A simple pipeline
* file name queue => root reader => tensor queue => random batch

In [None]:
def create_pipeline_simple(file_list,features_dict,batch_size=5):
    # place all operations on cpu; leave gpu for training
    # (note: this notebook has no gpu support)
    with tf.device('/cpu:0'):
        
        # 1st queue hold random list of file names
        file_list_queue = tf.train.string_input_producer(
            file_list,
            num_epochs=1,
            seed = 234567,
            shuffle=True
        )
        
        # custom reader will dequeue from queue and produce tensors
        reader = rtf.root_reader(
            file_list_queue, 
            features_dict, 
            "jets", # TTree name
            batch=5 #numer of sequential reads
        ).batch()

        # tensors are enqueue in 2nd queue 
        batch = tf.train.shuffle_batch_join(
            [reader],
            batch_size=batch_size,
            capacity=5*batch_size,
            min_after_dequeue=2*batch_size,
            seed = 234567,
            enqueue_many=True
        )
        
        # calling sess.run(batch) will dequeue a random batch
        # of tensors from the queue
        return batch

In [None]:
feature_dict_1 = {
    "globalvars": {
        "branches": ['global_pt','global_eta','nsv'],
    },
}

feature_dict_2 = {
    "globalvars": { # global jet features
        "branches": ['global_pt','global_eta','nsv'],
    },
    "truth": { # jet label determined from simulation truth
        "branches":[
            'jetorigin_isB||jetorigin_isBB||jetorigin_isGBB||jetorigin_isLeptonic_B||jetorigin_isLeptonic_C',         
            'jetorigin_isC||jetorigin_isCC||jetorigin_isGCC',
            'jetorigin_isUD||jetorigin_isS',
            'jetorigin_isG'       
        ],
    },
    "sv" : { # secondary vertices inside jet
        "branches":[
            'sv_mass',
            'sv_chi2',
            'sv_dxy',        
        ],
        "max":2 # allow up to 2 vertices per jet; zero-padded
    },
}

feature_dict = feature_dict_1

file_list = glob.glob('Samples/QCD_Pt-15to7000_unpacked_train1_[0-9]*.root')

with tf.Session() as sess:
    batch = create_pipeline_simple(file_list,feature_dict)
            
    init_op = tf.group(
        tf.global_variables_initializer(),
        tf.local_variables_initializer()
    )

    sess.run(init_op)

    # manages thread pool
    coord = tf.train.Coordinator()
    
    # run queues asynchronous 
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    # get a single batch
    batch_value = sess.run(batch)
    for k,v in batch_value.iteritems():
        print k,v.shape
        print v
        print
    
    
    coord.request_stop()
    coord.join()

# Example 4: A realistic input pipeline
* use multiple parallel readers to populate tensor queue
* preprocess tensors with random sampling
* goal: achieve same $p_\mathrm{T}$ and $|\eta|$ distribution per jet label to reduce bias => neural network is optimized at each ($p_\mathrm{T}$, $|\eta|$) point

In [None]:
# import realistic dict of features containing 
#  * jet labels from simulation ('truth')
#  * global jet properties ('globalvar')
#  * secondary vertex properties ('sv')
#  * charged & neutral particle properties ('cpf' & 'npf')
from realistic_feature_dict import feature_dict
print "%14s  |  %12s"%("feature group","number of features")
print "-"*40
for k,v in feature_dict.iteritems():
    if v.has_key('max'):
        print "%12s    |%8i x %-2i"%(k,len(v['branches']),v['max'])
    else:
        print "%12s    |%8i"%(k,len(v['branches']))

### Create histograms for resampling

In [None]:
import my_root_style

def create_weight_histograms(input_file_list,features):
    hists_per_class = []
    chain = ROOT.TChain('jets','jets')
    for f in input_file_list:
        chain.AddFile(f)
        
    for i,name in enumerate(features['truth']['names']):
        hist = ROOT.TH2F(
            name,
            name+";Jet log_{10}(p_{T} / 1 GeV);",
            10,1.3, 3.0, #log10(pt / 1 GeV) binning
            3,-2.4,2.4 #eta binning
        )
        variable = features['truth']['branches'][i]
        chain.Project(hist.GetName(),"global_eta:global_pt",variable)
        hist.SetDirectory(0)
        hists_per_class.append(hist)
        
    avg_events = np.mean(map(
        lambda h: h.Integral()/h.GetNbinsX()/h.GetNbinsY(),
        hists_per_class
    ))
    
    output_file = ROOT.TFile("weights.root","RECREATE")
    for hist in hists_per_class:
        for etabin in range(hist.GetNbinsY()):
            hist.SetBinContent(0,etabin+1,0)
            hist.SetBinContent(hist.GetNbinsX()+1,etabin+1,0)
            for ptbin in range(hist.GetNbinsX()):   
               
                xflavor = hist.GetBinContent(ptbin+1,etabin+1)
                if xflavor<4.:
                    hist.SetBinContent(ptbin+1,etabin+1,0.)
                else:
                    hist.SetBinContent(ptbin+1,etabin+1,avg_events/xflavor)
                
                hist.SetDirectory(output_file)
    output_file.Write()
    
create_weight_histograms(
    glob.glob('Samples/QCD_Pt-15to7000_unpacked_train1_[0-9]*.root'),
    feature_dict
)

### Inspecting the weights
(note: weight > 1 means jets are oversampled which should be avoided in realistic applications since it can lead to overfitting of the neural network)

In [None]:
weightFile = ROOT.TFile("weights.root")
cv = ROOT.TCanvas("cv","",800,600)
cv.Divide(2,2)
for i,name in enumerate(feature_dict['truth']['names']):
    cv.cd(i+1)
    cv.SetLogy(1)
    hist = weightFile.Get(name).ProjectionX()
    hist.GetYaxis().SetTitle("Resample weight")
    hist.SetLineWidth(3)
    hist.Draw("L")
cv.Draw()

### Bulding the pipeline

In [None]:
def create_pipeline_realistic(file_list,features_dict,batch_size=5,nthreads=2):
    # place all operations on cpu; leave gpu for training
    # (note: this notebook has no gpu support)
    with tf.device('/cpu:0'):
        
        # 1st queue hold random list of file names
        file_list_queue = tf.train.string_input_producer(
            file_list,
            num_epochs=1,
            seed = 234567,
            shuffle=True
        )
        
        readers = []
        resamplers = []
        
        for _ in range(nthreads):
            # custom reader will dequeue from queue and produce tensors
            reader = rtf.root_reader(
                file_list_queue, 
                features_dict, 
                "jets", # TTree name
                batch=5 #numer of sequential reads
            ).batch()
            
            readers.append(reader)
            
            weight = rtf.classification_weights(
                reader["truth"],
                reader["globalvars"],
                "weights.root",
                features_dict['truth']['names'],
                [0, 1]
            )
            resample = rtf.resampler(
                weight,
                reader
            ).resample()

            resamplers.append(resample)

        # tensors are enqueue in 2nd queue 
        batch = tf.train.shuffle_batch_join(
            #readers,
            resamplers,
            batch_size=batch_size,
            capacity=5*batch_size,
            min_after_dequeue=2*batch_size,
            seed = 234567,
            enqueue_many=True
        )
        
        # calling sess.run(batch) will dequeue a random batch
        # of tensors from the queue
        return batch

### Training a neural network

In [None]:

file_list = glob.glob('Samples/QCD_Pt-15to7000_unpacked_train1_[0-9]*.root')

with tf.Session() as sess:
    K.set_session(sess)
    batch = create_pipeline_realistic(
        file_list,
        feature_dict,
        batch_size=20,
        nthreads=2
    )

    from realistic_model import create_model
    
    
    globalvars = keras.layers.Input(tensor=batch['globalvars'])
    cpf = keras.layers.Input(tensor=batch['cpf'])
    npf = keras.layers.Input(tensor=batch['npf'])
    sv = keras.layers.Input(tensor=batch['sv'])
    
    nlabels = len(feature_dict['truth']['names'])

    model = create_model(
        globalvars,
        cpf,
        npf,
        sv,
        nlabels
    )
    #model.summary()
    
    predicted_labels = model.outputs[0]
    
    loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(
        labels=batch['truth'], 
        logits=predicted_labels,
    ))
    opt = tf.train.AdamOptimizer(learning_rate=0.001).minimize(loss)
    
    init_op = tf.group(
        tf.global_variables_initializer(),
        tf.local_variables_initializer()
    )

    sess.run(init_op)

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(sess=sess, coord=coord)
    
    pt = [[] for _ in range(nlabels)]    
        
    step = 0
    try:
        while not coord.should_stop():
            step+=1
            _,loss_value,batch_value = sess.run([opt,loss,batch])
            
            iclass = np.argmax(batch_value['truth'],axis=1)
            
            for ibatch in range(iclass.shape[0]):
                pt[iclass[ibatch]].append(batch_value['globalvars'][ibatch,0])
            if step%10==0:
                print "Step %i, loss=%.3f\r"%(step,loss_value),
                
    except tf.errors.OutOfRangeError:
        print 'Done reading files for %i steps (loss=%.3f)' % (step,loss_value)
    coord.request_stop()
    coord.join()
    
    fig, ax = plt.subplots()
    for ilabel in range(nlabels):
        plt.hist(pt[ilabel], bins = 10, alpha=0.25, label=feature_dict['truth']['names'][ilabel])
    plt.legend()
    ax.set_xlabel(r'$\mathrm{log10}(p_\mathrm{T} /1\,\mathrm{GeV})$', fontsize=15)
    ax.set_ylabel(r'#Jets', fontsize=15)
    plt.show()

# Example 4: Summary
* constructed input pipeline using two random queues
* read data from multiple ROOT TTrees asynchronous
* on-the-fly preprocessing: perform random sampling of tensors

# Outlook
* presented ROOT TensorFlow pipeline as proof-of-concept
* unfortunately TensorFlow queue system deprecated in TF2.X in favor of <a href="https://www.tensorflow.org/api_docs/python/tf/data/Dataset">Dataset API</a>; 
* main drawback: closed queues cannot be reopened; new tf.Session has to be constructed
* currently working on adapting to Dataset API system
* intermediate problem: linking TensorFlow and ROOT together (c++ ABI does not match for binaries taken from anaconda repositories)
* no need to install all of ROOT anyway; a slim c++ ROOT I/O library to link against would be sufficient 

# Epilogue
* basic ingredients to train a neural network
* feeding data into TensorFlow
* using `generator` pattern and uproot to feed data sequentially into keras/TensorFlow
* introduction to TensorFlow queue system
* custom operation to interface TensorFlow with ROOT 
* proof-of-concept for training neural networks directly on ROOT TTrees
* no conversion to intermediate formats required
* flexible system (select branches; perform preprocessing)
* pipeline used in publication <a href="https://arxiv.org/abs/1912.12238">arXiv:1912.12238</a>