In [136]:
import json
import re
from math import sqrt
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf,col,split, explode
import pandas as pd

In [137]:
sqlContext = SQLContext(sc)

# Part 1

## Q1：Read data from dataset

In [138]:
#Read movies data
movies = sqlContext.read.options(header='True',inferSchema='True', delimiter=',').csv('./dataset/ml-latest/movies.csv')
movies.take(5)

[Row(movieId=1, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=2, title='Jumanji (1995)', genres='Adventure|Children|Fantasy'),
 Row(movieId=3, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=4, title='Waiting to Exhale (1995)', genres='Comedy|Drama|Romance'),
 Row(movieId=5, title='Father of the Bride Part II (1995)', genres='Comedy')]

In [139]:
#Read ratings data
ratings = sqlContext.read.options(header='True',inferSchema='True', delimiter=',').csv('./dataset/ml-latest/ratings.csv')
ratings.take(5)

[Row(userId=1, movieId=1, rating=4.0, timestamp=964982703),
 Row(userId=1, movieId=3, rating=4.0, timestamp=964981247),
 Row(userId=1, movieId=6, rating=4.0, timestamp=964982224),
 Row(userId=1, movieId=47, rating=5.0, timestamp=964983815),
 Row(userId=1, movieId=50, rating=5.0, timestamp=964982931)]

In [140]:
watchs = ratings.join(movies, ["movieId"], "left")

In [141]:
watchs.take(5)

[Row(movieId=1, userId=1, rating=4.0, timestamp=964982703, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=3, userId=1, rating=4.0, timestamp=964981247, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=6, userId=1, rating=4.0, timestamp=964982224, title='Heat (1995)', genres='Action|Crime|Thriller'),
 Row(movieId=47, userId=1, rating=5.0, timestamp=964983815, title='Seven (a.k.a. Se7en) (1995)', genres='Mystery|Thriller'),
 Row(movieId=50, userId=1, rating=5.0, timestamp=964982931, title='Usual Suspects, The (1995)', genres='Crime|Mystery|Thriller')]

In [142]:
watchs_rdd = sc.parallelize(watchs.collect())

In [143]:
watchs_rdd.take(5)

[Row(movieId=1, userId=1, rating=4.0, timestamp=964982703, title='Toy Story (1995)', genres='Adventure|Animation|Children|Comedy|Fantasy'),
 Row(movieId=3, userId=1, rating=4.0, timestamp=964981247, title='Grumpier Old Men (1995)', genres='Comedy|Romance'),
 Row(movieId=6, userId=1, rating=4.0, timestamp=964982224, title='Heat (1995)', genres='Action|Crime|Thriller'),
 Row(movieId=47, userId=1, rating=5.0, timestamp=964983815, title='Seven (a.k.a. Se7en) (1995)', genres='Mystery|Thriller'),
 Row(movieId=50, userId=1, rating=5.0, timestamp=964982931, title='Usual Suspects, The (1995)', genres='Crime|Mystery|Thriller')]

## Q2：Storage dataset

In [144]:
watchs_df = watchs_rdd.toDF()
watchs_df.write.save("csvFile.parquet", format="parquet")
watchs_df.show()

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
|     47|     1|   5.0|964983815|Seven (a.k.a. Se7...|    Mystery|Thriller|
|     50|     1|   5.0|964982931|Usual Suspects, T...|Crime|Mystery|Thr...|
|     70|     1|   3.0|964982400|From Dusk Till Da...|Action|Comedy|Hor...|
|    101|     1|   5.0|964980868|Bottle Rocket (1996)|Adventure|Comedy|...|
|    110|     1|   4.0|964982176|   Braveheart (1995)|    Action|Drama|War|
|    151|     1|   5.0|964984041|      Rob Roy (1995)|Action|Drama|Roma...|
|    157|     1|   5.0|964984100|Canadian Bacon (1...|          Comedy|War|
|    163|   

## Q3:The numbers of movies that users has watched 

In [145]:
user_watch_movies = watchs_df.groupby('userId').agg({'movieId': 'count'})
user_watch_movies.show()

+------+--------------+
|userId|count(movieId)|
+------+--------------+
|    26|            21|
|    29|            81|
|   474|          2108|
|    65|            34|
|   191|            85|
|   418|            93|
|   541|            87|
|   558|            56|
|   222|           250|
|   270|            40|
|   293|            21|
|   243|            36|
|   278|            20|
|   367|           185|
|   442|            20|
|    19|           703|
|    54|            33|
|   296|            27|
|   277|            28|
|   287|           152|
+------+--------------+
only showing top 20 rows



In [146]:
user_genres_df = watchs_df.select('userId','genres')
replace_udf = udf(lambda x: x.replace('|',' '))
user_genres_df = user_genres_df.withColumn("genres", replace_udf(col("genres")))
user_genres_df = user_genres_df.withColumn("genres", explode(split(user_genres_df['genres'], ' ')))
# Deduplication
user_genres_df.dropDuplicates().show()

+------+---------+
|userId|   genres|
+------+---------+
|    10|    Drama|
|    12|Animation|
|    80|Animation|
|   105|Adventure|
|   108|   Sci-Fi|
|   119|  Fantasy|
|   140|  Musical|
|   151|  Mystery|
|   162|Adventure|
|   166| Thriller|
|   176|    Crime|
|   177|  Romance|
|   187|  Western|
|   199| Children|
|   207|  Fantasy|
|   224| Children|
|   230|   Sci-Fi|
|   253|   Action|
|   261|    Crime|
|   261|  Romance|
+------+---------+
only showing top 20 rows



## Q3:The numbers of genre that users has watched 

In [147]:
user_watch_genres = user_genres_df.groupby('userId').agg({'genres': 'count'})
user_watch_genres.show()

+------+-------------+
|userId|count(genres)|
+------+-------------+
|    26|           59|
|    29|          231|
|   474|         4739|
|    65|          109|
|   191|          205|
|   418|          278|
|   541|          243|
|   558|          152|
|   222|          622|
|   270|          105|
|   293|           59|
|   243|          112|
|   278|           42|
|   367|          435|
|   442|           46|
|    19|         1855|
|    54|           96|
|   296|           87|
|   277|           79|
|   287|          385|
+------+-------------+
only showing top 20 rows



## Q3: List of users of each movie

In [148]:
watchs_df.select('movieId','userId').rdd.reduceByKey(lambda x,y: str(x)+','+str(y)).take(5)

[(216,
  '1,6,19,22,40,45,58,68,103,109,136,151,169,177,217,226,240,274,280,284,298,307,310,353,356,365,369,373,380,411,414,428,435,438,448,474,477,484,495,498,500,510,555,561,583,590,599,600,608'),
 (260,
  '1,4,7,15,16,17,18,19,21,25,27,28,30,32,39,42,44,45,51,52,57,59,62,63,64,66,68,69,71,72,73,75,76,77,78,79,82,84,86,90,91,93,95,96,103,104,105,112,113,114,115,120,122,123,124,128,129,131,132,135,137,139,140,141,149,152,155,156,160,166,167,171,177,180,182,186,187,193,195,198,199,200,201,202,205,210,211,212,215,217,219,220,223,224,226,227,228,231,232,234,239,246,247,248,249,254,256,262,265,266,267,268,274,275,276,277,279,282,288,290,292,294,298,302,303,304,305,307,308,312,313,314,317,318,323,328,330,334,337,343,344,348,350,352,354,357,361,362,363,364,365,367,368,369,370,372,376,380,381,382,385,387,389,391,399,400,407,408,409,410,414,415,416,422,425,428,432,434,437,438,441,443,445,448,450,452,453,456,457,459,461,462,464,466,469,474,475,477,479,480,483,487,492,493,494,497,500,509,513,51

## Q4:The average rating of each movie

In [149]:
movies_info = watchs_df.select('movieId','title','rating').groupby('movieId','title').agg({'rating': 'mean'})
movies_info.show()

+-------+--------------------+------------------+
|movieId|               title|       avg(rating)|
+-------+--------------------+------------------+
|   3448|Good Morning, Vie...| 3.802325581395349|
|   3702|      Mad Max (1979)| 3.486842105263158|
|    502|Next Karate Kid, ...|2.3666666666666667|
|  54286|Bourne Ultimatum,...| 3.697530864197531|
|  50872|  Ratatouille (2007)|3.8680555555555554|
|   3070|Adventures of Buc...|3.5476190476190474|
|  58025|       Jumper (2008)|               3.0|
|   1458|        Touch (1997)|               4.0|
| 171759| The Beguiled (2017)|               3.0|
|   1301|Forbidden Planet ...|3.8076923076923075|
|  33646|Longest Yard, The...| 3.111111111111111|
|  53000|28 Weeks Later (2...|3.5952380952380953|
|   6193|Poolhall Junkies ...|3.8333333333333335|
|  84772|         Paul (2011)|               3.2|
|     55|      Georgia (1995)|               4.0|
|  82854|Gulliver's Travel...|1.8333333333333333|
| 126921|The Fox and the H...|               5.0|


## A list of movies of each genre

In [150]:
movies_genre_df = watchs_df.select('genres','movieId','title')
replace_udf = udf(lambda x: x.replace('|',' '))
movies_genre_df = movies_genre_df.withColumn("genres", replace_udf(col("genres")))
movies_genre_df = movies_genre_df.withColumn("genres", explode(split(movies_genre_df['genres'], ' ')))
#Deduplication
movies_genre = movies_genre_df.dropDuplicates()
movies_genre.show()

+--------+-------+--------------------+
|  genres|movieId|               title|
+--------+-------+--------------------+
|  Action|   1291|Indiana Jones and...|
|   Drama|   1597|Conspiracy Theory...|
|  Comedy|   1641|Full Monty, The (...|
|Thriller|   2467|Name of the Rose,...|
|   Drama|    304|    Roommates (1995)|
|   Drama|    587|        Ghost (1990)|
|Children|    631|All Dogs Go to He...|
| Romance|   1064|Aladdin and the K...|
|  Comedy|   4558|        Twins (1988)|
|Thriller|  54286|Bourne Ultimatum,...|
|   Drama|  97938|   Life of Pi (2012)|
|     War|   1619|Seven Years in Ti...|
|  Sci-Fi|  60069|       WALL·E (2008)|
|    IMAX| 104841|      Gravity (2013)|
|  Action| 120466|      Chappie (2015)|
|   Drama|   2194|Untouchables, The...|
|  Action|   6709|Once Upon a Time ...|
| Mystery|  55290|Gone Baby Gone (2...|
|   Drama|  69951|Imaginarium of Do...|
|   Drama|  99728|Gangster Squad (2...|
+--------+-------+--------------------+
only showing top 20 rows



## Q5:A list of genres and all movies belonging to each genre

In [151]:
movies_genre.select('genres','movieId').rdd.reduceByKey(lambda x,y: str(x)+','+str(y)).take(5)

[('Fantasy',
  '108932,2876,26686,92637,104074,45722,31696,86721,84942,60937,8015,96430,2367,4902,174141,2327,72921,72982,1199,27790,47774,78772,653,26340,48043,182297,1265,837,188301,26693,8620,26741,122886,106072,74553,243,3000,159811,80834,69951,6624,73804,86864,1275,1346,7380,7302,51939,52712,3393,62956,3153,83132,107953,49649,80615,95149,122924,3466,4153,166203,184987,79895,1241,65577,100163,4993,6958,33681,122902,4985,2253,80748,47721,1881,2316,533,8492,594,80083,8965,6773,4392,1278,94015,79702,1464,5214,43289,3358,148978,108795,2161,1702,73854,2148,88125,72731,47124,661,76175,63239,5768,2414,2,4591,27731,184641,91273,74095,135534,7099,70946,53121,82854,65359,83349,139855,393,4294,137517,6936,121007,2138,32554,1681,2123,1835,26258,55259,31184,137857,4911,8580,367,1848,103042,54259,1011,32456,119655,114627,54686,139655,606,4564,7757,3516,5419,26606,3687,1126,26590,136864,6686,4130,170827,27266,5039,5127,78105,53140,32031,2931,885,82169,69606,36289,130050,126142,27251,179819,7067,1

## Q6:List of movive by year

In [152]:
movies_year = watchs_df.select('movieId','title')
movies_year.show()

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      3|Grumpier Old Men ...|
|      6|         Heat (1995)|
|     47|Seven (a.k.a. Se7...|
|     50|Usual Suspects, T...|
|     70|From Dusk Till Da...|
|    101|Bottle Rocket (1996)|
|    110|   Braveheart (1995)|
|    151|      Rob Roy (1995)|
|    157|Canadian Bacon (1...|
|    163|    Desperado (1995)|
|    216|Billy Madison (1995)|
|    223|       Clerks (1994)|
|    231|Dumb & Dumber (Du...|
|    235|      Ed Wood (1994)|
|    260|Star Wars: Episod...|
|    296| Pulp Fiction (1994)|
|    316|     Stargate (1994)|
|    333|    Tommy Boy (1995)|
|    349|Clear and Present...|
+-------+--------------------+
only showing top 20 rows



In [153]:
year_udf = udf(lambda x: re.findall('\((\d+)\)',x)[0])
movies_year = movies_year.withColumn("year", year_udf(col("title")))
movies_year.show()

+-------+--------------------+----+
|movieId|               title|year|
+-------+--------------------+----+
|      1|    Toy Story (1995)|1995|
|      3|Grumpier Old Men ...|1995|
|      6|         Heat (1995)|1995|
|     47|Seven (a.k.a. Se7...|1995|
|     50|Usual Suspects, T...|1995|
|     70|From Dusk Till Da...|1996|
|    101|Bottle Rocket (1996)|1996|
|    110|   Braveheart (1995)|1995|
|    151|      Rob Roy (1995)|1995|
|    157|Canadian Bacon (1...|1995|
|    163|    Desperado (1995)|1995|
|    216|Billy Madison (1995)|1995|
|    223|       Clerks (1994)|1994|
|    231|Dumb & Dumber (Du...|1994|
|    235|      Ed Wood (1994)|1994|
|    260|Star Wars: Episod...|1977|
|    296| Pulp Fiction (1994)|1994|
|    316|     Stargate (1994)|1994|
|    333|    Tommy Boy (1995)|1995|
|    349|Clear and Present...|1994|
+-------+--------------------+----+
only showing top 20 rows



## Q7:List of movies by rating from high to low

In [154]:
movies_info.orderBy(movies_info['avg(rating)'].desc()).show()

+-------+--------------------+-----------+
|movieId|               title|avg(rating)|
+-------+--------------------+-----------+
| 138966|Nasu: Summer in A...|        5.0|
|  96832|  Holy Motors (2012)|        5.0|
|  94810|          Eva (2011)|        5.0|
| 117531|    Watermark (2014)|        5.0|
| 131610|  Willy/Milly (1986)|        5.0|
|  26928|Summer's Tale, A ...|        5.0|
|   4402|Dr. Goldfoot and ...|        5.0|
|  31522|Marriage of Maria...|        5.0|
| 175293|Gena the Crocodil...|        5.0|
|  95311|       Presto (2008)|        5.0|
| 173351|Wow! A Talking Fi...|        5.0|
|   5889|Cruel Romance, A ...|        5.0|
|   5059|Little Dieter Nee...|        5.0|
| 172705|Tickling Giants (...|        5.0|
|  26366|Harlan County U.S...|        5.0|
|  69860|     Eichmann (2007)|        5.0|
|  95149|Superman/Batman: ...|        5.0|
|   1151| Lesson Faust (1994)|        5.0|
|   3496|Madame Sousatzka ...|        5.0|
| 126921|The Fox and the H...|        5.0|
+-------+--

## Q8:List of movies with highest number of watches from high to low

In [155]:
watchs_info = watchs_df.groupby('movieId').agg({'userId': 'count'})
watchs_info.show()

+-------+-------------+
|movieId|count(userId)|
+-------+-------------+
|   2529|           56|
|  60756|           28|
|    474|           70|
|     26|           13|
|  72011|           32|
|   1806|            8|
|   2040|            7|
|   2453|            3|
| 102993|            4|
|     29|           38|
|   6721|            7|
|  96829|           10|
|   3764|            5|
| 106100|           17|
| 106002|           16|
|   4823|           25|
|  45726|            6|
|  51709|            7|
|  69720|            1|
|  91261|            1|
+-------+-------------+
only showing top 20 rows



In [156]:
watchs_info.orderBy(watchs_info['count(userId)'].desc()).show()

+-------+-------------+
|movieId|count(userId)|
+-------+-------------+
|    356|          329|
|    318|          317|
|    296|          307|
|    593|          279|
|   2571|          278|
|    260|          251|
|    480|          238|
|    110|          237|
|    589|          224|
|    527|          220|
|   2959|          218|
|      1|          215|
|   1196|          211|
|   2858|          204|
|     50|          204|
|     47|          203|
|    780|          202|
|    150|          201|
|   1198|          200|
|   4993|          198|
+-------+-------------+
only showing top 20 rows



# part2

## Q1: List the favourite genre of users. (The highest average rating of genre was considered as the favourite. )

In [157]:
#Here consider finding the average scores of each user for all movie genres, and then finding top1 which means his/her favourate genre. 
user_genre_df = watchs_df.select('userId','genres','rating')
replace_udf = udf(lambda x: x.replace('|',' '))
user_genre_df = user_genre_df.withColumn("genres", replace_udf(col("genres")))
user_genre_df = user_genre_df.withColumn("genres", explode(split(user_genre_df['genres'], ' ')))
user_genre_df.show()

+------+---------+------+
|userId|   genres|rating|
+------+---------+------+
|     1|Adventure|   4.0|
|     1|Animation|   4.0|
|     1| Children|   4.0|
|     1|   Comedy|   4.0|
|     1|  Fantasy|   4.0|
|     1|   Comedy|   4.0|
|     1|  Romance|   4.0|
|     1|   Action|   4.0|
|     1|    Crime|   4.0|
|     1| Thriller|   4.0|
|     1|  Mystery|   5.0|
|     1| Thriller|   5.0|
|     1|    Crime|   5.0|
|     1|  Mystery|   5.0|
|     1| Thriller|   5.0|
|     1|   Action|   3.0|
|     1|   Comedy|   3.0|
|     1|   Horror|   3.0|
|     1| Thriller|   3.0|
|     1|Adventure|   5.0|
+------+---------+------+
only showing top 20 rows



In [158]:
user_genre_avg = user_genre_df.groupBy('userId','genres').agg({'rating': 'mean'})
user_genre_avg.show()

+------+---------+------------------+
|userId|   genres|       avg(rating)|
+------+---------+------------------+
|    10|    Drama|3.1527777777777777|
|    12|Animation|               3.0|
|    80|Animation|               4.0|
|   105|Adventure| 3.932142857142857|
|   108|   Sci-Fi| 4.230769230769231|
|   119|  Fantasy|               4.0|
|   140|  Musical| 3.576923076923077|
|   151|  Mystery|               3.0|
|   162|Adventure|               4.0|
|   166| Thriller| 4.014705882352941|
|   176|    Crime| 3.857142857142857|
|   177|  Romance| 3.517167381974249|
|   187|  Western|               3.5|
|   199| Children|2.3333333333333335|
|   207|  Fantasy|2.3333333333333335|
|   224| Children| 4.333333333333333|
|   230|   Sci-Fi|            3.0625|
|   253|   Action|              3.25|
|   261|    Crime| 4.026315789473684|
|   261|  Romance|               5.0|
+------+---------+------------------+
only showing top 20 rows



In [159]:
user_genre_avg_rdd = user_genre_avg.rdd.map(lambda x: (x.userId,(x.genres,x['avg(rating)']))).reduceByKey(lambda x,y: x if x[1] > y[1] else y)
user_genre_avg_rdd.collect()

[(400, ('Crime', 4.777777777777778)),
 (200, ('Film-Noir', 4.666666666666667)),
 (600, ('Documentary', 3.8333333333333335)),
 (401, ('Horror', 4.5)),
 (601, ('listed)', 4.75)),
 (1, ('Film-Noir', 5.0)),
 (201, ('War', 5.0)),
 (2, ('Romance', 4.5)),
 (202, ('Documentary', 4.666666666666667)),
 (402, ('Horror', 5.0)),
 (602, ('Musical', 4.0)),
 (3, ('Mystery', 5.0)),
 (603, ('Film-Noir', 4.260869565217392)),
 (203, ('War', 4.75)),
 (403, ('Crime', 4.0)),
 (4, ('Horror', 4.25)),
 (204, ('War', 4.428571428571429)),
 (404, ('Children', 3.6666666666666665)),
 (604, ('Mystery', 3.75)),
 (205, ('Documentary', 5.0)),
 (605, ('IMAX', 3.6785714285714284)),
 (405, ('IMAX', 4.5)),
 (5, ('Musical', 4.4)),
 (606, ('Film-Noir', 3.8125)),
 (6, ('IMAX', 4.666666666666667)),
 (406, ('IMAX', 4.0)),
 (206, ('Western', 5.0)),
 (207, ('Crime', 3.5)),
 (7, ('Horror', 4.0)),
 (407, ('Mystery', 5.0)),
 (607, ('IMAX', 5.0)),
 (8, ('Animation', 5.0)),
 (608, ('IMAX', 4.0)),
 (408, ('Animation', 5.0)),
 (208, ('Wa

## Q2:Compare the movie taste of two users (using the pearson distance to show justify between -1 and 1)

In [160]:
ratings_dict = {}
for userId,movieId,rating in watchs_df.select('userId','movieId','rating').rdd.map(lambda x: (x.userId,x.movieId,x.rating)).collect():
    if userId not in ratings_dict.keys():
        ratings_dict[userId] = {}
    ratings_dict[userId][movieId] = int(rating)

# Definie the distance function
def euclidean_dis(rating1, rating2):
    """
    Calculate the Euclidean distance of two rating sequences.The input rating1 and rating2 are both scoring dict，{'1':1.0,'2':5.0}
    """
    distance = 0
    commonRatings = False
    for key in rating1:
        if key in rating2:
            distance += (rating1[key] - rating2[key]) ** 2
            commonRatings = True
    # There is a same movie between the two sequences
    if commonRatings:
        return sqrt(distance)
    # no same movie 
    else:
        return -1

def cos_dis(rating1, rating2):
    """
    Calculate the cos distance between two rating sequences.
    """
    distance = 0
    dot_product_1 = 0
    dot_product_2 = 0
    commonRatings = False

    for score in rating1.values():
        dot_product_1 += score ** 2
    for score in rating2.values():
        dot_product_2 += score ** 2

    for key in rating1:
        if key in rating2:
            distance += rating1[key] * rating2[key]
            commonRatings = True

    # There is a same movie between the two sequences
    if commonRatings:
        return 1 - distance / sqrt(dot_product_1 * dot_product_2)
    #  no same movie 
    else:
        return -1


def pearson_dis(rating1, rating2):
    """
    Calculate the pearson distance between two ratings
    """
    sum_xy = 0
    sum_x = 0
    sum_y = 0
    sum_x2 = 0
    sum_y2 = 0
    n = 0
    for key in rating1:
        if key in rating2:
            n += 1
            x = rating1[key]
            y = rating2[key]
            sum_xy += x * y
            sum_x += x
            sum_y += y
            sum_x2 += x ** 2
            sum_y2 += y ** 2

    if n == 0:
        return 0
    denominator = sqrt(sum_x2 - (sum_x ** 2 / n)) * sqrt(sum_y2 - (sum_y ** 2 / n))
    if denominator == 0:
        return 0
    else:
        return (sum_xy - (sum_x * sum_y) / n) / denominator

#Using the following code to campare any two users tastes.
#pearson_dis(ratings_dict[userId1],ratings_dict[userId2])

In [161]:
pearson_dis(ratings_dict[55],ratings_dict[286])

0.9999999999999998

In [162]:
pearson_dis(ratings_dict[56],ratings_dict[146])

-0.9999999999999998

# Part3

## Through the distaces to find similar-taste users

In [163]:
def computerNearestNeighbor(userid, users):
    """
    Give the userid, calculate the distance between other users and it, and sort them.
    """
    distances = []
    for user in users:
        if user != userid:
            distance = pearson_dis(users[user], users[userid])
            distances.append((distance, user))

    # Sort by distance 
    distances.sort()
    return distances

## Use collaborative filtering algorithm to recommend movies to users

In [164]:
#  nearest/top K neighbors
def recommend(userid, users):
 
    nearest = computerNearestNeighbor(userid, users)[0][1]
    recommendations = []
    # Find movies that the nearest neighbor has watched but the user hasn’t watched, and calculate recommendations
    neighborRatings = users[nearest]
    userRatings = users[userid]
    for artist in neighborRatings:
        if not artist in userRatings:
            recommendations.append((artist, neighborRatings[artist]))
    results = sorted(recommendations, key=lambda artistTuple: artistTuple[1], reverse=True)
    df = {"UserID": [], "MovieID": [], "Rating": []}
    for result in results:
        df["UserID"].append(userid)
        df["MovieID"].append(result[0])
        df["Rating"].append(result[1])
    return pd.DataFrame(df)

def make_df(users):
    df = pd.DataFrame({"UserID":[],"MovieID":[],"Rating":[]})
    for user in users:
        df = pd.concat([df,recommend(user,users)],ignore_index=True)
    return df

df = make_df(ratings_dict)

In [165]:
df.head(50)

Unnamed: 0,UserID,MovieID,Rating
0,1.0,30816.0,5.0
1,1.0,40629.0,5.0
2,1.0,98491.0,5.0
3,1.0,4308.0,4.0
4,1.0,4896.0,4.0
5,1.0,5816.0,4.0
6,1.0,8368.0,4.0
7,1.0,31658.0,4.0
8,1.0,40815.0,4.0
9,1.0,44191.0,4.0
