In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import udf
import pickle  
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
import pandas as pd
from pyspark.sql.functions import pandas_udf
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("MLStreamExample")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

#load the model 
model_lr = pickle.load(open('/home/jovyan/data/lr_model.pkl', 'rb'))

# create broadcasted model
sc = spark.sparkContext
braodcast_model = sc.broadcast(model_lr)

dataSchema = StructType(
        [StructField("ntp", IntegerType(), True),
         StructField("pgc", IntegerType(), True),
         StructField("dbp", IntegerType(), True),
         StructField("tsft", IntegerType(), True),
         StructField("si", IntegerType(), True),
         StructField("bmi", FloatType(), True),       
         StructField("dpf", FloatType(), True),
         StructField("age", IntegerType(), True),
         StructField("class", StringType(), True)
         ])

df = spark.readStream.schema(dataSchema).format("csv").option("header", "true") \
       .load("/home/jovyan/data/diabetes").drop("class")
df.printSchema()

@udf('integer')
def predict_udf(*cols):
    return int(braodcast_model.value.predict((cols,)))

list_of_columns = df.columns
df_prediction = df.withColumn('prediction', predict_udf(*list_of_columns))

query = df_prediction \
    .writeStream \
    .queryName("diabetes_prediction") \
    .format("memory") \
    .outputMode("append") \
    .start()

try:
    for x in range(5):
        spark.sql("SELECT * FROM diabetes_prediction").show()
        sleep(10)
        
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- ntp: integer (nullable = true)
 |-- pgc: integer (nullable = true)
 |-- dbp: integer (nullable = true)
 |-- tsft: integer (nullable = true)
 |-- si: integer (nullable = true)
 |-- bmi: float (nullable = true)
 |-- dpf: float (nullable = true)
 |-- age: integer (nullable = true)

+---+---+---+----+---+---+---+---+----------+
|ntp|pgc|dbp|tsft| si|bmi|dpf|age|prediction|
+---+---+---+----+---+---+---+---+----------+
+---+---+---+----+---+---+---+---+----------+

+---+---+---+----+---+----+-----+---+----------+
|ntp|pgc|dbp|tsft| si| bmi|  dpf|age|prediction|
+---+---+---+----+---+----+-----+---+----------+
|  1|126| 60|   0|  0|30.1|0.349| 47|         0|
|  1| 93| 70|  31|  0|30.4|0.315| 23|         0|
| 12| 84| 72|  31|  0|29.7|0.297| 46|         0|
|  0|139| 62|  17|210|22.1|0.207| 21|         0|
|  0| 97| 64|  36|100|36.8|  0.6| 25|         0|
|  8|120|  0|   0|  0|30.0|0.183| 38|         1|
|  1| 97| 70|  15|  0|18.2|0.147| 21|         0|
|  6|107| 88|   0|  0|36.8|0.727| 3

In [4]:
# Stop the spark context
spark.stop()

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit, from_json
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab8_Ex2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
    [StructField("username", StringType(), True),
     StructField("teamname", StringType(), True),
     StructField("score", IntegerType(), True),
     StructField("timestamp_in_ms", LongType(), True),
     StructField("readable_time", StringType(), True)
     ])


# Read the whole dataset as a batch
kafkaStream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9093") \
    .option("subscribe", "game") \
    .option("startingOffsets", "latest") \
    .load()

df = kafkaStream.selectExpr("CAST(value AS STRING)")

df1 = df.select(from_json(df.value, dataSchema.simpleString()))

df1.printSchema()

sdf = df1.select(col("from_json(value).*"))

sdf.printSchema()

# create the event time column 
withEventTimedf = sdf.selectExpr(
    "*",
    "cast(timestamp_in_ms/1000.0 as timestamp) as event_time")

withEventTimedf.printSchema()

avgscoredf = withEventTimedf \
    .groupBy(window(col("event_time"), "10 seconds"), "username", "teamname") \
    .agg(avg("score").alias("value"))

resultdf = avgscoredf.select(concat(col("username"), lit(" "), col("teamname")).alias("key"), col("value"))

query = resultdf \
    .writeStream \
    .queryName("avg_score_window") \
    .format("memory") \
    .outputMode("complete") \
    .start()

try:
    for x in range(100):
        spark.sql("SELECT * FROM avg_score_window").show()
        sleep(10)
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")

root
 |-- from_json(value): struct (nullable = true)
 |    |-- username: string (nullable = true)
 |    |-- teamname: string (nullable = true)
 |    |-- score: integer (nullable = true)
 |    |-- timestamp_in_ms: long (nullable = true)
 |    |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)

root
 |-- username: string (nullable = true)
 |-- teamname: string (nullable = true)
 |-- score: integer (nullable = true)
 |-- timestamp_in_ms: long (nullable = true)
 |-- readable_time: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+---+-----+
|key|value|
+---+-----+
+---+-----+

+--------------------+-----+
|                 key|value|
+--------------------+-----+
|user1