# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

LargeScaleDataAnalysis | by Krzysztof Michal Parocki | krpa@itu.dk | 30.04.2023

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [4]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
FIRST_EXECUTION = False
# Only execute this cell once.
if not FIRST_EXECUTION:
    FIRST_EXECUTION = True
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 

    EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext

Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [5]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()
# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [3]:
# Get number of rows with no sampling:
reviews.count()

6990280

In [4]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to 
# to re-read the df when done developing code using
# df = spark.read etc like above.
#reviews = reviews.sample(withReplacement=False, fraction=1/50)

# Get number of rows after sampling:
#reviews.count() 

In [5]:
reviews.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [6]:
business.show()

+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{null, null, null...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                null|      0|   34.4266787|  -119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1|   

Example: Say we're only interested in reviews of good mexican restaurants in Arizona. You can delete this when you do your own thing. 

In [7]:
# Filter to only Arizona businesses with "Mexican" as part of their categories
az_mex = business.filter(business.state == "AZ")\
                .filter(business.categories.rlike("Mexican"))\
                .select("business_id", "name")

# Join with the reviews
az_mex_rs = reviews.join(az_mex, on="business_id", how="inner")

# Filter to only 5 star reviews
good_az_mex_rs = az_mex_rs.filter(az_mex_rs.stars == 5)\
                        .select("name","text")

# Print the top 20 rows of the DataFrame
good_az_mex_rs.show()

# Convert to pandas (local object) and save to local file system
good_az_mex_rs.toPandas().to_csv("good_az_reviews.csv", header=True, index=False, encoding='utf-8')


+--------------------+--------------------+
|                name|                text|
+--------------------+--------------------+
|Casa Molina Del N...|We've been coming...|
|St Mary's Mexican...|Some of the fines...|
|Street- Taco and ...|Top notch street ...|
|Indian Frybread-M...|This place is a h...|
|St Mary's Mexican...|One of my favorit...|
|Street- Taco and ...|This is my favori...|
|Street- Taco and ...|Great food!  Grea...|
|            BK Tacos|Quality ingredien...|
|Street- Taco and ...|Best tacos in Tuc...|
|        El Merendero|This place was fr...|
|        El Merendero|To the reviewer w...|
|Taqueria Pico De ...|If you are lookin...|
|               Penca|Good unique Mexic...|
|   La Mesa Tortillas|Incredible red ch...|
|               Penca|I love this place...|
|            BK Tacos|This place is the...|
|Indian Frybread-M...|Wow! My sister ha...|
|St Mary's Mexican...|Carne Seca burro ...|
|             Club 21|The food was exce...|
|      El Charro Cafe|This is th

## 3.1 DataFrame Queries

In [8]:
business.groupBy().sum('review_count').show()

+-----------------+
|sum(review_count)|
+-----------------+
|          6745508|
+-----------------+



In [9]:
#1. Analyze business.json to find the total number of reviews for all businesses. The output should be in the form of a Spark DataFrame with one value representing the count.
bus_rev=business.join(reviews, on='business_id', how='inner')
count_df = bus_rev.groupby('name')\
.agg({'review_id':'count'})\
.withColumnRenamed('count(review_id)', 'count')
count_df.orderBy('count', ascending=False).show()

+--------------------+-----+
|                name|count|
+--------------------+-----+
|           Starbucks|21575|
|          McDonald's|18210|
|             Dunkin'|10312|
|Chipotle Mexican ...| 9763|
|         First Watch| 9317|
|           Taco Bell| 8636|
|   Acme Oyster House| 8491|
|         Chick-fil-A| 8378|
|        Panera Bread| 7565|
|        Oceana Grill| 7516|
|  Buffalo Wild Wings| 7347|
|             Chili's| 6565|
|      Domino's Pizza| 6539|
|      Steak ’n Shake| 6455|
|                IHOP| 6443|
|             Wendy's| 6397|
|  Outback Steakhouse| 6207|
|Hattie B’s Hot Ch...| 6160|
|Applebee's Grill ...| 6146|
|   Ruby Slipper Cafe| 5882|
+--------------------+-----+
only showing top 20 rows



In [10]:
#2. Analyze business.json to find all businesses that have received 5 stars and that have been reviewed by 500 or more users. The output should be in the form of DataFrame of (name, stars, review count).
business_five = business.filter((business.stars == 5)\
& (business.review_count>=500))\
.select("name","stars","review_count")
business_five.show()

+--------------------+-----+------------+
|                name|stars|review_count|
+--------------------+-----+------------+
|     Blues City Deli|  5.0|         991|
|            Tumerico|  5.0|         705|
|  Free Tours By Foot|  5.0|         769|
|                Yats|  5.0|         623|
|   SUGARED + BRONZED|  5.0|         513|
|Nelson's Green Br...|  5.0|         545|
|Smiling With Hope...|  5.0|         526|
|    Carlillos Cocina|  5.0|         799|
|Barracuda Deli Ca...|  5.0|         521|
+--------------------+-----+------------+



In [11]:
#3. Analyze user.json to find the influencers who have written more than 1000 reviews. The output should be in the form of DataFrame of user id.
influ = users.filter(users.review_count>1000).select('user_id')
influ.show()

+--------------------+
|             user_id|
+--------------------+
|j14WgRoU_-2ZE1aw1...|
|q_QQ5kBBwlCcbL1s4...|
|MGPQVLsODMm9ZtYQW...|
|NIhcRW6DWvk1JQhDh...|
|QJI9OSEn6ujRCtrX0...|
|AkBtT43dYcttxQ3qO...|
|2l0O1EI1m0yWjFo2z...|
|RgDVC3ZUBqpEe6Y1k...|
|lquc6IF6uGIeRomDL...|
|VHdY6oG2JPVNjihWh...|
|om5ZiponkpRqUNa3p...|
|K7thO1n-vZ9PFYiC7...|
|UQFE3BT1rsIYrcDvu...|
|jVYzrVblDFSuL3GHt...|
|3zxy3LVBV3ttxoYbY...|
|0G-QF457q_0Z_jKqh...|
|ITa3vh5ERI90G_WP4...|
|P5bUL3Engv-2z6kKo...|
|ouODopBKF3AqfCkuQ...|
|YMgZqBUAddmFErxLt...|
+--------------------+
only showing top 20 rows



In [12]:
#4. Analyze review.json, business.json, and a view created from your answer to Q3 to find the businesses that have been reviewed by more than 5 influencer users.
bus_rev=business.join(reviews, on='business_id', how='inner')
bus_rev_influ=bus_rev.join(influ, on='user_id', how='inner')
grouped=bus_rev_influ.groupBy('name').count()
grouped.filter(grouped['count']>5).show()

+--------------------+-----+
|                name|count|
+--------------------+-----+
| Brothers III Lounge|    8|
|Applebee's Grill ...|   94|
|          Ohana Cafe|    8|
|City Coffee & Crê...|   17|
|         Jazzy's BBQ|    8|
|Señor Tequila Mex...|    9|
|Woodrow's Sandwic...|   10|
|Straz Center for ...|   12|
|    McAlister's Deli|   60|
| Gran Caffe L'Aquila|   22|
|        Santo Mezcal|    8|
|Suncoast Credit U...|    9|
|     LaScala's Birra|   11|
|Goodyear Auto Ser...|    6|
|Iron Hill Brewery...|   81|
|   Swizzle Stick Bar|   11|
|            Popeye's|    8|
|         Ohana Sushi|    6|
|          Winn Dixie|   23|
|       Buca di Beppo|    9|
+--------------------+-----+
only showing top 20 rows



In [13]:
grouped=bus_rev_influ.groupBy('name').count()
grouped.filter(grouped['count']>5).count()

5506

In [14]:
rev_influ = reviews.join(influ, on='user_id', how='right').groupBy('business_id').count()
rev_influ.filter(rev_influ['count']>5).count()

5119

In [15]:
users.select('user_id').orderBy('average_stars', ascending=False).show()

+--------------------+
|             user_id|
+--------------------+
|uVRumIjYImzUFc6tr...|
|ydeAqVSr2HX9TR2bl...|
|vx3OD6YC3kOk4oneH...|
|VWY7PSIrt-tCyHA-S...|
|_qOk1DJ_RRtXrgs3T...|
|I7KT8qSLXVeOM7ogN...|
|CcXrryd_EJTKWooSL...|
|qyzpifW3ORv-EGV27...|
|W63wG-8u70acKeKFA...|
|Pv6FIXQbWiOZ28lSJ...|
|A9qxTwz7gFhCjix5A...|
|GG9a0eV6w9cKVpVrT...|
|AeKwjnERVN4YbK-Ry...|
|K1WzrU75btX-CMiPk...|
|rAIwdOYiYHIwECXMz...|
|1xmV57LXBAsCWRNDT...|
|PaLjKU7MLLWpLokba...|
|meihVP_0q6SyVLVYW...|
|PQLQocTQpDwc3C0IB...|
|XUMkFho1Ly9A-b5UY...|
+--------------------+
only showing top 20 rows



In [16]:
influ_count = reviews.join(influ, on="user_id", how="right")\
        .groupBy('business_id').count()
influ_count.show()

influ_count.filter(influ_count['count'] > 5)\
        .join(business, on='business_id', how='left').select('name').count()

+--------------------+-----+
|         business_id|count|
+--------------------+-----+
|yMdl4xS-XZLgeFQ9k...|    1|
|h_6ioAoKNLi01kPho...|    2|
|S27cRb1jq0Q7QUYqE...|    3|
|vXb4OWsjPoiBtmarf...|    6|
|QKQnFTZzCXyr0NIi9...|    1|
|NY_sXepAjc83v58_1...|    4|
|3fPvGGLaH29WkXXcO...|   10|
|LmRyuir7orBNN43fy...|    6|
|3VrqxApK-iwfRohlA...|    1|
|2fJ-WxJlUN6azp3bz...|   15|
|iipnazeY9eoANJ37l...|    1|
|Mb7EELlpx1E9hA0pS...|    2|
|2IahpaBR4U2Kdy9HF...|   22|
|bPoU-QnlvgbTZWqsE...|    3|
|bncTqUdA8ZPcUkDDm...|    6|
|0wZJkj-OnZ7Pmubls...|   22|
|WvlCQhLQGza1wj0Ho...|    2|
|2y_CdkxEOJEJGyJAp...|    1|
|E9a5O6ONNgly2IfDf...|    1|
|3FKIev7ZB_KE6XHL9...|    8|
+--------------------+-----+
only showing top 20 rows



5119

In [17]:
users.printSchema()

root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)
 |-- fans: long (nullable = true)
 |-- friends: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)
 |-- yelping_since: string (nullable = true)



In [18]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
#5. Analyze review.json and user.json to find an ordered list of users based on the average star counts they have given in all their reviews.
avg = users.select('user_id', 'average_stars','review_count')\
.orderBy('average_stars', ascending=False).show() #the easy way
avg.show()
avg.count()

In [20]:
rev_usr=reviews.join(users, on='user_id', how='inner')\
.groupBy('user_id')\
.agg({'stars':'avg', 'review_id':'count'})\
.withColumnRenamed('avg(stars)','avg_stars')
rev_usr.orderBy('avg_stars', ascending=False).show()
rev_usr.count()

+--------------------+----------------+---------+
|             user_id|count(review_id)|avg_stars|
+--------------------+----------------+---------+
|--zU_p1sxo-W-ae_D...|               3|      5.0|
|-0-2SQH-1t7jKTP2i...|               1|      5.0|
|-0-_vUr91ASWssEyv...|               2|      5.0|
|--_Q0PF6V_QhCh2yn...|               1|      5.0|
|-02nZw0C2L13YM1KW...|               1|      5.0|
|--0S2HVJui8bEa2iV...|               1|      5.0|
|-04yLlA6Z6BfAzYEF...|               1|      5.0|
|--SXoMTx1URLFR_9e...|               2|      5.0|
|-06iaHH01RX8se5IH...|               1|      5.0|
|--W95nwm9MMB1Bs1x...|               4|      5.0|
|-0AYXSEmt8tF4jBg_...|               2|      5.0|
|--YWBD-491bjkqJWE...|               1|      5.0|
|-0AhJU76qScGDAQ-m...|               1|      5.0|
|--338aogPBCUUKGRH...|               1|      5.0|
|-0CkKartWFGzSQotp...|               1|      5.0|
|--cTSV869T8KKNIhN...|               1|      5.0|
|-0FDPC6mDeA5k0gR3...|               1|      5.0|


1987897

In [21]:
test = avg.join(rev_usr, on='user_id', how='inner')
test.show()
test.count()

AttributeError: 'function' object has no attribute 'join'

In [None]:
reviews.count()

In [None]:
users.select('review_count').groupBy().sum().collect()

In [None]:
###Include both numbers in the final report. Write that the discrepancy might be because 1)the statistics include empty reviews with star rating while the reviews dataset doesn't 2)reviews might be only from a certain time period,
#and statistics calculated over the whole existence.

## 3.2 Authenticity Study

In [6]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
business.printSchema()

In [None]:
business.select('categories').show(20, False)

In [None]:
expr = "Restaurants"
bs = business.filter(business["categories"].rlike(expr))
bs.select('categories').show(20, False)

In [None]:
from pyspark.sql.functions import when

In [None]:
bs = bs.withColumn("cuisine", when(bs.categories.rlike("Mexican"), "Mexican")\
         .when(bs.categories.rlike("Thai"), "Thai")\
         .when(bs.categories.rlike("Japanese"), "Japanese")\
         .when(bs.categories.rlike("Chinese"), "Chinese")\
         .when(bs.categories.rlike("French"), "French")\
         .when(bs.categories.rlike("Italian"), "Italian")\
         .when(bs.categories.rlike("Korean"), "Korean")\
         .when(bs.categories.rlike("Indian"), "Indian")\
 #       .when(bs.categories.rlike("American"),"American")\
 #       .when(bs.categories.rlike("Greek"),"Greek")
 #       .when(bs.categories.rlike("Vietnamese"),"Vietnamese")
         .otherwise(""))
bs = bs.filter(bs.cuisine != "")
    
bs.select('categories','cuisine').show(40, False)

In [None]:
##What is the percentage of reviews containing a variant of the word "authentic"? How many reviews contain the string "legitimate" grouped by type of cuisine?

df = bs.join(reviews, on='business_id', how='left')
auth = df.filter(df.text.rlike("auth"))
print(auth.count()/df.count()*100, '% of reviews contain a variant of the word "authentic"')

#add synonyms here, but this is not as easy as adding the | operator unfortunately.

In [None]:
legit = df.filter(df.text.rlike("authentic"))
legit = legit.groupBy("cuisine").count()
total = df.groupBy("cuisine").count()
total.join(legit, on="cuisine").show()


In [None]:
lang = df.withColumn("auth_score",
    when(df.text.rlike("legit") | df.text.rlike("auth"), 1).otherwise(0))

lang.groupBy('state').avg('auth_score').show()             

In [None]:
lang.select('state').distinct().count()

In [None]:
lang.groupBy('cuisine').avg('auth_score').show()

In [None]:
cube = lang.cube('state', 'auth_score').count().orderBy("state", "auth_score")

positive = cube.filter(cube.auth_score==1)
positive.show()

In [None]:
cube2 = lang.cube('state', 'cuisine').avg('auth_score').orderBy('avg(auth_score)')
cube2 = cube2.filter(cube2.state.isNotNull() & cube2.cuisine.isNotNull())
cube2.show()
cube2.orderBy('avg(auth_score)', ascending=False).show()

In [None]:
bad = lang.withColumn("bad_score",
    when(df.text.rlike("kitsch")\
        | df.text.rlike("bad")\
        | df.text.rlike("simple")\
        | df.text.rlike("disgusting")\
        | df.text.rlike("dirty")\
        | df.text.rlike("cheap")\
        | df.text.rlike("rude"), 1).otherwise(0))

bad.select('auth_score','bad_score').show()

In [None]:
cuisines = ['Mexican', 'Thai', 'Indian', 'Chinese', 'Japanese', 'Italian', 'Korean', 'French']
results=[]
for cuisine in cuisines:
        results.append(bad.filter(bad.cuisine == cuisine) \
              .stat.corr("auth_score", "bad_score"))
        
print(results)

## Machine Learning

In [7]:
reviews.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [8]:
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.feature import CountVectorizer
import pyspark.sql.functions as F
import pyspark.sql.types as T


data = reviews.select(F.split(F.col("text")," ").alias("text_array"), 'stars') \
    .drop("text")
data.show()

+--------------------+-----+
|          text_array|stars|
+--------------------+-----+
|[If, you, decide,...|  3.0|
|[I've, taken, a, ...|  5.0|
|[Family, diner., ...|  3.0|
|[Wow!, , Yummy,, ...|  5.0|
|[Cute, interior, ...|  4.0|
|[I, am, a, long, ...|  1.0|
|[Loved, this, tou...|  5.0|
|[Amazingly, amazi...|  5.0|
|[This, easter, in...|  3.0|
|[Had, a, party, o...|  3.0|
|[My, experience, ...|  5.0|
|[Locals, recommen...|  4.0|
|[Love, going, her...|  4.0|
|[Good, food--love...|  4.0|
|[The, bun, makes,...|  4.0|
|[Great, place, fo...|  5.0|
|[Tremendous, serv...|  5.0|
|[The, hubby, and,...|  4.0|
|[I, go, to, blow,...|  5.0|
|[My, absolute, fa...|  5.0|
+--------------------+-----+
only showing top 20 rows



In [None]:
from pyspark.ml.feature import CountVectorizer

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="text_array", outputCol="features")

model = cv.fit(data)

result = model.transform(data)
result.show(truncate=False)

In [None]:
data = result.drop("text_array")

In [None]:
# Prepare training and test data.
train, test = data.randomSplit([0.9, 0.1], seed=1)

In [None]:
from pyspark.ml.regression import LinearRegression

#Train the model
lr = LinearRegression(featuresCol = 'features', labelCol='stars', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

trainingSummary = lr_model.summary
print("r2: %f" % trainingSummary.r2)

In [None]:
#Testing
lr_predictions = lr_model.transform(test)
lr_predictions.select("prediction","stars","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="stars",metricName="r2")

print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))