# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Large Scale Data Analysis | by Zuzanna Emilia Derylo | zude@itu.dk | date 08.10.2024

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [14]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
# Only execute this cell once.
if '_EXECUTED_' in globals():
    # check if variable '_EXECUTED_' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before, running...")
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 
    
    if CPUS_PER_NODE > 1:
        EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    else:
        EXECUTOR_CORES = CPUS_PER_NODE 

    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext
    _EXECUTED_ = True
    print("Success!")

Already been executed once, not running again!


Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [15]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [16]:
# Get number of rows with no sampling:
reviews.count()

6990280

In [17]:
# # OPTIONAL:
# # Reduce resource usage and make queries run faster
# # by only using a small sample of the dataframe
# # and overwriting previous variable "df".
# # Useful while developing, not so much to
# # provide final answers. Therefore: Remember to 
# # to re-read the df when done developing code using
# # df = spark.read etc like above.
# reviews = reviews.sample(withReplacement=False, fraction=1/50)

# # Get number of rows after sampling:
# reviews.count() 

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

In [18]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = business.filter(business.state == "AZ")\
                .filter(business.categories.rlike("Mexican"))\
                .select("business_id", "name")

# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5)\
                        .select("name","text")

# Print the top 20 rows of the DataFrame
good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv("good_az_reviews.csv", header=True, index=False, encoding='utf-8')


+--------------------+--------------------+
|                name|                text|
+--------------------+--------------------+
|Casa Molina Del N...|We've been coming...|
|St Mary's Mexican...|Some of the fines...|
|Street- Taco and ...|Top notch street ...|
|Indian Frybread-M...|This place is a h...|
|St Mary's Mexican...|One of my favorit...|
|Street- Taco and ...|This is my favori...|
|Street- Taco and ...|Great food!  Grea...|
|            BK Tacos|Quality ingredien...|
|Street- Taco and ...|Best tacos in Tuc...|
|        El Merendero|This place was fr...|
|        El Merendero|To the reviewer w...|
|Taqueria Pico De ...|If you are lookin...|
|               Penca|Good unique Mexic...|
|   La Mesa Tortillas|Incredible red ch...|
|               Penca|I love this place...|
|            BK Tacos|This place is the...|
|Indian Frybread-M...|Wow! My sister ha...|
|St Mary's Mexican...|Carne Seca burro ...|
|             Club 21|The food was exce...|
|      El Charro Cafe|This is th

In [19]:
# Getting to know the schema of the data. 

# Business schema
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [20]:
# All imports:
from pyspark.sql.functions import sum, countDistinct, avg, col, lower, when, split, explode, count

In [21]:
# 3.1.1 Find the total number of reviews for all businesses. 
# The output should be in the form of a Spark Table/DataFrame 
# with one value representing the count.

# Count of all reviews from the business dataset
# Note to self: agg automatically ignoers null values and it is in the form of Spark DataFrame
total_reviews = business.agg(sum("review_count").alias("total_reviews")) 

# Show results
total_reviews.show()

# # Another approach using review dataset. Simply count the number of rows.
# total_reviews1 = reviews.count()
# print(total_reviews1)

+-------------+
|total_reviews|
+-------------+
|      6745508|
+-------------+



In [22]:
# Find all businesses that have received 5 stars and that have been reviewed 
# by 500 or more users. The output should be in the form of a DataFrame 
# of (name, stars, review count).

# Filter businesses with 5 stars and with review count greater than or equal to 500
filtered_businesses = business \
                    .filter((business["stars"]==5) & (business["review_count"]>=500))\
                    .select("name", "stars", "review_count")

# Show results
filtered_businesses.show()

+--------------------+-----+------------+
|                name|stars|review_count|
+--------------------+-----+------------+
|     Blues City Deli|  5.0|         991|
|            Tumerico|  5.0|         705|
|  Free Tours By Foot|  5.0|         769|
|                Yats|  5.0|         623|
|   SUGARED + BRONZED|  5.0|         513|
|Nelson's Green Br...|  5.0|         545|
|Smiling With Hope...|  5.0|         526|
|    Carlillos Cocina|  5.0|         799|
|Barracuda Deli Ca...|  5.0|         521|
+--------------------+-----+------------+



In [23]:
# 3.1.3 Find influencers who have written more than 1000 reviews. 
# The output should be in the form of a Spark Table/DataFrame of user id.

# Filter users that wrote more than 1000 reviews
influencers = users\
                    .filter((users["review_count"]>1000))\
                    .select("user_id")

# Show results
influencers.show()

+--------------------+
|             user_id|
+--------------------+
|j14WgRoU_-2ZE1aw1...|
|q_QQ5kBBwlCcbL1s4...|
|MGPQVLsODMm9ZtYQW...|
|NIhcRW6DWvk1JQhDh...|
|QJI9OSEn6ujRCtrX0...|
|AkBtT43dYcttxQ3qO...|
|2l0O1EI1m0yWjFo2z...|
|RgDVC3ZUBqpEe6Y1k...|
|lquc6IF6uGIeRomDL...|
|VHdY6oG2JPVNjihWh...|
|om5ZiponkpRqUNa3p...|
|K7thO1n-vZ9PFYiC7...|
|UQFE3BT1rsIYrcDvu...|
|jVYzrVblDFSuL3GHt...|
|3zxy3LVBV3ttxoYbY...|
|0G-QF457q_0Z_jKqh...|
|ITa3vh5ERI90G_WP4...|
|P5bUL3Engv-2z6kKo...|
|ouODopBKF3AqfCkuQ...|
|YMgZqBUAddmFErxLt...|
+--------------------+
only showing top 20 rows



In [24]:
# 3.1.4 Find the businesses names that have been reviewed by more than 5 influencer users. 
# You can use a view created from your answer to Q3.

# Joining the review data with influencer data to find reviews made by influencers
rev_infl = influencers.join(reviews, on="user_id", how="inner")

# Counting how many influencers reviewed each business
rev_bus_infl = rev_infl\
            .groupBy("business_id")\
            .agg(countDistinct("user_id").alias("influencer_count"))
                 
# Finding businesses that were reviewed by more than 5 influencers
bus_infl = rev_bus_infl.filter(rev_bus_infl["influencer_count"]>5)

# The names of those businesses
popular_business = bus_infl\
                 .join(business, on="business_id", how="inner")\
                 .select("name")

# popular_business_count = popular_business.count()
# print(popular_business_count)
                 
# Show results
popular_business.show()

+--------------------+
|                name|
+--------------------+
|Iron Hill Brewery...|
|      Slice Pizzeria|
|     Shanghai Bazaar|
|Peacemaker Lobste...|
|      Fat Dan's Deli|
|              Pho Ha|
|     Green Eggs Café|
|Philadelphia Marr...|
|Tampa Bay History...|
|   Vieux Carre Pizza|
|Hobnobber's Varie...|
|Smiling With Hope...|
|       Highland Park|
|Sanwa Farmers Market|
|      Dim Sum Garden|
|   Zia's on the Hill|
|        Exo Roast Co|
|  Razzoo Bar & Patio|
|Hog Island Fish Camp|
|Boathouse at Hend...|
+--------------------+
only showing top 20 rows



In [25]:
# 3.1.5 Find an ordered list of users based on the 
# average star counts they have given in all their reviews.

# Group reviews by user_id and calculate the average star count
user_avg_stars = reviews\
                    .groupBy("user_id")\
                    .agg(avg("stars").alias("average_stars"))

# Ordered list
ordered_users_avg_stars = user_avg_stars.orderBy("average_stars", ascending=False)

# ordered_users_avg_stars_count = ordered_users_avg_stars.count()
# print(ordered_users_avg_stars_count)

# Show results
ordered_users_avg_stars.show()

+--------------------+-------------+
|             user_id|average_stars|
+--------------------+-------------+
|F01yNRYCi1S6Of1ZO...|          5.0|
|Jp1DDbOkQkDmhn0qM...|          5.0|
|iuMioByR5nzuObg7z...|          5.0|
|AGJw2U3saGsoFdq5S...|          5.0|
|kD4Afy0o9o3bW_nEr...|          5.0|
|Di_afB79q7a1SwlZM...|          5.0|
|eGDpkaCvmqVGZMlSE...|          5.0|
|MCCJIanpZb-uHgI09...|          5.0|
|1YzqC12iXCBKpuSRW...|          5.0|
|kaSv0qKBOJ1Fi0jew...|          5.0|
|0o_jgqmuY3leWWa_X...|          5.0|
|KjDQGh3cd93OFhXsY...|          5.0|
|ItYOJnVe5C2uAe6L3...|          5.0|
|0p-CF_aaDuXEjY4Le...|          5.0|
|4JamwGrnS1adH_J1j...|          5.0|
|d9-BlPLwiioTkKm-x...|          5.0|
|_NS4zN5-hFG9USP1u...|          5.0|
|rqySPzInkWIW65HLv...|          5.0|
|Mz-NlJVfGUnNpOiiR...|          5.0|
|iTiomB-ynFMysGQrg...|          5.0|
+--------------------+-------------+
only showing top 20 rows



In [26]:
# 3.2.1 Data exploration
# What is the percentage of reviews that contain a variant of the word "authentic"?

# Variants (synonyms) of authentic: authentic, traditional, original, real, genuine, legitimate
authentic_variants = "authentic|traditional|original|real|legitimate|genuine"

# Filter for reviews containing those variants. Set authentic flag to 1 if there is variant of word "authentic", otherwise set to 0
reviews_with_authentic = reviews.withColumn(
    "authentic_flag",
    when(lower(col("text")).rlike(authentic_variants), 1).otherwise(0)
)
authentic_reviews_count = reviews_with_authentic.filter(col("authentic_flag") == 1).count()

# Total number of reviews (from reviews data)
total_reviews_count = reviews.count()

# Percentage of those reviews
percentage_reviews_authentic = round((authentic_reviews_count / total_reviews_count) * 100, 2)

print(f"Percentage of reviews that contain a variant of the word 'authentic': {percentage_reviews_authentic}%")


Percentage of reviews that contain a variant of the word 'authentic': 23.38%


In [27]:
# Split the categories into a list of individual categories
split_categories = business.withColumn("category", explode(split(lower(col("categories")), ",\\s*")))

# Group by the individual categories and count how many times each category appears
category_count = split_categories.groupBy("category").agg(count("*").alias("count"))

# Order by count and show the top categories
category_count_ordered = category_count.orderBy(col("count").desc()) 
category_count_ordered.show(5, truncate=False)

+-------------+-----+
|category     |count|
+-------------+-----+
|restaurants  |52268|
|food         |27781|
|shopping     |24395|
|home services|14356|
|beauty & spas|14292|
+-------------+-----+
only showing top 5 rows



In [28]:
# Cuisine list based on previous analysis
cuisine_list = ["american", "mexican", "italian", "chinese", "japanese", "thai", 
                "vietnamese", "indian", "greek", "caribbean", "middle eastern", 
                "french", "korean", "spanish", "cuban", "canadian", "pakistani", 
                "irish", "german", "african", "szechuan", "filipino", "puertorican", 
                "turkish", "lebanese", "peruvian", "taiwanese", "brazilian", 
                "british", "ethiopian", "salvadorian", "moroccan", "venezuelan", 
                "afghan", "dominican", "mongolian", "polish", "russian", "arabic", 
                "persian/iranian", "portuguese", "malaysian", "hondurian", "belgian", 
                "indonesian", "ukrainian", "cambodian", "egyptian", "armenian"]

# Reviews containing the string "legitimate"
reviews_with_variant = reviews.filter(lower(col("text")).rlike("legitimate"))

# Split the categories into individual categories
cuisine_businesses = business.withColumn("category", explode(split(lower(col("categories")), ",\\s*")))

# Filter for rows where the category matches any cuisine in the cuisine_list
cuisine_businesses_filtered = cuisine_businesses.filter(col("category").isin([cuisine.lower() for cuisine in cuisine_list]))

# Join the filtered businesses with the reviews that contain "legitimate"
reviews_with_cuisine = reviews_with_variant.join(cuisine_businesses_filtered, on="business_id", how="inner")

# Group by the cuisine category and count the number of reviews for each cuisine
reviews_grouped_by_cuisine = reviews_with_cuisine.groupBy("category") \
    .agg(count("review_id").alias("count_reviews")) \
    .orderBy("count_reviews", ascending=False)

# Show the result
reviews_grouped_by_cuisine.show()

+--------------+-------------+
|      category|count_reviews|
+--------------+-------------+
|       mexican|          295|
|       italian|          271|
|       chinese|          159|
|      japanese|          151|
|          thai|           86|
|    vietnamese|           65|
|        french|           60|
|        korean|           50|
|middle eastern|           47|
|         greek|           42|
|        indian|           35|
|         cuban|           26|
|     caribbean|           22|
|         irish|           21|
|      szechuan|           20|
|       spanish|           19|
|     pakistani|           15|
|       british|           13|
|        german|           12|
|      lebanese|           11|
+--------------+-------------+
only showing top 20 rows



In [29]:
# Is there a difference in the amount of authenticity language used in the different areas? 
# (e.g., by state, north/south, urban/rural)
# Note: As part of answering this question, you could compute the full cube or rollup combining
# the location of the business and whether the review contains authenticity language,
# and use this to aggregate their counts per state and city.

# I'm using the "reviews_with_authentic" from previous part
# Join reviews with business information to information about cityy and state
reviews_with_location = reviews_with_authentic.join(
    business.select("business_id", "city", "state"),
    on="business_id", how="inner"
)

# # Results as cube
# cube_results = reviews_with_location.cube("state", "city", "authentic_flag") \
#     .agg(count("*").alias("review_count")) \
#     .orderBy("state", "city", "authentic_flag")
# cube_results = cube_results.filter(col("authentic_flag") == 1)

# cube_results.show() 

# # Results as rollup
# rollup_results = reviews_with_location.rollup("state", "city", "authentic_flag") \
#     .agg(count("*").alias("review_count")) \
#     .orderBy("state", "city", "authentic_flag")

# rollup_results.show()

# Results grouped by cities
results_by_city = reviews_with_location.cube("state", "city", "authentic_flag") \
    .agg(count("*").alias("review_count")) \
    .filter(col("authentic_flag") == 1) \
    .filter(col("city").isNotNull()) \
    .groupBy("city") \
    .agg(sum("review_count").alias("total_reviews")) \
    .orderBy("total_reviews", ascending=False)

results_by_city.show()

# Results grouped by states
results_by_state = reviews_with_location.cube("state", "authentic_flag") \
    .agg(count("*").alias("review_count")) \
    .filter(col("authentic_flag") == 1) \
    .filter(col("state").isNotNull()) \
    .groupBy("state") \
    .agg(sum("review_count").alias("total_reviews")) \
    .orderBy("total_reviews", ascending=False)

# Show the result
results_by_state.show()


+----------------+-------------+
|            city|total_reviews|
+----------------+-------------+
|    Philadelphia|       525908|
|     New Orleans|       296926|
|       Nashville|       213838|
|           Tampa|       208160|
|          Tucson|       185126|
|    Indianapolis|       173506|
|            Reno|       163038|
|     Saint Louis|       128384|
|   Santa Barbara|       127348|
|        Edmonton|        62098|
|           Boise|        46190|
|      Clearwater|        35534|
|Saint Petersburg|        34932|
|          Sparks|        31168|
|       St. Louis|        29648|
|        Metairie|        28194|
|        Franklin|        24504|
|  St. Petersburg|        24194|
|          Goleta|        22124|
|      Wilmington|        19694|
+----------------+-------------+
only showing top 20 rows

+-----+-------------+
|state|total_reviews|
+-----+-------------+
|   PA|       404916|
|   FL|       248080|
|   LA|       175867|
|   TN|       141103|
|   MO|       120587|
|   IN

In [30]:
# 3.2.2 Can you identify a difference in the relationship between authenticity language
# and typically negative words, in restaurants serving South American or Asian cuisine 
# compared to restaurants serving European cuisine? And to what degree?

# Dividing types of cuisines into three categories: South American, Asian and European
south_american_cuisines = ["mexican", "brazilian", "venezuelan", "peruvian", "cuban", "salvadorian", "dominican", "honduran"]
asian_cuisines = ["chinese", "japanese", "thai", "vietnamese", "korean", "indian", "pakistani", "szechuan", "taiwanese", "filipino", "malaysian", "indonesian", "himalayan/nepalese", "afghan", "mongolian", "cambodian"]
european_cuisines = ["italian", "french", "german", "spanish", "greek", "irish", "british", "portuguese", "polish", "belgian", "ukrainian"]

# Filter businesses by cuisine types
south_american_businesses = cuisine_businesses.filter(col("category").isin([cuisine.lower() for cuisine in south_american_cuisines]))
asian_businesses = cuisine_businesses.filter(col("category").isin([cuisine.lower() for cuisine in asian_cuisines]))
european_businesses = cuisine_businesses.filter(col("category").isin([cuisine.lower() for cuisine in european_cuisines]))

# # Show results
# south_american_businesses.show()
# asian_businesses.show()
# european_businesses.show()

In [31]:
# Filtering reviews that contain authenticity langage and typically negative words

# Variants for "authentic" and "negative" flags
# authentic_variants was created before
negative_variants = "dirty|cheap|kitsch|rude|simple|disappointing|bad|worst|terrible|overrated|soggy|bland|horrible|inedible|disgusting|poor"

# Applying authentic and negative flags to all reviews database
reviews_with_flags = reviews.withColumn(
    "authentic_flag",
    when(lower(col("text")).rlike(authentic_variants), 1).otherwise(0)
).withColumn(
    "negative_flag",
    when(lower(col("text")).rlike(negative_variants), 1).otherwise(0)
)

In [32]:
# # Filter reviews by cuisine type
# south_american_reviews = reviews_with_flags.join(south_american_businesses, on="business_id", how="inner")
# asian_reviews = reviews_with_flags.join(asian_businesses, on="business_id", how="inner")
# european_reviews = reviews_with_flags.join(european_businesses, on="business_id", how="inner")

In [33]:
# # Reviews from each cuisine 

# # South american
# south_american_reviews_count = south_american_reviews.count()
# print(south_american_reviews_count)

# # Asian
# asian_reviews_count = asian_reviews.count()
# print(asian_reviews_count)

# # European
# european_reviews_count = european_reviews.count()
# print(european_reviews_count)


In [34]:
# # Filtering the reviews with authentic language for each cuisine type

# # South american
# south_american_reviews_authentic = south_american_reviews.filter(col("authentic_flag") == 1)
# south_american_reviews_authentic_count = south_american_reviews_authentic.count()
# print(south_american_reviews_authentic_count)

# # Asian
# asian_reviews_authentic = asian_reviews.filter(col("authentic_flag") == 1)
# asian_reviews_authentic_count = asian_reviews_authentic.count()
# print(asian_reviews_authentic_count)

# # European
# european_reviews_authentic = european_reviews.filter(col("authentic_flag") == 1)
# european_reviews_authentic_count = european_reviews_authentic.count()
# print(european_reviews_authentic_count)

In [35]:
# # Filtering the reviews with negative language for each cuisine type

# # South american
# south_american_reviews_negative = south_american_reviews.filter(col("negative_flag") == 1)
# south_american_reviews_negative_count = south_american_reviews_negative.count()
# print(south_american_reviews_negative_count)

# # Asian
# asian_reviews_negative = asian_reviews.filter(col("negative_flag") == 1)
# asian_reviews_negative_count = asian_reviews_negative.count()
# print(asian_reviews_negative_count)

# # European
# european_reviews_negative = european_reviews.filter(col("negative_flag") == 1)
# european_reviews_negative_count = european_reviews_negative.count()
# print(european_reviews_negative_count)

In [36]:
# # Filtering the reviews with both authentic and negative language for each cuisine type

# # South american cuisine
# south_american_reviews_both = south_american_reviews.filter(
#     (col("authentic_flag") == 1) & (col("negative_flag") == 1)
# )
# south_american_reviews_both_count = south_american_reviews_both.count()
# print(south_american_reviews_both_count)

# # Asian cuisine
# asian_reviews_both = asian_reviews.filter(
#     (col("authentic_flag") == 1) & (col("negative_flag") == 1)
# )
# asian_reviews_both_count = asian_reviews_both.count()
# print(asian_reviews_both_count)

# # European cuisine
# european_reviews_both = european_reviews.filter(
#     (col("authentic_flag") == 1) & (col("negative_flag") == 1)
# )
# european_reviews_both_count = european_reviews_both.count()
# print(european_reviews_both_count)

In [37]:
# # Ratio of reviews with negative language to all negative reviews

# # Total number of negative reviews
# total_negative_reviews = reviews_with_flags.filter(col("negative_flag") == 1).count()

# # South american cuisine
# south_american_negative_to_total_negative_ratio = (south_american_reviews_negative_count / total_negative_reviews) * 100
# print(f"South American cuisine: {round(south_american_negative_to_total_negative_ratio, 2)}% of all negative reviews.")

# # Asian cuisine
# asian_negative_to_total_negative_ratio = (asian_reviews_negative_count / total_negative_reviews) * 100
# print(f"Asian cuisine: {round(asian_negative_to_total_negative_ratio, 2)}% of all negative reviews.")

# # European cuisine
# european_negative_to_total_negative_ratio = (european_reviews_negative_count / total_negative_reviews) * 100
# print(f"European cuisine: {round(european_negative_to_total_negative_ratio, 2)}% of all negative reviews.")

In [38]:
# # Ratio of reviews with negative language to total reviews for each cuisine

# # South american cuisine
# south_american_negative_to_total_reviews_ratio = (south_american_reviews_negative_count / south_american_reviews_count) * 100
# print(f"South American cuisine: {round(south_american_negative_to_total_reviews_ratio, 2)}% of reviews are negative.")

# # Asian cuisine
# asian_negative_to_total_reviews_ratio = (asian_reviews_negative_count / asian_reviews_count) * 100
# print(f"Asian cuisine: {round(asian_negative_to_total_reviews_ratio, 2)}% of reviews are negative.")

# # European cuisine
# european_negative_to_total_reviews_ratio = (european_reviews_negative_count / european_reviews_count) * 100
# print(f"European cuisine: {round(european_negative_to_total_reviews_ratio, 2)}% of reviews are negative.")


In [39]:
# # Ratios of how many % of reviews with authenticity language have negative words
# # South american cuisine
# south_american_ratio = (south_american_reviews_both_count / south_american_reviews_authentic_count) * 100
# print(f"South American cuisine: {round(south_american_ratio,2)}% of reviews with authenticity language also contain negative words.")

# # Asian cuisine
# asian_ratio = (asian_reviews_both_count / asian_reviews_authentic_count) * 100
# print(f"Asian cuisine: {round(asian_ratio,2)}% of reviews with authenticity language also contain negative words.")

# # European cuisine
# european_ratio = (european_reviews_both_count / european_reviews_authentic_count) * 100
# print(f"European cuisine: {round(european_ratio,2)}% of reviews with authenticity language also contain negative words.")


In [40]:
### Predicting the rating of certian restaurant given a user review

In [41]:
# Rename the reviews "stars" column, to avoid problems later
reviews = reviews.withColumnRenamed("stars", "review_stars")

In [42]:
# Extracting all the restaurants from business dataframe, to make sure that 
# I'm only considering restaurants (so where one of cataegories is "restaurant")
restaurants = business.filter(lower(business.categories).contains("restaurant"))

In [43]:
# Joining the reviews with restaurnts dataframe, to have all the reviews from restaurants
restaurants_reviews = restaurants.join(reviews, on="business_id", how="inner")

In [44]:
# Exploring the reviews with 5.0 stars to find words that show positivity of the review
# Filter the dataframe for rows where stars is 5.0
five_star_reviews = restaurants_reviews.filter(col("review_stars") == 5.0)

# Show the first X rows
five_star_reviews.show(50, truncate=False)

# Result - positive words
positive_keywords = ["affordable","amazing","appreciated","awesome","be back",
    "be returning","best","clean","cool","delicious","delightful","enjoy","excellent",
    "fantastic","favourite","flavorful","friendly","good","good quality","great",
    "happy","home","incredible","like","love","nice","on point","perfect","phenomenal",
    "pleasant","pleased","pretty","reasonable","recommend","satisfied","scrumptious",
    "super","the bomb","top tier","tremendous","wonderful","worth every penny",
    "worth the wait","wow","yummy","would recommend"]

+----------------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------+---------------+----------------------------------------------------------------------+-------+----------+-----------+-----------------+-----------+------------+-----+-----+----+-------------------+-----+----------------------+------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [45]:
restaurants_reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nul

In [46]:
### Extracting the features. I decided to use city, state review_stars and text columns. 

In [49]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Authenticity language
authenticity_keywords = ["authentic", "traditional", "original", 
                         "real", "legitimate", "genuine"]

# Define UDF for authenticity language
def contains_authenticity_language(text):
    if text:
        words = text.lower().split()
        return int(any(word in authenticity_keywords for word in words))
    return 0

authenticity_udf = udf(contains_authenticity_language, IntegerType())

# Add authenticity column to the dataframe
restaurants_reviews = restaurants_reviews.withColumn(
    "authenticity_language", authenticity_udf(col("text"))
)

In [50]:
# Define UDF for positive language
def contains_positive_language(text):
    if text:
        words = text.lower().split()
        return int(any(word in positive_keywords for word in words))
    return 0

positive_udf = udf(contains_positive_language, IntegerType())

# Add positive language column to the dataframe
restaurants_reviews = restaurants_reviews.withColumn(
    "positive_language", positive_udf(col("text"))
)

In [57]:
from pyspark.ml.feature import StringIndexer

# Encoding categorical variables - city and state
indexer_city = StringIndexer(inputCol="city", outputCol="city_indexed")
indexer_state = StringIndexer(inputCol="state", outputCol="state_indexed")

In [51]:
from pyspark.ml.feature import Tokenizer

# Tokenize the review text 
tokenizer = Tokenizer(inputCol="text", outputCol="words")

In [58]:
from pyspark.ml.feature import VectorAssembler

# Assemble all features into a single feature vector
assembler = VectorAssembler(
    inputCols=["stars", "review_stars", "city_indexed", "state_indexed", 
               "authenticity_language", "positive_language"],
    outputCol="final_features"
)

In [59]:
from pyspark.ml.regression import LinearRegression

# Linear Regression model
linear_regression = LinearRegression(featuresCol="final_features", labelCol="stars")

In [62]:
from pyspark.ml import Pipeline

# Pipeline definition
pipeline = Pipeline(stages=[
    tokenizer,
    indexer_city,
    indexer_state,
    assembler,
    linear_regression
])

In [63]:
### Splitting the dataset into training and testing sets
train_df, test_df = restaurants_reviews.randomSplit([0.8, 0.2])

In [64]:
### Train the model using the pipeline
model = pipeline.fit(train_df)

In [65]:
### Make predictions on the test set
predictions = model.transform(test_df)

In [66]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="stars", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

Root Mean Squared Error (RMSE): 3.188343408925282e-15


In [67]:
# Show some predictions
predictions.select("business_id", "stars", "prediction", "text").show(10, truncate=False)