# Streaming processing of cosmic rays using Drift Tubes detectors

Dal Magro Matteo

Rosset Lorenzo

Zanola Andrea

## Spark Cluster

In [2]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, when, sum, count, struct, collect_list
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType
import json
import time
import sys
from kafka import KafkaProducer

### Start the cluster

In [3]:
findspark.init('/usr/local/spark/')
# check some of the env variables
!env | grep -i spark

SPARK_HOME=/usr/local/spark/
PYSPARK_PYTHON=/usr/bin/python3.6
PATH=/usr/bin:/root/anaconda3/bin:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/usr/local/spark/bin:/root/bin


In [4]:
# Start Spark cluster
!$SPARK_HOME/sbin/start-all.sh --host localhost --port 7077 --webui-port 8080
!ssh kafka-broker1 /usr/local/spark/sbin/start-worker.sh spark://10.67.22.193:7077 --cores 2 --memory 2g
!ssh kafka-broker2 /usr/local/spark/sbin/start-worker.sh spark://10.67.22.193:7077 --cores 2 --memory 2g

starting org.apache.spark.deploy.master.Master, logging to /usr/local/spark//logs/spark-root-org.apache.spark.deploy.master.Master-1-mapd-b-gr07-1.novalocal.out
spark-master: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr07-1.novalocal.out
spark-slave: starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr07-2.novalocal.out
starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr07-3.novalocal.out
starting org.apache.spark.deploy.worker.Worker, logging to /usr/local/spark/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-mapd-b-gr07-4.novalocal.out


In [5]:
spark = SparkSession\
    .builder\
    .master("spark://spark-master:7077")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")\
    .appName("spark_cluster")\
    .getOrCreate()

sc = spark.sparkContext
spark

### Read data from Kafka and preprocessing

In [6]:
KAFKA_HOME = 'kafka_2.13-2.8.0'
KAFKA_BOOTSTRAP_SERVERS = '10.67.22.127:9092'

In [7]:
inputDF = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option('subscribe', 'topic_stream')\
    .option("startingOffsets", "latest")\
    .load()
inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
schema = StructType(
        [
                StructField("HEAD", StringType()),
                StructField("FPGA", StringType()),
                StructField("TDC_CHANNEL", StringType()),
                StructField("ORBIT_CNT", StringType()),
                StructField("BX_COUNTER", StringType()),
                StructField("TDC_MEAS", StringType())
        ]
)

df = inputDF\
    .select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))\
    .select('value.*')

In [9]:
df = df \
    .withColumn("HEAD", col("HEAD").cast('integer')) \
    .withColumn("FPGA", col("FPGA").cast('integer')) \
    .withColumn("TDC_CHANNEL", col("TDC_CHANNEL").cast('integer')) \
    .withColumn("ORBIT_CNT", col("ORBIT_CNT").cast('integer')) \
    .withColumn("BX_COUNTER", col("BX_COUNTER").cast('integer')) \
    .withColumn("TDC_MEAS", col("TDC_MEAS").cast('double'))

df.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: integer (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: double (nullable = true)



Definition of the `chamber` and `ABSOLUTETIME` columns. Chamber number 4 refers to the scintillator signal

In [10]:
df = (
    df
    .filter(col('HEAD') == 2)
    .withColumn('chamber',
                when((df.FPGA == 0) & (df.TDC_CHANNEL <= 63), 0)
               .when((df.FPGA == 0) & (df.TDC_CHANNEL > 63) & (df.TDC_CHANNEL <= 127), 1)
               .when((df.FPGA == 1) & (df.TDC_CHANNEL <= 63), 2)
               .when((df.FPGA == 1) & (df.TDC_CHANNEL > 63) & (df.TDC_CHANNEL <= 127), 3)
               .when((df.FPGA == 1) & (df.TDC_CHANNEL == 128), 4)
               .otherwise(None))
    .filter(col('chamber').isNotNull())
    .withColumn('ABSOLUTETIME', 25*(col('ORBIT_CNT')*3564 + col('BX_COUNTER') + col('TDC_MEAS')/30))
)

df.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: integer (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: double (nullable = true)
 |-- chamber: integer (nullable = true)
 |-- ABSOLUTETIME: double (nullable = true)



### Message

In [11]:
    msg = {
    'batch_id' : 0,
    'total_hits' : 0,
    'chamber0' : {
        'total_hits' : 0,
        'hist_CHANNEL' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_ORBIT' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_SCINT' :{
            'bin_edges' : [],
            'bin_counts' : []
        },
        'DRIFTIME' : []
    },
    'chamber1' : {
        'total_hits' : 0,
        'hist_CHANNEL' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_ORBIT' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_SCINT' :{
            'bin_edges' : [],
            'bin_counts' : []
        },
        'DRIFTIME' : []
    },
    'chamber2' : {
        'total_hits' : 0,
        'hist_CHANNEL' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_ORBIT' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_SCINT' :{
            'bin_edges' : [],
            'bin_counts' : []
        },
        'DRIFTIME' : []
    },
    'chamber3' : {
        'total_hits' : 0,
        'hist_CHANNEL' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_ORBIT' : {
            'bin_edges' : [],
            'bin_counts' : []
        },
        'hist_SCINT' :{
            'bin_edges' : [],
            'bin_counts' : []
        },
        'DRIFTIME' : []
    }
}

### Batch processing

In [12]:
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [13]:
# scintillator time offset by Chamber
time_offset_by_chamber = {
    0: 95.0 - 1.1, # Ch 0
    1: 95.0 + 6.4, # Ch 1
    2: 95.0 + 0.5, # Ch 2
    3: 95.0 - 2.6, # Ch 3
}

In [17]:
avg = []
def process_batch(df, batch_id):
    
    # Start time
    start = time.perf_counter()

    msg['chamber0']['total_hits'] = 0
    msg['chamber1']['total_hits'] = 0
    msg['chamber2']['total_hits'] = 0
    msg['chamber3']['total_hits'] = 0
    msg['batch_id'] = batch_id
    
    # Hit counts for each chamber
    df_counts = (
        df
        .groupBy('chamber')
        .count()
        .withColumnRenamed('count', 'chamber_hits')
    )
    
    # Histogram of counts for active channels
    df_channel = (
        df
        .filter(col('chamber') != 4)
        .groupBy('TDC_CHANNEL', 'chamber')
        .count()
        .withColumnRenamed('TDC_CHANNEL', 'bin_edges')
        .withColumnRenamed('count', 'counts')
        .groupBy('chamber')
        .agg(
            struct(
                collect_list('bin_edges').alias('bin_edges'),
                collect_list('counts').alias('counts'),
            ).alias('hist_CHANNEL')
        )
    )
    
    # Histogram of the number of active channels per orbit
    df_orbit = (
        df
        .filter(col('chamber') != 4)
        .groupBy('ORBIT_CNT', 'chamber')
        .count()
        .withColumnRenamed('ORBIT_CNT', 'bin_edges')
        .withColumnRenamed('count', 'counts')
        .groupBy('chamber')
        .agg(
            struct(
                collect_list('bin_edges').alias('bin_edges'),
                collect_list('counts').alias('counts'),
            ).alias('hist_ORBIT')
        )
    )
    
    # Contains only those ORBIT_CNT compatible with the passage of a muon
    df_scint = (
        df
        .select('chamber', 'ORBIT_CNT', 'ABSOLUTETIME')
        .filter(col('chamber') == 4)
        .groupBy('ORBIT_CNT')
        .min('ABSOLUTETIME')
        .withColumnRenamed('min(ABSOLUTETIME)', 't0')
    )
    
    # Filter df keeping only muon events -> histogram of active cannels
    df_muons = (
       df
       .filter(col('chamber') != 4)
       .join(df_scint, on='ORBIT_CNT', how='leftsemi')
       .groupBy('TDC_CHANNEL', 'chamber')
       .count()
       .withColumnRenamed('TDC_CHANNEL', 'bin_edges')
       .withColumnRenamed('count', 'counts')
       .groupBy('chamber')
       .agg(
           struct(
               collect_list('bin_edges').alias('bin_edges'),
               collect_list('counts').alias('counts'),
           ).alias('hist_SCINT')
       )
    )
    
    # Drift time of the events for each chamber
    df_time = (
    df
    .select('ORBIT_CNT', 'chamber', 'ABSOLUTETIME')
    .filter(col('chamber') != 4)
    .join(df_scint, on='ORBIT_CNT', how='rightouter')
    .filter(col('chamber').isNotNull())
    .withColumn('DRIFTIME',
               when(col('chamber') == 0, col('ABSOLUTETIME') - (col('t0') - time_offset_by_chamber[0]))
              .when(col('chamber') == 1, col('ABSOLUTETIME') - (col('t0') - time_offset_by_chamber[1]))
              .when(col('chamber') == 2, col('ABSOLUTETIME') - (col('t0') - time_offset_by_chamber[2]))
              .when(col('chamber') == 3, col('ABSOLUTETIME') - (col('t0') - time_offset_by_chamber[3]))
               )
    .groupBy('chamber')
    .agg(collect_list('DRIFTIME').alias('DRIFTIME'))
    )
    
    # Total message dataframe
    df_msg = (
        df_counts
        .join(df_channel, on='chamber')
        .join(df_orbit, on='chamber')
        .join(df_muons, on='chamber')
        .join(df_time, on='chamber')
        .sort('chamber')
        .collect()
    )
    
    # Paste in the dictionary message
    tot_count = 0
    for ch in range(0,len(df_msg)):
        chamber = df_msg[ch]['chamber']  
        msg[f'chamber{chamber}']['total_hits']                 = df_msg[ch]['chamber_hits']
        msg[f'chamber{chamber}']['hist_CHANNEL']['bin_edges']  = df_msg[ch]['hist_CHANNEL']['bin_edges']
        msg[f'chamber{chamber}']['hist_CHANNEL']['bin_counts'] = df_msg[ch]['hist_CHANNEL']['counts']
        msg[f'chamber{chamber}']['hist_ORBIT']['bin_edges']    = df_msg[ch]['hist_ORBIT']['bin_edges']
        msg[f'chamber{chamber}']['hist_ORBIT']['bin_counts']   = df_msg[ch]['hist_ORBIT']['counts']
        msg[f'chamber{chamber}']['hist_SCINT']['bin_edges']    = df_msg[ch]['hist_SCINT']['bin_edges']
        msg[f'chamber{chamber}']['hist_SCINT']['bin_counts']   = df_msg[ch]['hist_SCINT']['counts']
        msg[f'chamber{chamber}']['DRIFTIME']                   = df_msg[ch]['DRIFTIME']
        tot_count += df_msg[ch]['chamber_hits']
    msg['total_hits'] = tot_count
    
    # Compute process time on the batch
    end = time.perf_counter()
    tmp = end-start
    avg.append(tmp)
    
    mbmsg=sys.getsizeof(json.dumps(msg).encode('utf-8'))/(tmp*1000)
    
    print(f'processed batch number {batch_id} --tot_rows {tot_count} --> processed in {round(tmp, 2)} s --:{round(mbmsg, 2)} KB/s')
     
    producer.send(topic='topic_results', value=json.dumps(msg).encode('utf-8'))
    producer.flush()

### Kafka sink

In [18]:
while True:
    try:
        query = df.writeStream\
            .foreachBatch(process_batch)\
            .trigger(processingTime='4 seconds')\
            .option("checkpointLocation", "checkpoint")\
            .start()\
            .awaitTermination()

    except KeyboardInterrupt:   #avoid print errors
        ##PRINT AVG PROCESS TIME
        if len(avg) == 0:
            break
        tot = 0
        for t in avg:
            tot = tot + t
        tot = tot/len(avg)
        print("\nAverage time taken for each batch: " + str(round(tot, 2)) + "s")
        break

processed batch number 0 --tot_rows 0 --> processed in 1.7 s --:14.17 KB/s
processed batch number 1 --tot_rows 4641 --> processed in 2.62 s --:14.54 KB/s
processed batch number 2 --tot_rows 9902 --> processed in 3.22 s --:23.62 KB/s
processed batch number 3 --tot_rows 10450 --> processed in 3.64 s --:21.53 KB/s
processed batch number 4 --tot_rows 10461 --> processed in 2.91 s --:28.45 KB/s
processed batch number 5 --tot_rows 10625 --> processed in 3.1 s --:26.55 KB/s
processed batch number 6 --tot_rows 9902 --> processed in 3.32 s --:22.83 KB/s
processed batch number 7 --tot_rows 10115 --> processed in 3.12 s --:24.1 KB/s
processed batch number 8 --tot_rows 8668 --> processed in 3.3 s --:22.63 KB/s
processed batch number 9 --tot_rows 10259 --> processed in 3.01 s --:26.33 KB/s
processed batch number 10 --tot_rows 10424 --> processed in 3.1 s --:25.33 KB/s
processed batch number 11 --tot_rows 9938 --> processed in 3.02 s --:26.17 KB/s
processed batch number 12 --tot_rows 10945 --> proce

### Stop the cluster

In [19]:
sc.stop()
spark.stop()

In [20]:
# Stop Spark cluster
!$SPARK_HOME/sbin/stop-all.sh
!ssh kafka-broker1 /usr/local/spark//sbin/stop-worker.sh
!ssh kafka-broker2 /usr/local/spark//sbin/stop-worker.sh

spark-master: stopping org.apache.spark.deploy.worker.Worker
spark-slave: stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.master.Master
stopping org.apache.spark.deploy.worker.Worker
stopping org.apache.spark.deploy.worker.Worker
