# Item-based CF
Author: [MengChiiehLiu](https://github.com/MengChiehLiu)  

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sqrt, pow, sum

Create SparkSession connection

In [2]:
spark = SparkSession.builder.appName("Item-based CF").enableHiveSupport().getOrCreate()

your 131072x1 screen size is bogus. expect trouble
23/09/15 14:50:25 WARN Utils: Your hostname, LAPTOP-FE47OAAK resolves to a loopback address: 127.0.1.1; using 172.17.128.64 instead (on interface eth0)
23/09/15 14:50:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/15 14:50:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 1. Calculate Similarity

Read ratings data and write into Hive Table

In [3]:
ratings = spark.read.csv("../../data/ml-latest-small/ratings.csv", header=True, inferSchema=True)
ratings.write.saveAsTable("ratings")
ratings = spark.table("ratings")
ratings.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



                                                                                

Generate item-item pairs based on same user appearance

In [4]:
joined_df = ratings.alias("df1") \
    .join(
        ratings.alias("df2"),
        col("df1.userId") == col("df2.userId"),
        "inner"
    ).where(
        col("df1.movieId") != col("df2.movieId")
    ).select(
        col("df1.movieId").alias("movieId1"),
        col("df2.movieId").alias("movieId2"),
        col("df1.rating").alias("rating1"),
        col("df2.rating").alias("rating2"),
    )

In [5]:
joined_df.show(5)

+--------+--------+-------+-------+
|movieId1|movieId2|rating1|rating2|
+--------+--------+-------+-------+
|       1|    5060|    4.0|    5.0|
|       1|    4006|    4.0|    4.0|
|       1|    3809|    4.0|    4.0|
|       1|    3793|    4.0|    5.0|
|       1|    3744|    4.0|    4.0|
+--------+--------+-------+-------+
only showing top 5 rows



Calculate item-item similarity and save write into Hive table

In [6]:
similarity_df = joined_df.groupBy("movieId1", "movieId2").agg(
    (sum(col("rating1")*col("rating2")) / (sqrt(sum(pow("rating1", 2))) * sqrt(sum(pow("rating2", 2)))))
    .alias("similarity")
)

In [7]:
similarity_df.write.saveAsTable("similarity")
similarity_df = spark.table("similarity")
similarity_df.show(5)

+--------+--------+------------------+
|movieId1|movieId2|        similarity|
+--------+--------+------------------+
|       1|       8|   0.9887212379691|
|       1|      16|0.9557904330681696|
|       1|      19|0.9488521373055432|
|       1|      23|0.9578211900303278|
|       1|      36|0.9594882760610837|
+--------+--------+------------------+
only showing top 5 rows



## 2. Generate recommendations

In [8]:
def recommender(userId):
    # Read Hive Tables
    similarity_df = spark.table("similarity")
    user_df = spark.table("ratings").where(col('userId') == userId)

    # similarity * user ratings
    user_recommend_df = user_df \
        .join(similarity_df, user_df.movieId == similarity_df.movieId1, "left") \
        .withColumn("weighted_rating", col("similarity") * col("rating")) \
        .groupBy("movieId2") \
        .agg(sum("weighted_rating").alias('score')) \
        .orderBy("score", ascending=False) \
        .limit(10)
    return user_recommend_df

In [9]:
recommends = recommender(1)
recommends.show()

23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:44 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/09/15 14:50:45 WARN RowBasedKeyValueBatch: Calling spill() on

+--------+------------------+
|movieId2|             score|
+--------+------------------+
|    1473|            1009.0|
|    1793|1004.1748699341041|
|    1927| 995.9786210295769|
|    2644| 990.1995143555372|
|    2033| 990.1273869979696|
|     943| 987.8737306485951|
|    2048| 987.2934345500367|
|    2099| 987.2232397564562|
|    3729| 987.1887638488214|
|    2093| 986.8429330002949|
+--------+------------------+



                                                                                