In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import Row

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
spark = SparkSession.builder \
.master("local").appName("read_hive").enableHiveSupport().getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
# check Hive database name
df=spark.sql("show databases")
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+
|databaseName|
+------------+
|     default|
+------------+

In [19]:
# reviews table has already been loaded from S3 to Hive on Hue with external table
tables = spark.sql("show tables").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default|  reviews|      false|
+--------+---------+-----------+

In [20]:
# "review" is the column for sentiment analysis. Take a glance at the data first. 
review = spark.sql("select review from default.reviews where review!= 'review' limit 20")
review.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|              review|
+--------------------+
|One of the best R...|
|good story good g...|
|             dis gud|
|favorite game of ...|
|Why wouldn't you ...|
|            it is ok|
|               worth|
|Isn't Geralt hot ...|
|Very Fun Would pl...|
|The game is enjoy...|
|The only thing bi...|
|better than cyber...|
|We all know you'l...|
|best open world g...|
|I don't think any...|
|It's surprisingly...|
| best game ever made|
|Still better than...|
|If you are a fan ...|
|There needs to be...|
+--------------------+

In [21]:
# Data cleaning on the review text.
# convert all words to lowercase in order to match with the positive/negative word list
# then remove all the special characters
# Finally, split all the words and save into a list
from pyspark.sql import functions as F
review = review.withColumn('review_cleaned', F.lower(F.col('review'))) \
.withColumn('review_cleaned', F.regexp_replace('review_cleaned', "[^0-9A-Za-z| ]", "")) \
.withColumn('review_cleaned', F.split('review_cleaned', ' '))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# a csv file containing a list of positive words has already been uploaded to S3
# Read the data and store it in a list
lines = sc.textFile('s3://bdpositivewords/positive_wordlist.csv')
pos_words = lines.map(lambda r: Row(r)).toDF(["line"])
#pos_words.collect()
#convert to a list instead of dataframe
pos_words=pos_words.rdd.map(lambda x: x[0]).collect()
#print(pos_words)
#pos_words = tuple(pos_words)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Also read the negative word list
lines = sc.textFile('s3://bdnegativewords/negative_wordlist.csv')
neg_words = lines.map(lambda r: Row(r)).toDF(["line"])
#neg_words.collect()
neg_words=neg_words.rdd.map(lambda x: x[0]).collect()
#print(neg_words)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# check data type
type(pos_words)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'list'>

In [25]:
# In this function, if the review words matches the words in positive word list,
# it counts the number of time that a positive word appears in the review
def count_pos(review):
    p = 0 
    for word in review:
        if word in pos_words:
            p += 1
    return p

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Count for negative words
def count_neg(review):
    n = 0 
    for word in review:
        if word in neg_words:
            n += 1
    return n

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
count_pos = F.udf(count_pos)
count_neg = F.udf(count_neg)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Testing: calcuate words count and sentiment 
review_test = review.withColumn("pos_cnt", count_pos('review_cleaned')) \
.withColumn("neg_cnt", count_neg('review_cleaned')) \
.withColumn("simple_sentiment", F.col("pos_cnt")/(F.col("pos_cnt")+F.col("neg_cnt"))) \
.withColumn("length", F.size("review_cleaned")) \
.withColumn("advanced_sentiment", (F.col("pos_cnt")-F.col("neg_cnt"))/F.col("length"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
review_test.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-------+-------+------------------+------+--------------------+
|              review|      review_cleaned|pos_cnt|neg_cnt|  simple_sentiment|length|  advanced_sentiment|
+--------------------+--------------------+-------+-------+------------------+------+--------------------+
|One of the best R...|[one, of, the, be...|      2|      0|               1.0|    12| 0.16666666666666666|
|good story good g...|[good, story, goo...|      2|      0|               1.0|     7|  0.2857142857142857|
|             dis gud|          [dis, gud]|      0|      0|              null|     2|                 0.0|
|favorite game of ...|[favorite, game, ...|      1|      0|               1.0|    11| 0.09090909090909091|
|Why wouldn't you ...|[why, wouldnt, yo...|      0|      0|              null|     5|                 0.0|
|            it is ok|        [it, is, ok]|      0|      0|              null|     3|                 0.0|
|               worth|             [w

In [30]:
# The second approach in the last column make the sentiment score range from -1 - 1, and 0 is neutral. 
# This makes more sense and we can simplify the code to just keep the sentiment score

# run on all data and simplified columns 

#read hive table
reviews = spark.sql("select * from default.reviews where review != 'review'")

# Data cleaning
reviews = reviews.withColumn('review_cleaned', F.lower(F.col('review'))) \
.withColumn('review_cleaned', F.regexp_replace('review_cleaned', "[^0-9A-Za-z| ]", "")) \
.withColumn('review_cleaned', F.split('review_cleaned', ' '))

# Calculate sentiment score
reviews = reviews.withColumn("sentiment", (count_pos('review_cleaned')-count_neg('review_cleaned'))/F.size("review_cleaned")).drop("review_cleaned")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
reviews.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['review_id', 'app_id', 'author_steamid', 'review', 'timestamp_created', 'timestamp_updated', 'recommended', 'votes_helpful', 'votes_funny', 'weighted_vote_score', 'comment_count', 'sentiment']

In [32]:
# just view 2 columns to check
reviews.select('review_id','sentiment').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+
|review_id|           sentiment|
+---------+--------------------+
| 85184605| 0.16666666666666666|
| 85184171|  0.2857142857142857|
| 85184064|                 0.0|
| 85180436| 0.09090909090909091|
| 85179753|                 0.0|
| 85179400|                 0.0|
| 85179341|                 1.0|
| 85178164| 0.10526315789473684|
| 85177892|                 0.2|
| 85174926|-0.01123595505617...|
| 85174737|                 0.0|
| 85172984|  0.3333333333333333|
| 85172769|               0.125|
| 85170596|  0.2857142857142857|
| 85170453| 0.14285714285714285|
| 85169788|              0.0875|
| 85169685|                0.25|
| 85169128| 0.18181818181818182|
| 85169052| 0.07017543859649122|
| 85167135| 0.08333333333333333|
+---------+--------------------+
only showing top 20 rows

In [18]:
# reviews is now the full table containing all review information and a new column for sentiment score. 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# Run ALS recommendation model
# test with small data 


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Import the required functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Code for cross validation for reference.
# Create ALS model
#als = ALS(
#         userCol="author_steamid", 
#         itemCol="app_id",
#         ratingCol="sentiment", 
#         nonnegative = False, 
#         implicitPrefs = False,
#         coldStartStrategy="drop"
#)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Add hyperparameters and their respective values to param_grid
#param_grid = ParamGridBuilder() \
#            .addGrid(als.rank, [10, 50, 100, 150]) \
#            .addGrid(als.regParam, [.01, .05, .1, .15]) \
#            .build()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Define evaluator as RMSE and print length of evaluator
#evaluator = RegressionEvaluator(
#           metricName="rmse", 
#           labelCol="rating", 
#           predictionCol="prediction") 
#print ("Num models to be tested: ", len(param_grid))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Build cross validation using CrossValidator
#cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# smaller table for testing

#read hive table
reviews_small = spark.sql("select * from default.reviews where review != 'review' limit 100000")

#Data cleaning
reviews_small = reviews_small.withColumn('review_cleaned', F.lower(F.col('review'))) \
.withColumn('review_cleaned', F.regexp_replace('review_cleaned', "[^0-9A-Za-z| ]", "")) \
.withColumn('review_cleaned', F.split('review_cleaned', ' '))

# Calculate sentiment score
reviews_small = reviews_small.withColumn("sentiment", (count_pos('review_cleaned')-count_neg('review_cleaned'))/F.size("review_cleaned")).drop("review_cleaned")
#reviews_small = reviews_small.withColumn("sentiment", (count_pos('review_cleaned'))/F.size("review_cleaned")).drop("review_cleaned")
#adjust to positive only!!!!

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
reviews_small.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------+-----------------+--------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------------+
|review_id|app_id|   author_steamid|              review|timestamp_created|timestamp_updated|recommended|votes_helpful|votes_funny|weighted_vote_score|comment_count|           sentiment|
+---------+------+-----------------+--------------------+-----------------+-----------------+-----------+-------------+-----------+-------------------+-------------+--------------------+
| 85184605|292030|76561199054755373|One of the best R...|       1611379970|       1611379970|       true|            0|          0|                0.0|            0| 0.16666666666666666|
| 85184171|292030|76561198170193529|good story good g...|       1611379264|       1611379264|       true|            0|          0|                0.0|            0|  0.2857142857142857|
| 85184064|292030|76561198119302812|             dis gud|       1

In [27]:
# small dataset

# subset data with only the 3 columns needed for the model
als_data = reviews_small.select("author_steamid","app_id","sentiment")

#als_data = spark.createDataFrame(als_data.tail(als_data.count()-1), als_data.schema)
#firstRow=als_data.first()
#als_data=als_data.filter(line -> line != firstRow)
#als_data.withColumn("author_steamid", F.col("author_steamid").cast("int"))
#from pyspark.sql.types import DoubleType
#als_data = als_data.withColumn("author_steamid", F.col("author_steamid").cast(DoubleType()))
#from pyspark.sql.types import LongType
#als_data = als_data.withColumn("author_steamid", F.col("author_steamid").cast(LongType()))
#als_data = als_data.withColumn("app_id", F.col("app_id").cast("int"))
#als_data = als_data.dropna()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# re-run with all data
#als_data = reviews.select("author_steamid","app_id","sentiment")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
als_data.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+------+--------------------+
|   author_steamid|app_id|           sentiment|
+-----------------+------+--------------------+
|76561199054755373|292030| 0.16666666666666666|
|76561198170193529|292030|  0.2857142857142857|
|76561198119302812|292030|                 0.0|
|76561198065591528|292030| 0.09090909090909091|
|76561198996835044|292030|                 0.0|
|76561198284845223|292030|                 0.0|
|76561198370568524|292030|                 1.0|
|76561198040150323|292030| 0.10526315789473684|
|76561198040190687|292030|                 0.2|
|76561198020027165|292030|-0.01123595505617...|
|76561199001673284|292030|                 0.0|
|76561198033304276|292030|  0.3333333333333333|
|76561198116559597|292030|               0.125|
|76561198299050034|292030|  0.2857142857142857|
|76561198080678414|292030| 0.14285714285714285|
|76561198352439354|292030|              0.0875|
|76561198868423923|292030|                0.25|
|76561197993334693|292030| 0.18181818181

In [30]:
als_data.printSchema()
# the author_steamid is in string format

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- author_steamid: string (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- sentiment: double (nullable = true)

In [31]:
# The ALS model requires id to be in numeric format. In the previous testing, 
# both integer and long data type are not working 
# Try with string indexer, which convert string into an index number
from pyspark.ml.feature import StringIndexer
stringindexer = StringIndexer().setInputCol("author_steamid").setOutputCol("author_steamid_num")
modelc = stringindexer.fit(als_data)
als_data = modelc.transform(als_data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# check the schema, the data type is correct. 
als_data.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- author_steamid: string (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- sentiment: double (nullable = true)
 |-- author_steamid_num: double (nullable = false)

In [33]:
# Create test and train set
(train, test) = als_data.randomSplit([0.8, 0.2], seed = 42)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
train.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- author_steamid: string (nullable = true)
 |-- app_id: integer (nullable = true)
 |-- sentiment: double (nullable = true)
 |-- author_steamid_num: double (nullable = false)

In [35]:
# Build the ALS model with the user (steamid), item (game app_id), and rating (sentiment score) column
als = ALS(
         userCol="author_steamid_num", 
         itemCol="app_id",
         ratingCol="sentiment", 
         nonnegative = False, 
         implicitPrefs = False,
         coldStartStrategy="drop"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
# fit the ALS model with train data
model = als.fit(train)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
predictions = model.transform(test)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
# recommend one game for all users
# number can be changed if we need more games
userRecs = model.recommendForAllUsers(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
userRecs.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+--------------------+
|author_steamid_num|     recommendations|
+------------------+--------------------+
|                12|[[292030, 0.01316...|
|                76|     [[292030, 0.0]]|
|                81|[[292030, 0.21949...|
|               126|[[292030, 0.13169...|
|               133|[[292030, 0.08231...|
|               148|[[292030, 0.65849...|
|               177|[[292030, 0.02993...|
|               192|[[292030, 0.09407...|
|               209|[[292030, 0.03658...|
|               243|     [[292030, 0.0]]|
|               300|[[292030, 0.32924...|
|               333|[[292030, 0.01431...|
|               406|[[292030, 0.21949...|
|               417|[[292030, 0.06931...|
|               444|     [[292030, 0.0]]|
|               481|[[292030, 0.08496...|
|               496|[[292030, 0.10130...|
|               539|[[292030, 0.03135...|
|               548|[[292030, 0.02235...|
|               577|[[292030, 0.13169...|
+------------------+--------------