# ADMIN SECTION: create and delete topics

In [1]:
KAFKA_BOOTSTRAP_SERVERS = '10.67.22.61:9092'
TOPIC_NAME = 'results'
N_PARTITIONS = 1

In [2]:
from  confluent_kafka.admin import AdminClient, NewTopic

In [3]:
kafka_admin = AdminClient({'bootstrap.servers':KAFKA_BOOTSTRAP_SERVERS})

In [4]:
def create_topics(admin, topics):
    """ Create topics """

    new_topics = [NewTopic(topic, num_partitions=N_PARTITIONS, replication_factor=1) for topic in topics]
    # Call create_topics to asynchronously create topics, a dict
    # of <topic,future> is returned.
    fs = admin.create_topics(new_topics, request_timeout=15.0)

    # Wait for operation to finish.
    # Timeouts are preferably controlled by passing request_timeout=15.0
    # to the create_topics() call.
    # All futures will finish at the same time.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} created".format(topic))
        except Exception as e:
            print("Failed to create topic {}: {}".format(topic, e))

In [5]:
def delete_topics(admin, topics):
    """ delete topics """

    # Call delete_topics to asynchronously delete topics, a future is returned.
    # By default, this operation on the broker returns immediately while
    # topics are deleted in the background. But here we give it some time (30s)
    # to propagate in the cluster before returning.
    #
    # Returns a dict of <topic,future>.
    fs = admin.delete_topics(topics, operation_timeout=30)

    # Wait for operation to finish.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            print("Topic {} deleted".format(topic))
        except Exception as e:
            print("Failed to delete topic {}: {}".format(topic, e))

In [6]:
delete_topics(kafka_admin, [TOPIC_NAME])

Topic results deleted


In [7]:
## check if topic already exits otherwise create it
if not TOPIC_NAME in kafka_admin.list_topics().topics.keys():
    create_topics(kafka_admin, [TOPIC_NAME])
else:
    print("Topic " + TOPIC_NAME + " already exists")

Failed to create topic results: KafkaError{code=TOPIC_ALREADY_EXISTS,val=36,str="Topic 'results' is marked for deletion."}


# Create producer

In [8]:
from confluent_kafka import Producer

In [9]:
producer = Producer({'bootstrap.servers':KAFKA_BOOTSTRAP_SERVERS,
                     'linger.ms':0, # delay in ms before messages are sent if batch size is not reached
                     'batch.size':16384}) # maximum batch size before messages are sent

# SPARK STRUCTURED STREAMING ANALYSIS

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, countDistinct, count
import numpy as np
import json

In [11]:
spark = SparkSession.builder \
        .master("spark://10.67.22.29:7077")\
        .appName("Test streaming")\
        .config('spark.executor.memory', '4g')\
        .config('spark.driver.memory', '1500m')\
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
        .config("spark.sql.execution.arrow.pyspark.fallback.enabled", "false")\
        .config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")\
        .config("spark.eventLog.enabled", 'true')\
        .getOrCreate()


        # .config('spark.executor.instances', '4')\
        # .config('spark.executor.cores', '1')\
        # .config('spark.executor.memory', '1g')

:: loading settings :: url = jar:file:/usr/local/lib/python3.8/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml

Ivy Default Cache set to: /root/.ivy2/cache

In [12]:
# this may be the only option that matters (set it equal to the number of cores)
spark.conf.set("spark.sql.shuffle.partitions", 12)

In [13]:
# not working with streaming df
# spark.conf.set("spark.sql.adaptive.enabled", "true")
# spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# spark.conf.set("spark.sql.cbo.enabled", "true")

In [14]:
sc = spark.sparkContext
sc

In [15]:
# read streaming df from kafka
inputDF = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)\
    .option("kafkaConsumer.pollTimeoutMs", 4000)\
    .option('subscribe', 'data')\
    .option("startingOffsets", "latest") \
    .load()

In [16]:
inputDF.isStreaming

True

In [17]:
inputDF.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [18]:
# extract the value from the kafka message
csv_df = inputDF.select(col("value").cast("string")).alias("csv").select("csv.*")

# split the csv line in the corresponding fields
df = csv_df.selectExpr("cast(split(value, ',')[0] as int) as HEAD",
                       "cast(split(value, ',')[1] as int) as FPGA",
                       "cast(split(value, ',')[2] as int) as TDC_CHANNEL",
                       "cast(split(value, ',')[3] as long) as ORBIT_CNT",
                       "cast(split(value, ',')[4] as int) as BX_COUNTER",
                       "cast(split(value, ',')[5] as double) as TDC_MEAS")


In [19]:
df.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: long (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: double (nullable = true)



In [20]:
# remove unwanted rows
df = df.filter(df.HEAD==2)

# add CHAMBER column for easier grouping later
df = df.withColumn("CHAMBER", \
                    when((df.FPGA == 0)&(df.TDC_CHANNEL>=0)&(df.TDC_CHANNEL<64), 0) \
                   .when((df.FPGA == 0)&(df.TDC_CHANNEL>=64)&(df.TDC_CHANNEL<128), 1 ) \
                   .when((df.FPGA == 1)&(df.TDC_CHANNEL>=0)&(df.TDC_CHANNEL<64), 2 ) \
                   .when((df.FPGA == 1)&(df.TDC_CHANNEL>=64)&(df.TDC_CHANNEL<128), 3 )
                  )

# compute absolute time
df = df.withColumn("ABSOLUTE_TIME", 25*(df.ORBIT_CNT * 3564 + df.BX_COUNTER + df.TDC_MEAS/30))

In [21]:
df.printSchema()

root
 |-- HEAD: integer (nullable = true)
 |-- FPGA: integer (nullable = true)
 |-- TDC_CHANNEL: integer (nullable = true)
 |-- ORBIT_CNT: long (nullable = true)
 |-- BX_COUNTER: integer (nullable = true)
 |-- TDC_MEAS: double (nullable = true)
 |-- CHAMBER: integer (nullable = true)
 |-- ABSOLUTE_TIME: double (nullable = true)



In [22]:
time_offset_by_chamber = {0: 95.0 - 1.1, # Ch 0
                          1: 95.0 + 6.4, # Ch 1
                          2: 95.0 + 0.5, # Ch 2
                          3: 95.0 - 2.6, # Ch 3
                         }

edges_list = [0,5,10,15,20,25,30,35,40,45,50,55,60,65,70,np.inf]
edges_list_to_print = [0,5,10,15,20,25,30,35,40,45,50,55,60,65,70,75]
    
edge_list_b = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,np.inf]
edge_list_b_to_print = [0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
    
edge_list_c = [0,30,60,90,120,150,180,210,240,270,300,330,360,390,420,450,480,510,np.inf]
edge_list_c_to_print = [0,30,60,90,120,150,180,210,240,270,300,330,360,390,420,450,480,510,540]

ID = -1

# function to apply to each batch: writes and send a kafka message at the end
def batch_func(df, epoch_id):

    df = df.persist()

    # 1: total number of processed hits, post-cleansing (1 value per batch)

    hit_count = df.count()

    # 2: total number of processed hits, post-cleansing, per chamber (4 values per batch)

    hit_count_chamber = df.groupby('CHAMBER').agg(count('TDC_CHANNEL').alias('HIT_COUNT')).sort("CHAMBER").select('HIT_COUNT')

    # 3: histogram of the counts of active TDC_CHANNEL, per chamber (4 arrays per batch)

    tdc_counts = df.groupby(['CHAMBER', 'TDC_CHANNEL']).agg(count('ORBIT_CNT').alias('TDC_COUNTS'))
    tdc_counts = tdc_counts.persist()

    ch0_tdc_counts = tdc_counts.filter(tdc_counts.CHAMBER==0).select('TDC_COUNTS')
    ch1_tdc_counts = tdc_counts.filter(tdc_counts.CHAMBER==1).select('TDC_COUNTS')
    ch2_tdc_counts = tdc_counts.filter(tdc_counts.CHAMBER==2).select('TDC_COUNTS')
    ch3_tdc_counts = tdc_counts.filter(tdc_counts.CHAMBER==3).select('TDC_COUNTS')

    # same query slightly slower
    # ch0_tdc_counts = df.filter(df.CHAMBER==0).groupby('TDC_CHANNEL').agg(count('ORBIT_CNT').alias('TDC_COUNTS')).select('TDC_COUNTS')
    # ch1_tdc_counts = df.filter(df.CHAMBER==1).groupby('TDC_CHANNEL').agg(count('ORBIT_CNT').alias('TDC_COUNTS')).select('TDC_COUNTS')
    # ch2_tdc_counts = df.filter(df.CHAMBER==2).groupby('TDC_CHANNEL').agg(count('ORBIT_CNT').alias('TDC_COUNTS')).select('TDC_COUNTS')
    # ch3_tdc_counts = df.filter(df.CHAMBER==3).groupby('TDC_CHANNEL').agg(count('ORBIT_CNT').alias('TDC_COUNTS')).select('TDC_COUNTS')

    # 4: histogram of the total number of active TDC_CHANNEL in each ORBIT_CNT, per chamber (4 arrays per batch)

    tdc_active = df.groupby(['CHAMBER', 'ORBIT_CNT']).agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL"))
    tdc_active = tdc_active.persist()

    ch0_tdc_active = tdc_active.filter(tdc_active.CHAMBER==0).select('ACTIVE_TDC_CHANNEL')
    ch1_tdc_active = tdc_active.filter(tdc_active.CHAMBER==1).select('ACTIVE_TDC_CHANNEL')
    ch2_tdc_active = tdc_active.filter(tdc_active.CHAMBER==2).select('ACTIVE_TDC_CHANNEL')
    ch3_tdc_active = tdc_active.filter(tdc_active.CHAMBER==3).select('ACTIVE_TDC_CHANNEL')

    # same query slightly slower
    # ch0_tdc_active = df.filter(df.CHAMBER==0).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch1_tdc_active = df.filter(df.CHAMBER==1).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch2_tdc_active = df.filter(df.CHAMBER==2).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch3_tdc_active = df.filter(df.CHAMBER==3).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')

    # optional: drift time computation

    # get scintillator signals
    t0 = df.filter((df.FPGA==1) & (df.TDC_CHANNEL==128)).selectExpr('ORBIT_CNT as ORBIT_CNT_T0', 'ABSOLUTE_TIME as ABSOLUTE_TIME_T0')
    # TODO: add check for multiple signal in same orbit

    # get only events with a signal from the scintillator and compute drift time
    df_scint = df.join(t0, df.ORBIT_CNT==t0.ORBIT_CNT_T0, 'inner').selectExpr('CHAMBER', 'TDC_CHANNEL', 'ORBIT_CNT', 'ABSOLUTE_TIME-ABSOLUTE_TIME_T0 as DRIFT_TIME')

    # adjust drift time with the correct
    df_scint = df_scint.withColumn('DRIFT_TIME',
                       when(df_scint.CHAMBER==0, df_scint.DRIFT_TIME+time_offset_by_chamber[0]) \
                      .when(df_scint.CHAMBER==1, df_scint.DRIFT_TIME+time_offset_by_chamber[1]) \
                      .when(df_scint.CHAMBER==2, df_scint.DRIFT_TIME+time_offset_by_chamber[2]) \
                      .when(df_scint.CHAMBER==3, df_scint.DRIFT_TIME+time_offset_by_chamber[3])
                  )

    df_scint = df_scint.persist()

    tdc_active_scint = df_scint.groupby(['CHAMBER', 'ORBIT_CNT']).agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL"))
    tdc_active_scint = tdc_active_scint.persist()

    ch0_tdc_active_scint = tdc_active_scint.filter(tdc_active_scint.CHAMBER==0).select('ACTIVE_TDC_CHANNEL')
    ch1_tdc_active_scint = tdc_active_scint.filter(tdc_active_scint.CHAMBER==1).select('ACTIVE_TDC_CHANNEL')
    ch2_tdc_active_scint = tdc_active_scint.filter(tdc_active_scint.CHAMBER==2).select('ACTIVE_TDC_CHANNEL')
    ch3_tdc_active_scint = tdc_active_scint.filter(tdc_active_scint.CHAMBER==3).select('ACTIVE_TDC_CHANNEL')

    # same query slightly slower
    # ch0_tdc_active_scint = df_scint.filter(df_scint.CHAMBER==0).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch1_tdc_active_scint = df_scint.filter(df_scint.CHAMBER==1).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch2_tdc_active_scint = df_scint.filter(df_scint.CHAMBER==2).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')
    # ch3_tdc_active_scint = df_scint.filter(df_scint.CHAMBER==3).groupby('ORBIT_CNT').agg(countDistinct("TDC_CHANNEL").alias("ACTIVE_TDC_CHANNEL")).select('ACTIVE_TDC_CHANNEL')

    ch0_drift_times = df_scint.filter(df_scint.CHAMBER==0).select("DRIFT_TIME")
    ch1_drift_times = df_scint.filter(df_scint.CHAMBER==1).select("DRIFT_TIME")
    ch2_drift_times = df_scint.filter(df_scint.CHAMBER==2).select("DRIFT_TIME")
    ch3_drift_times = df_scint.filter(df_scint.CHAMBER==3).select("DRIFT_TIME")

    # get results (trigger actions on dataframes) and compute histograms

    hit_count_chamber = hit_count_chamber.toPandas().values.reshape(-1)

    ch0_tdc_counts_hist, ch0_tdc_counts_be = np.histogram(ch0_tdc_counts.toPandas().values.reshape(-1), bins = edges_list)
    ch1_tdc_counts_hist, ch1_tdc_counts_be = np.histogram(ch1_tdc_counts.toPandas().values.reshape(-1), bins = edges_list)
    ch2_tdc_counts_hist, ch2_tdc_counts_be = np.histogram(ch2_tdc_counts.toPandas().values.reshape(-1), bins = edges_list)
    ch3_tdc_counts_hist, ch3_tdc_counts_be = np.histogram(ch3_tdc_counts.toPandas().values.reshape(-1), bins = edges_list)

    ch0_tdc_active_hist, ch0_tdc_active_be = np.histogram(ch0_tdc_active.toPandas().values.reshape(-1), bins=edge_list_b)
    ch1_tdc_active_hist, ch1_tdc_active_be = np.histogram(ch1_tdc_active.toPandas().values.reshape(-1), bins=edge_list_b)
    ch2_tdc_active_hist, ch2_tdc_active_be = np.histogram(ch2_tdc_active.toPandas().values.reshape(-1), bins=edge_list_b)
    ch3_tdc_active_hist, ch3_tdc_active_be = np.histogram(ch3_tdc_active.toPandas().values.reshape(-1), bins=edge_list_b)

    ch0_tdc_active_scint_hist, ch0_tdc_active_scint_be = np.histogram(ch0_tdc_active_scint.toPandas().values.reshape(-1), bins=edge_list_b)
    ch1_tdc_active_scint_hist, ch1_tdc_active_scint_be = np.histogram(ch1_tdc_active_scint.toPandas().values.reshape(-1), bins=edge_list_b)
    ch2_tdc_active_scint_hist, ch2_tdc_active_scint_be = np.histogram(ch2_tdc_active_scint.toPandas().values.reshape(-1), bins=edge_list_b)
    ch3_tdc_active_scint_hist, ch3_tdc_active_scint_be = np.histogram(ch3_tdc_active_scint.toPandas().values.reshape(-1), bins=edge_list_b)

    ch0_drift_times_hist, ch0_drift_times_be = np.histogram(ch0_drift_times.toPandas().values.reshape(-1), bins=edge_list_c)
    ch1_drift_times_hist, ch1_drift_times_be = np.histogram(ch1_drift_times.toPandas().values.reshape(-1), bins=edge_list_c)
    ch2_drift_times_hist, ch2_drift_times_be = np.histogram(ch2_drift_times.toPandas().values.reshape(-1), bins=edge_list_c)
    ch3_drift_times_hist, ch3_drift_times_be = np.histogram(ch3_drift_times.toPandas().values.reshape(-1), bins=edge_list_c)

    df.unpersist()
    df_scint.unpersist()
    tdc_counts.unpersist()
    tdc_active.unpersist()
    tdc_active_scint.unpersist()
    
    
    global ID
    ID += 1

    # prepare message to send to kafka

    msg = {
        'msg_ID': ID,
        'hit_count': hit_count,
        'hit_count_chamber': hit_count_chamber.tolist(),
        'tdc_counts_chamber': {
            '0': {
                'bin_edges': edges_list_to_print,
                'hist_counts': ch0_tdc_counts_hist.tolist()
            },
            '1': {
                'bin_edges': edges_list_to_print,
                'hist_counts': ch1_tdc_counts_hist.tolist()
            },
            '2': {
                'bin_edges': edges_list_to_print,
                'hist_counts': ch2_tdc_counts_hist.tolist()
            },
            '3': {
                'bin_edges': edges_list_to_print,
                'hist_counts': ch3_tdc_counts_hist.tolist()
            }
        },
        'active_tdc_chamber': {
            '0': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch0_tdc_active_hist.tolist()
            },
            '1': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch1_tdc_active_hist.tolist()
            },
            '2': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch2_tdc_active_hist.tolist()
            },
            '3': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch3_tdc_active_hist.tolist()
            }
        },
        'active_tdc_chamber_scint': {
            '0': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch0_tdc_active_scint_hist.tolist()
            },
            '1': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch1_tdc_active_scint_hist.tolist()
            },
            '2': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch2_tdc_active_scint_hist.tolist()
            },
            '3': {
                'bin_edges': edge_list_b_to_print,
                'hist_counts': ch3_tdc_active_scint_hist.tolist()
            }
        },
        'drift_times': {
            '0': {
                'bin_edges': edge_list_c_to_print,
                'hist_counts': ch0_drift_times_hist.tolist()
            },
            '1': {
                'bin_edges': edge_list_c_to_print,
                'hist_counts': ch1_drift_times_hist.tolist()
            },
            '2': {
                'bin_edges': edge_list_c_to_print,
                'hist_counts': ch2_drift_times_hist.tolist()
            },
            '3': {
                'bin_edges': edge_list_c_to_print,
                'hist_counts': ch3_drift_times_hist.tolist()
            }
        }
    }

    producer.produce(TOPIC_NAME, json.dumps(msg).encode('utf-8'))
    producer.poll(0)


### Stop here, start the consumer (Kafka)

In [None]:
df.writeStream\
    .outputMode("update")\
    .foreachBatch(batch_func)\
    .trigger(processingTime='5 seconds')\
    .start()\
    .awaitTermination()