In [None]:
import pyspark.sql.streaming
from pyspark.sql import SparkSession, types, functions
from pyspark import SparkConf, SparkContext
import sys
from kafka import KafkaConsumer
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler

conf = SparkConf().setAppName('read stream')
sc = SparkContext(conf=conf).getOrCreate()
assert sys.version_info >= (3, 5)  # make sure we have Python 3.5+
assert sc.version >= '2.2'  # make sure we have Spark 2.2+
spark = SparkSession.builder.appName('nasa log').getOrCreate()

#Defining the topic name where the data is published
topic = 'test'

#Consuming the message from Kafka message broker, topic = 'test'
messages = spark.readStream.format('kafka') \
        .option('kafka.bootstrap.servers', 'localhost:9092') \
        .option('subscribe', topic).load()

#Selecting the values in structured streaming
message = messages.select(functions.decode(messages['value'],'UTF-8').alias('val'))
msg = functions.split(message['val'],';')
for i in range(0,64):
    col = 'col_'+str(i)
    message = message.withColumn(col, msg.getItem(i))

def string_to_float(col):
    return float(col)


udf_float=functions.udf(string_to_float, returnType=types.FloatType())
columns=message.schema.names
for column in columns:
        message=message.withColumn(column+'_f',udf_float(column)).drop(column)


columns=message.schema.names
temp=''
for name in columns[2:]:
    temp=temp+"avg(abs("+name+")),variance(abs("+name+")),"


temp=temp.rstrip(",")

message.createOrReplaceTempView("aggregate")
message=spark.sql('''select '''+temp+'''
from aggregate ''')

#Loading the Randomforest Classifier
sameModel=RandomForestClassificationModel.load("/home/wolvorine/RFClassifier/stages/1_RandomForestClassifier_40ab83d4c98016a19d8d")

vecAssembler = VectorAssembler(inputCols=message.schema.names,outputCol="features")
message = vecAssembler.transform(message)

#Applying the model
message = sameModel.transform(message)
stream = message.writeStream.format('console').outputMode('update').trigger(processingTime='3 seconds').start()

stream.awaitTermination()