In [0]:
!ls

 accommodations.csv	     implicit_ratings.csv
 drive			     ratings.csv
'implicit_ratings (1).csv'   recommender_model01_0116_selected.csv
'implicit_ratings (2).csv'  'recommender_model_0116 (1).csv'
'implicit_ratings (3).csv'   recommender_model_0116.csv
'implicit_ratings (4).csv'   sample_data


I plan to build recommendation system on google gcloud. I want to make sure all codes are good before gcloud. Thus, I set up spark in google colab with TPU.

The data and codes are originally from: https://github.com/GoogleCloudPlatform/spark-recommendation-engine

**Part 2**: **use the best parameter in ALS model for prediction**

In [0]:
import os
import warnings
warnings.filterwarnings('ignore')
import pandas as pd

import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer, MinMaxScaler, VectorAssembler
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml import Transformer
from pyspark.sql import Window

from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local").getOrCreate()
sc = SparkContext.getOrCreate()

In [0]:
# dfAccos = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_ITEMS)
# dfRates = sqlContext.read.jdbc(url=jdbcUrl, table=TABLE_RATINGS)

In [0]:
#dfAccos = spark.read.csv('accommodations.csv', header=True, inferSchema=True).cache()
dfAccos = spark.read.csv('accommodations.csv')

In [0]:
pd.DataFrame(dfAccos.take(5), columns=dfAccos.columns)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6
0,1,Comfy Quiet Chalet,Vancouver,50,3,3.1,cottage
1,2,Cozy Calm Hut,London,65,2,4.1,cottage
2,3,Agreable Calm Place,London,65,4,4.8,house
3,4,Colossal Quiet Chateau,Paris,3400,16,2.7,castle
4,5,Homy Quiet Shack,Paris,50,1,1.1,cottage


In [0]:
dfAccos = dfAccos.withColumnRenamed('_c0', 'id')\
.withColumnRenamed('_c1', 'title')\
.withColumnRenamed('_c2', 'location')\
.withColumnRenamed('_c3', 'price')\
.withColumnRenamed('_c4', 'rooms')\
.withColumnRenamed('_c5', 'rating')\
.withColumnRenamed('_c6', 'type')\


In [74]:
pd.DataFrame(dfAccos.take(5), columns=dfAccos.columns)

Unnamed: 0,id,title,location,price,rooms,rating,type
0,1,Comfy Quiet Chalet,Vancouver,50,3,3.1,cottage
1,2,Cozy Calm Hut,London,65,2,4.1,cottage
2,3,Agreable Calm Place,London,65,4,4.8,house
3,4,Colossal Quiet Chateau,Paris,3400,16,2.7,castle
4,5,Homy Quiet Shack,Paris,50,1,1.1,cottage


In [0]:
dfAccos.agg(F.countDistinct('id')).show()

+------------------+
|count(DISTINCT id)|
+------------------+
|                99|
+------------------+



In [0]:
dfRates = spark.read.csv('ratings.csv')

In [0]:
pd.DataFrame(dfRates.take(5), columns=dfRates.columns)

Unnamed: 0,_c0,_c1,_c2
0,10,1,1
1,18,1,2
2,13,1,1
3,7,2,2
4,4,2,2


In [0]:
dfRates = dfRates.withColumnRenamed('_c0', 'userId')\
.withColumnRenamed('_c1', 'accoId')\
.withColumnRenamed('_c2', 'rating')\

In [0]:
pd.DataFrame(dfRates.take(5), columns=dfRates.columns)

Unnamed: 0,userId,accoId,rating
0,10,1,1
1,18,1,2
2,13,1,1
3,7,2,2
4,4,2,2


In [0]:
dfRates.agg(F.countDistinct('userId'), F.countDistinct('accoId')).show()

+----------------------+----------------------+
|count(DISTINCT userId)|count(DISTINCT accoId)|
+----------------------+----------------------+
|                    25|                    99|
+----------------------+----------------------+



In [0]:
dfRates.groupBy('userId').count().orderBy(F.col('count'), ascending=False).show(25)

+------+-----+
|userId|count|
+------+-----+
|    16|   75|
|     4|   70|
|     7|   69|
|    23|   66|
|    10|   66|
|    21|   63|
|     8|   58|
|    13|   48|
|    19|   48|
|     1|   46|
|    12|   46|
|    20|   44|
|     2|   43|
|     3|   42|
|    18|   42|
|    17|   42|
|     6|   42|
|    11|   41|
|     5|   39|
|    24|   39|
|     9|   39|
|    15|   37|
|    14|   37|
|    22|   35|
|     0|    9|
+------+-----+



In [0]:
dfRates.createOrReplaceTempView('ratings')

In [76]:
spark.sql("SELECT * FROM ratings WHERE userId==0").show()

+------+------+------+
|userId|accoId|rating|
+------+------+------+
|     0|     3|     4|
|     0|     6|     5|
|     0|    12|     5|
|     0|    16|     4|
|     0|    22|     4|
|     0|    28|     4|
|     0|    30|     5|
|     0|    33|     4|
|     0|    38|     4|
+------+------+------+



In [0]:
USER_ID = 0

In [0]:
# Get all the ratings rows of our user
dfUserRatings  = dfRates.filter(dfRates.userId == USER_ID).rdd.map(lambda r: r.accoId).collect()
print(dfUserRatings)

['3', '6', '12', '16', '22', '28', '30', '33', '38']


In [0]:
# Returns only the accommodations that have not been rated by our user
rddPotential  = dfAccos.rdd.filter(lambda x: x[0] not in dfUserRatings)
pairsPotential = rddPotential.map(lambda x: (USER_ID, x[0]))

In [0]:
#[START split_sets]
rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2])

In [0]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [0]:
#[START predict]
# Build our model with the best found values
# Rating, Rank, Iteration, Regulation
model = ALS.train(rddTraining, 10, 5, 0.1)


In [0]:
# Calculate all predictions
predictions = model.predictAll(pairsPotential).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))

In [0]:
# Take the top 5 ones
topPredictions = predictions.takeOrdered(5, key=lambda x: -x[2])
print(topPredictions)

[('0', '49', 4.896520089114775), ('0', '76', 4.754601059938514), ('0', '66', 4.707847057758075), ('0', '75', 4.698511918681345), ('0', '72', 4.254802137130806)]


In [0]:
schema = StructType([StructField("userId", StringType(), True), StructField("accoId", StringType(), True), StructField("prediction", FloatType(), True)])

In [0]:
dfToSave = sqlContext.createDataFrame(topPredictions, schema)

In [0]:
dfToSave

DataFrame[userId: string, accoId: string, prediction: float]

In [0]:
dfToSave.show()

+------+------+----------+
|userId|accoId|prediction|
+------+------+----------+
|     0|    49|   4.89652|
|     0|    76|  4.754601|
|     0|    66|  4.707847|
|     0|    75|  4.698512|
|     0|    72|  4.254802|
+------+------+----------+



In [77]:
spark.sql("SELECT * FROM ratings WHERE userId==22").show()

+------+------+------+
|userId|accoId|rating|
+------+------+------+
|    22|     4|     3|
|    22|     7|     3|
|    22|     8|     2|
|    22|    11|     1|
|    22|    14|     3|
|    22|    18|     3|
|    22|    19|     1|
|    22|    21|     2|
|    22|    24|     3|
|    22|    26|     2|
|    22|    34|     2|
|    22|    37|     2|
|    22|    40|     3|
|    22|    43|     1|
|    22|    46|     3|
|    22|    48|     4|
|    22|    52|     2|
|    22|    54|     1|
|    22|    55|     3|
|    22|    56|     3|
+------+------+------+
only showing top 20 rows



In [0]:
USER_ID = 22

# Get all the ratings rows of our user
dfUserRatings  = dfRates.filter(dfRates.userId == USER_ID).rdd.map(lambda r: r.accoId).collect()
print(dfUserRatings)



['4', '7', '8', '11', '14', '18', '19', '21', '24', '26', '34', '37', '40', '43', '46', '48', '52', '54', '55', '56', '58', '65', '67', '68', '69', '73', '78', '81', '85', '87', '88', '91', '94', '95', '98']


In [0]:
# Returns only the accommodations that have not been rated by our user
rddPotential  = dfAccos.rdd.filter(lambda x: x[0] not in dfUserRatings)




In [79]:
rddPotential

PythonRDD[766] at RDD at PythonRDD.scala:53

In [0]:
pairsPotential = rddPotential.map(lambda x: (USER_ID, x[0]))

In [0]:
#[START split_sets]
rddTraining, rddValidating, rddTesting = dfRates.rdd.randomSplit([6,2,2])

In [0]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [0]:
#[START predict]
# Build our model with the best found values
# Rating, Rank, Iteration, Regulation
model = ALS.train(rddTraining, 10, 5, 0.1)

In [0]:
# Calculate all predictions
predictions = model.predictAll(pairsPotential).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))

In [85]:
# Take the top 5 ones
topPredictions = predictions.takeOrdered(5, key=lambda x: -x[2])
print(topPredictions)

[('22', '59', 4.301565494196462), ('22', '99', 4.009579216783245), ('22', '44', 3.904668072091046), ('22', '38', 3.8908555766838733), ('22', '72', 3.7541565994779083)]


In [0]:
schema = StructType([StructField("userId", StringType(), True), StructField("accoId", StringType(), True), StructField("prediction", FloatType(), True)])

In [0]:
dfToSave = sqlContext.createDataFrame(topPredictions, schema)
dfToSave.show()

+------+------+----------+
|userId|accoId|prediction|
+------+------+----------+
|    22|    90|  4.733117|
|    22|    76|  4.542928|
|    22|     6|   4.49832|
|    22|    33| 4.4753304|
|    22|    66|  4.411006|
+------+------+----------+

