In [1]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.types import *
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import udf

In [2]:
conf = SparkConf().set("spark.cores.max", "10")
sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)

In [3]:
df =spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://54.245.171.238/yelp.review").load()

In [4]:
df_final=df.select('business_id','stars','user_id', 'useful')

In [4]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()
#analyser.polarity_scores()

In [5]:
rating_udf = udf(lambda x: analyser.polarity_scores(x)['compound'], FloatType())

In [23]:
df_rated=df.withColumn("textrating", rating_udf(df.text))

In [24]:
df_rated.show()

+--------------------+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+----------+
|                 _id|         business_id|cool|      date|funny|           review_id|stars|                text|useful|             user_id|textrating|
+--------------------+--------------------+----+----------+-----+--------------------+-----+--------------------+------+--------------------+----------+
|[5a5e9699bf78f846...|uYHaNptLzDLoV_JZ_...|   0|2016-07-12|    0|VfBHSwC5Vz_pbFluy...|    5|My girlfriend and...|     0|cjpdDjZyprfyDG3Rl...|    0.9855|
|[5a5e9699bf78f846...|uYHaNptLzDLoV_JZ_...|   0|2016-10-02|    0|3zRpneRKDsOPq92tq...|    3|If you need an in...|     0|bjTcT8Ty4cJZhEOEo...|    0.8921|
|[5a5e9699bf78f846...|uYHaNptLzDLoV_JZ_...|   0|2015-09-17|    0|ne5WhI1jUFOcRn-b-...|    3|Mittlerweile gibt...|     0|AXgRULmWcME7J6Ix3...|   -0.9837|
|[5a5e9699bf78f846...|uYHaNptLzDLoV_JZ_...|   0|2016-08-21|    0|llmdwOgDReucVoWEr

In [27]:
indexer = StringIndexer(inputCol="business_id", outputCol="business_id_indexed")
df_rated = indexer.fit(df_rated).transform(df_rated)
indexer = StringIndexer(inputCol="user_id", outputCol="user_id_indexed")
df_rated = indexer.fit(df_rated).transform(df_rated)

df_input=df_rated.select('user_id_indexed', 'business_id_indexed', 'textrating')

In [37]:
df_input.cache()

DataFrame[user_id_indexed: double, business_id_indexed: double, textrating: float]

In [42]:
df_input.agg({"textrating": "min"}).collect()[0][0]

-0.9998000264167786

In [48]:
df_input2=df_input.select('user_id_indexed', 'business_id_indexed', ((df_input.textrating+1.5)*2.0).alias("final_rating"))

In [49]:
df_input2.show()

+---------------+-------------------+------------------+
|user_id_indexed|business_id_indexed|      final_rating|
+---------------+-------------------+------------------+
|        95597.0|            10690.0|4.9709999561309814|
|       119624.0|            10690.0| 4.784199953079224|
|       102416.0|            10690.0|1.0326000452041626|
|       100157.0|            10690.0| 4.919399976730347|
|       393296.0|            10690.0|2.7439999878406525|
|       197307.0|            10690.0| 4.198799967765808|
|        29765.0|            10690.0| 4.763200044631958|
|        83123.0|            10690.0| 4.115399956703186|
|        75523.0|            10690.0| 4.958600044250488|
|       127977.0|            10690.0| 4.904600024223328|
|        29808.0|            10690.0| 4.866999983787537|
|       246605.0|            10690.0| 4.924999952316284|
|       116579.0|            10690.0| 4.714400053024292|
|         1071.0|            10690.0| 4.993600010871887|
|        71793.0|            10

In [50]:
train,valid = df_input2.randomSplit([0.8,0.2])
# Cached it improve speed
train.cache()
valid.cache()

DataFrame[user_id_indexed: double, business_id_indexed: double, final_rating: double]

In [51]:
# coldstartStrategy will ensure that we have no nan value
als = ALS(maxIter=10, regParam=1, userCol="user_id_indexed",nonnegative=True
          ,itemCol="business_id_indexed", ratingCol="final_rating", rank =10)
model = als.fit(train)
#Train
pred_trn = model.transform(train)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="final_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(pred_trn)
print("RMSE of training data = " + str(rmse))
#validation
pred_vld = model.transform(valid).na.drop("all",subset=['prediction'])
rmse = evaluator.evaluate(pred_vld)
print("RMSE of validation data = " + str(rmse))

RMSE of training data = 1.17257359146
RMSE of validation data = 1.52146479465


In [6]:
tips=spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://54.245.171.238/yelp.tip").load()

In [54]:
tips.cache()

DataFrame[_id: struct<oid:string>, business_id: string, date: string, likes: int, text: string, user_id: string]

In [55]:
tips_rated=tips.withColumn("textrating", rating_udf(tips.text))

In [56]:
tips_rated.show()

+--------------------+--------------------+----------+-----+--------------------+--------------------+----------+
|                 _id|         business_id|      date|likes|                text|             user_id|textrating|
+--------------------+--------------------+----------+-----+--------------------+--------------------+----------+
|[5a5eb6e107002a9b...|jH19V2I9fIslnNhDz...|2015-08-12|    0|Great breakfast l...|ZcLKXikTHYOnYt5VY...|    0.8777|
|[5a5eb6e107002a9b...|tJRDll5yqpZwehenz...|2012-07-15|    0|Get here early en...|zcTZk7OG8ovAmh_fe...|       0.0|
|[5a5eb6e107002a9b...|dAa0hB2yrnHzVmsCk...|2014-06-20|    0|Nice place. Great...|oaYhjqBbh18ZhU0bp...|    0.7845|
|[5a5eb6e107002a9b...|dAa0hB2yrnHzVmsCk...|2016-10-12|    0|Happy hour 5-7 Mo...|ulQ8Nyj7jCUR8M83S...|    0.5719|
|[5a5eb6e107002a9b...|k7WRPbDd7rztjHcGG...|2017-02-25|    0|Homemade pasta is...|ulQ8Nyj7jCUR8M83S...|    0.6369|
|[5a5eb6e107002a9b...|k7WRPbDd7rztjHcGG...|2017-04-08|    0|Excellent service...|ulQ8Nyj

In [57]:
indexer = StringIndexer(inputCol="business_id", outputCol="business_id_indexed")
tips_rated = indexer.fit(tips_rated).transform(tips_rated)
indexer = StringIndexer(inputCol="user_id", outputCol="user_id_indexed")
tips_rated = indexer.fit(tips_rated).transform(tips_rated)
tips_input=tips_rated.select('user_id_indexed', 'business_id_indexed', 'textrating')

In [58]:
tips_input2=tips_input.select('user_id_indexed', 'business_id_indexed', ((tips_input.textrating+1.5)*2.0).alias("final_rating"))

In [60]:
train,valid = tips_input2.randomSplit([0.8,0.2])
# Cached it improve speed
train.cache()
valid.cache()

DataFrame[user_id_indexed: double, business_id_indexed: double, final_rating: double]

In [61]:
# coldstartStrategy will ensure that we have no nan value
als = ALS(maxIter=10, regParam=1, userCol="user_id_indexed",nonnegative=True
          ,itemCol="business_id_indexed", ratingCol="final_rating", rank =10)
model = als.fit(train)
#Train
pred_trn = model.transform(train)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="final_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(pred_trn)
print("RMSE of training data = " + str(rmse))
#validation
pred_vld = model.transform(valid).na.drop("all",subset=['prediction'])
rmse = evaluator.evaluate(pred_vld)
print("RMSE of validation data = " + str(rmse))

RMSE of training data = 1.17356664921
RMSE of validation data = 1.32288001362


In [62]:
pred_trn.count()

823560

In [63]:
train.count()

823560

In [64]:
pred_vld.count(), valid.count()

(167880, 205242)

In [8]:
tips.show()

+--------------------+--------------------+----------+-----+--------------------+--------------------+
|                 _id|         business_id|      date|likes|                text|             user_id|
+--------------------+--------------------+----------+-----+--------------------+--------------------+
|[5a5eb6e107002a9b...|jH19V2I9fIslnNhDz...|2015-08-12|    0|Great breakfast l...|ZcLKXikTHYOnYt5VY...|
|[5a5eb6e107002a9b...|tJRDll5yqpZwehenz...|2012-07-15|    0|Get here early en...|zcTZk7OG8ovAmh_fe...|
|[5a5eb6e107002a9b...|dAa0hB2yrnHzVmsCk...|2014-06-20|    0|Nice place. Great...|oaYhjqBbh18ZhU0bp...|
|[5a5eb6e107002a9b...|dAa0hB2yrnHzVmsCk...|2016-10-12|    0|Happy hour 5-7 Mo...|ulQ8Nyj7jCUR8M83S...|
|[5a5eb6e107002a9b...|k7WRPbDd7rztjHcGG...|2017-02-25|    0|Homemade pasta is...|ulQ8Nyj7jCUR8M83S...|
|[5a5eb6e107002a9b...|k7WRPbDd7rztjHcGG...|2017-04-08|    0|Excellent service...|ulQ8Nyj7jCUR8M83S...|
|[5a5eb6e107002a9b...|ESzO3Av0b1_TzKOiq...|2017-01-28|    0|Parking is a 

In [5]:
df.join(tips, (df.business_id == tips.business_id) & (df.user_id == tips.user_id)).count()

72722

In [7]:
df_filtered=df.select('business_id', 'date', 'text', 'user_id')

In [8]:
tips_filtered=tips.select('business_id', 'date', 'text', 'user_id')

In [9]:
tipsdf=df_filtered.union(tips_filtered)

In [10]:
tipsdf.cache()

DataFrame[business_id: string, date: string, text: string, user_id: string]

In [11]:
tipsdf.is_cached

True

In [12]:
tipsdf.count()

1891802

In [13]:
from pyspark.sql.functions import col, max as max_

In [14]:
tipsdf2=tipsdf.groupBy('business_id', 'user_id', 'text').agg(max_("date"))

In [15]:
tipsdf2.show()

+--------------------+--------------------+--------------------+----------+
|         business_id|             user_id|                text| max(date)|
+--------------------+--------------------+--------------------+----------+
|-050d_XIor1NpCuWk...|BI43zsVnKZHtMI_4G...|Thanks to Guy Fie...|2009-03-21|
|-050d_XIor1NpCuWk...|CF4Rd4pCBFMgI0HWs...|This great breakf...|2011-04-20|
|-050d_XIor1NpCuWk...|Ju2dV32vmx-epovrG...|Food is good. Sim...|2008-11-23|
|-050d_XIor1NpCuWk...|sq0qvPb5g5Se_W0Oa...|Try the coffee. N...|2012-09-03|
|-1UMR00eXtwaeh59p...|2esLgxJoVXd1CzrHf...|Quick service all...|2014-07-17|
|-1UMR00eXtwaeh59p...|CY9iLsE2z_yLhLqJd...|FLY THRU BREAKFAS...|2015-04-18|
|-1UMR00eXtwaeh59p...|YfRKZvZUYw-RlRejE...|Delicious breakfa...|2015-03-25|
|-1xuC540Nycht_iWF...|Mt7q9U48KiAJKivYR...|It's back open!!!...|2013-07-18|
|-5L8zOxibac-vBrsY...|UdKvY0ux2D3EQuyf_...|This is a total s...|2008-01-21|
|-5XuRAfrjEiMN77J4...|DsWg3leomfasGs3j0...|Can you say... sm...|2011-11-04|
|-5fK4OgKir0

In [17]:
tipsdf2.count()

1887312

In [18]:
indexer = StringIndexer(inputCol="business_id", outputCol="business_id_indexed")
tipsdf2 = indexer.fit(tipsdf2).transform(tipsdf2)
indexer = StringIndexer(inputCol="user_id", outputCol="user_id_indexed")
tipsdf2 = indexer.fit(tipsdf2).transform(tipsdf2)
tips_input=tipsdf2.select('user_id_indexed', 'business_id_indexed', 'text')

In [19]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyser = SentimentIntensityAnalyzer()
rating_udf = udf(lambda x: analyser.polarity_scores(x)['compound'], FloatType())
tips_input_rated=tips_input.withColumn("textrating", rating_udf(tips_input.text))

In [20]:
tips_input_rated.cache()

DataFrame[user_id_indexed: double, business_id_indexed: double, text: string, textrating: float]

In [23]:
train.show()

+---------------+-------------------+------------------+
|user_id_indexed|business_id_indexed|      final_rating|
+---------------+-------------------+------------------+
|            0.0|             4792.0| 4.971199989318848|
|            0.0|            14871.0| 4.351400017738342|
|            0.0|            24227.0| 4.520400047302246|
|            0.0|            35297.0| 3.636399984359741|
|            0.0|            54252.0|               3.0|
|            0.0|            62700.0|               3.0|
|            0.0|            64113.0|               3.0|
|            0.0|            66888.0| 4.222800016403198|
|            0.0|            82623.0| 4.843600034713745|
|            0.0|            97297.0| 3.617799997329712|
|            0.0|           107340.0|               3.0|
|            0.0|           118292.0|               3.0|
|            1.0|             3834.0| 3.542800009250641|
|            1.0|             4302.0| 4.583999991416931|
|            1.0|             6

In [22]:
tips_input_rated=tips_input_rated.select('user_id_indexed', 'business_id_indexed', ((tips_input_rated.textrating+1.5)*2.0).alias("final_rating"))
train,valid = tips_input_rated.randomSplit([0.8,0.2])
# Cached it improve speed
train.cache()
valid.cache()

DataFrame[user_id_indexed: double, business_id_indexed: double, final_rating: double]

In [24]:
# coldstartStrategy will ensure that we have no nan value
als = ALS(maxIter=10, regParam=1, userCol="user_id_indexed",nonnegative=True
          ,itemCol="business_id_indexed", ratingCol="final_rating", rank =10)
model = als.fit(train)
#Train
pred_trn = model.transform(train)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="final_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(pred_trn)
print("RMSE of training data = " + str(rmse))
#validation
pred_vld = model.transform(valid).na.drop("all",subset=['prediction'])
rmse = evaluator.evaluate(pred_vld)
print("RMSE of validation data = " + str(rmse))

RMSE of training data = 1.21339711028
RMSE of validation data = 1.40226129455


In [25]:
pred_vld.count(), valid.count()

(302879, 376934)