In [1]:
import os
import findspark

In [2]:
findspark.init()
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.elasticsearch:elasticsearch-hadoop:7.4.1 pyspark-shell'

In [3]:
import sys 
import json
import kafka
from ast import literal_eval
from pyspark.sql import SparkSession 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LogisticRegressionModel
from pyspark.ml import PipelineModel

In [4]:
spark = SparkSession.builder.config("es.index.auto.create", "true").appName("SSKafka").getOrCreate()

In [5]:
sc = spark.sparkContext

In [6]:
ssc = StreamingContext(sc, 2)

In [7]:
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test_agg") \
  .load()
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [8]:
message = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
message.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [9]:
schema = StructType([
            StructField("stock", StringType(), True),
            StructField("date", StringType(), True),
            StructField("prices", ArrayType(DoubleType()), True)])
schema

StructType(List(StructField(stock,StringType,true),StructField(date,StringType,true),StructField(prices,ArrayType(DoubleType,true),true)))

In [10]:
stockValue = message.select(from_json(col("value"), schema).alias("data")).select("data.*")
stockValue.printSchema()

root
 |-- stock: string (nullable = true)
 |-- date: string (nullable = true)
 |-- prices: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [11]:
stockValueFinal = stockValue.select("stock", "date", stockValue.prices[0].alias("n5"), stockValue.prices[1].alias("n4"), stockValue.prices[2].alias("n3"), stockValue.prices[3].alias("n2"), stockValue.prices[4].alias("n1"))
stockValueFinal.printSchema()
#stockValueFinal.writeStream.format("parquet").option("path", "/tmp/test2").option("checkpointLocation", "checkpoint").start()

root
 |-- stock: string (nullable = true)
 |-- date: string (nullable = true)
 |-- n5: double (nullable = true)
 |-- n4: double (nullable = true)
 |-- n3: double (nullable = true)
 |-- n2: double (nullable = true)
 |-- n1: double (nullable = true)



In [12]:
pipeline = PipelineModel.load("FINAL_MODEL")

In [13]:
pipeline

PipelineModel_d71c425ebdd0

In [14]:
prediction = pipeline.transform(stockValueFinal)
prediction.printSchema()

root
 |-- stock: string (nullable = true)
 |-- date: string (nullable = true)
 |-- n5: double (nullable = true)
 |-- n4: double (nullable = true)
 |-- n3: double (nullable = true)
 |-- n2: double (nullable = true)
 |-- n1: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [15]:
#prediction.writeStream.format("parquet").option("path", "/tmp/test").option("checkpointLocation", "checkpoint").start()

In [16]:
asd = prediction.selectExpr("CAST(stock AS STRING) AS key", "to_json(struct(*)) AS value")
asd.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [17]:
from random import random

In [None]:
asd.selectExpr("key", "value") \
   .writeStream \
   .format("kafka") \
   .outputMode("append") \
   .option("kafka.bootstrap.servers", "localhost:9092") \
   .option("topic", "testoutput") \
   .option("checkpointLocation", "checkpoint10") \
   .start() \
   .awaitTermination() 

In [None]:
break

In [None]:
asd.selectExpr("key", "value") \
   .writeStream \
   .format("es") \
   .outputMode("append") \
   .option("checkpointLocation", "checkpoint2") \
   .option("es.resources", "data") \
   .option("es.nodes", "localhost") \
   .start("data/doc-type") \
   .awaitTermination() 