## Kafka and Spark Streaming Example with OpenMetrics
This notebook Showcase How PNDA platform integrates Jupyter Kafka and Spark.

Kafka ingest events in [openMetrics](https://openmetrics.io/) format in the "openmetrics" topic.

This notebook provides a [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) applicaton to:
- transform openmetrics String to a Spark Dataframe.
- query the openmetrics stream to get averaged values of the timeseries and their last timestamp.  

In [None]:
## decoding openmetrics strings with spark udf

from pyspark.sql import DataFrame
import pyspark.sql.functions as f
import pyspark.sql.types as t
import datetime
from typing import Callable
import time

@f.udf(t.StructType([
    t.StructField("metric", t.StringType(), False),
    t.StructField("tags", t.ArrayType(t.StringType()), False),
    t.StructField("timestamp", t.TimestampType(), False),
    t.StructField("value", t.DoubleType(), False)]))
def decode_openmetrics_udf(input:str) -> dict:
    [metric,value,timestamp] = input.split()
    [metric,tags] = metric.split("{")
    tags = tags[:-1].split(",")
    #return ({ "metric": metric,
    #         "tags": tags,
    #         "timestamp":  datetime.datetime.fromtimestamp(float(timestamp)/1000),
    #         "value": float(value) } )
    return t.Row('metric','tags','timestamp','value')(metric,
                                                      tags,
                                                      datetime.datetime.fromtimestamp(float(timestamp)/1000.0),
                                                      float(value))
def decode_openmetrics(col_input:str, rm_input_df: bool=False) -> Callable[[DataFrame], DataFrame]:
    def F(df: DataFrame) -> DataFrame:
        df = df.withColumn('temp_col', decode_openmetrics_udf(f.col(col_input)))
        if rm_input_df:
            return  df.select('temp_col.*')
        else:
            df_columns = df.columns
            df_columns.append('temp_col.*')
            return df.select(df_columns).drop('temp_col')
    return F

In [None]:
def transform(self, f):
    return f(self)
DataFrame.transform = transform

## Accessing Kafka and Spark
Kafka is accessible from notebooks through the "pnda-cp-kafka" hostname.

PNDA deploys a K8s service with that name pointing to its kafka brokers.

Jupyter's PySpark kernel points to "spark://pnda-spark-standalone:7077" by default, the PNDA spark-standalone component.

In [None]:
input_ds = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "pnda-cp-kafka:9092") \
  .option("subscribe", "openmetrics") \
  .load().selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as input_value")

In [None]:
openmetrics_stream = input_ds.transform(decode_openmetrics("input_value")).writeStream.queryName("openmetrics").format("memory").outputMode("append").start()

## Stream Query with SQL
Each 5 seconds we query the kafka stream through Spark Structured Streaming to get timeseries average values.

In [None]:
for _ in range(5):
    print ('metrics average at {}'.format(datetime.datetime.now()))
    spark.sql("select metric, tags, avg(value), max(timestamp) as last_seen from openmetrics group by metric, tags").show(10, False)
    time.sleep(5)

In [None]:
openmetrics_stream.stop()