## Day 79 Lecture 2 Assignment

In this assignment, we will learn about collaborative filtering in Spark.

Run the cells below to start a spark session.

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-eu.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [0]:
!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 59kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 50.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=3db730d65e5c88f41250bc1161e6622970138e5f92e09e57778765aefd9b3f2e
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [0]:
APP_NAME = "Day79"

In [0]:
spark = SparkSession.builder.appName(APP_NAME).getOrCreate()

We will define the schema of our Amazon Video ratings dataset and load the data into a dataframe. 

In [0]:
videoSchema = StructType([StructField('user', StringType(), True),
                     StructField('item', StringType(), True),
                     StructField('rating', FloatType(), True),
                     StructField('timestamp', StringType(), True)])

In [0]:
video = spark.read.format("csv").option("header", "false").schema(videoSchema).load("/content/drive/My Drive/Thinkful Datasets/ratings_Amazon_Instant_Video.csv")

# New Section

In [0]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [0]:
video.show()

+--------------+----------+------+----------+
|          user|      item|rating| timestamp|
+--------------+----------+------+----------+
|A1EE2E3N7PW666|B000GFDAUG|   5.0|1202256000|
| AGZ8SM1BGK3CK|B000GFDAUG|   5.0|1198195200|
|A2VHZ21245KBT7|B000GIOPK2|   4.0|1215388800|
| ACX8YW2D5EGP6|B000GIOPK2|   4.0|1185840000|
| A9RNMO9MUSMTJ|B000GIOPK2|   2.0|1281052800|
|A3STFVPM8NHJ7B|B000GIOPK2|   5.0|1203897600|
|A2582KMXLK2P06|B000GIOPK2|   5.0|1205884800|
|A1TZCLCW9QGGBH|B000GIOPK2|   4.0|1209427200|
|A2E2I6B878CRMA|B000GIOPK2|   5.0|1378684800|
| AD5MZA8SOVMPJ|B000GIOPK2|   5.0|1218240000|
|A3IE1M3QVUKIJN|B000GIOPK2|   5.0|1251763200|
| AZ1MUCW76BDL1|B000GIOPK2|   5.0|1361145600|
|A2XNOB1T796Y6B|B000GIOPK2|   5.0|1233532800|
|A12DO7F3TT123V|B000GIOPK2|   2.0|1189987200|
|A2UN6AL460C8J4|B000GIOPK2|   1.0|1391299200|
| AVYBQU4XX5QR4|B000GIOPK2|   4.0|1363046400|
| AVE3EF44DFS0C|B000GIOPK2|   5.0|1190937600|
|A27AWN5G5GT6RP|B000GIOPK2|   5.0|1328745600|
|A35KJPLBWHF5GJ|B000GIOPK2|   5.0|

In [0]:
type(video)

pyspark.sql.dataframe.DataFrame

Since we have a timestamp column, this might mean that some users reviewed a video more than once. To make sure we have one review per video per user, we will compute the average rating per user and video.

In [0]:
from pyspark.sql.functions import desc

In [0]:
# Answer below:
video.groupBy('item', 'user').count().orderBy('count', ascending=False).show()

+----------+--------------+-----+
|      item|          user|count|
+----------+--------------+-----+
|B000H0YRNY|A293B44ZW3VU9W|    1|
|B000H3RG4I|A3E1R7VPFBXDHQ|    1|
|B000H3S8Z4| AAXE8XFYC35UI|    1|
|B000H4YNM0| AL143IAR7DSN2|    1|
|B000HAB4NK|A2BLL8BM5I5RNL|    1|
|B000HAB4NK|A2KAL73T9ME7SV|    1|
|B000HAB4NK|A1XCB9U0YL5KS5|    1|
|B000HL0KWU|A1PXYAD8AX7G6S|    1|
|B000HVITQ4|A2F7CCPK4QL359|    1|
|B000HVITQ4| AOK72ELBI86GF|    1|
|B000HZEHL6| A64I4IGQ1IUX8|    1|
|B000I62JFU| A6AXHB4NFGSNF|    1|
|B000I66JMY|A3A4QMP6HJ3AOC|    1|
|B000I67MD4| AWM2XN8BG9DTZ|    1|
|B000I8EO4W|A1SD8Y9Q1KESLB|    1|
|B000I9S5BE|A2K0ZFY81CGGMF|    1|
|B000I9S5BE|A36Y6140C5ZPON|    1|
|B000IBUIS0|A1NGSVPWLWJT7B|    1|
|B000IBUIS0| AUKDWZ381NIT0|    1|
|B000IBUP6A| AD094AU9RA7NE|    1|
+----------+--------------+-----+
only showing top 20 rows



In [0]:
avg_rating = video.groupBy('user', 'item').mean('rating')

Rename the average rating column to `mean_rating`

In [0]:
# Answer below:
video1 = avg_rating.withColumnRenamed('avg(rating)', 'mean_rating')


In [0]:
video1.show()

+--------------+----------+-----------+
|          user|      item|mean_rating|
+--------------+----------+-----------+
| AXB3UAIB8DHBM|B000GUTWLW|        5.0|
|A1I4YAA9N1VVU7|B000H0YRNY|        5.0|
|A14XT5GHSYRBPH|B000H0YRNY|        5.0|
|A1ZUW4BLFXBNDB|B000H3S8Z4|        4.0|
| AAXE8XFYC35UI|B000H3S8Z4|        5.0|
|A1PQ7T604TOLIP|B000H4YNM0|        4.0|
|A3ULK1Y4XZMS45|B000H4YNM0|        5.0|
|A1NI2F0VYALIV6|B000HAB4NK|        5.0|
|A1L1DACA4D43TW|B000HKWE3O|        4.0|
| AL8EFRSR81UNI|B000HKWE3O|        5.0|
|A2B73CL3QSYWLB|B000HMPU0Q|        4.0|
|A3QEPSZ62L6BU5|B000HVITQ4|        5.0|
|A2TXR85WQLE32N|B000HZEHL6|        5.0|
|A2BN1JYWB25MS5|B000HZEHL6|        4.0|
| AZEFMBE6SFZ7E|B000I01R40|        5.0|
|A3DM1MV8Z71JC1|B000I01R40|        5.0|
|A1CXYJJ4W0FPAK|B000I5O6XO|        3.0|
|A3A4TEVCALESHQ|B000I5O6Y8|        5.0|
|A3ETF68HRRCLJF|B000I5RENI|        5.0|
|A2969H8MYNF60C|B000I5RENI|        4.0|
+--------------+----------+-----------+
only showing top 20 rows



Now assign an index to each user and each item. You may use the `monotonically_increasing_id` function in spark or use a sql query with the row number.

In [0]:
video1_user_distinct = video1.select('user').distinct()
video1_user_distinct.createOrReplaceTempView('video')
new_user_id = spark.sql('select row_number() over (order by "user") as userIdInt, * from video')

In [0]:
new_user_id.show(5)

+---------+--------------+
|userIdInt|          user|
+---------+--------------+
|        1| A7O8X1JIUQQCV|
|        2|A2EOXFBPW3U8Q0|
|        3|A28W4EJPEPAU1L|
|        4|A1JNNGRKUF47PP|
|        5| AWN6ZMWWMUWLX|
+---------+--------------+
only showing top 5 rows



In [0]:
video1_item_distinct = video1.select('item').distinct()
video1_item_distinct.createOrReplaceTempView('video')
new_item_id = spark.sql('select row_number() over (order by "item") as itemIdInt, * from video')

In [0]:
new_item_id.show(5)

+---------+----------+
|itemIdInt|      item|
+---------+----------+
|        1|B000OC3FZQ|
|        2|B001MVN03K|
|        3|B002AL4A4E|
|        4|B003MSR2OE|
|        5|B0084TG2LU|
+---------+----------+
only showing top 5 rows



Now split the data into 70% for training and 30% for testing.

In [0]:
# Answer below:
new_video = video1.join(new_user_id, "user", "inner").join(new_item_id, "item", "inner")


In [0]:
new_video.show(10)

+----------+--------------+-----------+---------+---------+
|      item|          user|mean_rating|userIdInt|itemIdInt|
+----------+--------------+-----------+---------+---------+
|B000OC3FZQ|A1V97PGLKZTSTC|        5.0|     2954|        1|
|B000OC3FZQ|A2EQSWOECBXBDK|        5.0|     2992|        1|
|B000OC3FZQ| AB87XW2MYPFTI|        5.0|     9406|        1|
|B000OC3FZQ| AL73H4RE0YWD2|        5.0|    10010|        1|
|B000OC3FZQ|A3G5JEXHQ6ZG96|        5.0|    12316|        1|
|B000OC3FZQ|A1ZW6A1XVDS8JN|        4.0|    20981|        1|
|B000OC3FZQ|A1F93NA8HQ47IU|        5.0|    21765|        1|
|B000OC3FZQ|A1OE9Z1KSQ0TMX|        4.0|    27465|        1|
|B000OC3FZQ|A3980H9VD0B4ZO|        5.0|    25619|        1|
|B000OC3FZQ|A2R88HK6AC5PWR|        4.0|    33052|        1|
+----------+--------------+-----------+---------+---------+
only showing top 10 rows



In [0]:
# Answer below:
train, test = new_video.randomSplit([0.7,0.3], seed = 1)
train.count()


408782

Generate an alternating least squares model using the training data and predict the ratings for the test data using the model.

In [0]:
# Answer below:
from pyspark.ml.recommendation import ALS
als = ALS()

In [0]:
als.setMaxIter(5)\
  .setSeed(1)\
  .setItemCol("itemIdInt")\
  .setRatingCol("mean_rating")\
  .setUserCol("userIdInt")

model = als.fit(train)
predict_df = model.transform(test)

In [0]:
predict_df = predict_df.dropna()

In [0]:
predict_df.show()

+----------+--------------+-----------+---------+---------+-----------+
|      item|          user|mean_rating|userIdInt|itemIdInt| prediction|
+----------+--------------+-----------+---------+---------+-----------+
|B0053ZU4P8| ABCTCW3IEF0SS|        5.0|   339670|      148|   9.203648|
|B0053ZU4P8| AXKV0W2E2RR6A|        5.0|   226533|      148|  6.7197256|
|B0053ZU4P8|A1ASQLM5XCBT94|        2.0|   185996|      148|  2.2752872|
|B0053ZU4P8|A1NRRU43MYI9S0|        3.0|   282404|      148|  -6.537208|
|B0053ZU4P8|A3KI8TII3LJ2L3|        5.0|    28222|      148|  2.2137299|
|B0053ZU4P8| AZ9GD4PTC99U4|        2.0|   407829|      148|-0.18189467|
|B00DCLZMQG|A3TKUCZT8698JR|        4.0|   214905|      496|  0.3798763|
|B00DCLZMQG|A2ZJLHHS4O7VHT|        4.0|   217968|      496|0.013990566|
|B00DCLZMQG| ALMPUJX6BXAR1|        4.0|     2399|      496| 0.76679295|
|B00DCLZMQG|A1W6DWSDVJS43Z|        4.0|   295284|      496|  0.5270287|
|B00DCLZMQG|A2X6TZ69E4WX57|        4.0|    81809|      496| -2.8

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="mean_rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predict_df)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 3.820727839787174
