# <p style="text-align: center;">Apache Spark - Song Recommendation System</p>


This project is based on Map-Reduce Framework.  

This project implements a basic song recommender system based on a Kaggle dataset 


This project consists of following steps:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 


In [1]:
import findspark
findspark.init('D:\\apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)

In [297]:
#sc.stop()

In [3]:
## Read kaggle_visible_evaluation_triplets.txt file into RDD
triplet_rdd = sc.textFile(r"msd\kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 
triplet_rdd.take(5)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [300]:
#read kaggle_songs.txt file into RDD
songs_rdd = sc.textFile(r"msd\kaggle_songs.txt") \
    .map(lambda line: line.split(" ")) 

#replacing the song name with song index
song_index_rdd = triplet_rdd.map(lambda x: (x[1], (x[0], x[2]))) \
    .join(songs_rdd)\
    .map(lambda x: (x[1][0][0],(x[1][1], x[1][0][1])))
song_index_rdd.take(10)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', ('25150', '1')),
 ('c34670d9c1718361feb93068a853cead3c95b76a', ('25150', '1')),
 ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', ('25150', '1')),
 ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', ('25150', '2')),
 ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', ('25150', '4')),
 ('f6e34f0a68d5ea1344511e33486f956de361db78', ('25150', '1')),
 ('e326c4b9fe3659ec1dc3af53fd7e0893809dafbc', ('25150', '25')),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf', ('25150', '5')),
 ('daa9e7e53ae787ab4f1b5518b695198947d821a2', ('25150', '1')),
 ('cd4321d8fd42ba44996e7f34c2f6404cf5884696', ('25150', '1'))]

In [301]:
#change triplet columns into Key-value pairs of song name and user history
triplet_pairs_rdd = triplet_rdd.map(lambda x: (x[1], (x[0],x[2])))
triplet_pairs_rdd.take(5)

[('SOBONKR12A58A7A7E0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOEGIYH12A6D4FC0E3', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOFLJQZ12A6D4FADA6', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOHTKMO12AB01843B0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SODQZCY12A6D4F9D11', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'))]

In [302]:
#performing a left outer join to find the number of songs without any user history
never_played_songs_rdd = songs_rdd.leftOuterJoin(triplet_pairs_rdd).filter(lambda x: x[1][1] == None)
print("Songs without User History: ",never_played_songs_rdd.count())

Songs without User History:  223007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. Similarly, generate the rating for all the songs.

In [303]:
#changing the play count column from str to int
triplet_int_rdd = triplet_rdd.map(lambda x: (x[0], int(x[2])))
triplet_int_rdd.take(10)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 1),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 1),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 1),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 1),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 2)]

In [304]:
#grouping the above rdd by user to get the total play count of songs for each user
play_count_rdd = triplet_int_rdd.groupByKey().map(lambda x: (x[0],sum(list(x[1]))))
play_count_rdd.take(10)

[('d7083f5e1d50c264277d624340edaaf3dc16095b', 17),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4', 6),
 ('fdf6afb5daefb42774617cf223475c6013969724', 10),
 ('10cbcd627472477dfbec90fb75017f8df6ce84ec', 9),
 ('30e4a688e6fc9c8bfe55998af3996a909ae34449', 8),
 ('da7bc0ec91a21a54f0b209bcc9ec5b4b49613a68', 129),
 ('3a613180775197cd08c154abe4e3f67af238a632', 50),
 ('4b9b549e84fb29bfbcf7ab34f01c0bcd1bbf93a5', 97),
 ('5b7124d06fe4b027b6dff487da9ac236aa4fc3e4', 31),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0', 44)]

In [305]:
#calculating the user rating for each song for each user by joing the above rdd to triplet rdd 
#and dividing each song's play count by total play count of songs for that user
user_rating_rdd = triplet_rdd.map(lambda x: (x[0], (x[1], int(x[2])))).join(play_count_rdd) \
                            .map(lambda x: (x[0], x[1][0][0], float(x[1][0][1] / x[1][1])))
user_rating_rdd.take(5)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B',
  0.058823529411764705),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUQERE12A58A75633',
  0.058823529411764705),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOIPJAX12A8C141A2D',
  0.058823529411764705),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOEFCDJ12AB0185FA0',
  0.11764705882352941),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOATCSU12A8C13393A',
  0.058823529411764705)]

In [306]:
#extracting only the song name and rating columns from the above rdd
song_rating_rdd = user_rating_rdd.map(lambda x: (x[1], x[2]))
song_rating_rdd.take(5)

[('SOUVUHC12A67020E3B', 0.058823529411764705),
 ('SOUQERE12A58A75633', 0.058823529411764705),
 ('SOIPJAX12A8C141A2D', 0.058823529411764705),
 ('SOEFCDJ12AB0185FA0', 0.11764705882352941),
 ('SOATCSU12A8C13393A', 0.058823529411764705)]

In [307]:
#grouping the above rdd by song name to get the total ratings for each song by all users 
#and the number of user who listened to that song
ratingBySong_rdd = song_rating_rdd.groupByKey().map(lambda x: (x[0],sum(list(x[1])), len(list(x[1]))))
ratingBySong_rdd.takeOrdered(10, key=lambda x: -x[1])


[('SOAUWYT12A81C206F1', 663.1184586134272, 4483),
 ('SOBONKR12A58A7A7E0', 632.049051196204, 4136),
 ('SOFRQTD12A81C233C0', 506.4467776053348, 5043),
 ('SOSXLTC12AF72A7F54', 464.59200022879946, 3672),
 ('SOEGIYH12A6D4FC0E3', 388.0158216161193, 3272),
 ('SOAXGDH12A8C13F8A1', 358.16819549257224, 3780),
 ('SONYKOW12AB01849C9', 311.4989355677909, 3430),
 ('SOFLJQZ12A6D4FADA6', 245.6546023378127, 2668),
 ('SODJWHY12A8C142CCE', 224.21609735194653, 2791),
 ('SOHTKMO12AB01843B0', 212.61485716048583, 2097)]

In [308]:
#computing the normalized rating for each song by calculating its average rating 
norm_song_rating_rdd = ratingBySong_rdd.map(lambda x: (x[0],x[1]/x[2]))
norm_song_rating_rdd.takeOrdered(10, key=lambda x: -x[1])

[('SOANRDA12A6D4F685F', 0.8936170212765957),
 ('SODMVPE12A6D4F79BD', 0.8901098901098901),
 ('SOCQPKP12A8C13BF2D', 0.8898305084745762),
 ('SOJTJXD12A6D4F9834', 0.8837209302325582),
 ('SOAXPKC12A8C13EB18', 0.8783783783783784),
 ('SOFMWXD12A6D4F64B0', 0.8727272727272727),
 ('SODCJCU12AB0182516', 0.8620689655172413),
 ('SODCOLK12A6D4F8424', 0.8571428571428571),
 ('SOMTDHR12A8C136863', 0.8518518518518519),
 ('SORCZBH12AF729ED8E', 0.8382352941176471)]

## Step 3: 
For a given user_id and rating, recommend 5 other songs from the list. Based on another user who liked the same song liked by the chosen user with rating more than the given rating, recommend 5 songs based on the matched user's rating. 

In [309]:
#Picked a user id and a song rating as given user and given rating
givenUserID = 'f7d9497bdc9d6cd84be59f984eb652f101ac32dd' 
givenRating = 0.4

In [310]:
#fetching the given user's song playlist and rating which is already calculated in step2
givenUserHistory_RDD = user_rating_rdd.filter(lambda x: x[0] == givenUserID)
givenUserHistory_RDD.collect()

[('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOEVXKS12A6D4F85BD',
  0.14285714285714285),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOBYSSP12AAF3B32CA',
  0.42857142857142855),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOENZDW12AB017C8B4',
  0.14285714285714285),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SODVXIB12AF72A37F3',
  0.07142857142857142),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOJKQSF12A6D4F5EE9',
  0.07142857142857142),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOAXSAC12AF729F4EB',
  0.07142857142857142),
 ('f7d9497bdc9d6cd84be59f984eb652f101ac32dd',
  'SOEKKYU12A67AD8520',
  0.07142857142857142)]

In [311]:
#making a list of given user's song playlist
givenUserPlaylist = givenUserHistory_RDD.map(lambda x: x[1]).collect()
givenUserPlaylist

['SOEVXKS12A6D4F85BD',
 'SOBYSSP12AAF3B32CA',
 'SOENZDW12AB017C8B4',
 'SODVXIB12AF72A37F3',
 'SOJKQSF12A6D4F5EE9',
 'SOAXSAC12AF729F4EB',
 'SOEKKYU12A67AD8520']

In [312]:
#picked a song(SOBYSSP12AAF3B32CA) from given user's playlist. 
#Used that song name and given rating as filters to find matching users for the given user
matchingUsers_RDD = user_rating_rdd.filter(lambda x: x[1] == 'SOBYSSP12AAF3B32CA' and x[2] >= givenRating and x[0] != givenUserID)
matchingUsers_RDD.collect()

[('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOBYSSP12AAF3B32CA',
  0.7058823529411765),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOBYSSP12AAF3B32CA',
  0.5675675675675675)]

In [313]:
#making a list of matching user's ids
matchingUserList = matchingUsers_RDD.map(lambda x: x[0]).collect()
matchingUserList_rdd

['718a56b289d85f9d41dc64372927636c75cad5f4',
 '040d011849daecc14894f99211eb74599a29ba93']

In [314]:
#fetching the matching users' song playlist and rating which is already calculated in step2
matchingUserHistory_RDD = user_rating_rdd.filter(lambda x: x[0] in matchingUserList )
matchingUserHistory_RDD.collect()

[('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOBYSSP12AAF3B32CA',
  0.7058823529411765),
 ('718a56b289d85f9d41dc64372927636c75cad5f4', 'SOSNJDE12A8C134AFB', 0.2),
 ('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOWNJXI12A58A7D50B',
  0.011764705882352941),
 ('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOKWYXC12A8C145DD2',
  0.011764705882352941),
 ('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOWKMZU12A81C2165A',
  0.023529411764705882),
 ('718a56b289d85f9d41dc64372927636c75cad5f4',
  'SOKSNTY12A8C1380FB',
  0.047058823529411764),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOJSXJY12A8C13E32E',
  0.10810810810810811),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOYHWWR12A8C142EC7',
  0.02702702702702703),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOUFPNI12A8C142D19',
  0.02702702702702703),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOBYSSP12AAF3B32CA',
  0.5675675675675675),
 ('040d011849daecc14894f99211eb74599a29ba93',
  'SOWGCMN12A8C136E44',
  0.0270

In [316]:
#recommending top 5 songs from the matching users' playlist, filtering out the ones that are already in given user's playlist
recommended_songs = matchingUserHistory_RDD.map(lambda x: (x[1], x[2])).filter(lambda x: x[0] not in givenUserPlaylist) \
                    .takeOrdered(5, key=lambda x: -x[1])

recommended_songs

[('SOGJXYW12A6D4F5E37', 0.24324324324324326),
 ('SOSNJDE12A8C134AFB', 0.2),
 ('SOJSXJY12A8C13E32E', 0.10810810810810811),
 ('SOKSNTY12A8C1380FB', 0.047058823529411764),
 ('SOYHWWR12A8C142EC7', 0.02702702702702703)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [317]:
#extracting a section(10000 rows) of the triplet file
triplet_sec_rdd = sc.parallelize(triplet_rdd.take(10000))
triplet_sec_rdd.take(5)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]

In [318]:
#extracting only the user id and song name columns from the above rdd
triplet_sec_songs_rdd = triplet_sec_rdd.map(lambda x: (x[0], (x[1])))
triplet_sec_songs_rdd.take(10)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0'),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3'),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6'),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0'),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11'),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOXLOQG12AF72A2D55'),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOUVUHC12A67020E3B'),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOUQERE12A58A75633'),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOIPJAX12A8C141A2D'),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOEFCDJ12AB0185FA0')]

In [319]:
#grouping the above rdd by user to make key-value pair of users and their song lists
triplet_sec_users_rdd = triplet_sec_songs_rdd.groupByKey().map(lambda x: (x[0],list(x[1])))
triplet_sec_users_rdd.take(10)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ['SOUVUHC12A67020E3B',
   'SOUQERE12A58A75633',
   'SOIPJAX12A8C141A2D',
   'SOEFCDJ12AB0185FA0',
   'SOATCSU12A8C13393A',
   'SOZPZGN12A8C135B45',
   'SOPFVWP12A6D4FC636',
   'SOHEKND12A8AE481D0',
   'SOPSVVG12A8C13B444',
   'SODSKZZ12AB0188524',
   'SONZTNP12A8C1321DF',
   'SOVVLKF12A8C1424F0',
   'SOMLKZO12AB017F4AE',
   'SOACRJG12A8C137A8D',
   'SONJVYU12A8AE44F9E',
   'SOSOUKN12A8C13AB79']),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  ['SOFRQTD12A81C233C0',
   'SOZQIUZ12A8C13CFBE',
   'SOKQNYH12A6D4FA5D3',
   'SOQDMED12A67ADE731',
   'SOAXGDH12A8C13F8A1',
   'SOAAGFH12A8C13D072']),
 ('fdf6afb5daefb42774617cf223475c6013969724',
  ['SOJZBHH12AB017F611',
   'SOONUTJ12A6701D7B4',
   'SONXYJW12AB018898A',
   'SONDFYA12AB018897F',
   'SOATBYQ12AB0188962',
   'SOWDRCH12A8159E8B4',
   'SOJNPGW12AB018896E',
   'SOTFFRI12AB0188953',
   'SORCKFG12A6D4F8F04',
   'SOPWKOX12A8C139D43']),
 ('10cbcd627472477dfbec90fb75017f8df6ce84ec',
  ['SOTNWC

In [320]:
#removing the user id column and making a list of song lists of users
song_interests_by_user = triplet_sec_users_rdd.map(lambda x: (x[1])).collect()
song_interests_by_user

[['SOUVUHC12A67020E3B',
  'SOUQERE12A58A75633',
  'SOIPJAX12A8C141A2D',
  'SOEFCDJ12AB0185FA0',
  'SOATCSU12A8C13393A',
  'SOZPZGN12A8C135B45',
  'SOPFVWP12A6D4FC636',
  'SOHEKND12A8AE481D0',
  'SOPSVVG12A8C13B444',
  'SODSKZZ12AB0188524',
  'SONZTNP12A8C1321DF',
  'SOVVLKF12A8C1424F0',
  'SOMLKZO12AB017F4AE',
  'SOACRJG12A8C137A8D',
  'SONJVYU12A8AE44F9E',
  'SOSOUKN12A8C13AB79'],
 ['SOFRQTD12A81C233C0',
  'SOZQIUZ12A8C13CFBE',
  'SOKQNYH12A6D4FA5D3',
  'SOQDMED12A67ADE731',
  'SOAXGDH12A8C13F8A1',
  'SOAAGFH12A8C13D072'],
 ['SOJZBHH12AB017F611',
  'SOONUTJ12A6701D7B4',
  'SONXYJW12AB018898A',
  'SONDFYA12AB018897F',
  'SOATBYQ12AB0188962',
  'SOWDRCH12A8159E8B4',
  'SOJNPGW12AB018896E',
  'SOTFFRI12AB0188953',
  'SORCKFG12A6D4F8F04',
  'SOPWKOX12A8C139D43'],
 ['SOTNWCI12AAF3B2028',
  'SOJGKMM12A6D4F98CD',
  'SONMWXV12AB01803B4',
  'SOZKNSB12A8C140F11',
  'SOGPLBE12A58A80442',
  'SOBWSGV12AB018B5E0',
  'SOWINIH12AB018CC51',
  'SOXHVRT12A81C2320D'],
 ['SOCHADN12A6310ED94',
  'SOXVVSM12

In [321]:
#removing the song name column and making a list of users
users_sec = triplet_sec_users_rdd.map(lambda x: (x[0])).collect()
users_sec

['d7083f5e1d50c264277d624340edaaf3dc16095b',
 'd68dc6fc25248234590d7668a11e3335534ae4b4',
 'fdf6afb5daefb42774617cf223475c6013969724',
 '10cbcd627472477dfbec90fb75017f8df6ce84ec',
 '6530c4fc41b9110de5d39fe0355fa103c66385f0',
 '47bf07bcb932cf88175ba3eb218401f9fa15fe6b',
 '5a68f7886f7e778490c6f13807039ff4152bcd62',
 '6493c305190b52657d4ea3f4adf367ffcf3427af',
 'baf2fe5885ab93fbbdb7fecc6691788e70afb6c8',
 '766369a79641ed084b8e6c1e1096dde15eed0fc1',
 '041d75b5ec1dc50634cce4a19f6b29f61fc5d2bf',
 'e6533f7fcb62fe305d4e241adec7475c62c15ce5',
 'a81d6d15d534a34e01fe67bc4d4945deef40ca7e',
 '8035a56d59f07ea7388ab32465015649fc7a0e51',
 '5c55f6e9b0f88600df3683e510a6153e1cbc5b29',
 '51e09f874aa261fc8106a091bb2ae21eca3cbcf3',
 'be581a4302a0d6d0431478b1ddb3550f3263ea07',
 '0c65a060faaf2c3f9a6aed6c9732131709c33d55',
 'ba182bf7c5c69c98f7fe46e5d1e3a5729faef94a',
 '22bf06f92d946c15224227c95a66f9f9ba64c500',
 '0936940c167dcd3cc49d92007c7871668f42e5ff',
 'f5e8f430212e751c4860a6ec488f2964235f678e',
 '9e4fbd5d

In [322]:
#converting the user list to a data frome
import pandas as pd
df = pd.DataFrame(users_sec, columns = ["UserId"])
df.head()

Unnamed: 0,UserId
0,d7083f5e1d50c264277d624340edaaf3dc16095b
1,d68dc6fc25248234590d7668a11e3335534ae4b4
2,fdf6afb5daefb42774617cf223475c6013969724
3,10cbcd627472477dfbec90fb75017f8df6ce84ec
4,6530c4fc41b9110de5d39fe0355fa103c66385f0


In [323]:
#finding the unique songs from the song list above
unique_songs = sorted(list({ song
                                 for user_interests in song_interests_by_user
                                 for song in user_interests }))

unique_songs

['SOAAFYH12A8C13717A',
 'SOAAGFH12A8C13D072',
 'SOAAGRT12AF72A2A6C',
 'SOAALJB12A8C13C4B6',
 'SOAAMPN12A6310D812',
 'SOAAQCS12AB01826DD',
 'SOAAROC12A6D4FA420',
 'SOAAUKC12AB017F868',
 'SOAAVUV12AB0186646',
 'SOAAWEE12A6D4FBEC8',
 'SOABBYY12AB0181CB0',
 'SOABCBI12A67AE1F27',
 'SOABESS12A67ADD315',
 'SOABHYV12A6D4F6D0F',
 'SOABJBU12A8C13F63F',
 'SOABLRG12A67ADA723',
 'SOABOLQ12AC468BDDD',
 'SOABRVJ12A8C13E5FD',
 'SOABTTR12A6D4FC2EB',
 'SOABWOG12AB017C97A',
 'SOABXAH12AB018654E',
 'SOABXYN12A8C137D1C',
 'SOACCGT12AB018DDEE',
 'SOACKIA12A6D4F73D7',
 'SOACNRN12A8C142771',
 'SOACRBY12AB017C757',
 'SOACRJG12A8C137A8D',
 'SOACXGP12A8C1323AF',
 'SOACYIR12A58A7A89A',
 'SOADCQJ12AB018546A',
 'SOADFMR12A6701FB5F',
 'SOADGAI12A6D4F64A3',
 'SOADJGO12AB0185D5B',
 'SOADJQJ12A8C141D38',
 'SOADKOB12AB017DB36',
 'SOADMKP12A8C13BAA4',
 'SOADOIO12A8C13DFB2',
 'SOADOQJ12A8AE45C7E',
 'SOADOWZ12AB01887EF',
 'SOADQMO12A8C136FF9',
 'SOADQPP12A67020C82',
 'SOADTRU12AB0186063',
 'SOADVMT12AB0188983',
 'SOADVUP12

In [324]:
#defining a function to create user interest matrices
#1 represents that the song is in the user's playlist
#0 represents that the song is not in the user's playlist
def make_user_interest_vector(song_interests_by_user):
    return [1 if song in   else 0
            for song in unique_songs]

user_interest_matrix = list(map(make_user_interest_vector, song_interests_by_user))

user_interest_matrix[0]

[0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,


In [325]:
#defining a function to compute cosine similarity 
import math
import numpy as np

def cosineSimilarity(vec1, vec2):
    return numpy.dot(vec1, vec1) / (numpy.linalg.norm(vec1) * numpy.linalg.norm(vec1))

In [326]:
#computing cosine similarity between all users by passing the user interest matrices
user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_interest_matrix]
                     for interest_vector_i in user_interest_matrix]

user_similarities[0]

[1.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0,
 0.0

In [327]:
#converting these user similarities to a data frame
df1 = pd.DataFrame(user_similarities)
df1.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,743,744,745,746,747,748,749,750,751,752
0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.089087,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.068041,0.0
2,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.083333,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.051031,0.0,0.0,0.0,0.0,0.0,0.0


In [328]:
#joing the user similarities df with the user id df created before to inclue a user id column for the user similarities
sim_df = pd.concat([df, df1], axis=1, join='inner')
sim_df.head()


Unnamed: 0,UserId,0,1,2,3,4,5,6,7,8,...,743,744,745,746,747,748,749,750,751,752
0,d7083f5e1d50c264277d624340edaaf3dc16095b,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
1,d68dc6fc25248234590d7668a11e3335534ae4b4,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.089087,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.068041,0.0
2,fdf6afb5daefb42774617cf223475c6013969724,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
3,10cbcd627472477dfbec90fb75017f8df6ce84ec,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.083333,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,6530c4fc41b9110de5d39fe0355fa103c66385f0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.051031,0.0,0.0,0.0,0.0,0.0,0.0


In [329]:
#defining a function to find the most similar users for any given user id and sorting by the high to low cosine similarity values

def most_similar_users_to(user_id):
    pairs = [[other_user_id, similarity]              
             for other_user_id, similarity in         
                enumerate(user_similarities[user_id]) 
             if user_id != other_user_id and similarity > 0] 

    return sorted(pairs,                              
                  key=lambda pair: pair[1],          
                  reverse=True) 

In [330]:
most_similar_users_to(5)

[[739, 0.06933752452815364],
 [343, 0.06537204504606134],
 [302, 0.062017367294604234],
 [335, 0.05661385170722978],
 [741, 0.05661385170722978],
 [195, 0.04499212706658475],
 [600, 0.038461538461538464],
 [212, 0.030261376633440123]]

In [331]:
#defining a fetches the actual user id based on the row indices in the output above
def fetch_userId(similarUserList):
    top_similar_users = []
    for i in range (0,len(similarUserList)):
    #for rows in range (0,sim_df.shape[0]): 
        user_id = sim_df.loc[similarUserList[i][0]]['UserId']
        if user_id not in top_similar_users:
            top_similar_users.append([user_id,similarUserList[i][1]])
    return top_similar_users
            

In [332]:
#making a list of the similar users
similarUserList = fetch_userId(most_similar_users_to(5))
similarUserList

[['dc102ddaf347b4d995a2ab1769ed632ab6827fe3', 0.06933752452815364],
 ['d9f9e0c524c4345052933df914433b6c80293aa8', 0.06537204504606134],
 ['9a8a50492b6dfe7ea2d7991dfd4b8b59d10b4235', 0.062017367294604234],
 ['4da1223b4df257578c124c07edaeee4b0e5fb101', 0.05661385170722978],
 ['809e4bc83c049cd89524ced9caa22304b23a0100', 0.05661385170722978],
 ['6ab53dc4361a4c4c29c579e777644d1213e26a3d', 0.04499212706658475],
 ['c48985d93d590dff33d20094eebc863b0cb455e8', 0.038461538461538464],
 ['d58fe04a5336173938a7123df91f4395a482d011', 0.030261376633440123]]

In [333]:
#Picking a user id and finding its row index
givenUser1 = 'd9f9e0c524c4345052933df914433b6c80293aa8'
givenUserIndex = int(df[df['UserId']== givenUser1].index.values)
givenUserIndex

343

In [334]:
#finding the top 5 most similar users for the given user using by calling the respective functions  
top_similar_users = fetch_userId(most_similar_users_to(givenUserIndex))[:5]
top_similar_users

[['4da1223b4df257578c124c07edaeee4b0e5fb101', 0.09622504486493763],
 ['809e4bc83c049cd89524ced9caa22304b23a0100', 0.09622504486493763],
 ['a96a7c7b9890ee18cd850ec8e8f1ef81cffa9e72', 0.0890870806374748],
 ['bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca', 0.07856742013183861],
 ['6ab53dc4361a4c4c29c579e777644d1213e26a3d', 0.07647191129018725]]

In [335]:
#fetching the given user's song playlist and rating which is already calculated in step2
givenUserHistory_RDD1 = user_rating_rdd.filter(lambda x: x[0] == givenUser1)
givenUserHistory_RDD1.collect()

[('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SONBBTB12A6D4F7898',
  0.034482758620689655),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOGPGNX12A8C1423BF',
  0.034482758620689655),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOXSMGP12A6310DFA6',
  0.13793103448275862),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SODBAHE12A8C132D34',
  0.3448275862068966),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOKRWJJ12AB018DE11',
  0.13793103448275862),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOUQPHL12AB018C1F2',
  0.034482758620689655),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOMGIYR12AB0187973',
  0.10344827586206896),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SORRZCX12A8C1423DA',
  0.10344827586206896),
 ('d9f9e0c524c4345052933df914433b6c80293aa8',
  'SOPKGJJ12A8AE4883C',
  0.06896551724137931)]

In [336]:
#making a list of given user's song playlist
givenUserPlaylist1 = givenUserHistory_RDD1.map(lambda x: x[1]).collect()
givenUserPlaylist1

['SONBBTB12A6D4F7898',
 'SOGPGNX12A8C1423BF',
 'SOXSMGP12A6310DFA6',
 'SODBAHE12A8C132D34',
 'SOKRWJJ12AB018DE11',
 'SOUQPHL12AB018C1F2',
 'SOMGIYR12AB0187973',
 'SORRZCX12A8C1423DA',
 'SOPKGJJ12A8AE4883C']

In [337]:
#making a list of the most similar user's ids
top_similar_users_list = list(users[0] for users in top_similar_users)
top_similar_users_list

['4da1223b4df257578c124c07edaeee4b0e5fb101',
 '809e4bc83c049cd89524ced9caa22304b23a0100',
 'a96a7c7b9890ee18cd850ec8e8f1ef81cffa9e72',
 'bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
 '6ab53dc4361a4c4c29c579e777644d1213e26a3d']

In [338]:
#fetching the similar users' song playlist and rating which is already calculated in step2
topSimilarUserHistory_RDD = user_rating_rdd.filter(lambda x: x[0] in top_similar_users_list )
topSimilarUserHistory_RDD.collect()

[('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOHTKMO12AB01843B0',
  0.01694915254237288),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOWONPQ12A8BED02A8',
  0.05084745762711865),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOXKCEX12AF729CFB5',
  0.05084745762711865),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOSXLTC12AF72A7F54',
  0.05084745762711865),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOJCNNO12A8C132705',
  0.06779661016949153),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOQWYAQ12A6D4FB9A3',
  0.09322033898305085),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOKPSFX12A58A7B5CF',
  0.05084745762711865),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOFRQTD12A81C233C0',
  0.09322033898305085),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOMEUED12A6701DBEA',
  0.05084745762711865),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOYFMNU12AB0181435',
  0.09322033898305085),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca',
  'SOUKMKR12A8

In [339]:
#recommending songs from the most similar users' playlist, filtering out the ones that are already in given user's playlist
recommended_songs1 = topSimilarUserHistory_RDD.map(lambda x: (x[1], x[2])).filter(lambda x: x[0] not in givenUserPlaylist1) \
                    .takeOrdered(5, key=lambda x: -x[1])

recommended_songs1

[('SOXOZIX12AF72A4EA0', 0.3888888888888889),
 ('SONGYFE12AB018562D', 0.30935251798561153),
 ('SOXVYWK12A6D4F8641', 0.16666666666666666),
 ('SOWVBDQ12A8C13503D', 0.15625),
 ('SOQZYQH12A8AE468E5', 0.13333333333333333)]