### Install All Requirements On Colab

In [0]:
!pip install pyspark
import os
import pandas as pd
import pyspark
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import rand, col
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator



In [0]:
sc = SparkContext()

In [0]:
!apt install openjdk-8-jdk

Reading package lists... Done
Building dependency tree       
Reading state information... Done
openjdk-8-jdk is already the newest version (8u222-b10-1ubuntu1~18.04.1).
The following package was automatically installed and is no longer required:
  libnvidia-common-430
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 7 not upgraded.


In [0]:
!update-alternatives --config java

There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
  0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
* 2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: 2


In [0]:
!java -version

openjdk version "1.8.0_222"
OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)
OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)


In [0]:
filePath = "/content/drive/My Drive/Yelp Project/data/"

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Conver ID From String to Integer

In [0]:
seed = 100
file = filePath + "yelp_ratings.csv"

In [0]:
ratings = pd.read_csv(file)

ratings['business_id'] = ratings['business_id'].astype('category')
ratings['business_id'] = ratings['business_id'].cat.codes

ratings['user_id'] = ratings['user_id'].astype('category')
ratings['user_id'] = ratings['user_id'].cat.codes

In [0]:
sqlCtx = SQLContext(sc)
df = sqlCtx.createDataFrame(ratings)
sqlCtx.registerDataFrameAsTable(df, "df")
df = sqlCtx.sql('''
    SELECT 
        user_id AS user, 
        business_id AS item,
        stars AS rating
    FROM df
''')

In [0]:
df.show(n=5)

+-----+----+------+
| user|item|rating|
+-----+----+------+
| 9521|6275|   2.0|
|40439|8653|   4.0|
| 7675|8361|   4.0|
|46733|6357|   4.0|
|16249|1698|   5.0|
+-----+----+------+
only showing top 5 rows



In [0]:
print("(row, col): ", (df.count(), len(df.columns)))

(row, col):  (392471, 3)


### Split Train / Test Spark DataFrame

In [0]:
dftrain = df.where(col('TrainTest') == 1).drop(*["TrainTest", "date"])
dftest = df.where(col('TrainTest') == 0).drop(*["TrainTest", "date"])

In [0]:
print(dftrain.printSchema())
print("(row, col): ", (dftrain.count(), len(dftrain.columns)))
dftrain.show(n=5)

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)

None
(row, col):  (343827, 3)
+-----+----+------+
| user|item|rating|
+-----+----+------+
| 9521|6275|   2.0|
|40439|8653|   4.0|
| 7675|8361|   4.0|
|46733|6357|   4.0|
|16249|1698|   5.0|
+-----+----+------+
only showing top 5 rows



In [0]:
print(dftest.printSchema())
print("(row, col): ", (dftest.count(), len(dftest.columns)))
dftest.show(n=5)

root
 |-- user: long (nullable = true)
 |-- item: long (nullable = true)
 |-- rating: double (nullable = true)

None
(row, col):  (48644, 3)
+-----+----+------+
| user|item|rating|
+-----+----+------+
|27374|6648|   2.0|
|31935| 632|   5.0|
| 4122|2391|   3.0|
|32566|  34|   5.0|
| 7675|8298|   5.0|
+-----+----+------+
only showing top 5 rows



### Model ALS on User-Business Rating Matrix

In [0]:
als = ALS(nonnegative=True, checkpointInterval=3, coldStartStrategy="nan")
paramGrid = ParamGridBuilder()\
    .addGrid(als.rank, [5, 30, 70])\
    .addGrid(als.regParam, [0.1, 1, 10])\
    .build()

In [0]:
rmse = RegressionEvaluator(metricName="rmse", labelCol="rating")
tvs = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=rmse,
    seed=seed,
    numFolds=3,
    parallelism=5
)

In [0]:
model = tvs.fit(dftrain)
trainPred = model.transform(dftrain)
trainPred.show(n=5)

+-----+----+------+----------+
| user|item|rating|prediction|
+-----+----+------+----------+
| 7168| 148|   5.0|  4.776293|
|47211| 148|   5.0|  3.981956|
|17751| 148|   4.0| 3.6792192|
|20398| 148|   3.0| 2.4057302|
|30519| 148|   3.0| 3.4807475|
+-----+----+------+----------+
only showing top 5 rows



In [0]:
testPred = model.transform(dftest)
testPred.show(5)
rmse.evaluate(testPred)

+-----+----+------+----------+
| user|item|rating|prediction|
+-----+----+------+----------+
|47501| 148|   2.0|  3.451548|
|33762| 148|   5.0| 3.8014915|
|25638| 148|   5.0|  3.383726|
|16506| 148|   5.0| 4.2565937|
|45750| 148|   2.0|  3.510189|
+-----+----+------+----------+
only showing top 5 rows



nan

In [0]:
print("final parameters: \nrank: %2.f" % (model.bestModel.rank)) 

final parameters: 
rank:  5


In [0]:
#  from datetime import datetime
# model_path = filePath + 'ALS_model_' + datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
# model.save(model_path)

In [0]:
# model_path = filePath + 'ALS_model_2019-12-14_01:05:16'
# print(model_path)

### Check / Output Results

In [0]:
trainPredDF = trainPred.toPandas()
testPredDF = testPred.toPandas()

In [0]:
testPredDF.shape

(48644, 4)

In [0]:
testPredDF.isnull().values.sum()

7649

In [0]:
testPredDF.isnull().sum()

user             0
item             0
rating           0
prediction    7649
dtype: int64

In [0]:
testPredDF.head()

Unnamed: 0,user,item,rating,prediction
0,47501,148,2.0,3.451548
1,33762,148,5.0,3.801491
2,25638,148,5.0,3.383726
3,16506,148,5.0,4.256594
4,45750,148,2.0,3.510189


In [0]:
trainRatingMean = ratings.loc[ratings['TrainTest'] == 1]['stars'].mean()
testPredDF.fillna(trainRatingMean, inplace=True)

In [0]:
testPredDF.head()

Unnamed: 0,user,item,rating,prediction
0,47501,148,2.0,3.451548
1,33762,148,5.0,3.801491
2,25638,148,5.0,3.383726
3,16506,148,5.0,4.256594
4,45750,148,2.0,3.510189


In [0]:
testPred.toPandas().to_csv(filePath + 'ALS_testPrediction.csv', index=False)
trainPred.toPandas().to_csv(filePath + 'ALS_trainPrediction.csv', index=False)

### Calculate Model Performance

In [0]:
from sklearn.metrics import mean_squared_error
mean_squared_error(testPredDF['rating'], testPredDF['prediction'])

2.4158066048359124