In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col ,regexp_replace, substring
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType,DoubleType,FloatType,DateType
from pyspark.sql.functions import expr
import seaborn as sns
import matplotlib.pyplot as plt

scala_version = '2.12'  
spark_version = '3.2.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.spark:spark-streaming-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.spark:spark-token-provider-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.1.1',
    'org.apache.commons:commons-pool2:2.8.0'
]

# Initialize a SparkSession object locally

spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .config("spark.jars.packages", ",".join(packages))\
   .getOrCreate()
spark.sparkContext.setLogLevel('ERROR')

In [3]:
# Set up a streaming DataFrame 'dfraw'

brokers = "Public DNS of EC2:9092"
topic = "mytopic1"

dfraw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", brokers) \
  .option("subscribe", topic) \
  .option("includeHeaders","true") \
  .option("startingOffsets","latest") \
  .load() 

In [4]:
# Set up a streaming query 

rawQuery = dfraw \
        .writeStream \
        .queryName("query_data")\
        .format("memory")\
        .start()

                                                                                

In [5]:
kafka_df = spark.sql("select CAST(value AS STRING), topic, timestamp from query_data")

                                                                                

In [7]:
kafka_df.show(1, False)

[Stage 26:>                                                         (0 + 1) / 1]

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------+
|value                                                                                                                                                                                                                        |topic   |timestamp              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------+
|[{"user_id": 89, "user_sex": "male", "user_age": 33, "user_country": "Finland", "rating": 4.5, "comment": "\"Albus Dumbledore's presence adds gravitas.\"", "favourite_character": "Albus Dumbledore", "date": "2005-04-25"}]|mytopi

                                                                                

In [8]:
kafka_df = kafka_df.withColumn(
    "favourite_character",
    expr("get_json_object(value, '$[0].favourite_character')"))

kafka_df.show(1,truncate=False)


                                                                                

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------+-------------------+
|value                                                                                                                                                                                                                        |topic   |timestamp              |favourite_character|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+-----------------------+-------------------+
|[{"user_id": 89, "user_sex": "male", "user_age": 33, "user_country": "Finland", "rating": 4.5, "comment": "\"Albus Dumbledore's presence adds gravitas.\"", "favourite_c

                                                                                

In [12]:
kafka_df.groupBy('favourite_character').count().show(truncate=False)

[Stage 76:>                                                         (0 + 1) / 1]

+-------------------+-----+
|favourite_character|count|
+-------------------+-----+
|Ron Weasley        |9    |
|Severus Snape      |3    |
|Hermione Granger   |10   |
|Harry Potter       |12   |
|Albus Dumbledore   |10   |
|Rubeus Hagrid      |8    |
|Neville Longbottom |6    |
|Draco Malfoy       |5    |
+-------------------+-----+



                                                                                