# Hands-on Day 1

## You'll learn

- familiarize with jupyter notebooks, numpy and pandas

### Input data
- efficient data format: convert CSV to Parquet
- create input vector with features for MLLib. Format of the input depends on chosen ML library

### Visualization
- explore dataset, plot features
- correlation matrix


# Dataset description

The dataset used in this example is described [here](https://archive.ics.uci.edu/ml/datasets/HIGGS). It is a binary classification problem where the goal is to train a classifier able to distinguish between a signal process, the production of new theoretical Higgs bosons, and a background process with identical decay products but distinct kinematic features.

Each row of this dataset contains 28 features plus the label:

- 21 low-level features which represent the basic measure made by the particle detector
  - Momentum of the observed paricles
  - Missing transverse momentum
  - Jets and b-tagging information
- 7 high-level features computed from the low-level features that encode the knowledge of the different intermediate states of the two processes (reconstructed invariant masses)

## Prepare the execution environment

Your code will run on a single dedicated server with 24 cores (hyperthreading enabled) and 192 GB of RAM. 
All the services needed for this tutorial are deployed as Kubernetes applications on this server. These include:
* JupytherHub
* Jupyter single-user servers
* the HDFS file-system
* Spark Clusters on demand 

#### Load custom magics definition

We load an external file implemanting some custom *magics* function. Have a look at it.

In [1]:
%reload_ext custom_magics

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# Check out these custom functions
from custom_functions import *

#### Load the Spark context

We use the custom magic *%sc* to load a pre-defined Spark context.

In [3]:
num_workers=4
spark=%sc $num_workers

from pyspark.sql import SparkSession
spark_session = SparkSession(spark)

#check if spark is there
spark

In [4]:
# Check number of workers (executors), and cores per executor

executor_count = len(spark._jsc.sc().statusTracker().getExecutorInfos()) - 1
cores_per_executor = int(spark.getConf().get('spark.executor.cores','1'))

print('Number of executors: '+ str(executor_count))
print('Cores per executor: '+ str(cores_per_executor))

Number of executors: 4
Cores per executor: 5


## Exercise 1

### Get familiar with kubernetes commands

- You can open a terminal and use the commands: 
  - `kubectl get pods`
  - `kubectl describe pod PODNAME`
  - `kubectl get nodes`
  - `kubectl describe node NODENAME`
  - `kubectl describe farm`
  - ```kubectl logs PODNAME```
  - ```kubectl exec PODNAME -it -- /bin/bash```
  
  
- Try to stop spark with `spark.stop()`, and start it again running the cell above with a different number of workers. What happens? You can play with the number of workers, and run the cells below that execute spark commands. Provided you're getting all the workers you're asking for, does the execution time change? Try to make some scaling tests

- if you don't stop spark correctly, you will see pods in Error state. You can get rid of those by running this command:
    `kubectl get pods -n YOURUSERNAME | grep Error | awk '{print $1}' | xargs kubectl delete pod -n YOURUSERNAME`

In [5]:
!kubectl describe pod jupyter-leggerf-dda6179a9b623b67-exec-1

Name:         jupyter-leggerf-dda6179a9b623b67-exec-1
Namespace:    leggerf
Priority:     0
Node:         t2-mlwn-04.to.infn.it/192.168.2.84
Start Time:   Wed, 19 Nov 2025 09:13:17 +0000
Labels:       spark-app-selector=spark-application-1763543498158
              spark-exec-id=1
              spark-exec-resourceprofile-id=0
              spark-role=executor
Annotations:  cni.projectcalico.org/containerID: 0dde9a18c602e7ab667917943279c8a5fc0bed8f58e8f2c918963ba8ce7e338e
              cni.projectcalico.org/podIP: 192.168.230.110/32
              cni.projectcalico.org/podIPs: 192.168.230.110/32
Status:       Running
IP:           192.168.230.110
IPs:
  IP:  192.168.230.110
Containers:
  spark-kubernetes-executor:
    Container ID:  docker://59534f7effe1f620f429cfd11057ca4caaec360b6719a7198218773d68c178b9
    Image:         svallero/sparkpy:3.2.1
    Image ID:      docker-pullable://svallero/sparkpy@sha256:3590e42c83c450d00b3c02a19c84903759920e6f58e46072f090e8a18040b760
    Port:        

In [None]:
#spark.stop()

## Exercise 2

### Create parquet files, which are faster to read than CSV

- create a parquet file for all input files, you will need them for the next notebooks
- check you can read in all parquet files and that they have the correct number of events

In [None]:
# read input file
inputFile = 'file:///data-corso/Higgs100k.csv'
#inputFile = 'file:///data-corso/Higgs1M.csv'
#inputFile = 'file:///data-corso/Higgs10M.csv'

%time df = spark_session.read.format('csv').option('header', 'true').option('inferschema', 'true').load(inputFile)

In [None]:
#write parquet in your home
outputFile = 'file:///home/jovyan/Higgs100k.parquet'
#outputFile = 'file:///home/jovyan/Higgs1M.parquet'
#outputFile = 'file:///home/jovyan/Higgs10M.parquet'

df.write.parquet(outputFile, mode='overwrite')

### Read in parquet files

how much faster is it to read parquet files rather than csv?

In [None]:
inputFile = 'file:///home/jovyan/Higgs100k.parquet'
#inputFile = 'file:///home/jovyan/Higgs1M.parquet'
#inputFile = 'file:///home/jovyan/Higgs10M.parquet'

%time df = spark_session.read.format('parquet').option('header', 'true').option('inferschema', 'true').load(inputFile)

In [None]:
#In how many partitions is the dataframe distributed?
print("Number of partitions: {}".format(df.rdd.getNumPartitions()))
print("Partitioner: {}".format(df.rdd.partitioner))

# Now let's have a look at the input data

In [None]:
%time total_events = df.count()

print('There are '+str(total_events)+' events')

In [None]:
df.printSchema()

## Exercise 3

- Create a function that makes a plot of any of the above variable for signal versus background (using the label variable to discriminate)
  - see an example of the plot in the hands-on slides
  - the function should take as input the dataframe *df* and the variable name. For example `plotSignalvsBg(df, 'm_bb')`
  - to develop the code, use the 100k dataset, so that debugging goes quicker
- try to plot a few input variables and try to understand which ones are more promising to distinguish signal from background  

In [None]:
#your code here

## Exercise 4 - Bonus

### Create the input feature vector

- Libraries for ML tipically take as inputs data in a very specific format. Documentation on how to do data preprocessing in Spark: https://spark.apache.org/docs/latest/ml-features.html
- Try to add to the dataframe df a new column, called 'features' which is a vector column with all the variables above except for 'label'
   - features = [lepton_pT, lepton_eta, lepton_phi, ...]
   - Hint: look at the VectorAssembler transformer

In [None]:
#your code here

## Exercise 5 - Bonus

Print (or draw) the correlation matrix (a table showing correlation coefficients between variables)

In [None]:
#your code here

#### when you're done, stop spark, this will release the resources you're using

In [6]:
spark.stop()

In [None]:
!kubectl get pods