In [2]:
import math
import csv
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("final project").getOrCreate()
sc = spark.sparkContext

25/04/19 17:28:51 WARN Utils: Your hostname, Tappns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 141.215.217.245 instead (on interface en0)
25/04/19 17:28:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/19 17:28:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/19 17:28:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
movies = sc.textFile(name='./movies.csv')
ratings = sc.textFile(name='./ratings.csv')

In [5]:
# creates key value pair of userid, movieid, and rating
def user_rating_mapper(line):
    (userID, movieID, rating, timestamp) = line.split(',')
    return (userID, (movieID, float(rating))) # (userId, (movieId, rating))

# filter ignores first line in rating csv
ratings_rdd = ratings.filter(lambda line: not line.startswith("userId")).map(user_rating_mapper)

# groups rating by userid and calculate average of that 
def cal_avg_rating(group):
    # input (group) : [ (movieid, rating), ....]
    # outout : average
    length = len(group)
    total_rating =0.0

    for movieId, rating in group:
        total_rating = total_rating + rating

    return total_rating/length

# this rdd has represenation like [{"userid",avg, "userid2":avg, ....}]
user_avg = ratings_rdd.groupByKey().mapValues(cal_avg_rating)

# baselined user rating around zero 
def optimized(x):
    # input(x) : (userid, ((movieId, rating), average)) -> ('21', (('799', 5.0), 3.8285714285714287))
    # output: (userid, (movieId, rating - average))
    return (x[0], (x[1][0][0], x[1][0][1] - x[1][1]))

centered_ratings = ratings_rdd.join(user_avg).map(optimized)



25/04/19 17:29:03 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [6]:
# Map movie ID, title, and genres 
def movie_genre_mapper(line):
    # you can't split using ',' becasue some movie title has a comman
    # you have to use csv reader to split the lines 
    fields = next(csv.reader([line])) 
    movieId = fields[0]
    title = fields[1]
    genres = fields[2].split('|')
    return (movieId, (title, genres))  # (movieId, (title, [genres]))


movies_rdd = movies.filter(lambda line: not line.startswith("movieId")).map(movie_genre_mapper)
movie_rating_rdd = centered_ratings.map(lambda x: (x[1][0],(x[0],x[1][1]))) # (movieId, (userId, rating))

# create key-value pair for user's rating to each genre they have rated
def genre_and_rating_mapper(x):
    # input(x) : (movieId, ((userId, rating), (title, [genres])))
    #input(example): ('327', (('72', -1.0238095238095237), ('Tank Girl (1995)', ['Action', 'Comedy', 'Sci-Fi'])))
    
    # output : [(userid, (genre,rating)), ...]
    genres = x[1][1][1]
    title = x[1][1][0]
    rating = x[1][0][1]
    userId = x[1][0][0]
    movieID = x[0]

    values = []

    for genre in genres:
        values.append( (userId, (genre,  rating)))

    return values


user_genre_scores = movie_rating_rdd.join(movies_rdd).flatMap(genre_and_rating_mapper)


# group by userID and find average of rated item user watched 
def avg_per_genre(group):
    #input: [ (genre, rating), .....(genre n, rating n)]
    #output: user profile {"genre": coefficent, ....} 
    total = {}
    count = {}
    for genre, rating in group:
        total[genre] = total.get(genre,0.0) + rating
        count[genre] = count.get(genre, 0) + 1

    avg = {}
    for g in total:
        avg[g] = total[g] / count[g]

    return avg


user_profile = user_genre_scores.groupByKey().mapValues(avg_per_genre)

In [7]:
# create item profile 

# gather all genres
all_genres = movies_rdd.flatMap(lambda x: x[1][1]).distinct().collect()

# create item profile vector 
# e.g., [ (movieid, vector), ....]
# vector = movie profile = {"genre1":0.0, "genre2":1.0, .....} 
def genres_to_dict_vector(genres, all_genres):  

    vector = {}
    for genre in all_genres:
        if genre in genres:
            vector[genre]=1.0
        else:
            vector[genre]=0.0
    
    return vector
# it creates this following rdd data representation {"movieId":{profile}, ...., "movieIDn":{prifle}}
item_profile = movies_rdd.mapValues(lambda x: genres_to_dict_vector(x[1], all_genres))

                                                                                

In [8]:
# collete user prfile and movie they watched as a map, then braodcast to all nodes. 

# collects all user and their profile as a map
# example {"user1":{profile}, "user2":{profile},....}
all_user_profile = user_profile.collectAsMap()

# groups all movies that user watched and turns that to a set.
# example, {"user1": {m1,m2,..,mn}, "user2":{m1,m2,..,mn},.....,}
watched_movies = ratings_rdd.map(lambda x: (x[0], x[1][0])).groupByKey().mapValues(set).collectAsMap()

#sends the value to all nodes and recreate rdd
user_profile_broadcast = sc.broadcast(all_user_profile)
watched_movies_broadcast = sc.broadcast(watched_movies)

# cosine helper function
def cosine_similarity(user_vector, item_vector):
    dot_product = sum(user_vector.get(g, 0) * item_vector.get(g, 0) for g in item_vector)
    user_magnitudes = math.sqrt(sum(v ** 2 for v in user_vector.values()))
    movie_magnitudes = math.sqrt(sum(v ** 2 for v in item_vector.values()))
    if user_magnitudes == 0 or movie_magnitudes == 0:
        return 0.0
    return dot_product / (user_magnitudes * movie_magnitudes)


user_id = '1'  # change as needed

# gets the user profile 
# example, {'Drama': 0.017104714226116226,'Comedy': -0.19858156028368773,.......,'Film-Noir': 1.4680851063829787}
profile = user_profile_broadcast.value.get(user_id, {})  

# gets all the movies that user watched 
# example, {'1041','1056','1060',........,'1090','1094','110'}
watched = watched_movies_broadcast.value.get(user_id, set())  

# filter out all movies that user watched, computes cosine similarity, and takes out top 10 movies. 
recommendations = item_profile \
    .filter(lambda x: x[0] not in watched) \
    .map(lambda x: (x[0], cosine_similarity(profile, x[1]))) \
    .takeOrdered(10, key=lambda x: -x[1])

for movieId, score in recommendations:
    print(f"Movie {movieId} -> similarity: {score:.4f}")



                                                                                

Movie 746 -> similarity: 0.7051
Movie 1153 -> similarity: 0.7051
Movie 1154 -> similarity: 0.7051
Movie 2066 -> similarity: 0.7051
Movie 3292 -> similarity: 0.7051
Movie 3380 -> similarity: 0.7051
Movie 4426 -> similarity: 0.7051
Movie 5169 -> similarity: 0.7051
Movie 5795 -> similarity: 0.7051
Movie 7335 -> similarity: 0.7051


In [9]:
# display user profile
profile

{'Drama': 0.017104714226116226,
 'Comedy': -0.19858156028368773,
 'Romance': -0.2819148936170211,
 'Sci-Fi': 0.4092615769712142,
 'Thriller': -0.03191489361702127,
 'Crime': 0.34308510638297873,
 'Mystery': 0.16808510638297874,
 'Adventure': -0.06132665832290365,
 'War': -0.21373307543520295,
 'Documentary': -0.5319148936170213,
 'Children': -0.03191489361702127,
 'Fantasy': -1.0319148936170213,
 'Horror': 0.1347517730496454,
 'Action': 0.5733482642777159,
 'Western': -0.03191489361702127,
 'Film-Noir': 1.4680851063829787}

In [10]:
# show movie user watched 
print(f"move he watched: ",len(watched))
watched

move he watched:  141


{'1041',
 '1056',
 '1060',
 '1080',
 '1090',
 '1094',
 '110',
 '111',
 '1120',
 '1136',
 '1150',
 '1172',
 '1178',
 '1183',
 '1196',
 '1197',
 '1199',
 '1200',
 '1203',
 '1204',
 '1208',
 '1210',
 '1211',
 '1213',
 '1217',
 '1221',
 '1225',
 '1228',
 '1233',
 '1234',
 '1236',
 '1242',
 '1247',
 '1259',
 '1262',
 '1263',
 '1270',
 '1272',
 '1276',
 '1288',
 '1296',
 '1297',
 '1304',
 '1357',
 '1392',
 '1406',
 '161',
 '1653',
 '166',
 '1693',
 '17',
 '1719',
 '1721',
 '1748',
 '176',
 '1784',
 '1810',
 '1885',
 '1923',
 '1939',
 '1944',
 '1952',
 '1960',
 '1961',
 '1965',
 '1968',
 '2020',
 '2025',
 '2028',
 '2064',
 '2067',
 '2109',
 '2125',
 '223',
 '2232',
 '2243',
 '2247',
 '2268',
 '2312',
 '2313',
 '232',
 '2324',
 '2329',
 '2336',
 '2352',
 '2396',
 '2407',
 '2424',
 '25',
 '2502',
 '2520',
 '2529',
 '2599',
 '260',
 '2640',
 '2690',
 '2712',
 '2724',
 '2797',
 '2882',
 '2890',
 '29',
 '2918',
 '2944',
 '2966',
 '2973',
 '2985',
 '2997',
 '30',
 '302',
 '3030',
 '306',
 '307',
 '

In [None]:
# stop spark application 
sc.stop()

In [None]:
profile