# Init Spark session

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from os.path import abspath
import os

# SparkSession
URL_SPARK = "spark://spark-master:7077"
warehouse_location = './spark-warehouse'

spark = (
    SparkSession.builder
    .appName("spark-ml-multiVM")
    .config("executor.memory", "8g")
    .config("spark.sql.warehouse.dir", warehouse_location)
    .config("spark.jars", "jars/spark-sql-kafka-0-10_2.12-3.2.1.jar,jars/kafka-clients-2.1.1.jar,jars/spark-streaming-kafka-0-10-assembly_2.12-3.2.1.jar,jars/commons-pool2-2.11.1.jar")
    .master(URL_SPARK)
    .getOrCreate()
)

/usr/local/lib/python3.9/dist-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
23/07/28 20:41:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# We have 2 streams from 2 producers publishing the data on 2 topics. We will read 2 stream messages into sparks using spark streaming

![Drag Racing](./images/kafka-spark-streaming2.png)


# Stream 1: Stream raw data of vm1 from kafka 
- Here we read the stream from kafka topic vm-stat-stream (acumos server) 


In [2]:
df1 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "onap-istanb-work01-stat-stream") \
    .option("startingOffsets", "earliest") \
    .load()

In [3]:
df1.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
stringDF = df1.selectExpr("CAST(value AS STRING)")

In [5]:
from pyspark.sql.types import *
from pyspark.sql.functions import unix_timestamp, from_unixtime

In [13]:
from pyspark.sql.functions import *

df_vm1 = stringDF.withColumn('timestamp', regexp_extract('value', r'timestamp:\s(.*),\shostname', 1)) \
        .withColumn('cpu1', regexp_extract('value', r'used_cpu:\s(.*)\%', 1)) \
#         .withColumn('memory1', regexp_extract('value', r'used_memory:\s(.*)\%,\sused_storage', 1)) \
#         .withColumn('storage1', regexp_extract('value', r'used_storage:\s(.*)\%,\sused_cpu', 1))

df_vm1 = df_vm1.drop('value')
df_vm1.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- cpu1: string (nullable = true)
 |-- memory1: string (nullable = true)
 |-- storage1: string (nullable = true)



In [14]:
df_vm1_2 = df_vm1.withColumn(
  'timestamp',
  from_unixtime(unix_timestamp("timestamp","dd-MM-yy hh:mm:ss a"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType())
)

In [15]:
df_vm1_2.writeStream.format('console').start()

23/07/28 20:47:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-5c9d2615-e61d-417e-84b0-7ff8b93a5a6d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/28 20:47:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f4bedf7ec10>

[Stage 9:>                                                          (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+----+-------+--------+
|          timestamp|cpu1|memory1|storage1|
+-------------------+----+-------+--------+
|2023-07-28 11:22:19|0.54|  25.53|      42|
|2023-07-28 11:22:49|0.78|  25.53|      42|
|2023-07-28 11:23:19|1.23|  25.53|      42|
|2023-07-28 11:23:50|0.93|  25.53|      42|
|2023-07-28 11:24:20|0.98|  25.53|      42|
|2023-07-28 11:24:50|0.80|  25.53|      42|
|2023-07-28 11:25:20|0.55|  25.53|      42|
|2023-07-28 11:25:51|0.38|  25.54|      42|
|2023-07-28 11:26:21|0.23|  25.54|      42|
|2023-07-28 11:26:51|0.45|  25.59|      42|
|2023-07-28 11:27:21|0.33|  25.53|      42|
|2023-07-28 11:27:51|0.20|  25.54|      42|
|2023-07-28 11:28:22|0.12|  25.53|      42|
|2023-07-28 11:28:52|0.07|  25.53|      42|
|2023-07-28 11:29:22|0.10|  25.52|      42|
|2023-07-28 11:29:52|0.06|  25.53|      42|
|2023-07-28 11:30:23|0.03|  25.54|      42|
|2023-07-28 11:30:53|0.

In [16]:
df_vm1_water = df_vm1_2.withWatermark('timestamp','10 minutes')

In [17]:
df_vm1_water.writeStream.format('console').start()

23/07/28 20:47:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-54a0a525-a7bc-4d42-b96b-a302a338b44a. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/07/28 20:47:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


<pyspark.sql.streaming.StreamingQuery at 0x7f4bedf7e340>

[Stage 12:>                                                         (0 + 1) / 1]                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+----+-------+--------+
|          timestamp|cpu1|memory1|storage1|
+-------------------+----+-------+--------+
|2023-07-28 11:22:19|0.54|  25.53|      42|
|2023-07-28 11:22:49|0.78|  25.53|      42|
|2023-07-28 11:23:19|1.23|  25.53|      42|
|2023-07-28 11:23:50|0.93|  25.53|      42|
|2023-07-28 11:24:20|0.98|  25.53|      42|
|2023-07-28 11:24:50|0.80|  25.53|      42|
|2023-07-28 11:25:20|0.55|  25.53|      42|
|2023-07-28 11:25:51|0.38|  25.54|      42|
|2023-07-28 11:26:21|0.23|  25.54|      42|
|2023-07-28 11:26:51|0.45|  25.59|      42|
|2023-07-28 11:27:21|0.33|  25.53|      42|
|2023-07-28 11:27:51|0.20|  25.54|      42|
|2023-07-28 11:28:22|0.12|  25.53|      42|
|2023-07-28 11:28:52|0.07|  25.53|      42|
|2023-07-28 11:29:22|0.10|  25.52|      42|
|2023-07-28 11:29:52|0.06|  25.53|      42|
|2023-07-28 11:30:23|0.03|  25.54|      42|
|2023-07-28 11:30:53|0.

# Stream 2: Stream raw data of vm1 from kafka 
- Here we read the stream from kafka topic vm-stat-stream-2 (acumos server) into stringDF2 

In [None]:
df2 = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:29092") \
    .option("subscribe", "vm-stat-stream-2") \
    .option("startingOffsets", "earliest") \
    .load()

In [None]:
stringDF2 = df2.selectExpr("CAST(value AS STRING)")

In [None]:
df_vm2 = stringDF2.withColumn('timestamp', regexp_extract('value', r'timestamp:\s(.*),\sused_memory', 1)) \
        .withColumn('cpu2', regexp_extract('value', r'used_cpu:\s(.*)\%', 1)) \
        .withColumn('memory2', regexp_extract('value', r'used_memory:\s(.*)\%,\sused_storage', 1)) \
        .withColumn('storage2', regexp_extract('value', r'used_storage:\s(.*)\%,\sused_cpu', 1))

df_vm2 = df_vm2.drop('value')

In [None]:
df_vm2_2 = df_vm2.withColumn(
  'timestamp',
  from_unixtime(unix_timestamp("timestamp","dd-MM-yy hh:mm:ss a"),"yyyy-MM-dd HH:mm:ss").cast(TimestampType())
)
df_vm2_2 = df_vm2_2.withColumnRenamed("timestamp","timestamp2")

In [None]:
df_vm2_water = df_vm2_2.withWatermark('timestamp2','10 minutes')

In [None]:
df_vm2_water.writeStream.format('console').start()

# Join two stream data into one stream 

In [None]:
df_join_water = df_vm1_water.join(df_vm2_water,expr("""
    timestamp = timestamp2 AND
    timestamp2 >= timestamp AND
    timestamp2 <= timestamp + interval 1 hour
    """),"leftOuter")

In [None]:
df_join_water.writeStream.format('console').start()

In [None]:
df_join = df_vm1.join(df_vm2, 'timestamp' )

# Publish joined stream data into topic 3 ('output-join-stat') in Kafka broker

In [None]:
nested_struct = struct(df_join.timestamp, df_join.cpu1, df_join.memory1, df_join.cpu2, df_join.memory2)

In [None]:
df_out = df_join.withColumn('value', to_json(nested_struct))

In [None]:
df_out.selectExpr("CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:29092") \
  .option("checkpointLocation", "./spark-warehouse/join-stream-kafka/checkpoint") \
  .option("topic", "output-join-stat") \
  .start()

# Create consumer to read the joined DF in the topic 3 and make the predictions using latest stream data 
![Drag Racing](./images/kafka-predictions1.png)


In [None]:
# import pyspark
# import pyspark.pandas as ps
# import pandas as pd

# #convert spark dataframe to pandas for more visualization
# n_vm = 2
# df_dict={}
# df_dict['vm1'] =  df.toPandas()
# df_dict['vm2'] = df2.toPandas() 

In [None]:
# # rename columns of two dataframe since now they have the same column names
# for i in range(0,n_vm):
#     df_dict['vm'+str(i+1)] = df_dict['vm'+str(i+1)].rename(columns={"cpu": "cpu_vm"+str(i+1), "memory": "memory_vm"+str(i+1),"storage": "storage_vm"+str(i+1)})
#     df_dict['vm'+str(i+1)]['timestamp'] = pd.to_datetime(df_dict['vm'+str(i+1)]['timestamp'],format='%d-%m-%y %I:%M:%S %p').dt.strftime('%Y-%m-%d %H:%M:%S')
#     df_dict['vm'+str(i+1)]['timestamp']= pd.to_datetime(df_dict['vm'+str(i+1)]['timestamp'])
#     df_dict['vm'+str(i+1)].set_index('timestamp',inplace=True)

In [None]:
# join two time series using time stamp index union and sort the index of combined data frame according to time stamp
# combined_df = df_dict['vm1'].join(df_dict['vm2'],how='outer')

In [None]:
# combined_df = combined_df.sort_index()

In [None]:
# combined_df = combined_df.apply(pd.to_numeric, errors='ignore')
# filled_df = combined_df.interpolate(method='ffill').interpolate(method='bfill')

In [None]:
# cols=[]
# for i in range(n_vm):
#     cols.append('storage_vm'+str(i+1))
# clean_df = filled_df.drop(columns=cols)
# clean_df.head()

In [None]:
# print('total number of missing values in clean dataframe:',clean_df.isna().sum())
# minute_df = clean_df.resample('1T').mean()
# nan_count = minute_df.isna().sum()
# print('total number of missing values in reampled dataframe:',nan_count)
# minute_df = minute_df.fillna(method='ffill')
# nan_count = minute_df.isna().sum()
# print('total number of missing values in filled reampled dataframe:',nan_count)

In [None]:
# test_df = minute_df[-40:]

# Make prediction
- Registered model is ready deployed and the url to access the serve model is 'http://mlflowserve:5000/invocations'.
- We construct a REST API call by using package requests of python to send the input X to retrieve the predicted y as follow

In this example:
- X must be an array which contains (n,input_steps,features) where number of features for the case of 2 VMs are 4
- body data must be converted to json using json dumps with the fields 'inputs'

In [None]:
# import numpy as np
# test_df_np = np.array(test_df)
# test_input_np = np.expand_dims(test_df_np[0:30],axis=0)
# print(test_input_np.shape)
# test_input_list = test_input_np.tolist()
# test_label_np = np.expand_dims(test_df_np[30:,[0,2]],axis=0)
# print('test label shape:',test_label_np.shape)

In [None]:
# import json
# import requests

# url = 'http://mlflowserve:5000/invocations'

# headers = {'Content-Type': 'application/json'}
# request_data = json.dumps({"inputs": test_input_list})
# response = requests.post(url,request_data, headers=headers)

In [None]:
# json_response = json.loads(response.content)
# json_response['predictions']

In [None]:
# import matplotlib.pyplot as plt
# max_subplots = 2
# plot_col = 'cpu'
# max_n = max_subplots
# shift = 10
# predictions = np.array(json_response['predictions'])
# print(predictions.shape)
# label_indices = np.arange(predictions.shape[1])
# for n in range(max_n):
#     plt.subplot(max_n, 1, n+1)
#     plt.ylabel(f'{plot_col}')
#     plt.plot(label_indices, test_label_np[0, :, n],
#                 marker='^', label='Labels vm'+str(n+1))
#     plt.plot(label_indices,  predictions[0, :, n],
#                 label='prediction vm'+str(n+1), marker='x')
#     plt.legend()