In [64]:
# Section 1: Imports and Environment Setup
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

'''
    Here, we're importing necessary modules and setting the correct environment paths for
    PySpark, Hadoop, and Python.

'''


"\n    Here, we're importing necessary modules and setting the correct environment paths for\n    PySpark, Hadoop, and Python.\n\n"

In [65]:
# Section 2: Define JSON Schema for Incoming Data
json_schema = T.StructType([
T.StructField('application_name', T.StringType()),
T.StructField('translated_review', T.StringType()),
T.StructField('sentiment_rank', T.IntegerType()),
T.StructField('sentiment_polarity', T.FloatType()),
T.StructField('sentiment_subjectivity', T.FloatType())
])

''' 
Defines the structure (json_schema) for parsing the incoming JSON data. This structure
dictates the expected fields in the JSON and their corresponding data types.

'''

' \nDefines the structure (json_schema) for parsing the incoming JSON data. This structure\ndictates the expected fields in the JSON and their corresponding data types.\n\n'

In [66]:
# Section 3: Initialize Spark Session

spark = SparkSession \
.builder \
.master("local") \
.appName('ex6_calculate_reviews') \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2') \
.getOrCreate()

''' 
    Initializes a SparkSession which is the entry point to any Spark functionality.
    Sets the application name and the master for the SparkSession.
    Adds a specific jar for Kafka integration to the Spark job.
'''


' \n    Initializes a SparkSession which is the entry point to any Spark functionality.\n    Sets the application name and the master for the SparkSession.\n    Adds a specific jar for Kafka integration to the Spark job.\n'

In [67]:
# Section 4: Consume Data from Kafka

'''
    Reads streaming data from the Kafka topic named "gps-user-review-source").
    The incoming byte data from Kafka is then converted to a string.

'''

stream_df = spark \
.readStream \
.format('kafka') \
.option("kafka.bootstrap.servers", "course-kafka:9092") \
.option("subscribe", "gps-user-review-source") \
.option('startingOffsets', 'earliest') \
.load() \
.select(F.col('value').cast(T.StringType()))

'''  
# Start streaming and print to the console
query = stream_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false").start()

# Let the stream run for a specific time (e.g., 10 seconds) instead of awaitTermination()
import time
time.sleep(10)  # Run the stream for 10 seconds (adjust as needed)

# Stop the query manually
query.stop()
'''



'  \n# Start streaming and print to the console\nquery = stream_df     .writeStream     .outputMode("append")     .format("console")     .option("truncate", "false").start()\n\n# Let the stream run for a specific time (e.g., 10 seconds) instead of awaitTermination()\nimport time\ntime.sleep(10)  # Run the stream for 10 seconds (adjust as needed)\n\n# Stop the query manually\nquery.stop()\n'

In [68]:
# Section 5: Parse the JSON Data
parsed_df = stream_df \
.withColumn('parsed_json', F.from_json(F.col('value'), json_schema)) \
.select(F.col('parsed_json.*'))


'''
Original column (value): This is a plain string. You can't easily access individual parts of the JSON without parsing it.
New column (parsed_json): This is a structured object (StructType). Spark recognizes the individual fields inside the JSON,
and now you can easily work with those fields in subsequent transformations.

Why Is This Important?
The structured nature of parsed_json allows you to access and manipulate each part of the JSON. For example:

python
Copy code
parsed_df.select("parsed_json.application_name", "parsed_json.sentiment_rank").show()
Will give you:

application_name	sentiment_rank
App A	                1
App B	                2
This would not be possible if the data was still in the unparsed value column because it is just a string.

This transformation is what allows you to access individual elements of the JSON and treat them like regular columns in Spark.
'''


'''
parsed_df = stream_df \
.withColumn('parsed_json', F.from_json(F.col('value'), json_schema)) \
.select(F.col('parsed_json.*'))
'''



"\nparsed_df = stream_df .withColumn('parsed_json', F.from_json(F.col('value'), json_schema)) .select(F.col('parsed_json.*'))\n"

In [None]:
'''
# Print stream_df to the console
query_stream_df = stream_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Print parsed_df to the console
query_parsed_df = parsed_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the streaming queries to finish (or you can specify a timeout)
query_stream_df.awaitTermination()
query_parsed_df.awaitTermination()
'''

In [69]:
# Section 6: Load Static Data from Parquet and Cache

static_data_df = spark.read.parquet('s3a://spark/data/source/google_apps/')
static_data_df.cache()

'''  
Reads static data stored in Parquet format.
Caches (in-memory storage) the data for better performance in subsequent operations.

'''

24/09/26 05:41:40 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


DataFrame[application_name: string, category: string, rating: string, reviews: float, size: string, num_of_installs: double, price: double, age_limit: bigint, genres: string, version: string]

In [72]:
# Section 7: Aggregation, Transformation, and Join Operations

joined_df = parsed_df \
.groupBy(F.col('application_name'))\
.agg(F.sum(F.when(F.col('sentiment_rank') == 1, 1).otherwise(0)).alias('num_of_positive_sentiments'),
    F.sum (F.when (F.col('sentiment_rank') == 0, 1).otherwise(0)).alias('num_of_neutral_sentiments'),
    F.sum(F.when (F.col('sentiment_rank') == -1, 1).otherwise(0)).alias('num_of_negative_sentiments'),
    F.avg(F.col('sentiment_polarity')).alias('avg_sentiment_polarity'),
    F.avg(F.col('sentiment_subjectivity')).alias('avg_sentiment_subjectivity'))\
.join(static_data_df, ['application_name'])

# Joining with Static Data:

'''
.join(static_data_df, ['application_name'])
Explanation: Once the aggregation is done, the result is joined with another DataFrame static_data_df on the
column application_name. This means for every application in the aggregated data (joined_df), the
corresponding details from the static_data_df are added

In essence, the code segment takes streaming review data (parsed_df), groups it by
application, computes various sentiment metrics for each application, and then enriches this
data by joining with static application data (static_data_df) .
'''



In [74]:
# Section 8: Fetching Field Names:

fields_list = joined_df.schema.fieldNames()
# print (fields_list)

'''
Explanation: The fieldNames method retrieves the list of column names (or fields) from the
schema of the joined_df DataFrame. So fields_list is a Python list containing these column
names.
So, after this line, fields_list contains all the column names of joined_df as a Python list.
'''

['application_name', 'num_of_positive_sentiments', 'num_of_neutral_sentiments', 'num_of_negative_sentiments', 'avg_sentiment_polarity', 'avg_sentiment_subjectivity', 'category', 'rating', 'reviews', 'size', 'num_of_installs', 'price', 'age_limit', 'genres', 'version']


'\nExplanation: The fieldNames method retrieves the list of column names (or fields) from the\nschema of the joined_df DataFrame. So fields_list is a Python list containing these column\nnames.\n'

In [79]:
# Section 9: Mapping Field Names to Columns:

fields_as_cols = list(map(lambda col_name: F.col(col_name), fields_list))
print (fields_as_cols)

'''
Explanation: This line maps each column name in fields_list to its respective column in the
DataFrame using the F.col function. The map function is used for this purpose, and the result
is a list of columns, which we store in fields_as_cols.

chatGBT:

map(lambda col_name: F.col(col_name), fields_list):

    1.  map() is a Python function that applies a given function (in this case, a lambda function) to each item in a list (fields_list in this case).
    2.  The lambda col_name: F.col(col_name) part creates a new PySpark Column object for each column name in fields_list.
        In PySpark, F.col(col_name) is a function that returns a column object for a given column name, where F refers to pyspark.sql.functions.
So, for each column name in fields_list, the lambda function creates an object representing that column in the DataFrame.

list(...): The map() function returns a map object, which is then converted to a list using list().

Example:

Suppose joined_df has the following schema with columns 'name', 'age', and 'city':

    fields_list = ['name', 'age', 'city'] (a list of column names).
    ields_as_cols = [F.col('name'), F.col('age'), F.col('city')] (a list of column objects).

'''

[Column<'application_name'>, Column<'num_of_positive_sentiments'>, Column<'num_of_neutral_sentiments'>, Column<'num_of_negative_sentiments'>, Column<'avg_sentiment_polarity'>, Column<'avg_sentiment_subjectivity'>, Column<'category'>, Column<'rating'>, Column<'reviews'>, Column<'size'>, Column<'num_of_installs'>, Column<'price'>, Column<'age_limit'>, Column<'genres'>, Column<'version'>]


"\nExplanation: This line maps each column name in fields_list to its respective column in the\nDataFrame using the F.col function. The map function is used for this purpose, and the result\nis a list of columns, which we store in fields_as_cols.\n\nchatGBT:\n\nmap(lambda col_name: F.col(col_name), fields_list):\n\n    1.  map() is a Python function that applies a given function (in this case, a lambda function) to each item in a list (fields_list in this case).\n    2.  The lambda col_name: F.col(col_name) part creates a new PySpark Column object for each column name in fields_list.\n        In PySpark, F.col(col_name) is a function that returns a column object for a given column name, where F refers to pyspark.sql.functions.\nSo, for each column name in fields_list, the lambda function creates an object representing that column in the DataFrame.\n\nlist(...): The map() function returns a map object, which is then converted to a list using list().\n\nExample:\n\nSuppose joined_df has 

In [84]:
# Section 10: Converting Data to JSON Format:

json_df = joined_df \
.withColumn('to_json_struct', F.struct(fields_as_cols)) \
.select(F.to_json(F.col('to_json_struct')).alias('value'))

'''
Explanation:
    • First, we use the withColumn method to create a new column named
        'to_json_struct'. This column is a struct type, combining all the columns in
        fields_as_cols.
    • Then, we use the F.to_json function to convert the struct to a JSON string. We select
        only this JSON string and alias it as 'value'.

        ChatGBT:
            Summary:
                1. withColumn('to_json_struct', F.struct(fields_as_cols)):
                 This creates a new column to_json_struct where all the existing columns in joined_df are combined into a struct.

                2. .select(F.to_json(F.col('to_json_struct')).alias('value')): 
                    This converts the struct into a JSON string and renames the resulting column as 'value'.
                    
The final DataFrame has a single column ('value') where each row contains the JSON representation of the original columns in the joined_df DataFrame.
'''

"\nExplanation:\n    • First, we use the withColumn method to create a new column named\n        'to_json_struct'. This column is a struct type, combining all the columns in\n        fields_as_cols.\n    • Then, we use the F.to_json function to convert the struct to a JSON string. We select\n        only this JSON string and alias it as 'value'.\n\n        ChatGBT:\n            Summary:\n                1. withColumn('to_json_struct', F.struct(fields_as_cols)):\n                 This creates a new column to_json_struct where all the existing columns in joined_df are combined into a struct.\n\n                2. .select(F.to_json(F.col('to_json_struct')).alias('value')): \n                    This converts the struct into a JSON string and renames the resulting column as 'value'.\n                    \nThe final DataFrame has a single column ('value') where each row contains the JSON representation of the original columns in the joined_df DataFrame.\n"

In [87]:
# Section 11: Writing the Stream to Kafka:

query = json_df \
.writeStream \
.format('kafka') \
.option("kafka.bootstrap.servers", "course-kafka:9092") \
.option("topic", "gps-with-reviews") \
.option('checkpointLocation', 's3a://spark/checkpoints/ex6/review_calculation') \
.outputMode('update') \
.start()

'''
Explanation:
• The writeStream method indicates we're setting up a streaming write.
• We specify the format as 'kafka' since we're writing to a Kafka topic.
• We provide the Kafka bootstrap server details and the target topic name.
• The checkpoint location (checkpointLocation) is specified to allow Spark to keep
    track of which records have been processed, ensuring fault tolerance and stream
    resumption capabilities.
• The outputMode is set to 'update', meaning only the rows that have changed are
    written to the output sink.
• Finally, start() initiates the streaming process.
'''


24/09/26 06:53:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


24/09/26 06:53:41 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/09/26 06:53:41 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/09/26 06:53:41 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/09/26 06:53:41 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known config.
24/09/26 06:53:41 WARN AdminClientConfig: The configuration 'auto.offset.reset' was supplied but isn't a known config.
24/09/26 06:53:43 WARN Sender: [Producer clientId=producer-1] Got error produce response with correlation id 218 on topic-partition gps-with-reviews-0, retrying (2147483646 attempts left). Error: UNKNOWN_TOPIC_OR_PARTITION
24/09/26 06:53:43 WARN Sender: [Producer clientId=producer-1] Received unknown topic or partition error in produce request on partition gps-with-reviews-0. The topic-partition may no

In [None]:
# Section 12: Waiting for Stream Termination:

query.awaitTermination()

'''
Explanation: This line ensures the Spark application keeps running and processing the data
until an external action stops it or it encounters an error.
'''


In [89]:
# Section 13: Removing Cached Data:

static_data_df.unpersist()

'''
Explanation: The unpersist method removes the static_data_df DataFrame from cache. Since
we called cache() on this DataFrame earlier, it's a good practice to free up memory once it's
no longer needed by unpersisting.
'''

"\nExplanation: The unpersist method removes the static_data_df DataFrame from cache. Since\nwe called cache() on this DataFrame earlier, it's a good practice to free up memory once it's\nno longer needed by unpersisting.\n"

In [91]:
# Section 13: Stopping the Spark Session:

spark.stop()

'''
Explanation: This method stops the active SparkSession, ensuring all resources are freed and
no further operations occur.

Summary:
after joining and aggregating the data, the code prepares the results in a JSON format and
streams it to a Kafka topic. Once streaming is done, it releases the cached static data and
stops the Spark session.

'''

'\nExplanation: This method stops the active SparkSession, ensuring all resources are freed and\nno further operations occur.\n\nSummary:\nafter joining and aggregating the data, the code prepares the results in a JSON format and\nstreams it to a Kafka topic. Once streaming is done, it releases the cached static data and\nstops the Spark session.\n\n'