# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Big Data Management | by Matteo Mormile | matmo@itu.dk | 03/10/2023


## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [41]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
# Only execute this cell once.
if 'EXECUTED' in globals():
    # check if variable 'EXECUTED' exists in the global variable namespace
    print("Already been executed once, not running again!")
else:
    print("Cell has not been executed before, running...")
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 

    EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext
    EXECUTED = True
    print("Success!")


Already been executed once, not running again!


## Loading the data

In [42]:
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json')
business = business.persist()

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")
reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

checkin = spark.read.json('file:////work/yelp/yelp_academic_dataset_checkin.json')
tips = spark.read.json('file:////work/yelp/yelp_academic_dataset_tip.json')

In [43]:
business.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

# 3.1 Specific DataFrame Queries

#### 1. Analyze business.json to find the total number of reviews for all businesses. The output should be in the form of a Spark Table/DataFrame with one value representing the count.


In [44]:
business.select("review_count").groupBy().sum().show()

+-----------------+
|sum(review_count)|
+-----------------+
|          6745508|
+-----------------+



#### 2. Analyze business.json to find all businesses that have received 5 stars and that have been reviewed by 500 or more users. The output should be in the form of DataFrame of (name, stars, review count)

In [45]:
business_5stars_500reviews = business \
.filter((business.stars == 5.0) & (business.review_count > 499)) \
.select(business.name, business.stars, business.review_count)

business_5stars_500reviews.show(5)

+------------------+-----+------------+
|              name|stars|review_count|
+------------------+-----+------------+
|   Blues City Deli|  5.0|         991|
|          Tumerico|  5.0|         705|
|Free Tours By Foot|  5.0|         769|
|              Yats|  5.0|         623|
| SUGARED + BRONZED|  5.0|         513|
+------------------+-----+------------+
only showing top 5 rows



#### 3. Analyze user.json to find the influencers who have written more than 1000 reviews. The output should be in the form of a Spark Table/DataFrame of user id.

In [46]:
influencers = users.filter(users.review_count > 1000)\
.orderBy(col("review_count").desc())

influencers.select("user_id").show(5)

+--------------------+
|             user_id|
+--------------------+
|Hi10sGSZNxQH3NLyW...|
|8k3aO-mPeyhbR5HUu...|
|hWDybu_KvYLSdEFzG...|
|RtGqdDBvvBCjcu5dU...|
|P5bUL3Engv-2z6kKo...|
+--------------------+
only showing top 5 rows



#### 4. Analyze review.json, business.json, and a view created from your answer to Q3 to find the businesses that have been reviewed by more than 5 influencer users.

In [47]:
# take only reviews made by influencer
reviews_influencer = influencers.join(reviews, on="user_id", how="inner") 
# attach business info to reviews
reviews_with_business = reviews_influencer.join(business, on="business_id", how="inner")

result = reviews_with_business.groupBy("business_id") \
.agg(count("user_id").alias("reviewsByInfluencer")) \
.filter("reviewsByInfluencer > 5") \
.select("business_id", "reviewsByInfluencer")

result.orderBy(desc("reviewsByInfluencer")).show(5)

+--------------------+-------------------+
|         business_id|reviewsByInfluencer|
+--------------------+-------------------+
|ytynqOUb3hjKeJfRj...|                238|
|Eb1XmmLWyt_way5NN...|                228|
|-QI8Qi8XWH3D8y8et...|                214|
|_ab50qdWOk0DdB6XO...|                202|
|PP3BBaVxZLcJU54uP...|                178|
+--------------------+-------------------+
only showing top 5 rows



#### 5. Analyze review.json and user.json to find an ordered list of users based on the average star counts they have given in all their reviews.


In [48]:
# join reviews with user info
reviews_user_info = reviews.join(users, on="user_id", how="inner")

result = reviews_user_info.groupBy("user_id","name") \
.agg(avg("stars").alias("avg_stars")) \
.select("name", "avg_stars")

result.orderBy(desc("avg_stars")).show(5)

+--------+---------+
|    name|avg_stars|
+--------+---------+
|Brittany|      5.0|
|     pat|      5.0|
|   Jared|      5.0|
|   Erwin|      5.0|
|  Surfer|      5.0|
+--------+---------+
only showing top 5 rows



# 3.2 Authenticity Study

In [49]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, FloatType, ArrayType, BooleanType

authentic_synonyms = ["real", "true", "legitimate", "original", "faithful","authentic"]

international_cuisine = [
    "Mexican", "Italian", "Chinese", "Japanese", "Indian", "Thai", "Greek", "Spanish", "French", "Korean", "Vietnamese", "German", "Cuban", "Brazilian", "Russian", "American",
    "Mediterranean", "Irish", "Middle Eastern", "Cajun", "Tex-Mex", "Soul Food", "Jewish", "Southern", "Caribbean", "Hawaiian", "Filipino", "Peruvian", "Argentinian",
    "Polish", "Swedish", "Czech", "African", "Haitian", "Colombian", "Venezuelan", "Chilean", "Turkish", "Armenian", "Portuguese", "Burmese", "Afghan", "Ethiopian",
    "Nigerian", "Kenyan", "South African", "Moroccan", "Egyptian", "Lebanese", "Syrian", "Palestinian", "Jordanian", "Iranian", "Pakistani", "Bangladeshi", "Sri Lankan",
]
# retrive data
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json')
business = business.persist()

# Tranform from string to array values
business_array = business.withColumn("categories", split(business["categories"], ",").cast(ArrayType(StringType())))

# Explode array categories
business_explode = business_array.select("business_id","review_count","city","state",explode(business_array["categories"]).alias("single_categories"))
print("Business_explode dataframe")
business_explode.show()

# Remove notes between brackets for American one (Traditional/New)
business_df = business_explode.select(regexp_replace(col("single_categories"), r"\s\([^)]*\)", "").alias("single_categories"))
business_df = business_df.persist()
business_df.show()

Business_explode dataframe
+--------------------+------------+-------------+-----+--------------------+
|         business_id|review_count|         city|state|   single_categories|
+--------------------+------------+-------------+-----+--------------------+
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA|             Doctors|
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA| Traditional Chin...|
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA| Naturopathic/Hol...|
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA|         Acupuncture|
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA|    Health & Medical|
|Pns2l4eNsfO8kk83d...|           7|Santa Barbara|   CA|       Nutritionists|
|mpf3x-BjTdTEA3yCZ...|          15|       Affton|   MO|    Shipping Centers|
|mpf3x-BjTdTEA3yCZ...|          15|       Affton|   MO|      Local Services|
|mpf3x-BjTdTEA3yCZ...|          15|       Affton|   MO|            Notaries|
|mpf3x-BjTdTEA3yCZ...|          15|       Affton|

### 3.2.1 Data Exploration
Look in the data for the use of "authenticity language", as defined in the Eater New York article

#### What is the percentage of reviews containing a variant of the word "authentic"

In [50]:
# reviews_per_categories without filter
reviews_per_categories = business_explode.groupBy("single_categories") \
.agg(sum("review_count").alias("Total")).orderBy(desc("Total"))
reviews_per_categories.show()

# with filter
auth_reviews = reviews.filter(col("text").rlike("|".join(authentic_synonyms))).select("business_id") # reduce df size

auth_reviews_per_categories = auth_reviews.join(business_explode, auth_reviews["business_id"] == business_explode["business_id"], "right") \
.select(business_explode.single_categories) \
.groupBy("single_categories") \
.agg(count("single_categories").alias("Filtered")).orderBy(desc("Filtered"))
auth_reviews_per_categories = auth_reviews_per_categories.persist()
auth_reviews_per_categories.show()

# final df
joined_df = reviews_per_categories.join(auth_reviews_per_categories, "single_categories", "inner")

# calculate the percentage
joined_df = joined_df.filter(col("single_categories").isin(international_cuisine)) \
.withColumn("percentage", expr("round((Filtered / Total) * 100, 2)")).filter("Total > 10000").orderBy(desc("percentage"))
joined_df.show()

+--------------------+-------+
|   single_categories|  Total|
+--------------------+-------+
|         Restaurants|3411483|
|                Food|1412065|
|           Nightlife|1265626|
|                Bars|1214711|
|         Restaurants|1149796|
| American (Tradit...| 806238|
|      American (New)| 789932|
|  Breakfast & Brunch| 703166|
|          Sandwiches| 547171|
| Event Planning &...| 486458|
|             Seafood| 476841|
|            Shopping| 404641|
|             Burgers| 350378|
|               Pizza| 350283|
|        Coffee & Tea| 347102|
|                Food| 340216|
|             Italian| 317018|
|             Mexican| 300417|
|       Cocktail Bars| 283575|
|               Salad| 277189|
+--------------------+-------+
only showing top 20 rows

+--------------------+--------+
|   single_categories|Filtered|
+--------------------+--------+
|         Restaurants|  792350|
|                Food|  322150|
|           Nightlife|  296132|
|                Bars|  283732|
|     

#### How many reviews contain the string "legitimate" grouped by type of cuisine?

In [51]:
# Filter only cuisine restaurant
# remove extra from brackets for america cuisine
business_cuisine_wa = business_explode.select("*",regexp_replace(col("single_categories"), r"\s\([^)]*\)", "").alias("single_categories_wa"))
business_cuisine_wa = business_cuisine_wa.filter(col("single_categories_wa").isin(international_cuisine)).select("business_id","single_categories_wa")
print("cuisine present in our businesses")
business_cuisine_wa.show()
business_cuisine_wa = business_cuisine_wa.persist()

business_cuisine_wa.join(reviews, business_cuisine_wa["business_id"] == reviews["business_id"], "right") \
.filter("text LIKE '%legitimate%'") \
.groupBy("single_categories_wa") \
.agg(count("single_categories_wa").alias("Number of reviews contains \"legimitate\"")).orderBy(desc("Number of reviews contains \"legimitate\"")).show()

cuisine present in our businesses
+--------------------+--------------------+
|         business_id|single_categories_wa|
+--------------------+--------------------+
|eEOYSgkmpB90uNA7l...|          Vietnamese|
|il_Ro8jwPlHresjw9...|            American|
|ROeacJQwBeh05Rqg7...|              Korean|
|8rb-3VYXE37IZix4y...|            American|
|aCDY7vXYMs54EbYuQ...|            American|
|x9K0RfZaT_zlw6Dkl...|             Italian|
|1MeIwdbTnZOBFCKOr...|            American|
|Dy91wdWkwtI_qgjAI...|             Mexican|
|aNtKyc2rr-uK5cqzY...|             Mexican|
|Ms5xG8i4p80KSMcF3...|             Italian|
|7Du9oW73YcYFmXdtU...|            Japanese|
|vPcfJ3rm3NpdqVDod...|            Japanese|
|ZU6NodDOWaabGkeNp...|            American|
|8c0r7olQSYGcws0bT...|            Hawaiian|
|4IcB3QyMEA85UTWFK...|            American|
|YR0nwxBOKk6DiLHNI...|             Italian|
|jcL_qaGJiappzpnn-...|             Chinese|
|VbItL6RDULtnw4YvB...|             Tex-Mex|
|W57cR9a7XP6RX56MS...|             Mexican

### Is there a difference in the amount of authenticity language used in the different areas?

In [67]:
# Explore differente areas of America
auth_reviews = reviews.filter(col("text").rlike("|".join(authentic_synonyms))).select("business_id")
auth_reviews = auth_reviews.persist()

business_america = business_explode.select("*",regexp_replace(col("single_categories"), r"\s\([^)]*\)", "").alias("single_categories_wa")) \
.filter(col("single_categories_wa") == "American") \
.drop("single_categories")
business_america = business_america.persist()

result = business_america.join(auth_reviews, business_america["business_id"] == auth_reviews["business_id"], "inner")
result = result.drop(auth_reviews.business_id)
print("Every restaurant is repeated for the number of authentich reviews")
result.orderBy("business_id").show()
result.count()

Every restaurant is repeated for the number of authentich reviews
+--------------------+------------+-------+-----+--------------------+
|         business_id|review_count|   city|state|single_categories_wa|
+--------------------+------------+-------+-----+--------------------+
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2QXBFdCil...|          32|Ardmore|   PA|            American|
|--ZVrH2X2Q

72821

In [53]:
usStates = [
  ("AL", "East"), ("AK", "West"), ("AZ", "West"), ("AR", "Central"), ("CA", "West"),
  ("CO", "West"), ("CT", "East"), ("DE", "East"), ("FL", "East"), ("GA", "East"),
  ("HI", "West"), ("ID", "West"), ("IL", "Central"), ("IN", "Central"), ("IA", "Central"),
  ("KS", "Central"), ("KY", "Central"), ("LA", "Central"), ("ME", "East"), ("MD", "East"),
  ("MA", "East"), ("MI", "Central"), ("MN", "Central"), ("MS", "Central"), ("MO", "Central"),
  ("MT", "West"), ("NE", "Central"), ("NV", "West"), ("NH", "East"), ("NJ", "East"),
  ("NM", "West"), ("NY", "East"), ("NC", "East"), ("ND", "Central"), ("OH", "Central"),
  ("OK", "Central"), ("OR", "West"), ("PA", "Central"), ("RI", "East"), ("SC", "East"),
  ("SD", "Central"), ("TN", "Central"), ("TX", "Central"), ("UT", "West"), ("VT", "East"),
  ("VA", "East"), ("WA", "West"), ("WV", "Central"), ("WI", "Central"), ("WY", "West")
]

schema = StructType([
    StructField("State", StringType(), True),
    StructField("Position", StringType(), True)
])
df = spark.createDataFrame(usStates, schema=schema)

result.join(df, result["state"] == df["State"], "inner") \
.groupBy("Position") \
.agg(count("business_id").alias("Number of authentic reviews per state")).orderBy(desc("Number of authentic reviews per state")).show()


+--------+-------------------------------------+
|Position|Number of authentic reviews per state|
+--------+-------------------------------------+
| Central|                                46626|
|    East|                                13972|
|    West|                                11563|
+--------+-------------------------------------+



### 3.2.2
Can you identify a difference in the relationship between authenticity language and typically negative words, in restaurants serving south american or asian cuisine compared to restaurants serving european cuisine? And to what degree?

In [54]:
# europe and not cuisine
cuisine_data = [
    ("Italian", "European"), ("Greek", "European"), ("Spanish", "European"), ("French", "European"), ("German", "European"), ("Russian", "European"), ("Irish", "European"), ("American", "North American"), ("Polish", "European"),
    ("Swedish", "European"), ("Czech", "European"), ("Armenian", "European"), ("Portuguese", "European"), ("Mexican", "North American"), ("Chinese", "Asian"), ("Japanese", "Asian"), ("Indian", "Asian"),
    ("Thai", "Asian"), ("Korean", "Asian"), ("Vietnamese", "Asian"), ("Cuban", "North American"), ("Brazilian", "South American"), ("Filipino", "Asian"), ("Peruvian", "South American"), ("Argentinian", "South American"),
    ("Mediterranean", "European"), ("Middle Eastern", "Middle Eastern"), ("Cajun", "North American"), ("Tex-Mex", "North American"), ("Soul Food", "North American"), ("Jewish", "Middle Eastern"), ("Southern", "North American"),
    ("Caribbean", "North American"), ("Hawaiian", "North American"), ("Burmese", "Asian"), ("Afghan", "Asian"), ("Ethiopian", "African"), ("Nigerian", "African"), ("Kenyan", "African"), ("South African", "African"),
    ("Moroccan", "African"), ("Egyptian", "African"), ("Lebanese", "Middle Eastern"), ("Syrian", "Middle Eastern"), ("Palestinian", "Middle Eastern"), ("Jordanian", "Middle Eastern"), ("Iranian", "Middle Eastern"), ("Pakistani", "Asian"),
    ("Bangladeshi", "Asian"), ("Sri Lankan", "Asian")
]

schema = StructType([
    StructField("Cuisine", StringType(), True),
    StructField("Continent", StringType(), True)
])
df_cuisine = spark.createDataFrame(cuisine_data, schema=schema)

#bad words
bad_words = ["dirty", "kitsch", "rude"]

In [55]:
auth_reviews = reviews.filter(col("text").rlike("|".join(authentic_synonyms)))  
auth_reviews = auth_reviews.persist()

business_cuisine_wa = business_explode.select("*",regexp_replace(col("single_categories"), r"\s\([^)]*\)", "").alias("single_categories_wa")) \
.filter(col("single_categories_wa").isin(international_cuisine)) \
.drop("single_categories")
business_cuisine_wa = business_cuisine_wa.persist()
business_cuisine_wa.show()

avg_stars_cuicine = business_cuisine_wa.join(auth_reviews, business_cuisine_wa["business_id"] == auth_reviews["business_id"], "right") \
.groupBy("single_categories_wa") \
.agg(count("single_categories_wa").alias("num_auth_review"), avg(auth_reviews.stars).alias("avg_stars")).orderBy(desc("avg_stars"))
print("average stars foreach cuisine consider only authentic language")
avg_stars_cuicine = avg_stars_cuicine.persist()
avg_stars_cuicine.show()

+--------------------+------------+----------------+-----+--------------------+
|         business_id|review_count|            city|state|single_categories_wa|
+--------------------+------------+----------------+-----+--------------------+
|eEOYSgkmpB90uNA7l...|          10|       Tampa Bay|   FL|          Vietnamese|
|il_Ro8jwPlHresjw9...|          28|    Indianapolis|   IN|            American|
|ROeacJQwBeh05Rqg7...|         205|    Philadelphia|   PA|              Korean|
|8rb-3VYXE37IZix4y...|          29|    Williamstown|   NJ|            American|
|aCDY7vXYMs54EbYuQ...|          25|       Glenolden|   PA|            American|
|x9K0RfZaT_zlw6Dkl...|           9|          Tucson|   AZ|             Italian|
|1MeIwdbTnZOBFCKOr...|          80|     Saint Louis|   MO|            American|
|Dy91wdWkwtI_qgjAI...|          23|      Wilmington|   DE|             Mexican|
|aNtKyc2rr-uK5cqzY...|          19|           Largo|   FL|             Mexican|
|Ms5xG8i4p80KSMcF3...|          52|     

In [56]:
bad_reviews = reviews.filter(col("text").rlike("|".join(bad_words))) 

avg_bad_cuicine = business_cuisine_wa.join(bad_reviews, business_cuisine_wa["business_id"] == bad_reviews["business_id"], "inner") \
.groupBy("single_categories_wa") \
.agg(count("single_categories_wa").alias("num_bad_review"), avg(auth_reviews.stars).alias("avg_stars_bad")).orderBy("avg_stars_bad")

print("average stars foreach cuisine consider only bad language")
avg_bad_cuicine = avg_bad_cuicine.persist()
avg_bad_cuicine.show()

average stars foreach cuisine consider only bad language
+--------------------+--------------+------------------+
|single_categories_wa|num_bad_review|     avg_stars_bad|
+--------------------+--------------+------------------+
|             Haitian|             2|               1.5|
|         Bangladeshi|             2|               1.5|
|              Syrian|             8|             1.625|
|             African|            73|1.7123287671232876|
|           Colombian|            11|1.7272727272727273|
|           Pakistani|           176|1.7670454545454546|
|             Tex-Mex|           728| 1.776098901098901|
|             Chinese|          1913|1.7773131207527444|
|            Egyptian|             6|1.8333333333333333|
|             Mexican|          3508|1.8509122006841505|
|               Irish|           139|1.8633093525179856|
|               Greek|           382|1.9450261780104712|
|             Italian|          3263|1.9549494330370825|
|       Mediterranean|         

In [57]:
result = avg_stars_cuicine.join(df_cuisine,df_cuisine["Cuisine"] == avg_stars_cuicine["single_categories_wa"]) \
.join(avg_bad_cuicine,avg_bad_cuicine["single_categories_wa"] == avg_stars_cuicine["single_categories_wa"]) \
.groupBy("Continent") \
.agg(sum((col("num_auth_review") * col("avg_stars"))).alias("num_auth_review * avg_stars"), \
    sum("num_auth_review").alias("tot_auth_review"), \
    sum((col("num_bad_review") * col("avg_stars_bad"))).alias("num_bad_review * avg_stars_bad"), \
    sum("num_bad_review").alias("tot_bad_review")
)
result.show()

+--------------+---------------------------+---------------+------------------------------+--------------+
|     Continent|num_auth_review * avg_stars|tot_auth_review|num_bad_review * avg_stars_bad|tot_bad_review|
+--------------+---------------------------+---------------+------------------------------+--------------+
|      European|                   181353.0|          47085|                       11299.0|          5439|
|North American|                   454657.0|         122183|                       31468.0|         16048|
|         Asian|                   202257.0|          53174|                       11013.0|          5673|
|South American|                     3967.0|           1035|                         153.0|            73|
|Middle Eastern|                    11622.0|           2863|                         387.0|           194|
|       African|                     4226.0|           1029|                         144.0|            65|
+--------------+---------------------

In [58]:
result_1 = result.withColumn("overall_avg", col("num_auth_review * avg_stars") / col("tot_auth_review")) \
.withColumn("overall_auth_avg", round(col("overall_avg"), 4))
result_1.show()
result_2 = result_1.withColumn("overall_bad_avg", col("num_bad_review * avg_stars_bad") / col("tot_bad_review")) \
.withColumn("overall_bad_avg", round(col("overall_bad_avg"), 4)) \
.select("Continent","overall_auth_avg","overall_bad_avg")

print("Average stars grouped by white and not cuisine")
result_2.show()

+--------------+---------------------------+---------------+------------------------------+--------------+------------------+----------------+
|     Continent|num_auth_review * avg_stars|tot_auth_review|num_bad_review * avg_stars_bad|tot_bad_review|       overall_avg|overall_auth_avg|
+--------------+---------------------------+---------------+------------------------------+--------------+------------------+----------------+
|      European|                   181353.0|          47085|                       11299.0|          5439|3.8516087926091114|          3.8516|
|North American|                   454657.0|         122183|                       31468.0|         16048|3.7211150487383677|          3.7211|
|         Asian|                   202257.0|          53174|                       11013.0|          5673| 3.803682250724038|          3.8037|
|South American|                     3967.0|           1035|                         153.0|            73|3.8328502415458936|          3.8329|

In [59]:
#bad_reviews.show()
#auth_reviews.show()
join_bad_auth = bad_reviews.join(auth_reviews, auth_reviews["business_id"] == bad_reviews["business_id"],"inner") \
.select(auth_reviews["*"])

avg_bad_auth_cuicine = business_cuisine_wa.join(join_bad_auth, business_cuisine_wa["business_id"] == join_bad_auth["business_id"], "inner") \
.groupBy("single_categories_wa") \
.agg(count("single_categories_wa").alias("num_bad_auth_review"), avg(auth_reviews.stars).alias("avg_stars")).orderBy("avg_stars")

#avg_bad_auth_cuicine.show()


result_bad_auth = avg_bad_auth_cuicine.join(df_cuisine,df_cuisine["Cuisine"] == avg_bad_auth_cuicine["single_categories_wa"]) \
.groupBy("Continent") \
.agg(sum((col("num_bad_auth_review") * col("avg_stars"))).alias("num_bag_auth_review * avg_stars"), \
    sum("num_bad_auth_review").alias("tot_bad_auth_review"), \
)

result_3 = result_bad_auth.withColumn("overall_avg", col("num_bag_auth_review * avg_stars") / col("tot_bad_auth_review")) \
.withColumn("overall_bad_auth_avg", round(col("overall_avg"), 4)) \
.select("Continent","overall_bad_auth_avg")
result_3.show()

+--------------+--------------------+
|     Continent|overall_bad_auth_avg|
+--------------+--------------------+
|      European|              2.4728|
|       African|              2.3639|
|North American|              2.2688|
|Middle Eastern|              1.9806|
|         Asian|              2.1425|
|South American|              2.1943|
+--------------+--------------------+



# 3.3 Machine Learning


In [60]:
from pyspark.ml.feature import Tokenizer, HashingTF
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as f

In [61]:
# we will do the test on the spanish auth reviews
reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
auth_reviews = reviews.filter(col("text").rlike("|".join(authentic_synonyms)))  
auth_reviews = auth_reviews.persist()

auth_reviews_cuicine = auth_reviews.join(business_cuisine_wa,business_cuisine_wa["business_id"] == auth_reviews["business_id"]) \
.filter(col("single_categories_wa") == "Spanish").select("text","stars")
auth_reviews_cuicine.show()
auth_reviews_cuicine.count()

+--------------------+-----+
|                text|stars|
+--------------------+-----+
|Whoever put Meson...|  4.0|
|This is actually ...|  5.0|
|First time as a f...|  5.0|
|I've been craving...|  5.0|
|My overall the ex...|  4.0|
|Truly authentic. ...|  5.0|
|A bit overpriced ...|  4.0|
|Got take out from...|  4.0|
|Fantastic, authen...|  4.0|
|Guys this is prob...|  5.0|
|I almost died whe...|  5.0|
|One of the origin...|  4.0|
|Awesome Cuban San...|  5.0|
|Was good, people ...|  3.0|
|TL;DR - REALLY go...|  5.0|
|Sad sad sad!!!! I...|  2.0|
|It's really good,...|  4.0|
|This is my second...|  3.0|
|I really wanted t...|  1.0|
|The coconut chick...|  4.0|
+--------------------+-----+
only showing top 20 rows



2063

In [62]:
#The test set needs to be separated before any preprocessing of the data that involves dataset statistics.
train_df, test_df = auth_reviews_cuicine.randomSplit([0.7, 0.3])
print(train_df.count(),test_df.count())

1435 628


In [63]:
from pyspark.ml import Pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words") # Create tokenizer object specifying which column to tokenize
hashingtf = HashingTF(inputCol="words", outputCol="features") # specify name of output column
dt_evaluator = MulticlassClassificationEvaluator(labelCol="stars",predictionCol="prediction",metricName="accuracy")

decisiontree = DecisionTreeClassifier(labelCol="stars",featuresCol="features")
naivebayes = NaiveBayes(modelType="multinomial",labelCol="stars",featuresCol="features")

In [64]:
# with decision tree algo
pipeline = Pipeline(stages=[tokenizer, hashingtf, decisiontree])

trained_model_pipeline = pipeline.fit(train_df)
test_preds = trained_model_pipeline.transform(test_df)
test_preds.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|stars|               words|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|First time as a f...|  5.0|[first, time, as,...|(262144,[5573,875...|[0.0,1.0,3.0,4.0,...|[0.0,0.0181818181...|       5.0|
|This is actually ...|  5.0|[this, is, actual...|(262144,[1619,230...|[0.0,1.0,5.0,11.0...|[0.0,0.0158730158...|       4.0|
|Fantastic, authen...|  4.0|[fantastic,, auth...|(262144,[12631,15...|[0.0,10.0,16.0,18...|[0.0,0.0233100233...|       5.0|
|Got take out from...|  4.0|[got, take, out, ...|(262144,[2325,553...|[0.0,40.0,53.0,10...|[0.0,0.0909090909...|       4.0|
|Truly authentic. ...|  5.0|[truly, authentic...|(262144,[6346,168...|[0.0,10.0,16.0,18...|[0.0,0.0233100233...|       5.0|
|Guys th

In [65]:
# with naive bayes algo
pipeline_naive = Pipeline(stages=[tokenizer, hashingtf, naivebayes])

trained_model_pipeline_naive = pipeline_naive.fit(train_df)
test_preds_naive = trained_model_pipeline_naive.transform(test_df)
test_preds_naive.show()

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|                text|stars|               words|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+----------+
|First time as a f...|  5.0|[first, time, as,...|(262144,[5573,875...|[-2056.3814342173...|[1.19932205411066...|       4.0|
|This is actually ...|  5.0|[this, is, actual...|(262144,[1619,230...|[-1586.2047674708...|[6.55638136404062...|       4.0|
|Fantastic, authen...|  4.0|[fantastic,, auth...|(262144,[12631,15...|[-373.83890708928...|[1.47844976527254...|       4.0|
|Got take out from...|  4.0|[got, take, out, ...|(262144,[2325,553...|[-1851.5343518804...|[1.75482969527240...|       4.0|
|Truly authentic. ...|  5.0|[truly, authentic...|(262144,[6346,168...|[-168.42267254814...|[6.44115179945193...|       4.0|
|Guys th

In [66]:
dt_accuracy = dt_evaluator.evaluate(test_preds)
print("Accuracy with Decision Tree ",dt_accuracy)
dt_accuracy_naive = dt_evaluator.evaluate(test_preds_naive)
print("Accuracy with Naive Bayes ",dt_accuracy_naive)

Accuracy with Decision Tree  0.4426751592356688
Accuracy with Naive Bayes  0.26910828025477707
