# COMP3002 Big Data and Cloud Project
## Task 1

This notebook describes the tasks that you must complete for the first task.  You should complete the work in this notebook and ensure that you regularly commit it to your GitHub classroom.  You can choose to include additional python .py files if you wish to create some helper functions to keep this notebook clean.  Make sure they are committed to the GitHub repository too.

### Scenario

You are provided with a small sample dataset of Amazon Review Data.  This notebook talks you through the process of loading that data into Spark SQL and asks you to analyse that data.  On the Block Release day on 18th November (Open Cohort) and 20th November (Ford Cohort) you will have access to a larger dataset hosted in the Cloud.  Much of the day will be spent moving your solutions to the cloud, and answering additional questions which will be set on the day.

If you do not finish everything during Block Release.  You will have additionl time to reflect on the experience and finalise your code before final submission on 26th November.

Amazon Review Data was downloaded from [here](https://jmcauley.ucsd.edu/data/amazon/) but a small sample is provided with this assignment.

### Learning Outcomes

Remember that the primary aim with this task is not to get the "correct" answer, but for you to use the time to become confident with some basic Big Data processing.

* **LO1** Understand the principles that allow the processing of big data sets.

* **LO3** Understand the limitations of big data technologies for distributed processing.

* **LO3** Demonstrate practical skills required to implement big-data solutions using modern large-scale data and compute infrastructures.

### Assessment

Assessment follows a similar approach to that used previously on the programme.  This small task attracts up to a grade C.  The second task to be released later this term will allow you to stretch to higher grades.

<table>
    <tr>
        <th align="left">Grade</th>
        <th align="left"><p>Criteria</p></th> 
    </tr>
    <tr>
        <td>C (50)</td>
        <td align="left">
            <p>In addition to the requirements for D-grade, the work should:</p>
            <ul align="left">
                <li align="left">
                    <p>Demonstrate the ability to implement a solution to the challenge tasks posed during the block release day using the Spark Cluster.</p>
                </li>
            </ul>
            <p>If the solution is not complete, a C-grade may still be awarded if a strong narrative is provided to explain where further work is needed and what the next steps would be.</p>
        </td>
    </tr>
    <tr style="background-color: #FBB36B;">
        <td>D (40)</td>
        <td align="left">
            <p>As this is the passing grade for the project, you must achieve all the learning outcomes.</p>
            <p>The work should meet the following minimum criteria:</p>
            <ul align="left">
                <li align="left">
                    <p>Work should be a Jupyter notebook submitted via Github Classrooms with accompanying helper .py files that are free from errors and execute successfully.</p>
                </li>
                <li align="left">
                    <p>The notebook demonstrates that the apprentice can:</p>
                    <ol>
                        <li align="left">
                            <p>Connect to a spark context.</p>
                        </li>
                        <li align="left">
                            <p>Transmit data to Spark.</p>
                        </li>
                        <li align="left">
                            <p>Execute remote transformations and actions on Spark.</p>
                        </li>
                        <li align="left">
                            <p>Retrieve outputs and present them in a suitable manner.</p>
                        </li>
                    </ol>
                </li>
                <li align="left">
                    <p>Provide acceptable answers to questions posed in the task template.</p>
                </li>
            </ul>
            <p>The work may be limited in that:</p>
            <ul align="left">
                <li align="left">
                    <p>It may only run on a single machine via PySpark.</p>
                </li>
                <li align="left">
                    <p>It may not demonstrate an attempt at the challenge tasks posed during the block release day.</p>
                </li>
            </ul>
        </td>
    </tr>
    <tr>
        <td>E (30)</td>
        <td align="left">
            <p>Learning outcomes not met at threshold level, but with additional work a pass could be achieved.  This may mean that code does not run, or solutions are that achieve the brief but without successfully using the Spark infrastructure.</p>
        </td>
    </tr>
    <tr style="background-color: #FBB36B;">
        <td>F (0-29)</td>
        <td align="left">
            <p>Learning outcomes not met at threshold level, but with additional work a pass could be achieved.  This may mean that code does not run, or solutions are that achieve the brief but without successfully using the Spark infrastructure.</p>
        </td>
    </tr>
</table>

<style>
    tr:nth-child(odd) {
        background-color: orange;
    }
</style>

In addition, for grade of E and above 10 discretionary marks are available for presentation quality of submission (including coding).

First you need to establish a Spark Session in a slighlty different way using Spark SQL:

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, weekofyear, col, avg, round, abs, udf, length, trim, when
from pyspark.sql.types import DoubleType, StringType, IntegerType
import spark_utils as su
from pyspark.sql.window import Window
from matplotlib import pyplot as plt
import pandas as pd
import os
import requests


In [0]:
## Initialise SparkSession (only needed in VS Code - comment out for Databricks)
# spark = su.get_spark_session(app_name="Task1")

In [0]:
## Load JSON data into DataFrame (applicable for VS Code - comment out for Databricks)
# file_path = "data/reviews.json"
# df = su.load_data(spark, file_path, file_format="json")

In [0]:
## Load JSON data into DataFrame (applicable for Databricks Code - comment out for VS Code)

# Define GitHub URL and local path on the driver
github_raw_url = "https://raw.githubusercontent.com/UoN-CS/zdat3002-coursework-1-2025-ngudhka/refs/heads/main/data/reviews.json?token=GHSAT0AAAAAADOHB2QNKCTQRPPNSLBR2RZ22JESYWA"
local_file_name = "reviews.json" # Just the file name

# Download the file to the driver's local filesystem
print(f"Downloading {github_raw_url} to driver's local path: {local_file_name}")
!wget -O {local_file_name} {github_raw_url}

# Define the absolute path on the driver's local filesystem
driver_local_absolute_path = f"file:{os.path.abspath(local_file_name)}"

# Define the target path in DBFS
dbfs_target_path = f"/FileStore/reviews/{local_file_name}"

# Ensure the DBFS directory exists
dbutils.fs.mkdirs(os.path.dirname(dbfs_target_path))

# Copy the file from the driver's local filesystem to DBFS
print(f"Copying from driver's local '{driver_local_absolute_path}' to DBFS '{dbfs_target_path}'...")
dbutils.fs.cp(driver_local_absolute_path, dbfs_target_path)
print("File copied to DBFS successfully.")

# Load JSON data into DataFrame from the DBFS path
print(f"Loading JSON data from DBFS: {dbfs_target_path}")
df = spark.read.json(dbfs_target_path)
 

Having imported the data, take a look at the schema.  Perhaps try running some SQL queries over it.  I've suggested a first example, but you can come up with more questions.

**Can you plot how many ratings of each grade are present in the data?**

### Hints

You've loaded your data, and you want to try and process that data remotely as much as possible, only collecting results at the end.

You can add columns to the remote DataFrame using

df.withColumn("myColumnName", data)

You can execute SQL like operations such as group by and order by:

df.orderBy("columnName")
df.groupBy("columnName")

Think about how you would transform the data in the dataframe, and then collect just the data needed to make the plot.

In [0]:
# Inspect the data
su.display_df_info(df)

In [0]:
# Create aggregated remote dataframe with count how many ratings of each grade are present. Note grade is represented in the "overall" column
ratings_distribution = su.get_unique_values(df, "overall", order_by_count=False)

# take the above remote data frame and convert to Pandas to enable plotting using matplotlib
ratings_distribution_pd = ratings_distribution.toPandas()

# Generate bar plot for the rating counts
su.plot_bar_chart(
    data_series=ratings_distribution_pd['count'],
    labels=ratings_distribution_pd['overall'],
    title="Distribution of Ratings",
    x_label="Rating Grade",
    y_label="Number of Reviews",
    figsize=(8, 5)
)



**Can you create a histogram of the number of reviews received on each week of the year.  Are there any patterns present?**

In [0]:
# Create a new remote dataframe to include the "Review Date" and "Week of Year"
df_with_date_week = df.withColumn("reviewDate", to_date(col("reviewTime"), "MM d, yyyy")) \
                      .withColumn("weekOfYear", weekofyear(col("reviewDate")))

# Use new remote dataframe to generated an remote aggregated dataframe for counts by week
reviews_per_week = su.get_unique_values(df_with_date_week, "weekOfYear", order_by_count=False)

# take the above remote data frame and convert to Pandas to enable plotting using matplotlib
reviews_per_week_pd = reviews_per_week.toPandas()

# Generate bar plot for the review count per week
su.plot_bar_chart(
    data_series=reviews_per_week_pd['count'],
    labels=reviews_per_week_pd['weekOfYear'],
    title="Number of Reviews per Week of the Year",
    x_label="Week of the Year",
    y_label="Number of Reviews",
    figsize=(15, 6),
    rotation=45 # Rotate labels if they overlap
)


Based on the histogram of the number of reviews received each week of the year, here are the patterns I observe:

1. Overall Consistency: The most prominent pattern is a relatively consistent number of reviews throughout the year. There aren't massive spikes or dramatic drops, suggesting a steady stream of reviews rather than highly seasonal or event-driven review behavior.
2. Slight Mid-Year Dip: There appears to be a minor dip in the number of reviews during the middle part of the year, roughly from Week 21 to Week 32 (which corresponds to late May through early August). Review counts during these weeks are often slightly lower than the annual average. This could potentially be attributed to summer holidays when people might be less active in submitting product reviews.
3. Minor Increases in Spring and Autumn: Conversely, there are minor upticks in review activity during the spring (around Weeks 13-16, March/April) and autumn (around Weeks 38-44, September/October). These periods might correlate with general shopping seasons or specific product releases that aren't tied to major annual holidays.
4. Absence of Major Holiday Spikes: Interestingly, despite common retail trends, there isn't a significant surge in reviews around major shopping holidays like Thanksgiving, Black Friday, or Christmas (which would typically fall in Weeks 47-52). The review counts in these weeks remain largely in line with the rest of the year, indicating that the products being reviewed might be heavily gift-oriented.
5. No Extreme Outliers: No single week stands out as an extreme outlier with an exceptionally high or
low number of reviews, reinforcing the idea of a stable and predictable review generation process
for this dataset.

In summary, the pattern suggests a product category or customer base that provides reviews at a fairly constant rate throughout the year, with only subtle seasonal variations.

**Can you think of your own query?**

Let's identify reviews that are deviating from reviewer's average ratings.

In [0]:
# Let's set a deviation threshold
deviation_threshold = 1.5

# Define the window specification for defining the average calculation on a particular row.
# We want to take the average rating for a particular reviewer ID
window_spec = Window.partitionBy("reviewerID")

# Add a new column with the reviewer's average rating using PySpark's window function (avg().over())
df_with_reviewer_avg = df.withColumn("reviewerAvgRating",
    round(avg(col("overall")).over(window_spec), 2) 
)

# Add a new column to calculate the deviation from the reviewer's average rating
# using the PySpark column operations
df_with_deviation = df_with_reviewer_avg.withColumn("ratingDeviation",
    round(col("overall") - col("reviewerAvgRating"), 2) # Direct PySpark function usage
)

# Filter the DataFrame to find reviews where the absolute deviation is significant
# using PySpark column conditions (abs())
unusual_reviews_df = df_with_deviation.filter(
    abs(col("ratingDeviation")) >= deviation_threshold # Direct PySpark function usage
)

# Select only the relevant columns for inspection
unusual_reviews_for_display_df = unusual_reviews_df.select(
    ["reviewerID", "overall", "reviewerAvgRating", "ratingDeviation", "asin", "reviewText", "summary"]
)

# Order the results for better readability, showing the largest deviations first
ordered_unusual_reviews_df = unusual_reviews_for_display_df.orderBy(
    ["ratingDeviation"], # Order by deviation magnitude
    ascending=False      # Show largest positive deviations first
)

print(f"Reviews where individual rating deviates from reviewer's average by >= {deviation_threshold} stars:")

ordered_unusual_reviews_df.show(n=20, truncate=False)

print("-----------------------------------------------------")

Task 1 stretch task.

Can you calculate the sentiment score for each review?

In [0]:
# Load JSON data into DataFrame from the DBFS path
df = spark.read.json("/mnt/reviews/clothing.json")

# View df info
su.display_df_info(df)

In [0]:
# Get AFINN-111 file for sentiment analysis
afinn_path = "file:////Workspace/Users/scyng2@nottingham.ac.uk/zdat3002-coursework-1-2025-ngudhka/data/AFINN-111.txt"
lines = spark.read.text(afinn_path).rdd.map(lambda r: r[0]).collect()
 
# Check if lines read
print(f"Read {len(lines)} lines. First line: {lines[0]}")

In [0]:
# Load AFINN lexicon into a Python dictionary
afinn_map = {}
try:
    for line in lines: # Iterate through the collected Python list 'lines'
        parts = line.strip().split('\t')
        if len(parts) == 2:
            word, score = parts
            afinn_map[word] = int(score)
        else:
            print(f"Skipping malformed line in AFINN file: {line.strip()}")
except Exception as e: # Catch any potential errors during dictionary creation
    print(f"Error processing AFINN lexicon: {e}")
    spark.stop()
    raise

In [0]:
# Broadcast the AFINN lexicon to all worker nodes
broadcast_afinn = sc.broadcast(afinn_map)
print(f"AFINN lexicon loaded and broadcasted. Contains {len(afinn_map)} terms.")

In [0]:
# Define the UDF for sentiment scoring 
def get_sentiment_score(text):
    if text is None:
        return None
    if not isinstance(text, str):
        return None
    cleaned_text = text.strip()
    if not cleaned_text:
        return float(0)
    words = text.lower().split()
    if not words:
        return float(0)
    score = sum(broadcast_afinn.value.get(word, 0) for word in words)
    return score

sentiment_udf = udf(get_sentiment_score, IntegerType())
print("Sentiment UDF registered.")

In [0]:
# Create a new column for the sentiment score and run the sentiment score function to populate it
df_with_sentiment = df.withColumn("sentiment_score", sentiment_udf(col("reviewText")))

# Display some reviews with their sentiment scores.
print("DataFrame with sentiment scores:")
df_with_sentiment.select("overall", "reviewText", "sentiment_score").show(10, truncate=False)


How can you visualise this sentiment in a useful way?



In [0]:

import seaborn as sns# Box plot of sentiment scores grouped by overall rating

df_with_sentiment_pd = df_with_sentiment.select("overall", "sentiment_score").toPandas()

# Sample a subset to avoid driver OOM when converting to pandas
sample_fraction = 0.05  # Adjust as needed for your cluster size
df_sampled = df_with_sentiment.select("overall", "sentiment_score").sample(fraction=sample_fraction, seed=42)
df_with_sentiment_pd = df_sampled.toPandas()

plt.figure(figsize=(10, 6))
sns.boxplot(x="overall", y="sentiment_score", data=df_with_sentiment_pd, palette="Set2")
plt.title("Sentiment Score Distribution by Overall Rating")
plt.xlabel("Overall Rating")
plt.ylabel("Sentiment Score")
plt.show()


From the above plot it is difficult to see the correlation given the wide range of data points. Let's focus in on the sections of the plots excluding the tail points.


In [0]:
# Plot the boxplot without showing outlier (tail) points
plt.figure(figsize=(10, 6))
sns.boxplot(x="overall", y="sentiment_score", data=df_with_sentiment_pd, palette="Set2", showfliers=False)
plt.title("Sentiment Score Distribution by Overall Rating (No Outliers)")
plt.xlabel("Overall Rating")
plt.ylabel("Sentiment Score")
plt.show()


From the above plot we can now see that there is a clear correlation between overall rating and sentiment score. Higher sentiment scores correspond to higher overall ratings.

Now let's see if we can get a similar plot output considering the whole dataset, not just a sample. To do this we will have to compute the quantile statistics using pyspark functions, and output just the stats for plotting, not the dataframe.

In [0]:
from pyspark.sql import functions as F


# Compute approximate quantiles  #
quantiles = df_with_sentiment.groupBy("overall").agg(
    F.expr("percentile_approx(sentiment_score, array(0.0, 0.25, 0.5, 0.75, 1.0)) as qs")
)

# Unpack quantiles into separate columns
quantiles = quantiles.select(
    "overall",
    F.col("qs")[0].alias("min"),
    F.col("qs")[1].alias("q1"),
    F.col("qs")[2].alias("median"),
    F.col("qs")[3].alias("q3"),
    F.col("qs")[4].alias("max")
)

pdf = quantiles.orderBy("overall").toPandas()

# Plot using matplotlib
plt.figure(figsize=(10, 6))

box_data = []
labels = []

for _, row in pdf.iterrows():
    box_data.append({
        "whislo": row["min"],
        "q1": row["q1"],
        "med": row["median"],
        "q3": row["q3"],
        "whishi": row["max"],
        "fliers": []
    })
    labels.append(str(row["overall"]))

plt.boxplot([[d['whislo'], d['q1'], d['med'], d['q3'], d['whishi']] for d in box_data], showfliers=False)
plt.xticks(range(1, len(labels)+1), labels)
plt.xlabel("Overall Rating")
plt.ylabel("Sentiment Score")
plt.title("Sentiment Score Distribution by Rating (Approx Quantiles)")
plt.show()


The above plot corresponds well to the earlier sample plot. So both are valid techniques for capturing the effectiveness of the sentiment scoring.