# Project - 1 : Apache Spark 

Instructor: Ramesh Yerraballi
TA: Madhumitha Sakthi
Semester: Fall 2019
Due Date: 11:59pm, Monday 9/16

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains '_kaggle_visible_evaluation_triplets.txt_'. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count). In another file, '_kaggle_songs.txt_', each song is marked using an index for easier representation of songs. 



What to turn in:
A zip folder which will have:
- Jupyter Notebook
- A brief report on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
- datasets folder with the csv files you are using in your notebook.
- Notebook should use relative path to the csv files in datasets folder.
- Name of the zip folder - <your\_name>\_<your\_partner_name>.zip


This project consists of 4 questions. 
1. Create an RDD with _msd_evalutation_triplets.txt_ and replace the song name with the song index from _msd_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [0]:
### Starter code ####

##### These lines are to tell jupyter where to find Apache Spark ####
import findspark
from operator import itemgetter
findspark.init('D:\\Spark')
#findspark.init('C:\\EPrograms\\spark-2.4.4-bin-hadoop2.7')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
# There are two configurable parameters
# 1. A cluster URL, namely  local  in this example, which tells Spark how to connect
# to a cluster.  local  is a special value that runs Spark on one thread on the local
# machine, without connecting to a cluster.
# 2. An application name, namely  Songs  in this example. This will identify your
# application on the cluster manager’s UI if you connect to a cluster.
sc = SparkContext.getOrCreate(conf = conf)
##### These lines are to tell jupyter where to find Apache Spark ####

In [0]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"msdchallenge/kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [0]:
triplet_rdd.take(5)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]

In [0]:
songs_rdd = sc.textFile(r"msdchallenge/kaggle_songs.txt") \
    .map(lambda line: line.split()) 

[['SOAAADD12AB018A9DD', '1'],
 ['SOAAADE12A6D4F80CC', '2'],
 ['SOAAADF12A8C13DF62', '3'],
 ['SOAAADZ12A8C1334FB', '4'],
 ['SOAAAFI12A6D4F9C66', '5']]

In [0]:
songs_rdd.take(5)

[['SOAAADD12AB018A9DD', '1'],
 ['SOAAADE12A6D4F80CC', '2'],
 ['SOAAADF12A8C13DF62', '3'],
 ['SOAAADZ12A8C1334FB', '4'],
 ['SOAAAFI12A6D4F9C66', '5']]

In [0]:
#Create an RDD with msdevalutation_triplets.txt_ and replace the song name with the song index from msdsongs.txt_. 
#Identify the number of songs that do not have any rating.
songs_rdd.map(lambda x : (x[0],(x[1]))) \
         .join(triplet_rdd.map(lambda x: (x[1],(x[0],x[2])))) \
         .map(lambda x : [x[1][1][0],x[1][0],x[1][1][1]]) \
         .take(5)

[['32cf63cf65787ce7e72fc7fda6ee585979af6582', '16', '1'],
 ['d1b88940eabd4fab860edf68f0f0842e4f902c78', '16', '1'],
 ['140be58d1ea6d645ed7b317f61197b97975ac1ba', '29', '1'],
 ['bb2e92130a312d9f9e5754f05b36ca85505656b0', '42', '1'],
 ['12e71abacd59cd98b403ffc0444b19dc3ba0b0d6', '48', '1']]

In [0]:
triplet_rdd.filter(lambda x: x[0] == '32cf63cf65787ce7e72fc7fda6ee585979af6582' ).collect()

[['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOKONZI12A6D4FA17B', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOIINQC12A6D4F73A6', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOAAAQN12AB01856D3', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SODOQUU12AAF3B33DF', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOFGDGA12AB017C86B', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOKVKON12A6D4F6DC8', '1'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOYJNHO12AB01856DC', '2'],
 ['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOOBBJV12A8151CE9B', '1']]

In [0]:
triplet_rdd.filter(lambda x: x[1] == 'SOAAAQN12AB01856D3' ).collect()

[['32cf63cf65787ce7e72fc7fda6ee585979af6582', 'SOAAAQN12AB01856D3', '1'],
 ['d1b88940eabd4fab860edf68f0f0842e4f902c78', 'SOAAAQN12AB01856D3', '1']]

In [0]:
songs_rdd.filter(lambda x: x[1] == '16').collect()

[['SOAAAQN12AB01856D3', '16']]

In [0]:
songs_rdd.map(lambda x : (x[0],(x[1]))) \
            .leftOuterJoin(triplet_rdd.map(lambda x: (x[1],(x[0],x[2])))) \
            .filter(lambda x : x[1][1] == None).collect()

[('SOAAALJ12AB01828B4', ('12', None)),
 ('SOAABDX12A58A7E508', ('25', None)),
 ('SOAABRF12AC468803F', ('34', None)),
 ('SOAABYG12AB01876F4', ('40', None)),
 ('SOAACGC12A8C1311C6', ('47', None)),
 ('SOAACNA12AB01867BA', ('51', None)),
 ('SOAACNQ12A8AE4692D', ('53', None)),
 ('SOAACTC12AB0186A20', ('60', None)),
 ('SOAACXA12AB018A864', ('64', None)),
 ('SOAACZY12A6D4F8081', ('67', None)),
 ('SOAADAG12AB018997B', ('69', None)),
 ('SOAADAS12A58A784EC', ('70', None)),
 ('SOAADJU12AB01872C7', ('78', None)),
 ('SOAADVA12A8C130133', ('89', None)),
 ('SOAADWH12A58A7A651', ('91', None)),
 ('SOAAETS12AB0180FAE', ('117', None)),
 ('SOAAEUY12AB017FBA0', ('120', None)),
 ('SOAAEXS12A58A784B3', ('121', None)),
 ('SOAAFBJ12AB018E272', ('125', None)),
 ('SOAAFDI12AB018888E', ('128', None)),
 ('SOAAFEW12A8AE47AE5', ('131', None)),
 ('SOAAFVG12A8C13C107', ('146', None)),
 ('SOAAGBA12AB018126F', ('153', None)),
 ('SOAAGQU12AB01843A8', ('169', None)),
 ('SOAAHMO12A8C13EC21', ('186', None)),
 ('SOAAHOU12A58

In [0]:
triplet_rdd.filter(lambda x: x[1] == 'SOAAALJ12AB01828B4' ).collect()

[]

## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [0]:
song_played = triplet_rdd.map(lambda x: (x[1],int(x[2]))) \
                            .reduceByKey(lambda x, y: x + y)

In [0]:
song_played.take(5)

[('SOBONKR12A58A7A7E0', 35432),
 ('SOFLJQZ12A6D4FADA6', 7895),
 ('SOHTKMO12AB01843B0', 10515),
 ('SOXLOQG12AF72A2D55', 4671),
 ('SOZPZGN12A8C135B45', 39)]

In [0]:
total_plays = song_played.map(lambda x: x[1]) \
                            .reduce(lambda x, y: x + y)

In [0]:
print(total_plays)

4624340


In [0]:
song_rating1 = song_played.map(lambda x: (x[0], 1.0*x[1]/total_plays))

In [0]:
song_rating1.collect()

[('SOBONKR12A58A7A7E0', 0.007662066370552338),
 ('SOFLJQZ12A6D4FADA6', 0.0017072706591643348),
 ('SOHTKMO12AB01843B0', 0.0022738379963410994),
 ('SOXLOQG12AF72A2D55', 0.0010100900885315527),
 ('SOZPZGN12A8C135B45', 8.433635935073978e-06),
 ('SOPFVWP12A6D4FC636', 3.222081421348776e-05),
 ('SODSKZZ12AB0188524', 5.838671031974292e-06),
 ('SOACRJG12A8C137A8D', 2.5949649030996856e-06),
 ('SOSOUKN12A8C13AB79', 2.8977108084613156e-05),
 ('SOFRQTD12A81C233C0', 0.004206870602075107),
 ('SOZQIUZ12A8C13CFBE', 2.3787178278413783e-06),
 ('SOAAGFH12A8C13D072', 6.271165182490907e-06),
 ('SOFIPHI12AAF3B3DB2', 9.38512306621053e-05),
 ('SOORFBA12A8C13E49E', 1.8597248472214414e-05),
 ('SOXPJVO12A6D4FCC69', 4.952058023415233e-05),
 ('SOQMFWG12AB0186AD8', 3.978946184752851e-05),
 ('SOUIROO12A8C139C18', 1.5353542343339807e-05),
 ('SOSHJHA12AB0181410', 0.00017775509586232847),
 ('SOSERCK12AB0186462', 9.298624236107207e-06),
 ('SOUAZDS12A8C13BA91', 4.757435655682757e-05),
 ('SOQCTEC12AB0186D7E', 1.18935891392

If the rating is based on all users' play counts and uses the (play counts)/(total play counts), each rating of the song will be a small number as shown above. In the following cells, we are rating songs within a certain user's play history.

For a specific user, let's try to rank the song by play counts and the rating is assigned by $\frac{play\, count - min}{max - min}$.

First, gather basic info (max, min, variety) of each user.

In [0]:
song_user_info = triplet_rdd.map(lambda x: (x[0], (int(x[2]), x[1]))) \
                            .groupByKey() \
                            .mapValues(lambda x: list(x))\
                            .mapValues(lambda x: (x, max(x)[0], min(x)[0], len(x)))

In [0]:
song_user_info.take(5)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  ([(1, 'SOUVUHC12A67020E3B'),
    (1, 'SOUQERE12A58A75633'),
    (1, 'SOIPJAX12A8C141A2D'),
    (2, 'SOEFCDJ12AB0185FA0'),
    (1, 'SOATCSU12A8C13393A'),
    (1, 'SOZPZGN12A8C135B45'),
    (1, 'SOPFVWP12A6D4FC636'),
    (1, 'SOHEKND12A8AE481D0'),
    (1, 'SOPSVVG12A8C13B444'),
    (1, 'SODSKZZ12AB0188524'),
    (1, 'SONZTNP12A8C1321DF'),
    (1, 'SOVVLKF12A8C1424F0'),
    (1, 'SOMLKZO12AB017F4AE'),
    (1, 'SOACRJG12A8C137A8D'),
    (1, 'SONJVYU12A8AE44F9E'),
    (1, 'SOSOUKN12A8C13AB79')],
   2,
   1,
   16)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  ([(1, 'SOFRQTD12A81C233C0'),
    (1, 'SOZQIUZ12A8C13CFBE'),
    (1, 'SOKQNYH12A6D4FA5D3'),
    (1, 'SOQDMED12A67ADE731'),
    (1, 'SOAXGDH12A8C13F8A1'),
    (1, 'SOAAGFH12A8C13D072')],
   1,
   1,
   6)),
 ('fdf6afb5daefb42774617cf223475c6013969724',
  ([(1, 'SOJZBHH12AB017F611'),
    (1, 'SOONUTJ12A6701D7B4'),
    (1, 'SONXYJW12AB018898A'),
    (1, 'SONDFYA12AB018897F'),
    (1, 'SOAT

In [0]:
# To keep track of song variety of each user, maybe used for song rating calculations.
user_variety = song_user_info.map(lambda x: (x[0], x[1][3]))

In [0]:
user_variety.take(5)

[('d7083f5e1d50c264277d624340edaaf3dc16095b', 16),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4', 6),
 ('fdf6afb5daefb42774617cf223475c6013969724', 10),
 ('10cbcd627472477dfbec90fb75017f8df6ce84ec', 8),
 ('30e4a688e6fc9c8bfe55998af3996a909ae34449', 7)]

Calculate the rating of each song for specific user.

In [0]:
# Key: user, Value: [(song_name, user_song_rating), ...]
song_user_rating = song_user_info.mapValues(lambda x: [(item[1], (1.0*item[0]-x[2])/(x[1]-x[2])) if (x[1]>x[2]) else (item[1], 1.0/x[3]) for item in x[0]])

In [0]:
song_user_rating.take(10)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  [('SOUVUHC12A67020E3B', 0.0),
   ('SOUQERE12A58A75633', 0.0),
   ('SOIPJAX12A8C141A2D', 0.0),
   ('SOEFCDJ12AB0185FA0', 1.0),
   ('SOATCSU12A8C13393A', 0.0),
   ('SOZPZGN12A8C135B45', 0.0),
   ('SOPFVWP12A6D4FC636', 0.0),
   ('SOHEKND12A8AE481D0', 0.0),
   ('SOPSVVG12A8C13B444', 0.0),
   ('SODSKZZ12AB0188524', 0.0),
   ('SONZTNP12A8C1321DF', 0.0),
   ('SOVVLKF12A8C1424F0', 0.0),
   ('SOMLKZO12AB017F4AE', 0.0),
   ('SOACRJG12A8C137A8D', 0.0),
   ('SONJVYU12A8AE44F9E', 0.0),
   ('SOSOUKN12A8C13AB79', 0.0)]),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  [('SOFRQTD12A81C233C0', 0.16666666666666666),
   ('SOZQIUZ12A8C13CFBE', 0.16666666666666666),
   ('SOKQNYH12A6D4FA5D3', 0.16666666666666666),
   ('SOQDMED12A67ADE731', 0.16666666666666666),
   ('SOAXGDH12A8C13F8A1', 0.16666666666666666),
   ('SOAAGFH12A8C13D072', 0.16666666666666666)]),
 ('fdf6afb5daefb42774617cf223475c6013969724',
  [('SOJZBHH12AB017F611', 0.1),
   ('SOONUTJ12A6701D7B4', 

In [0]:
#total_variety = user_variety.reduce(lambda x, y: (x[1]+y[1]))

In [0]:
#total_variety

1450933

## Step 3: 
For a given user_id, rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [0]:
song_user_rating.flatMapValues(lambda x: x).take(5)

[('d7083f5e1d50c264277d624340edaaf3dc16095b', ('SOUVUHC12A67020E3B', 0.0)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', ('SOUQERE12A58A75633', 0.0)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', ('SOIPJAX12A8C141A2D', 0.0)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', ('SOEFCDJ12AB0185FA0', 1.0)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b', ('SOATCSU12A8C13393A', 0.0))]

In [0]:
# Assigned given values here.
user_id = '5b7124d06fe4b027b6dff487da9ac236aa4fc3e4'
base_rating = 0.75

#### 1. Recognize candidate users 

1.1 Generate the list of songs favored by the given user -> song_liked. <br>

In [0]:
# First generate song_liked list.
song_liked = song_user_rating.filter(lambda x: x[0] == user_id) \
    .flatMapValues(lambda x: x) \
    .filter(lambda x: x[1][1] > base_rating) \
    .map(lambda x: (x[1][0], 1))

In [0]:
song_liked.take(5)

[('SOVIZNF12AF72A710A', 1),
 ('SOWQXHG12AB0189D1A', 1),
 ('SOMXAXD12A6D223ACF', 1),
 ('SOZQASV12A6D4F87CB', 1),
 ('SOKNWRZ12A8C13BF62', 1)]

In [0]:
# (user, (song_name, rating))
song_user_rating_flat = song_user_rating.flatMapValues(lambda x: x)

1.2 Check for users who also like these songs -> user_candidate. <br>

In [0]:
# Check for candidate users.
# (user, 1)
user_candidate = song_user_rating_flat.filter(lambda x: x[1][1] > base_rating) \
    .map(lambda x: (x[1][0], x[0])) \
    .join(song_liked) \
    .values()

In [0]:
song_user_rating_flat.filter(lambda x: x[1][0] == 'SOTVZIB12A6D4F694A').collect()

[('afccd1857e16791de9770651bd481becb62ffe4a', ('SOTVZIB12A6D4F694A', 0.0)),
 ('3fd3a7503d3046c7bf775d3b7b1244b061212f74', ('SOTVZIB12A6D4F694A', 0.0)),
 ('414a9c24ad9bf51a32b824587c1e39e7d1c07bf1', ('SOTVZIB12A6D4F694A', 1.0)),
 ('8dce84bbdafcb58e86e3dea18f5cb4ee3381965b', ('SOTVZIB12A6D4F694A', 1.0)),
 ('28a79b7391ed7bcb36b209908e4a65be7efc4ce5', ('SOTVZIB12A6D4F694A', 0.0)),
 ('66ec4e0db08961f93373a32f5a0b6a68fbade90e', ('SOTVZIB12A6D4F694A', 0.0)),
 ('190e25d35301f76015c3025f8f2d3f165d23584f', ('SOTVZIB12A6D4F694A', 0.0)),
 ('589c5685f00f8dc967c493a169157a2f8ec121b4', ('SOTVZIB12A6D4F694A', 0.0)),
 ('a291b8274f0b6a2ba47c2bcc5eaab3b9f73e03aa', ('SOTVZIB12A6D4F694A', 0.0)),
 ('f1ebf3fa52d314cd541f49c5f2b04d75bcb87166', ('SOTVZIB12A6D4F694A', 1.0)),
 ('ec49c8faa6cb87159b6ac97d727c00aacce1b0f5', ('SOTVZIB12A6D4F694A', 0.0)),
 ('2c53544a969938a1dbc08d3a962dbef668cb94bc',
  ('SOTVZIB12A6D4F694A', 0.04819277108433735)),
 ('68449f45b075c2856b4841762505f8e9f265ed36', ('SOTVZIB12A6D4F694A', 0

1.3 Find candidate songs listened by candidate users. <br>
Solution to generate a general rating for each song: the weight of each user is assigned by $\frac{raters'\, variety}{total\, variety}$.

In [0]:
song_candidate = user_candidate.join(user_variety)\
    .join(song_user_rating_flat.filter(lambda x: x[1][1] > base_rating)) \
    .map(lambda x: (x[1][1][0], (x[1][1][1], x[1][0][1]))) \
    .groupByKey() \
    .mapValues(lambda x: (list(x),sum([item[1] for item in list(x)]), len(list(x)))) \
    .mapValues(lambda x: sum([item[1]*item[0] for item in x[0]])/x[1]/x[2])

In [0]:
song_candidate.take(5)

[('SOVIZNF12AF72A710A', 0.015299624830461904),
 ('SOWUCHV12A6D4F99A8', 1.0),
 ('SOQIOXQ12AAF3B2A37', 0.5),
 ('SONUKHF12A67AE0395', 1.0),
 ('SOMNXKM12AB01838BB', 1.0)]

In [0]:
# Recommend 5 songs that are rated highest
song_candidate.subtractByKey(song_liked).takeOrdered(5, key=lambda x: -x[1])

[('SOWUCHV12A6D4F99A8', 1.0),
 ('SONUKHF12A67AE0395', 1.0),
 ('SOMNXKM12AB01838BB', 1.0),
 ('SOBONKR12A58A7A7E0', 1.0),
 ('SOPGVVG12A8C13C4FA', 1.0)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [0]:
from sklearn.metrics.pairwise import cosine_similarity

In [0]:
def makeDict(x):
    returnDict = {}
    for tup in x:
        returnDict[tup[0]] =int(tup[1])
    return returnDict

def cosineSimilarity(x):
    list0 = []
    list1 = []
    for song in set(list(x[0].keys())+list(x[1].keys())):
        try:
            list0.append(x[0][song])
        except:
            list0.append(0)
        try:
            list1.append(x[1][song])
        except:
            list1.append(0)
    return cosine_similarity([list0], [list1])[0][0]

In [0]:
# all possible pairwise combinations
user_songs = triplet_rdd.map(lambda x: (x[0],(x[1],x[2]))) \
                .groupByKey().mapValues(list).mapValues(makeDict)
all_user_pairs = user_songs.cartesian(user_songs)

# create subset to find all pairwise combinations in a subset for easier computation
user_songs_subset = sc.parallelize(user_songs.top(1000))
subset_user_pairs = user_songs_subset.cartesian(user_songs_subset)

In [0]:
#compute cosine_similarity across all pairs of users
all_cosine_similarity = all_user_pairs \
                .map(lambda x: ((x[0][0],x[1][0]),(x[0][1],x[1][1]))) \
                .filter(lambda x: x[0][0]!=x[0][1]) \
                .mapValues(cosineSimilarity)

#compute cosine_similarity across all pairs of users [in the subset]
subset_cosine_similarity = subset_user_pairs \
                .map(lambda x: ((x[0][0],x[1][0]),(x[0][1],x[1][1]))) \
                .filter(lambda x: x[0][0]!=x[0][1]) \
                .mapValues(cosineSimilarity)

In [0]:
# Determine top 5 similar users 

# running `all_cosine_similarity` instead of `subset_cosine_similarity` crashes my computer (too many combinations)

#Start with 10, but duplicate entries with x,y and y,x, so then delete duplicates
sc.parallelize(subset_cosine_similarity.top(10, key = lambda x: x[1])) \
                .map(lambda x: (':'.join(sorted(x[0])),x[1])) \
                .reduceByKey(lambda x,y:x).sortBy(lambda x: -x[1]).collect()

In [0]:
# Give recommended songs based on 5 users most cosine_similar to user inputted
input_user  = input("Input User ID: ")

specific_user = user_songs.filter(lambda x : x[0] == input_user)
specific_pairs = specific_user.cartesian(user_songs) \
                .map(lambda x: ((x[0][0],x[1][0]),(x[0][1],x[1][1]))) \
                .filter(lambda x: x[0][0]!=x[0][1]) \
                .mapValues(cosineSimilarity) \
                .filter(lambda x: x[1]>0)

similar_users_rdd = specific_pairs.top(5, key = lambda x: x[1])

similar_users_list = sc.parallelize(similar_users_rdd).map(lambda x: x[0][1]).collect()

recommended_songs = user_songs.filter(lambda x: x[0] in similar_users_list) \
            .reduce(lambda x, y: (x[0]+y[0],set(x[1]) | set(y[1])))[1]

songs_listened = user_songs.filter(lambda x: x[0] == input_user).map(lambda x: set(x[1])).collect()[0]

recommended_songs -= songs_listened
print("Recommended songs:")
print(recommended_songs)