In [17]:
import findspark
import os
import json
import numpy as np

from pyspark.streaming import StreamingContext
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, IntegerType
from pyspark.sql.functions import from_json, col
from pyspark.sql.functions import max
from pyspark.sql.functions import min

## Creating Spark context

In [2]:
findspark.init('/home/packages/spark-3.1.2-bin-hadoop3.2')

In [3]:
spark = SparkSession.builder \
    .master("spark://10.67.22.100:7077")\
    .appName("MAPD Final Project session")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
    .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
    .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2")\
    .getOrCreate()

#non funziona, fai partire a mano

#KAFKA_HOME = "/home/packages/kafka_2.13-2.7.0"
#KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"

#os.system("{0}/bin/zookeeper-server-start.sh {0}/config/zookeeper.properties".format(KAFKA_HOME)) 
#os.system("{0}/bin/kafka-server-start.sh {0}/config/server.properties".format(KAFKA_HOME)) 

In [4]:
#print('{0}/bin/zookeeper-server-start.sh {0}/config/zookeeper.properties'.format(KAFKA_HOME))
#print("{0}/bin/kafka-server-start.sh {0}/config/server.properties".format(KAFKA_HOME)) 

In [5]:
spark

## Trying to include Kafka in the process

In [6]:
KAFKA_BOOTSTRAP_SERVERS = "10.67.22.100:9092"

inputDF = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option('subscribe', 'topic_stream')\
    .option("startingOffsets", "latest") \
    .load()

In [7]:
inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
schema = StructType(
        [StructField("HEAD",        IntegerType()),
         StructField("FPGA",         IntegerType()),
         StructField("TDC_CHANNEL",  IntegerType()),
         StructField("ORBIT_CNT",    IntegerType()),
         StructField("BX_COUNTER",   IntegerType()),
         StructField("TDC_MEAS",    DoubleType() )]
)

In [9]:
jsonDF = inputDF.select(from_json(col("value").alias('value').cast("string"), schema).alias('value'))

In [10]:
jsonDF.printSchema()
#a dataframe with one row and complex structures inside

root
 |-- value: struct (nullable = true)
 |    |-- HEAD: integer (nullable = true)
 |    |-- FPGA: integer (nullable = true)
 |    |-- TDC_CHANNEL: integer (nullable = true)
 |    |-- ORBIT_CNT: integer (nullable = true)
 |    |-- BX_COUNTER: integer (nullable = true)
 |    |-- TDC_MEAS: double (nullable = true)



In [11]:
#I flatten out the dataframe
flatDF = jsonDF.selectExpr("value.HEAD", 
                           "value.FPGA", 
                           "value.TDC_CHANNEL",
                           "value.ORBIT_CNT",
                           "value.BX_COUNTER",
                           "value.TDC_MEAS")

In [12]:
flatDF.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: integer (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: double (nullable = true)



In [13]:
#credo che per questo primo stream non si possano avere diversi schema. Negli hints viene proposta una struttura
#ma è 'dopo aver fatto il processamento'

In [40]:
#volendo qui si può provare a scrive su console 
#flatDF.writeStream\
#    .outputMode("append")\
#    .format("console")\
#    .start()\
#    .awaitTermination()

## Processing the data without streaming with Kafka

In [51]:
## load dataset on dataset/lecture2/dimuon
#
#schema = StructType()                          \
#      .add("HEAD",        IntegerType(), True) \
#      .add("FPGA",        IntegerType(), True) \
#      .add("TDC_CHANNEL", IntegerType(), True) \
#      .add("ORBIT_CNT",   IntegerType(), True) \
#      .add("BX_COUNTER",  IntegerType(), True) \
#      .add("TDC_MEAS",    DoubleType(),  True)
#
#df = spark.read.format("csv") \
#      .option("header",True) \
#      .schema(schema) \
#      .load("/home/data_000019.txt")

### Cleaning

In [13]:
df = flatDF.where(col("HEAD")!=2)

### Uploading to consumer

In [14]:
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS)

In [15]:
def batch_proc(batch_df, epoch_id):
    
    hits = batch_df.count()
    
    if hits!=0:
        
        hits_ch0 = batch_df.filter('(FPGA=0) AND (TDC_CHANNEL >= 0) AND (TDC_CHANNEL < 64)').count()
        hits_ch1 = batch_df.filter('(FPGA=0) AND (TDC_CHANNEL >= 64) AND (TDC_CHANNEL < 128)').count()
        hits_ch2 = batch_df.filter('(FPGA=1) AND (TDC_CHANNEL >= 0) AND (TDC_CHANNEL < 64)').count()
        hits_ch3 = batch_df.filter('(FPGA=1) AND (TDC_CHANNEL >= 64) AND (TDC_CHANNEL < 128)').count()

        hist = {}
        for chamber in [0,1,2,3]:
            hist[chamber] = {}

            bins, counts = (
                batch_df.where((col('TDC_CHANNEL')>=(chamber % 2)*64) & (col('TDC_CHANNEL')<(chamber % 2 +1)*64) & (col('FPGA')==chamber//2))
                .select('TDC_CHANNEL')
                .rdd.map(lambda x: x.TDC_CHANNEL)
                .histogram(list(np.arange((chamber % 2)*64,(chamber % 2 +1)*64,1)))
            )

            hist[chamber]['bins'] = list(map(int,bins)) #must convert to python integers
            hist[chamber]['counts'] = list(map(int,counts))

        hist2 = {}
        for chamber in [0,1,2,3]:
            hist2[chamber] = {}

            bins, counts = (
                batch_df.where((col('TDC_CHANNEL')>=(chamber % 2)*64) & (col('TDC_CHANNEL')<(chamber % 2 +1)*64) & (col('FPGA')==chamber//2))
                .groupBy("ORBIT_CNT","TDC_CHANNEL").count()
                .select('ORBIT_CNT')
                .rdd.map(lambda x: x.ORBIT_CNT)
                .histogram(list(np.arange(batch_df.agg(min("ORBIT_CNT"), max("ORBIT_CNT")).rdd.flatMap(lambda x: x).collect()[0],batch_df.agg(min("ORBIT_CNT"), max("ORBIT_CNT")).rdd.flatMap(lambda x: x).collect()[1],30)))
            )

            hist2[chamber]['bins'] = list(map(int,bins))
            hist2[chamber]['counts'] = list(map(int,counts))

        result = {
            "hits" : hits,
            "hits_per_chamber": [hits_ch0, hits_ch1, hits_ch2, hits_ch3],
            "hist_1": hist,
            "hist_2": hist2
        }
        producer.send('topic_results', json.dumps(result).encode('utf-8'))
        producer.flush()
    else: 
        pass

In [18]:
df.writeStream\
    .foreachBatch(batch_proc)\
    .trigger(processingTime='1 second')\
    .start()\
    .awaitTermination()

KeyboardInterrupt: 