# Assignment: Scalable Processing
## Yelp Reviews and Authenticity

Big Data Management | by ___ | ____@itu.dk | date

## Connecting to the Spark Cluster job using the two JobParameters.json

To connect this jupyter notebook with your Spark cluster, we need to tell jupyter how it can access the spark cluster. Below code accomplishes that. 

In [1]:
#####################################################################
# DO NOT CHANGE ANYTHING HERE.
# IF YOU HAVE PROBLEMS, CHECK THE ASSIGNMENT GUIDE CAREFULLY 
#####################################################################
    
FIRST_EXECUTION = False
# Only execute this cell once.
if not FIRST_EXECUTION:
    FIRST_EXECUTION = True
    import os, json, pyspark
    from pyspark.conf import SparkConf
    from pyspark.sql import SparkSession, functions as F
    from pyspark.sql.functions import avg

    # Two files are automatically read: JobParameters.json for the Spark Cluster job using a temporary spark instance
    # and JobParameters.json for the Jupyter Lab job to extract the hostname of the cluster. 

    MASTER_HOST_NAME = None

    # Open the parameters Jupyter Lab app was launched with
    with open('/work/JobParameters.json', 'r') as file:
        JUPYTER_LAB_JOB_PARAMS = json.load(file)
        # from pprint import pprint; pprint(JUPYTER_LAB_JOB_PARAMS) 
        for resource in JUPYTER_LAB_JOB_PARAMS['request']['resources']:
            if 'hostname' in resource.keys():
                MASTER_HOST_NAME = resource['hostname']

    MASTER_HOST = f"spark://{MASTER_HOST_NAME}:7077"

    conf = SparkConf().setAll([
            ("spark.app.name", 'reading_job_params_app'), 
            ("spark.master", MASTER_HOST),
        ])
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()

    CLUSTER_PARAMETERS_JSON_DF = spark.read.option("multiline","true").json('/work/JobParameters.json')

    # Extract cluster info from the specific JobParameters.json
    NODES = CLUSTER_PARAMETERS_JSON_DF.select("request.replicas").first()[0]
    CPUS_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.cpu").first()[0] - 1
    MEM_PER_NODE = CLUSTER_PARAMETERS_JSON_DF.select("machineType.memoryInGigs").first()[0]

    CLUSTER_CORES_MAX = CPUS_PER_NODE * NODES
    CLUSTER_MEMORY_MAX = MEM_PER_NODE * NODES 

    EXECUTOR_CORES = CPUS_PER_NODE - 1  # set cores per executor on worker node
    EXECUTOR_MEMORY = int(
        MEM_PER_NODE / (CPUS_PER_NODE / EXECUTOR_CORES) * 0.5
    )  # set executor memory in GB on each worker node

    # Make sure there is a dir for spark logs
    if not os.path.exists('spark_logs'):
        os.mkdir('spark_logs')

    conf = SparkConf().setAll(
        [
            ("spark.app.name", 'spark_assignment'), # Change to your liking 
            ("spark.sql.caseSensitive", False), # Optional: Make queries strings sensitive to captialization
            ("spark.master", MASTER_HOST),
            ("spark.cores.max", CLUSTER_CORES_MAX),
            ("spark.executor.cores", EXECUTOR_CORES),
            ("spark.executor.memory", str(EXECUTOR_MEMORY) + "g"),
            ("spark.eventLog.enabled", True),
            ("spark.eventLog.dir", "spark_logs"),
            ("spark.history.fs.logDirectory", "spark_logs"),
            ("spark.deploy.mode", "cluster"),
        ]
    )

    ## check executor memory, taking into accout 10% of memory overhead (minimum 384 MiB)
    CHECK = (CLUSTER_CORES_MAX / EXECUTOR_CORES) * (
        EXECUTOR_MEMORY + max(EXECUTOR_MEMORY * 0.10, 0.403)
    )

    assert (
        int(CHECK) <= CLUSTER_MEMORY_MAX
    ), "Executor memory larger than cluster total memory!"

    # Stop previous session that was just for loading cluster params
    spark.stop()

    # Start new session with above config, that has better resource handling
    spark = SparkSession.builder.config(conf=conf)\
                                .getOrCreate()
    sc = spark.sparkContext

Click on the "SparkMonitor" tab at the top in Jupyter Lab to see the status of running code on the cluster.

## Loading the data
Here we specify where the yelp datasets are located on UCloud and read then using the spark session.

In [2]:
# Read in the business and review files
# This is the path to the shared datasets provided by adding an the dataset input folder
# when submitting the spark cluster job.
business = spark.read.json('file:////work/yelp/yelp_academic_dataset_business.json') # Use the file:/// prefix to indicate we want to read from the cluster's filesystem
business = business.persist()

# Persist 2 commonly used dataframes since they're used for later computations
# https://sparkbyexamples.com/spark/spark-difference-between-cache-and-persist/

users = spark.read.json("file:////work/yelp/yelp_academic_dataset_user.json")

reviews = spark.read.json('file:////work/yelp/yelp_academic_dataset_review.json')
reviews = reviews.persist()

In [3]:
# Get number of rows with no sampling:
reviews.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

In [4]:
# OPTIONAL:
# Reduce resource usage and make queries run faster
# by only using a small sample of the dataframe
# and overwriting previous variable "df".
# Useful while developing, not so much to
# provide final answers. Therefore: Remember to 
# to re-read the df when done developing code using
# df = spark.read etc like above.
reviews = reviews.sample(withReplacement=False, fraction=1/50)

# Get number of rows after sampling:

business.show()


+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|             address|          attributes|         business_id|          categories|          city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|
+--------------------+--------------------+--------------------+--------------------+--------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+
|1616 Chapala St, ...|{null, null, null...|Pns2l4eNsfO8kk83d...|Doctors, Traditio...| Santa Barbara|                null|      0|   34.4266787|  -119.7111968|Abby Rappoport, L...|      93101|           7|  5.0|   CA|
|87 Grasso Plaza S...|{null, null, null...|mpf3x-BjTdTEA3yCZ...|Shipping Centers,...|        Affton|{8:0-18:30, 0:0-0...|      1|   

In [5]:
#3.1.1:

from pyspark.sql.functions import sum

temp = business.agg(sum("review_count").alias("total amount of reviews"))

#temp.show()

#reviewsCount.show()

#3.1.2:


fiveStarReviews = business.select("name","stars","review_count").filter((business["stars"] == 5.0) & (business["review_count"] >= 500))

#fiveStarReviews.show()

#3.1.3:

#Analyze user.json to find the influencers who have written more than 1000 reviews. 
#The output should be in the form of a Spark Table/DataFrame of user id.

specialUsers = users.filter(users["review_count"] > 1000).select("user_id")

#creating view containing the influencers
specialUsers.createOrReplaceTempView("influencers")

#3.1.4:

# Analyze review.json, business.json, and a view created from your answer to Q3 to find the businesses that have been reviewed by more than 5 influencer users.

#joining reviews with influencers on id
influencer_reviews = reviews.join(specialUsers, reviews["user_id"] == specialUsers["user_id"], "inner")

#counting how many influencer reviews each business have:
business_influencerRev_count = influencer_reviews.groupBy("business_id").count().withColumnRenamed("count", "number_of_influencer_reviews")

business_influencerRev_count.show()

#selecting only businesses that have more than five influencer reviews:

more_than_five= business_influencerRev_count.filter(business_influencerRev_count["number_of_influencer_reviews"] > 5)

print("number of businesses with more than 5 influencer reviews: " + str(more_than_five.count()))

#showing which businesses have more than five:

final_result = more_than_five.join(business, more_than_five["business_id"] == business["business_id"], "inner")

final_result.select( "name", "number_of_influencer_reviews").show()


#3.1.5:

# Analyze review.json and user.json to find an ordered list of users based on the average star counts they have given in all their reviews.

from pyspark.sql.functions import avg

first= reviews.join(users, reviews["user_id"] == users["user_id"]).select("name", "stars")

second = first.groupBy("name").agg(avg("stars"))
second = second.withColumnRenamed("avg(stars)", "average") 

final = second.orderBy("average", ascending=False)

#final.show()

#3.2.1





+--------------------+----------------------------+
|         business_id|number_of_influencer_reviews|
+--------------------+----------------------------+
|Ety2Z0CImO6FYDV6L...|                           1|
|kPG6r0h73sPgXBei0...|                           1|
|lXCFcmhoRsyW-mnzz...|                           1|
|8vvMGaJ5biveUVE6-...|                           1|
|fnO03-RX7UDC1TzXE...|                           1|
|r3If7d4wAvYIbltQf...|                           1|
|3u5ri1nLnDyFeGEHE...|                           1|
|2KIDQyTh-HzLxOUED...|                           1|
|ltBBYdNzkeKdCNPDA...|                           1|
|T-uphPk44OFt6sR8F...|                           1|
|z06yuwkwWjgXAxnlZ...|                           1|
|xwmdDSPBvFAM9D74f...|                           1|
|mzZ_WTb2zvyJMBkm8...|                           1|
|qmzHJVxvr63lyybBJ...|                           1|
|tJPaPZwRqFmDSEc8V...|                           2|
|yy3XuGkFIowGr3jL4...|                           1|
|pG5hLBK49Oz

In [6]:
from pyspark.sql.functions import col

#3.2.1: 

# What is the percentage of reviews containing a variant of the word "authentic"? 

#first we define a new column with boolean values called "is_aut", that checks if a review contains either a string or a substring with "authentic"
#this way variants of authentic like "authenticity" is checked:

authentic_variants = ["authentic", "authentik", "authentique"]

authentic = reviews.withColumn("is_aut", col("text").contains("authentic"))

#counting total amount of reviews:
all_reviews = reviews.count()

#finding amount of reviews containing a variant of authentic:
authentic_reviews = authentic.filter(col("is_aut") == True).count()

#finding the percentage by dividing the number of authentic revs with all revs and multiply the result with 100
percentage = (authentic_reviews / all_reviews) * 100

print("percentage of reviews containing a variant of authentic is: " + str(round(percentage, 2)) + "%")


# How many reviews contain the string "legitimate" grouped by type of cuisine?

legitimate = reviews.withColumn("is_legit", col("text").contains("legitimate"))

l_true = legitimate.filter(col("is_legit") == True)

l_true.count()


percentage of reviews containing a variant of authentic is: 1.66%


94

In [7]:
authentic.count()

139887

In [8]:


spark.conf.set("spark.sql.repl.eagerEval.truncate", 200)


from pyspark.sql.functions import col

cuisines = ["African", "American", "British", "Carribbean", "Italian", "Indian", "Korean", "Mexican", "Thai", "Vietnamese", "Indonesian", 
                 "French", "British", "Nordic", "Chinese", "Greek", "Irish", "Portuguese", "Spanish", "Turkish", "Asian", "Middle-Eastern"]

#First i create an empty table
filterCon = col("categories").contains(cuisines[0])


# i use this loop with the I operator to be able to dynamically change the filter condition to each type of cuisine in the list 
for name in cuisines[1:]:
    filterCon = filterCon | col("categories").contains(name)

# using the created condition to filter through businesses and find all the cuisines in the list
f_business = business.filter(filterCon)

# now i join the existing 
l_business = l_true.join(f_business, l_true["business_id"] == f_business["business_id"], "inner")

#l_business.select("name","categories","is_legit").show()

#How many reviews contain the string "legitimate" grouped by type of cuisine?

print("number of legit businesses: " + str(l_business.count()))

#5

number of legit businesses: 36


In [9]:
from pyspark.sql import functions as F


#Is there a difference in the amount of authenticity language used in the different areas? (e.g., by state, north/south, urban/rural)

# Defining a set of authentic keywords

from pyspark.sql.functions import count, col, when

aut_keywords = ["authentic", "genuine", "real", "traditional", "original", "actual", "true", "legitimate", "valid", "reliable", "accurate", "honest"]

#using r.like method that takes a regex expression
#the vertical bar | will act as an or operator, that will match any of the authentic keywords
aut = reviews.withColumn("is_aut", col("text").rlike("|".join(aut_keywords)))

true_count = aut.filter(col("is_aut") == True).count()

print(true_count)

# https://simple.wikipedia.org/wiki/Northern_United_States

# https://www.google.com/maps/place/USA/@35.6618607,-116.9516466,4z/data=!3m1!4b1!4m6!3m5!1s0x54eab584e432360b:0x1c3bb99243deb742!8m2!3d37.09024!4d-95.712891!16zL20vMDljN3cw?entry=ttu

#Very hard to divide business by north or south with information on google. Therefore i took a threshold of 40 on latitude
newbusiness = business.withColumn(
    "north_south",
    when(col("latitude") > 40, "North")
    .when(col("latitude") < 40, "South")
)

#using the categorized businesses in north and south
aut_business = newbusiness.join(aut, on="business_id")

#now i cube  state, city, authenticity and if they are south or north
aut_cube = aut_business.cube("state", "city", "is_aut", "north_south")

counts = aut_cube.agg(count("*").alias("review_count"))

filter_counts = counts.na.drop()


#difference in authentic businesses by each state

temp_aut_b_count = filter_counts.filter(col("is_aut") == True)

# now i group and order by descending states in terms of count
state_aut = temp_aut_b_count.groupBy("state", "is_aut", "north_south").agg(count("*").alias("aut_count"))

state_count = state_aut.orderBy("aut_count", ascending=False)

# Show which state has the highest amount of authentic businesses
state_count.select("state", "aut_count").show()

#showing a percentage column for each state
percentage = temp_aut_b_count.groupBy("state", "is_aut", "north_south").agg(count("*").alias("percentage"))

aut_state = percentage.withColumn(
    "percentage",
    (col("percentage") / true_count * 100)
)

aut_state = aut_state.orderBy("percentage", ascending=False)

aut_state.show()

#now i want to show two rows containing the whole percentage of north and south businesses, that are authentic

north_south = temp_aut_b_count.groupBy("north_south").agg(
    F.count("*").alias("aut_business")
)

#using the total amount of businesses for division
total = temp_aut_b_count.count()

#calculating percentage for north and south
final = north_south.withColumn(
    "percentage", (col("aut_business") / total* 100)
)

final.show()



37674
+-----+---------+
|state|aut_count|
+-----+---------+
|   PA|      157|
|   NJ|      103|
|   FL|       84|
|   PA|       61|
|   MO|       61|
|   TN|       34|
|   IL|       30|
|   NJ|       26|
|   IN|       25|
|   LA|       24|
|   AZ|       14|
|   DE|       14|
|   NV|       14|
|   CA|       12|
|   AB|        7|
|   ID|        6|
|   SD|        1|
|   MA|        1|
+-----+---------+

+-----+------+-----------+--------------------+
|state|is_aut|north_south|          percentage|
+-----+------+-----------+--------------------+
|   PA|  true|      North| 0.41673302542867763|
|   NJ|  true|      South|   0.273398099485056|
|   FL|  true|      South|  0.2229654403567447|
|   PA|  true|      South| 0.16191537930668365|
|   MO|  true|      South| 0.16191537930668365|
|   TN|  true|      South| 0.09024791633487286|
|   IL|  true|      South| 0.07963051441312312|
|   NJ|  true|      North| 0.06901311249137336|
|   IN|  true|      South| 0.06635876201093592|
|   LA|  true|      S

In [10]:
# Can you identify a difference in the relationship between authenticity language and typically negative words ?
# in restaurants serving _south american or asian cuisine_ compared to restaurants serving _european cuisine_? And to what degree?

#Find all the south american and asian cuisine containing authentic and negative words
#Find all the european cuising authentic and negative words

bad_words = ["bad","awful", "nasty", "terrible", "dirty", "kitsch", "cheap", "rude", "simple"]
    
#copied down for overview
authenticity_keywords = ["authentic", "legitimate"]
authenticity = reviews.withColumn("is_aut", col("text").rlike("|".join(authenticity_keywords)))

#combining with negative words

neg_words = authenticity.withColumn("has_neg", col("text").rlike("|".join(bad_words)))

#now i filter on each type of cuisine to find business related to their continent

european = [
    'Italian', 'French', 'Spanish', 'German', 'Greek', 'Portuguese', 'British', 'Russian', 'Hungarian', 
    'Polish', 'Dutch', 'Swedish', 'Belgian', 'Turkish', 'Irish', 'Romanian', 'Austrian', 'Swiss', 
    'Norwegian', 'Czech', 'Finnish', 'Danish', 'Ukrainian', 'Serbian', 'Slovakian', 'Croatian', 'Bulgarian',
    'Lithuanian', 'Latvian', 'Estonian', 'Slovenian', 'Macedonian', 'Maltese', 'Moldovan', 'Luxembourger',
    'Icelandic', 'Albanian', 'Andorran', 'Bosnian', 'Georgian'
]

south_american= [
    'Brazilian', 'Argentine', 'Peruvian', 'Colombian', 'Venezuelan', 'Chilean', 'Ecuadorian', 'Bolivian', 
    'Paraguayan', 'Uruguayan', 'Guyanese', 'Surinamese', 'Columbian', 'French Guianese', 'Falkland Islander',
    'Belizean', 'Nicaraguan', 'Honduran', 'Costa Rican', 'Panamanian', 'Guatemalan', 'Salvadoran', 'Mexican',
    'Cuban', 'Jamaican', 'Haitian', 'Bahamian', 'Trinidadian', 'Tobagonian', 'Barbadian', 'Antiguan',
    'Barbudan', 'Dominican', 'Grenadian', 'Vincentian', 'Saint Lucian', 'Kittitian', 'Nevisian', 'Guyanese',
    'Surinamese'
]

asian= [
    'Chinese', 'Indian', 'Japanese', 'Thai', 'Indonesian', 'Filipino', 'Vietnamese', 'Korean', 'Malaysian', 
    'Sri Lankan', 'Singaporean', 'Bangladeshi', 'Pakistani', 'Nepalese', 'Cambodian', 'Myanmar', 'Afghan', 
    'Laotian', 'Taiwanese', 'Mongolian', 'Bhutanese', 'Bruneian', 'Maldivian', 'Tibetan', 'Burmese', 
    'Kazakhstani', 'Kyrgyz', 'Tajik', 'Turkmen', 'Uzbekistani', 'Kuwaiti', 'Omani', 'Qatari', 'Saudi', 
    'Emirati', 'Yemeni', 'Iranian', 'Iraqi', 'Israeli'
]

businesss = business.filter(col("categories").contains("Restaurants"))

euro_business = business.filter(col("categories").rlike("|".join(european)))
sa_business = business.filter(col("categories").rlike("|".join(south_american)))
asian_business = business.filter(col("categories").rlike("|".join(asian)))

# joining the reviews with aut and bad words
euro = neg_words.join(euro_business, on="business_id")
sa = neg_words.join(sa_business, on="business_id")
asian = neg_words.join(asian_business, on="business_id")

#now i need to count how many authentic and neg reviews each cuisine has

#starting with euro

euro_dict = {
    "cuisine": "European",
    "is_authentic": euro.filter(col("is_aut") == True).count(),
    "has_negative": euro.filter(col("has_neg") == True).count(),
    "total_reviews": euro_business.count()
}

sa_dict = {
    "cuisine": "South American",
    "is_authentic": sa.filter(col("is_aut") == True).count(),
    "has_negative": sa.filter(col("has_neg") == True).count(),
    "total_reviews": sa_business.count()
}

asia_dict = {
    "cuisine": "Asian",
    "is_authentic": asian.filter(col("is_aut") == True).count(),
    "has_negative": asian.filter(col("has_neg") == True).count(),
    "total_reviews": asian_business.count()
}


all_cuisines = [euro_dict, sa_dict, asia_dict]

#creating new dataframe containing each of the cuisines 

final = spark.createDataFrame(all_cuisines)
final = final.withColumn("aut_percentage", (col("is_authentic") / col("total_reviews") * 100))
final = final.withColumn("neg_percentage", (col("has_negative") / col("total_reviews") * 100))

final.show()

+--------------+------------+------------+-------------+------------------+------------------+
|       cuisine|has_negative|is_authentic|total_reviews|    aut_percentage|    neg_percentage|
+--------------+------------+------------+-------------+------------------+------------------+
|      European|        2018|         417|         6676|6.2462552426602755|30.227681246255244|
|South American|        1496|         698|         5286|13.204691638289823|28.301172909572454|
|         Asian|        2083|         749|         7632| 9.813941299790356|27.292976939203356|
+--------------+------------+------------+-------------+------------------+------------------+



In [69]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
#selecting which columns to preprocess: 

new_business = business.drop("stars")

preprocess = new_business.join(neg_words, on="business_id", how="inner")

#first i want to see how much text is a feature importance for rating
preprocess1 = preprocess.select("text", "stars")
#then i want to see how much other features combined 
preprocess2 = preprocess.select("text", "city", "state", "cool", "funny", "useful", "is_aut", "has_neg", "stars", "is_open")


In [49]:
#starting to preprocess my text column as a 

from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator

#preprocessing the text column into a list or words
#removing stopwords like "is, are"... 
tokenizer = Tokenizer(inputCol="text", outputCol="listOfWords")
stopWords = StopWordsRemover(inputCol="listOfWords", outputCol="ListWithoutStopWords")

#converting the list of words into term frequency vectors, that represents each words frequency in the each review
hashingTF = HashingTF(inputCol="ListWithoutStopWords", outputCol="TFvectors", numFeatures=10000)
#using idf to rescale the TF vector on that particular review. It uses TF vectors from all the reviews to compute the idf values for each TF 
#it calculates how frequent each term occurs across all reviews 
idf = IDF(inputCol="TFvectors", outputCol="features")

#using a pipeline to transform / preprocess the data
pipeline = Pipeline(stages=[tokenizer, stopWords, hashingTF, idf])
pipeline_model = pipeline.fit(preprocess1)
rating = pipeline_model.transform(preprocess1)

rating.show()

#appending each stage to a list to print later: 

stages = [] 

for step in pipeline_model.stages:
    name = step.__class__.__name__ 
    stages.append(name) 

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|                text|stars|         listOfWords|ListWithoutStopWords|           TFvectors|            features|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+
|If you decide to ...|  3.0|[if, you, decide,...|[decide, eat, her...|(10000,[92,129,27...|(10000,[92,129,27...|
|Say you decide th...|  5.0|[say, you, decide...|[say, decide, nee...|(10000,[453,551,5...|(10000,[453,551,5...|
|Took a while to g...|  4.0|[took, a, while, ...|[took, get, actua...|(10000,[1369,1388...|(10000,[1369,1388...|
|Fantastic sliders...|  5.0|[fantastic, slide...|[fantastic, slide...|(10000,[1285,1369...|(10000,[1285,1369...|
|I was here a few ...|  3.0|[i, was, here, a,...|[times, review., ...|(10000,[750,1221,...|(10000,[750,1221,...|
|Awesome detailing...|  5.0|[awesome, detaili...|[awesome, detaili...|(10000,[871,2007,...|(1000

In [67]:
from pyspark.ml.evaluation import RegressionEvaluator

rating = rating.select("features", "stars")

train, test = rating.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(labelCol="stars", maxIter=10)

lr_model = lr.fit(train)

#use our model to make predictions
predictions = lr_model.transform(test)

print("Preprocessing stages in the pipeline: " + str(stages))
print("Using Model: LinearRegression")
print("Max Iterations: ", lr_model.getMaxIter())
print("Input Features: " + str(preprocess1.columns))
#evaluating using metrics such as MSE, RSME, R2 and MAE
evaluator = RegressionEvaluator(labelCol="stars", metricName="mse", predictionCol="prediction")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error:", mse)
evaluator = RegressionEvaluator(labelCol="stars", metricName="rmse", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error:", rmse)
evaluator = RegressionEvaluator(labelCol="stars", metricName="r2", predictionCol="prediction")
r2 = evaluator.evaluate(predictions)
print("R2-score: ", r2)
evaluator = RegressionEvaluator(labelCol="stars", metricName="mae", predictionCol="prediction")
mae = evaluator.evaluate(predictions)
print("Mean Absolute Error:", mae)

Preprocessing stages in the pipeline: ['Tokenizer', 'StopWordsRemover', 'HashingTF', 'IDFModel']
Using Model: LinearRegression
Max Iterations:  10
Input Features: ['text', 'stars']
Mean Squared Error: 1.1116060331452502
Root Mean Squared Error: 1.0543272893865785
R2-score:  0.4867709006636458
Mean Absolute Error: 0.8324936276277384


In [70]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

#using a pipeline to transform / preprocess the data
pipeline2 = Pipeline(stages=[tokenizer, stopWords, hashingTF, idf])
pipeline2_model = pipeline.fit(preprocess2)
rating2 = pipeline2_model.transform(preprocess2)

#appending each stage to a list to print later: 
stages = [] 
for step in pipeline_model.stages:
    name = step.__class__.__name__ 
    stages.append(name) 

#converting state and city into numerical values and categorical data using one hot encoding
stringIndex1 = StringIndexer(inputCol="city", outputCol="city_index", handleInvalid="keep")
indexed = stringIndex1.fit(rating2).transform(rating2)
stringIndex2 = StringIndexer(inputCol="state", outputCol="state_index", handleInvalid="keep")
indexed = stringIndex2.fit(indexed).transform(indexed)
encoder1 = OneHotEncoder(inputCol="city_index", outputCol="city_encoded")
indexed_and_encoded = encoder1.fit(indexed).transform(indexed)
encoder2 = OneHotEncoder(inputCol="state_index", outputCol="state_encoded")
indexed_and_encoded = encoder2.fit(indexed_and_encoded).transform(indexed_and_encoded)

#using the rest of the features aswell
#input_features = ["features","cool", "funny", "useful", "is_aut", "has_neg", "city_encoded", "state_encoded"]
input_features1 = ["features","cool", "funny", "useful", "is_aut", "has_neg", "is_open"]
assembler = VectorAssembler(inputCols=input_features1, outputCol="AllFeatures")
final = assembler.transform(indexed_and_encoded)

final = final.select("AllFeatures", "stars")
final.show()

+--------------------+-----+
|         AllFeatures|stars|
+--------------------+-----+
|(10006,[92,129,27...|  3.0|
|(10006,[453,551,5...|  5.0|
|(10006,[1369,1388...|  4.0|
|(10006,[1285,1369...|  5.0|
|(10006,[750,1221,...|  3.0|
|(10006,[871,2007,...|  5.0|
|(10006,[42,313,47...|  3.0|
|(10006,[627,903,3...|  1.0|
|(10006,[94,198,52...|  1.0|
|(10006,[505,569,6...|  4.0|
|(10006,[1127,1376...|  1.0|
|(10006,[524,533,1...|  3.0|
|(10006,[18,54,269...|  5.0|
|(10006,[199,273,3...|  5.0|
|(10006,[12,15,91,...|  5.0|
|(10006,[199,222,2...|  4.0|
|(10006,[15,59,115...|  4.0|
|(10006,[642,877,1...|  4.0|
|(10006,[646,1004,...|  4.0|
|(10006,[80,374,38...|  3.0|
+--------------------+-----+
only showing top 20 rows



In [71]:
train, test = final.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol='AllFeatures',labelCol="stars", maxIter=10)

lr_model = lr.fit(train)

#use our model to make predictions
predictions = lr_model.transform(test)

print("Preprocessing stages in the pipeline: " + str(stages))
print("Using Model: LinearRegression")
print("Max Iterations: ", lr_model.getMaxIter())
print("Vector Assember with following features " + str(input_features1))
print("    *NOTE: 'features' contains the idf vectors of 'text' column")
#evaluating using metrics such as MSE, RSME, R2 and MAE to see how our model performs
evaluator = RegressionEvaluator(labelCol="stars", metricName="mse", predictionCol="prediction")
mse = evaluator.evaluate(predictions)
print("Mean Squared Error:", mse)
evaluator = RegressionEvaluator(labelCol="stars", metricName="rmse", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error:", rmse)
evaluator = RegressionEvaluator(labelCol="stars", metricName="r2", predictionCol="prediction")
r2 = evaluator.evaluate(predictions)
print("R2-score: ", r2)
evaluator = RegressionEvaluator(labelCol="stars", metricName="mae", predictionCol="prediction")
mae = evaluator.evaluate(predictions)
print("Mean Absolute Error:", mae)

Preprocessing stages in the pipeline: ['Tokenizer', 'StopWordsRemover', 'HashingTF', 'IDFModel']
Using Model: LinearRegression
Max Iterations:  10
Vector Assember with following features ['features', 'cool', 'funny', 'useful', 'is_aut', 'has_neg', 'is_open']
    *NOTE: 'features' contains the idf vectors of 'text' column
Mean Squared Error: 1.090713962736012
Root Mean Squared Error: 1.0443725210555916
R2-score:  0.49641678073238416
Mean Absolute Error: 0.8240674081593888


In [68]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

input_features2 = ["features","cool", "funny", "useful", "is_aut", "has_neg", "is_open", "city_encoded", "state_encoded"]
assembler = VectorAssembler(inputCols=input_features2, outputCol="AllFeatures")
final = assembler.transform(indexed_and_encoded)

final = final.select("AllFeatures", "stars")

train, test = final.randomSplit([0.8, 0.2], seed=42)

lg = LogisticRegression(featuresCol='AllFeatures', labelCol='stars', maxIter=10, standardization=True)

lg_model = lg.fit(train)

predictions = lg_model.transform(test)

eval = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction", metricName="accuracy")
accuracy = eval.evaluate(predictions)
eval = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction", metricName="weightedPrecision")
precision = eval.evaluate(predictions)
eval = MulticlassClassificationEvaluator(labelCol="stars", predictionCol="prediction", metricName="f1")
f1 = eval.evaluate(predictions)

print("Preprocessing stages in the pipeline: " + str(stages))
print("Using Model: LogisticRegression")
print("Max Iterations: ", lg_model.getMaxIter())
print("Standardization: True")
print("Vector Assember with following features " + str(input_features2))
print("    *NOTE: 'features' contains the idf vectors of 'text' column")
print("Accuracy: ", accuracy)
print("Precision:", precision)
print("F1-score: ", f1)


Preprocessing stages in the pipeline: ['Tokenizer', 'StopWordsRemover', 'HashingTF', 'IDFModel']
Using Model: LogisticRegression
Max Iterations:  10
Standardization: True
Vector Assember with following features ['features', 'cool', 'funny', 'useful', 'is_aut', 'has_neg', 'is_open', 'city_encoded', 'state_encoded']
    *NOTE: 'features' contains the idf vectors of 'text' column
Accuracy:  0.6311200057179616
Precision: 0.6127413049165751
F1-score:  0.6199410689182556
