In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 pyspark-shell'

In [None]:
kafkaBootstrapServers = "kafka-1:19091,kafka-2:29091,kafka-3:39091"
kafkaTopics = "a"

In [None]:
# Create Spark session
spark = SparkSession \
    .builder \
    .appName("StructuredKafka") \
    .getOrCreate()

In [None]:
spark

In [None]:
# Subscribe to 1 topic
kafka_raw_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafkaBootstrapServers) \
  .option("subscribe", kafkaTopics) \
  .load()

In [None]:
kafka_stream = kafka_raw_stream.selectExpr("CAST(key as String)", "CAST(value as String)")

In [None]:
schema = StructType([StructField("ISIN", StringType()), 
                     StructField("Mnemonic", StringType()),
                     StructField("SecurityDesc", StringType()), 
                     StructField("SecurityType", StringType()),
                     StructField("Currency", StringType()),
                     StructField("SecurityID", IntegerType()),
                     StructField("Date", StringType()),
                     StructField("Time", StringType()), 
                     StructField("StartPrice",FloatType()),
                     StructField("MaxPrice",FloatType()),
                     StructField("MinPrice",FloatType()),
                     StructField("EndPrice",FloatType()),
                     StructField("TradedVolume", IntegerType()), 
                     StructField("NumberOfTrades", IntegerType())])

parsed = kafka_stream\
  .select(f.from_json(kafka_stream[1].cast("string"), schema).alias("json"))\
  .select(f.col("json").getField("ISIN").alias("ISIN"), 
          f.col("json").getField("Mnemonic").alias("Mnemonic"), 
          f.col("json").getField("SecurityDesc").alias("SecurityDesc"),
          f.col("json").getField("SecurityType").alias("SecurityType"), 
          f.col("json").getField("Currency").alias("Currency"), 
          f.col("json").getField("SecurityID").alias("SecurityID"),
          f.col("json").getField("Date").alias("Date"), 
          f.col("json").getField("Time").alias("Time"), 
          f.col("json").getField("StartPrice").alias("StartPrice"),
          f.col("json").getField("MaxPrice").alias("MaxPrice"), 
          f.col("json").getField("MinPrice").alias("MinPrice"), 
          f.col("json").getField("EndPrice").alias("EndPrice"), 
          f.col("json").getField("TradedVolume").alias("TradedVolume"), 
          f.col("json").getField("NumberOfTrades").alias("NumberOfTrades"))

In [None]:
# count = parsed.groupBy(["SecurityType","Currency"]).count()

In [None]:
query = parsed \
.writeStream \
.outputMode("append") \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaBootstrapServers) \
.option("topic", "moj_temat") \
.option("checkpointLocation", "/tmp") \
.start()

In [None]:
from pyspark import  SparkContext
sc = SparkContext.getOrCreate()

In [None]:
sc.stop()

In [None]:
#query.awaitTermination()
query.stop()
