In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, TimestampType, IntegerType
from pyspark.sql.functions import from_json, col, avg, sum
import time

# Define the path to the jars on the EC2 instance
spark_jars_path = "/home/ec2-user/stream-processing-template/jars"  # <-- Update this path

spark = SparkSession.builder.appName("retail_pysaprk_consumer") \
    .config("spark.jars", f"{spark_jars_path}/commons-pool2-2.11.1.jar,"
            f"{spark_jars_path}/spark-sql-kafka-0-10_2.12-3.4.0.jar,"
            f"{spark_jars_path}/spark-streaming-kafka-0-10-assembly_2.12-3.4.0.jar") \
    .getOrCreate()


In [None]:
# Define the schema for our data (Column names and Datatype for each column)
schema = StructType([
    StructField("store_location", StringType(), True),
    StructField("time_of_purchase", TimestampType(), True),
    StructField("product_ID", StringType(), True),
    StructField("transaction_amount", IntegerType(), True)
])

# Stream from Kafka topic

# initalise a spark object and read the stream of data using the readStream method
# set the format of the streaming source to kafka
# set the kafka topic to retail_transactions (topic is a feed name to which messages are published)
# load the data and store it in the df variable
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "b-1.monstercluster1.6xql65.c3.kafka.eu-west-2.amazonaws.com:9092") \
    .option("subscribe", "retail_transactions") \
    .load()

In [None]:
# Extract and parse the JSON data - convert the json data to strings and add it to a new column 'data'

# selectExpr allows you to select and transform columns using SQL
# CAST(value AS STRING) - SQL expression that set the datatype for the selected column as a string
# withColumn - Pyspark method used to add a or replace a column to the DF, in this case creating a new column named 'data'
# the from_json function takes the json data from the value column of the stream and inserts into the new column
transactions = (df.selectExpr("CAST(value AS STRING)")
                .withColumn("data", from_json(col("value"), schema))
                .select("data.*"))


# write the transactions DF to an in memory table called temporary_view_one

# writeStream - write the streaming data to memory, whereas readStream reads streaming data
# set the format to memory
# give the query a name using queryName
# start the streaming query
query = transactions.writeStream \
.format("memory") \
.queryName("temporary_view_one") \
.start()

query.awaitTermination(15)

23/12/13 11:39:38 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-ab6e68ef-a6a2-4474-a883-603b0081415c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/12/13 11:39:38 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/12/13 11:39:38 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

False

In [None]:
processed_data = spark.sql("SELECT * FROM temporary_view_one")

# Now you can perform aggregations or other transformations on `processed_data`

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

transactions = processed_data.withColumn("transaction_amount", col("transaction_amount").cast(DoubleType()))

In [None]:
from pyspark.sql.functions import sum
total_transactions = transactions.groupBy("store_location").agg(sum("transaction_amount").alias("total_amount"))
total_transactions.show()


+--------------+------------+
|store_location|total_amount|
+--------------+------------+
|       Phoenix|       847.0|
|   San Antonio|      1120.0|
|  Philadelphia|       654.0|
|   Los Angeles|       965.0|
|       Houston|       586.0|
|      New York|       568.0|
+--------------+------------+



In [None]:
transaction_counts = transactions.groupBy("store_location").count().withColumnRenamed("count", "transaction_count")
transaction_counts.show()

+--------------+-----------------+
|store_location|transaction_count|
+--------------+-----------------+
|       Phoenix|                1|
|   San Antonio|                2|
|  Philadelphia|                1|
|   Los Angeles|                1|
|       Houston|                3|
|      New York|                1|
+--------------+-----------------+



In [None]:
average_transactions = transactions.groupBy("store_location").agg(avg("transaction_amount").alias("average_amount"))
average_transactions.show()

+--------------+--------------+
|store_location|average_amount|
+--------------+--------------+
|       Phoenix|         847.0|
|   San Antonio|         560.0|
|  Philadelphia|         654.0|
|   Los Angeles|         965.0|
|       Chicago|          20.0|
|       Houston|         272.0|
|      New York|         568.0|
+--------------+--------------+



                                                                                