<a href="https://colab.research.google.com/github/kghdxx/8.2CDevSecOps/blob/main/Task2Part1_Kaylin.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
from pyspark.sql.functions import from_unixtime, to_date, col
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql import DataFrame


In [None]:
#Q1 (boilerplate from Darrin's code)

# Download the data for part 1 from the the GitHub repository
!wget -q https://github.com/tulip-lab/sit742/raw/develop/Jupyter/data/business_review_submission.zip

# Unzip the folder int othe data directory
!unzip business_review_submission.zip -d data

# Remove the zip file
!rm business_review_submission.zip

spark = SparkSession.builder.appName("SIT742-A2-P1").getOrCreate()

# Load data
data_file_path = "./data/review.csv"
sdf = spark.read.csv(data_file_path, header=True, inferSchema=True)

# Inspect headers
sdf.show()



Q 1.1.1
For the none or null in text column, change it to 'no review'

In [None]:
sdf_text_without_null = sdf.na.fill({'text': 'no review'})

Q 1.1.2 Process the content in time column, and convert the strings from time to yyyy-mm-dd format
in the new column as newtime and show the first 5 rows.


In [None]:

# Add new date value, 'newtime_long' as dummy column
sdf_with_new_date = (
    sdf_text_without_null
    .withColumn("newtime_long", col("time").cast("long"))
    .withColumn("newtime", to_date(from_unixtime( (col("newtime_long")/1000).cast("long") )))
)

sdf_with_new_date.show(20)


'''
This changes the unix datetime value to long type so that it can be converted
to a shorter version of unix time value, so that it can finally be converted
properly with spark's "to_date" function to YYYY-mm-dd
'''

Q 1.2

Find out the information for gmap_id on the reviews. In order to achieve the above, some wrangling
work is required to be done:


In [None]:
from pyspark.sql import functions as F, types as T

# Typically, incoming data should have schema defined on read,
# The line of questioning asks for wrangling in this step (Q1.2) so performing
# a second read with defined schema.

schema = T.StructType([
    T.StructField("user_id",     T.FloatType(),  True),
    T.StructField("name",        T.StringType(),  True),
    T.StructField("time",       T.LongType(), True),
    T.StructField("rating",       T.IntegerType(), True),
    T.StructField("text",        T.StringType(),  True),
    T.StructField("pics",       T.StringType(), True),
    T.StructField("resp",       T.StringType(), True),
    T.StructField("gmap_id",     T.StringType(),  True),
])

required = ["user_id"] # enforce non-null after read
enforce_types_on = ["time","rating" ] # drop rows where columns failed to parse

sdf = (spark.read
       .schema(schema)
       .option("header", True)
       .csv("./data/review.csv"))

# Keep rows where key is present AND typed columns parsed (not null)
sdf_clean = sdf.dropna(subset=required + enforce_types_on)

# Clean-up the name column for incorrect and unwanted names

# Tidy up whitespacing
sdf_clean_name = sdf_clean.withColumn("name", F.regexp_replace(F.trim("name"), r"\s+", " "))

# Accept only letters, spaces, dots and sizes between 2 and 60 for name
name_regex = r"^(?=.{2,60}$)\p{L}+(?:[ .'-]\p{L}+)*$"

# Filter out incorrect charachters in name column
sdf_clean_name = sdf_clean_name.filter(F.col("name").rlike(name_regex))

# View and inspect
inspect_cleaned_data = sdf_clean_name.describe()

inspect_cleaned_data.show(truncate = False)

sdf_clean_name.show(5)

# re-add the newtime column

sdf_cleaned_with_new_dates = (
    sdf_clean_name
    .withColumn("newtime_long", col("time").cast("long"))
    .withColumn("newtime", to_date(from_unixtime( (col("newtime_long")/1000).cast("long") )))
)

# Drop helper colum, not needed
sdf_cleaned_with_date = sdf_cleaned_with_new_dates.drop("newtime_long")

# Inspect again
sdf_cleaned_with_date.show(5)

Q 1.2.1 Using pyspark to calculate the number of reviews per each unique gmap_id and save as float
format in pyspark dataframe to show the top 5 rows.


In [None]:
# Use agg to define the float as part of the call
gmap_gby = (
    sdf_cleaned_with_date
      .filter(F.col("gmap_id").isNotNull())
      .groupBy("gmap_id")
      .agg(F.count("*").cast("float").alias("review_count"))
      .orderBy(F.desc("review_count"))
)
gmap_gby.show(5, truncate=False)

'''
Create a new temporay sdf without nulls (for the purpose of this question only) then take a groupby of all the gmap_id values to ensure a unique set, add a count aggregation of each row of for each matching gmap_id.
Cast this column as a float.
Sort descending, from highest to lowest.
'''


Q 1.2.2 Transform the current pyspark dataframe to pandas dataframe (named as df)
and create the column reivew_time with the information of review time on hours
level. Print your df pandas dataframe with top 5 rows after creating the column
review_time.

In [None]:
# Conevert the Spark df to pandas

sdf_valid_dates = sdf_cleaned_with_date.na.drop(subset=["newtime"])

df = sdf_valid_dates.toPandas()

# Create a Date time column

df["date_time"] = pd.to_datetime(df["time"].astype(int), unit="ms")

# time in hours
df["review_time"] = df["date_time"].dt.hour

df.head(5)


Q 1.2.3

Using matplotlib or seaborn to draw some (two or more if possible) visualizations on the
relationship between gmap_id and reivew_time. You could explore for example, what is the
time people usually review? How many business is reviewed in the morning time etc. Please
also discuss the insights you are finding with your visualizations in the markdown cell. Please
also include your findings and visualizations in the report.

In [None]:
# Use existing code

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np


counts = (df['review_time']
          .value_counts()
          #.reindex(range(24), fill_value=0)
          .sort_index())

plt.figure(figsize=(9,4))
counts.plot(kind='bar')
plt.title('Reviews by Hour of Day')
plt.xlabel('Hour (0–23)')
plt.ylabel('Review Count')
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()


Q 1.3 Let’s continue to analyze the reivew_time with reviews and related gmap_id. You need to use
another data meta-business to join with the current dataframe on gmap_id.




Q 1.3.1  Determine which workday (day of the week), generates the most reviews (plotting the results
in a line chart with workday on averaged submissions).

In [None]:
""" 1.3.1 Determine which workday (day of the week), generates the most reviews (plotting the results
in a line chart with workday on averaged submissions).
"""
import matplotlib.pyplot as plt
# Use the time column to get a DateTime object
# extract the day of the week from the DateTime object
df["workday"] = df["date_time"].dt.strftime("%A")

# Also need the week number to average the submissions
df["week"] = df["date_time"].dt.isocalendar().week

# Calculate the number of reviews for each day in each week
weekly_counts = df.groupby(["week", "workday"]).size().reset_index(name="review_counts")

# Calculate average reviews per weekday across all weeks
# Reindex for plotting
average_reviews = weekly_counts.groupby("workday")['review_counts'].mean().reindex([
    "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"
])

# Identify the weekday with the highest average reviews
busiest_workday = average_reviews.idxmax()
print(f"Workday with the highest average number of reviews: {busiest_workday}")

# Plotting the results in a line chart
plt.figure(figsize=(10, 6))
plt.plot(average_reviews.index, average_reviews.values, marker='o', linestyle='-')
plt.title("Average Submissions per Workday")
plt.xlabel("Workday")
plt.ylabel("Average Number of Reviews")
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


Q 1.3.2

In [None]:
""" 1.3.2 Identify the names of business (column name from data meta-business) that has the highest
averaged ratings on ‘that workday’ (you need to find out from 1.3.1), and find out which
category those businesses are from?
"""
from collections import Counter
import ast

# Load meta-review-business data
meta_file_path = "./data/meta-review-business.csv"
meta_business_df = pd.read_csv(meta_file_path)
meta_business_df = meta_business_df.rename(columns={"name": "business_name", "category": "business_category"})

# Select only the relevant columns from meta-review-business
meta_subset_df = meta_business_df[["gmap_id", "business_name", "business_category"]]

# Filter reviews for the busiest weekday
reviews_on_busiest_day = df[df["workday"] == busiest_workday]

# Join with meta-business to get business names and categories
merged_df = pd.merge(reviews_on_busiest_day, meta_subset_df, on="gmap_id", how="left")

# Need to ensure rating values are floats
merged_df["rating"] = pd.to_numeric(merged_df["rating"], errors='coerce')

# Calculate average rating per business
avg_ratings = merged_df.groupby("business_name")["rating"].mean()

# Find business(es) with highest average rating
max_avg_rating = avg_ratings.max()
top_businesses = avg_ratings[avg_ratings == max_avg_rating].index.tolist()

print(f"The number of business that recived an average rating of {max_avg_rating} on {busiest_workday}'s is {len(top_businesses)}.")

# Filter merged_df to include only top-rated businesses
filtered_df = merged_df[merged_df["business_name"].isin(top_businesses)]

# Select business name and category
top_bus_cat = filtered_df[["business_name", "business_category"]]

# Drop duplicates
top_bus_cat = top_bus_cat.drop_duplicates("business_name").reset_index(drop=True)

# Categories are stories as strings, convert them to a list
top_bus_cat["business_category"] = top_bus_cat["business_category"].apply(ast.literal_eval)

# Extract only the first category from each list
top_bus_cat["primary_category"] = top_bus_cat["business_category"].apply(lambda x: x[0] if isinstance(x, list) and x else None)

# Count the most common primary categories
category_counts = top_bus_cat["primary_category"].value_counts()

# Plot the results
plt.figure(figsize=(10, 6))
category_counts.plot(kind="bar", color="skyblue")
plt.title("Most Common Primary Categories Among Top-Rated Businesses")
plt.xlabel("Primary Category")
plt.ylabel("Number of Businesses")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
# Alternative, show top 15


import pandas as pd
import matplotlib.pyplot as plt

N = 15  # top count

top = category_counts.nlargest(N).copy()
other = category_counts.drop(top.index).sum()
if other > 0:
    top.loc['Other'] = other

top = top.drop('Other', errors='ignore')  # Remove "other" category because it
# is too large for the purpose of the visual

plt.figure(figsize=(8, 6))
top.sort_values().plot(kind='barh')
plt.title(f"Top 15 Primary Categories Among Top-Rated Businesses" )
plt.xlabel("Number of Businesses")
plt.ylabel("Category")
plt.tight_layout()
plt.show()



In [None]:
# Alternative 2
# Wordcloud
from wordcloud import WordCloud

freq = dict(category_counts)
wc = WordCloud(width=1000, height=500, background_color="white").generate_from_frequencies(freq)

plt.figure(figsize=(10,5))
plt.imshow(wc)
plt.axis('off')
plt.title("Category Word Cloud")
plt.tight_layout()
plt.show()

Q 1.4 (use existing code)

In [None]:
""" 1.4 Find top 30 common words in reviews. Generate word clouds by review year.

Decided to use pySpark for this question as we are ptentially working with large
data.
"""

from pyspark.sql.functions import explode, split, lower, col, regexp_replace, year, to_date, collect_list, flatten
from pyspark.ml.feature import StopWordsRemover
from collections import Counter
from wordcloud import WordCloud
import matplotlib.pyplot as plt

# Create new dataframe with only the columns we need.
words_sdf = sdf_cleaned_with_date.select("newtime", "text")

# Create "year" column from "newtime" to date and extract year.
words_sdf = words_sdf.withColumn("year",
                               year(to_date(col("newtime"), "yyyy-MM-dd")))

# For entries in the text column, remove punctuation and special charaters,
# keeping only letter, numbers and spaces.
# Convert the remaining text to lower case and split into a list of words at spaces
# Rename the column word_list
words_sdf = words_sdf.withColumn("words_list",
        split(
            lower(
                regexp_replace(col("text"), "[^a-zA-Z0-9\s]", "")
            ), "\s+"
        )
)

# Remove stop words (“the”, “is”, “and”) words using StopWordsRemover
# Get default stop words
default_stopwords = StopWordsRemover().getStopWords()

# In 1.1.1 we replaced NULL entries with "no review".
custom_stopwords = default_stopwords + ["no", "review"]

# Input column is words_list output column filtered
remover = StopWordsRemover(inputCol="words_list",
                           outputCol="filtered_words").setStopWords(custom_stopwords)

# Do the removal
words_sdf = remover.transform(words_sdf)

# Group by year and collect all words into one list for each year flattening the list for each year
grouped_yrs = words_sdf.groupBy("year").agg(flatten(collect_list("filtered_words")).alias("all_words"))

# Convert to Pandas for word cloud generation
grouped_yrs_pd = grouped_yrs.toPandas()

# Sort by year in ascending order
grouped_yrs_pd_sorted = grouped_yrs_pd.sort_values(by="year")

# Count words and generate word clouds
year_word_counts = {}
for _, row in grouped_yrs_pd_sorted.iterrows():
    year = row["year"]
    words = row["all_words"]
    word_freq = Counter(words).most_common(30)
    year_word_counts[year] = dict(word_freq)

    # Generate word cloud for each year
    wc = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(year_word_counts[year])
    plt.figure(figsize=(10, 5))
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    plt.title(f"Word Cloud for {int(year)}")
    plt.show()

# Overall word cloud
overall_counter = Counter()
for freq_dict in year_word_counts.values():
    overall_counter.update(freq_dict)

overall_top_30 = dict(overall_counter.most_common(30))

# Plot
wc = WordCloud(width=800, height=400, background_color="white").generate_from_frequencies(overall_top_30)
plt.figure(figsize=(10, 5))
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.title("Overall Word Cloud")
plt.show()


Notes

In [None]:
# backup

sdf_with_newtime_column_long = sdf_text_without_null.withColumn(
    'newtime_long',col('time').cast('long'))

sdf_with_newtime_column_long.show(5)


sdf_with_newtime_column_seconds = sdf_with_newtime_column_long.withColumn(
    'newtime_unix_seconds', col('newtime_long') / 1000)

sdf_with_newtime_column_date = sdf_with_newtime_column_seconds.withColumn(
    'newtime', to_date(col('newtime_unix_seconds')))

sdf_with_newtime_column_date.show(5)

In [None]:
# TEMP - Data wrangling - Infer schema after read Q 1.2.1

# Inspecting the data for mistakes
inspect_data = sdf_with_new_date.describe()
inspect_data.show(truncate=False)

# Define schema and parse data, nullify type mismatches.

schema = T.StructType([
    T.StructField("user_id",     T.DoubleType(),  True),
    T.StructField("name",        T.StringType(),  True),
    T.StructField("time",       T.IntegerType(), True),
    T.StructField("text",        T.StringType(),  True),
    T.StructField("pics",       T.StringType(), True),
    T.StructField("resp",       T.StringType(), True),
    T.StructField("gmap_id",     T.StringType(),  True),
    T.StructField("newtime",     T.DateType(),    True),
])
# user_id has the most non-null values before clean-up and is the most likely,
# column to be a 'key', this column is required to be non-null after casting.

required = ['user_id']

sdf_cast = sdf_with_new_date.select(
    [F.col(f.name).cast(f.dataType).alias(f.name) for f in schema.fields]
)

sdf_clean = sdf_cast.dropna(subset = required) # Drop nulls from userid

# This is a little bit backwards, normally the data should be read-in with an
# explicit schema from source. In this case it is being used for clean-up later
# in the pipeline

inspect_cleaned_data = sdf_clean.describe()

inspect_cleaned_data.show(truncate = False)