# <center> PSTAT 235 Group Project: Internet Service Providers' Review Ratings Predictions </center>

## Preprocessing Steps
* Drop unnecessary columns
* Convert ratings into integers
* <span style = "color:red"> **(TO DO)** Remove duplicate reviews (use rev_ids)</span>
* Natural Language Processing (NLP)
    * Remove Punctuations
    * Tokenize words
    * Remove stop words
    * Word2Vec
    * <span style = "color:red"> **(TO DO)** Scale Features with StandardScaler</span>
* Sentiment Extraction (use package vaderSentiment on full reviews)

#### Considerations
* Optimize pipeline using RDDs. Currently only using pyspark dataframes because they are easier to use for structured data, but when applying transformations such as Word2Vec it takes super long. And this is only with half the data. MLib has Word2Vec for RDDs, but the results are not per row like with the ml version (which requires a dataframe). 

In [5]:
!pip install vaderSentiment

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType
import pandas as pd
import os
import re
import time
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer



In [6]:
spark = SparkSession.builder \
        .master("local") \
        .appName("isp_analysis") \
        .getOrCreate()

In [7]:
sc = spark.sparkContext

In [8]:
#businesses_dataFile = 'businesses.csv'
reviews_dataFile = 'Yelp_Data/businesses_reviews.csv'

In [9]:
# First read in as dataframe to drop unnecessary columns

#businessesRDD = sc.textFile(businesses_dataFile)
reviews_df = spark.read.csv(reviews_dataFile, header = True)

In [6]:
drop_columns = ['time_create','url','rev_id']

In [10]:
# Drop unnecessary columns to reduce overhead later on. 
# Maybe drop location from business reviews and put lat/long coordinates from businesses_df
reviews_df2 = reviews_df.drop('time_created','url','rev_id')

In [11]:
# Convert the ratings column to an integer
reviews_df2 = reviews_df2.withColumn('rating', reviews_df2['rating'].cast(IntegerType()))

# Rename text column to reviews
reviews_df2 = reviews_df2.withColumnRenamed('text','reviews')

In [12]:
# Remove puncutation. Have to do this before tokenization because of noninterpretable errors
removePunctuation_udf = udf(lambda x: re.sub('[^\w\s]','',str(x)))

reviews_noPunc = reviews_df2.withColumn('non_punc', removePunctuation_udf('reviews'))

In [13]:
# Tokenize the reviews column
tokenizer = Tokenizer(inputCol = 'non_punc', outputCol = 'words')
reviews_tokenized = tokenizer.transform(reviews_noPunc)

In [14]:
# Remove stop words
remover = StopWordsRemover(inputCol = 'words', outputCol = 'filtered')
reviews_stopsRemoved = remover.transform(reviews_tokenized)

In [35]:
# select necessary columns and convert to RDD.
# saving raw reviews for sentiment conversion
reviews_df3 = reviews_stopsRemoved.select('rating','ISP_name','filtered','reviews')

reviews_df3.show(5)

+------+--------------------+--------------------+--------------------+
|rating|            ISP_name|            filtered|             reviews|
+------+--------------------+--------------------+--------------------+
|     5|Comcast Service C...|[review, service,...|This review is fo...|
|     1|Comcast Service C...|[goes, xfinity, c...|This goes for all...|
|     1|Comcast Service C...|[im, giving, one,...|I'm only giving o...|
|     1|ClearView Communi...|[clearview, disho...|ClearView is a di...|
|     1|             CONX TV|[dish, tv, , disc...|Dish tv  discount...|
+------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [14]:
# Convert reviews to numerical features using Word2Vec
# This step will take a while
#start = time.time()
# set vector size arbitrarily. we dont want to make the model too large 
#word2vec = Word2Vec(vectorSize = 5, inputCol = 'filtered', outputCol = 'word_vecs')
#model = word2vec.fit(reviews_df3)

#reviews_wordVec = model.transform(reviews_df3)

#print(f'Conversion of Reviews Took: {round(time.time()-start,2)} Seconds')

Conversion of Reviews Took: 13.5 Seconds


In [224]:
# Sentiment Analysis
analyzer = SentimentIntensityAnalyzer()
polarity_udf = udf(lambda x: analyzer.polarity_scores(x))

In [225]:
#reviews_final = reviews_wordVec.withColumn('valence', polarity_udf('reviews'))
reviews_final = reviews_df3.withColumn('valence', polarity_udf(reviews_df3['filtered']))

In [17]:
# Convert to RDD for faster modeling?
#processed_reviewsRDD = reviews_final.rdd.map(list)

In [226]:
pd.set_option('display.max_colwidth', -1)

reviews_final.select('valence').show(10, False)

+-----------------------------------------+
|valence                                  |
+-----------------------------------------+
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
|{neg=0.0, pos=0.0, compound=0.0, neu=1.0}|
+-----------------------------------------+
only showing top 10 rows



In [16]:
# Convert reviews to numerical features using Word2Vec
# This step will take a while
start = time.time()
# set vector size arbitrarily. we dont want to make the model too large 
word2vec = Word2Vec(vectorSize = 100, inputCol = 'filtered', outputCol = 'word_vecs')
model = word2vec.fit(reviews_df3)

reviews_wordVec = model.transform(reviews_df3)

print(f'Conversion of Reviews Took: {round(time.time()-start,2)} Seconds')

Conversion of Reviews Took: 17.61 Seconds


In [17]:
reviews_wordVec.show(10)

+------+--------------------+--------------------+--------------------+--------------------+
|rating|            ISP_name|            filtered|             reviews|           word_vecs|
+------+--------------------+--------------------+--------------------+--------------------+
|     5|Comcast Service C...|[review, service,...|This review is fo...|[-0.0855886761564...|
|     1|Comcast Service C...|[goes, xfinity, c...|This goes for all...|[-0.0182209065055...|
|     1|Comcast Service C...|[im, giving, one,...|I'm only giving o...|[-0.0729191299962...|
|     1|ClearView Communi...|[clearview, disho...|ClearView is a di...|[-0.0362489023669...|
|     1|             CONX TV|[dish, tv, , disc...|Dish tv  discount...|[0.02664551820213...|
|     3|            Spectrum|[hallelujah, open...|***HALLELUJAH THE...|[-0.1190661154105...|
|     1|            Spectrum|[trust, door, doo...|DO NOT TRUST the ...|[-0.0548134035430...|
|     4|            Spectrum|[looking, charter...|If you are lookin...

In [227]:
# Model
from pyspark.sql.functions import length
import pyspark.sql.functions as f
from pyspark.ml.regression import LinearRegression # note this is from the ML package
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline  

## PREPROCESSNG
# Remove ratings == null
reviews_df4 = reviews_df3.filter(reviews_df3['rating'].isNotNull())

In [228]:
## FEATURE ENGINEERING
# Create column for length of review
reviews_df4 = reviews_df4.withColumn('reviewLength', length(reviews_df4.reviews))

# Create a column for the count of a specific word, here an "!"
word = '!'
wordCount_udf = udf(lambda col: len([x for x in col if x == word]), IntegerType())
reviews_df4 = reviews_df4.withColumn('exCount', wordCount_udf(reviews_df4.reviews))

reviews_df4.show(15)

# One hot encoding for ISP_name
SI = StringIndexer(inputCol="ISP_name", outputCol="nameIndex")
OHE = OneHotEncoder(inputCol="nameIndex", outputCol="nameDummy")

+------+--------------------+--------------------+--------------------+------------+-------+
|rating|            ISP_name|            filtered|             reviews|reviewLength|exCount|
+------+--------------------+--------------------+--------------------+------------+-------+
|     5|Comcast Service C...|[review, service,...|This review is fo...|         152|      0|
|     1|Comcast Service C...|[goes, xfinity, c...|This goes for all...|         159|      0|
|     1|Comcast Service C...|[im, giving, one,...|I'm only giving o...|         154|      2|
|     1|ClearView Communi...|[clearview, disho...|ClearView is a di...|         154|      0|
|     1|             CONX TV|[dish, tv, , disc...|Dish tv  discount...|         159|      0|
|     3|            Spectrum|[hallelujah, open...|***HALLELUJAH THE...|         157|      0|
|     1|            Spectrum|[trust, door, doo...|DO NOT TRUST the ...|         159|      5|
|     4|            Spectrum|[looking, charter...|If you are lookin...

In [229]:
# Configure pipeline stages 
# I only separated them to show features we created versus those we derive with functions
VA = VectorAssembler(inputCols=['reviewLength', 'exCount']+['nameDummy'], outputCol="features")  

In [230]:
## MODEL BUILDING
# Create parameters of model
seed = 314
train_test = [0.6, 0.4]

train_data1, test_data1 = reviews_df4.randomSplit(train_test, seed)

maxIter=10
regParam=0.3
elasticNetParam=0.8

LR1 = LinearRegression(featuresCol='features', labelCol='rating',
                       maxIter=maxIter, regParam=regParam, elasticNetParam=elasticNetParam)

In [231]:
# Build pipeline
pipeline = Pipeline(stages=[SI, OHE, VA, LR1])

# Create model
model1 = pipeline.fit(train_data1)

# Make predicions
preds1 = model1.transform(test_data1)

preds1.show(5)

+------+--------------------+--------------------+--------------------+------------+-------+---------+-----------------+--------------------+------------------+
|rating|            ISP_name|            filtered|             reviews|reviewLength|exCount|nameIndex|        nameDummy|            features|        prediction|
+------+--------------------+--------------------+--------------------+------------+-------+---------+-----------------+--------------------+------------------+
|     1|1 On 1 Communicat...|[terrible, custom...|Terrible customer...|         156|      0|    444.0|(817,[444],[1.0])|(819,[0,446],[156...|3.6500602312182635|
|     1|995 Web Internet ...|[couldnt, provide...|Couldn't provide ...|         157|      0|    686.0|(817,[686],[1.0])|(819,[0,688],[157...|3.6500602312182635|
|     1|         ABS-CBNnow!|[im, sure, got, n...|I'm not sure how ...|         159|      0|    543.0|(817,[543],[1.0])|(819,[0,545],[159...|3.6500602312182635|
|     1|         ABS-CBNnow!|[im, 