# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Monday, September 21st submitted via Canvas by 11:59 pm</p>

Your homework should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you would be using will be provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - <your\_name>\_<your\_partner_name>.zip

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the higer level idea about the questions. 

In [1]:
### Starter code ####
import findspark
findspark.init('/Users/Gautam/apachespark')
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

In [None]:
#sc.stop()

In [2]:
## Read triplet file into RDD
triplet_rdd = sc.textFile(r"kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [3]:
triplet_rdd.take(5)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1']]

In [4]:
songs_rdd = sc.textFile(r"kaggle_songs.txt") \
    .map(lambda line: line.split(" "))
songs_rdd.take(5)

[['SOAAADD12AB018A9DD', '1'],
 ['SOAAADE12A6D4F80CC', '2'],
 ['SOAAADF12A8C13DF62', '3'],
 ['SOAAADZ12A8C1334FB', '4'],
 ['SOAAAFI12A6D4F9C66', '5']]

In [5]:
#Now we are mapping the songs_rdd dataset columns to a key-value pair.In which key is song Name 
# and value is Song Index.
songs = songs_rdd.map(lambda z: ((z[0]),(z[1])))
songs.take(5)

[('SOAAADD12AB018A9DD', '1'),
 ('SOAAADE12A6D4F80CC', '2'),
 ('SOAAADF12A8C13DF62', '3'),
 ('SOAAADZ12A8C1334FB', '4'),
 ('SOAAAFI12A6D4F9C66', '5')]

In [6]:
#Now we are considering the triplet_rdd and mapping it to form a key-value pair.In which Song Name is the key
# and values are user name ,number of times the song is played. 
#We are storing it in temp. 
temp = triplet_rdd.map(lambda z: ((z[1]),(z[0],z[2])))
temp.take(5)

[('SOBONKR12A58A7A7E0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOEGIYH12A6D4FC0E3', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOFLJQZ12A6D4FADA6', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SOHTKMO12AB01843B0', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1')),
 ('SODQZCY12A6D4F9D11', ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '1'))]

In [7]:
### Now we are joining 'songs'rdd and 'temp' RDD.
### We are rearranging the resultand saved it in final_triplet_rdd.
final = temp.join(songs)
final_triplet_rdd = final.map(lambda z: (z[1][0][0],z[1][1],z[1][0][1]))
final_triplet_rdd.take(10)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', '25150', '1'),
 ('c34670d9c1718361feb93068a853cead3c95b76a', '25150', '1'),
 ('c5006d9f41f68ccccbf5ee29212b6af494110c5e', '25150', '1'),
 ('e4332e11f4df6dd26673bb6b085e9a2bbdc9b8a5', '25150', '2'),
 ('baf2fe5885ab93fbbdb7fecc6691788e70afb6c8', '25150', '4'),
 ('f6e34f0a68d5ea1344511e33486f956de361db78', '25150', '1'),
 ('e326c4b9fe3659ec1dc3af53fd7e0893809dafbc', '25150', '25'),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf', '25150', '5'),
 ('daa9e7e53ae787ab4f1b5518b695198947d821a2', '25150', '1'),
 ('cd4321d8fd42ba44996e7f34c2f6404cf5884696', '25150', '1')]

In [8]:
#For Identifying the number of songs without user history. for which we need to find all the songs which are 
#present in the 'kaggle_songs.txt' and not present in the 'kaggle_visible_evaluation_triplets.txt' 
#we have taken the Song Name column both 'triplet_rdd' and 'songs_rdd' RDD. 

songs_users = triplet_rdd.map(lambda z: z[1])
songs_total = songs_rdd.map(lambda z: z[0])

In [9]:
# As multiple users have listened to the same song.So, we are finding all the unique songs from the 'songs_users'. 
#Then subtracting the unique 'songs_users' form 'songs_total' RDD to obtain total number of songs 
#which doent have any user history.we are using the count function to get the total number of songs
#without user history
 
songs_users=songs_users.distinct()
songs_without_userhistory = songs_total.subtract(songs_users).count()
print("The number of songs without user history: " +str(songs_without_userhistory))

The number of songs without user history: 223007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [10]:
# Firstly we are getting the song name and no of times songs playes from triplet_rdd,then by using reduceByKey
# added the number of times each song is played by all the users and saved it in 'temp'
songs_played = triplet_rdd.map(lambda z: (z[1],int(z[2])))
temp = songs_played.reduceByKey(lambda a, v: a + v)
### We are calculating the total sum of all songs ever played
total = temp.map(lambda z: int(z[1])).reduce(lambda a,v:int(a)+int(v))

In [11]:
#we are calculating the rating by dividing the no of times a song is played with the total
#no of times and saved it in the 'songs_rating' RDD. In order to scale the rating it is multiplies by 100000.
songs_rating = temp.map(lambda z: (z[0],(int(z[1])/total)*100000)).sortBy(lambda z: -z[1])
songs_rating.take(10)

[('SOBONKR12A58A7A7E0', 766.2066370552338),
 ('SOAUWYT12A81C206F1', 717.4861709995373),
 ('SOSXLTC12AF72A7F54', 526.7562506217104),
 ('SOFRQTD12A81C233C0', 420.68706020751074),
 ('SOEGIYH12A6D4FC0E3', 370.1068693045927),
 ('SOAXGDH12A8C13F8A1', 308.7791987613368),
 ('SONYKOW12AB01849C9', 267.97337566009423),
 ('SOVDSJC12A58A7A271', 251.06285437489456),
 ('SOUFTBI12AB0183F65', 233.4170930338167),
 ('SOHTKMO12AB01843B0', 227.38379963410995)]

In [12]:
#Now we are calculating the ratings based on each users songs played history. For that we are calculating the 
#total number of times a user has heard all his songs by taking 'User Name' and 'Number of Times played' columns 
#from the triplet_rdd,applying reduceByKey and sorting it in descending order. Now we obtain total number of times 
#a user has heard all his songs
user_played_songs = triplet_rdd.map(lambda z: (z[0],int(z[2])))
total = user_played_songs.reduceByKey(lambda x, y: int(x)+int(y)).sortBy(lambda x:-x[1])
total.take(10)

[('090b841eaf56d343a26625c2c6d08b823927bc4f', 1305),
 ('938c2632d43eeadb8a83a7cc254d014f9cea6afe', 1267),
 ('1c5aa998482a40abfd020759e7d757eb6c510e72', 1200),
 ('c6150292374fb1dad89982367b3245dd5004c718', 1192),
 ('5a9375e46a7e9b869058c7bc0e820e00d77f3e0b', 1184),
 ('d1d845a92cd34456423e781512bdb502ca385b51', 1180),
 ('957440a77858369fb7a6bcc6fa408fc187d5bd7b', 1150),
 ('315103a41c2ced1143de0c2ba20de224800e6d89', 1148),
 ('22bb29714137fa47083963c30e1a26f1bf517e7d', 1141),
 ('bda891a59a96252cc0f5b1f63f2630692b490e37', 1140)]

In [13]:
#Rearranging the triplet_rdd as a key value pair in 'temp_triplet_rdd'then joining it with 'total' RDD.
#we get 4 columns i.e user name, song name, no of times the songs played and  UsersTotal
temp_triplet_rdd = triplet_rdd.map(lambda z: (z[0],(z[1],z[2])))
temp = temp_triplet_rdd.join(total)
temp.take(10)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOUVUHC12A67020E3B', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOUQERE12A58A75633', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOIPJAX12A8C141A2D', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOEFCDJ12AB0185FA0', '2'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOATCSU12A8C13393A', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOZPZGN12A8C135B45', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOPFVWP12A6D4FC636', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOHEKND12A8AE481D0', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SOPSVVG12A8C13B444', '1'), 17)),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  (('SODSKZZ12AB0188524', '1'), 17))]

In [14]:
#Now we want to obtain ratings per song per user.So, we are dividing no of times each song is played 
#by the total number of times a user has heard his songs. 

rating = temp.map(lambda z: (z[0],z[1][0][0],(int(z[1][0][1])/z[1][1])*100))
rating.take(10)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUQERE12A58A75633',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOIPJAX12A8C141A2D',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOEFCDJ12AB0185FA0',
  11.76470588235294),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOATCSU12A8C13393A',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOZPZGN12A8C135B45',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOPFVWP12A6D4FC636',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOHEKND12A8AE481D0',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOPSVVG12A8C13B444',
  5.88235294117647),
 ('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SODSKZZ12AB0188524',
  5.88235294117647)]

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [15]:
# For example, we are considering the user_id,song_id. 

user_id="f6e34f0a68d5ea1344511e33486f956de361db78"
song_id="SOMDBJK12A6D4F873A"

In [16]:
#We are doing step 2, to get rating as we need recommandation for songs which got higher rating by other users 
#that liked the same song. 'Total' contains total no of songs played by all the users.

songs_played_total = triplet_rdd.map(lambda z: (z[1],int(z[2]))).reduceByKey(lambda a, v: a + v)
total = songs_played_total.map(lambda z: int(z[1])).reduce(lambda a,v:int(a)+int(v))
print("\nUser played total no of songs: ",total,"\n")


User played total no of songs:  4624340 



In [17]:
#Like step 2 , Now we are calculating the rating given by a specific user to a specific song and stored it in result.
#Result contains user name, song, rating columns
result = triplet_rdd.map(lambda x: (x[0],x[1],(int(x[2])/total)*100000))
result.take(5)

[('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOBONKR12A58A7A7E0',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOEGIYH12A6D4FC0E3',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOFLJQZ12A6D4FADA6',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SOHTKMO12AB01843B0',
  0.021624707525830716),
 ('fd50c4007b68a3737fe052d5a4f78ce8aa117f3d',
  'SODQZCY12A6D4F9D11',
  0.021624707525830716)]

In [18]:
#We are filtering  the 'rating' rdd to obtain a row in which Song Name = given Song Name and 
#User Name = given user_id and converting the value into a float number.
x=result.filter(lambda z: (z[0]==user_id and z[1]==song_id)).map(lambda z: z[2])
num=float(x.collect()[0])
print("\nThe song rating for the given User Name and Song Name is: ",num)


The song rating for the given User Name and Song Name is:  0.10812353762915357


In [19]:
#when User Name != given user_id AND Song Name == given song name AND Rating of Songs > Rating given by User we 
#are filtering the records from rating rdd.The resultant records are the users that have rated 
#the same song higher than the given User and re-arranging it.

users=result.filter(lambda z: (z[0]!=user_id and z[1]==song_id and float(z[2])>num)).map(lambda z: (z[0],(z[2])))
users.take(5)

[('f26dd0f6f3831009dc603c0f8a78be5cfdda08d4', 0.1297482451549843),
 ('ac161501199a1b984f8deff58ef592b9ebf67483', 0.8433635935073978),
 ('c87a0f11929453f3fb3c75ee26d24183901602b1', 0.151372952680815),
 ('38184d13862f7466d8318557b921e91720fdef40', 0.1297482451549843),
 ('7c78aa227e6cf1db5dd944225079cb4ed9ce5b44', 0.151372952680815)]

In [20]:
#Now we need to obtain all other songs heard by these Users.We are re-arranging the rating in K,V pairs and
#joining final rdd and similar rdd. we now got all songs heard by the selected Users that like our song
final=result.map(lambda z: (z[0],(z[1],z[2])))
r=final.join(users)
r.take(5)

[('38184d13862f7466d8318557b921e91720fdef40',
  (('SOTIYYT12AC9072CA7', 0.08649883010332286), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SORAOUS12A8C13950A', 0.06487412257749214), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SORHKKL12AC9072CB6', 0.08649883010332286), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SOMDBJK12A6D4F873A', 0.1297482451549843), 0.1297482451549843)),
 ('38184d13862f7466d8318557b921e91720fdef40',
  (('SOTIGPL12AC9072CBB', 0.08649883010332286), 0.1297482451549843))]

In [21]:
#Finally,we need to get the Songs heard by these Users which have rating higher than our Song.
#All the songs with a higher rating than our song rating will be filtered and are mapped to extract 
#only the recommended Song Name and its rating and sorted in Descending order
#by rating in order to obtain the the highest recommendations for given User.

r.filter(lambda z: (z[1][0][1]>z[1][1])).map(lambda z: (z[1][0][0],z[1][0][1])).takeOrdered(5,key=lambda z:-z[1])

[('SOHTKMO12AB01843B0', 1.059610668765705),
 ('SOSUIAK12AB01850CB', 0.6487412257749214),
 ('SOPLDTD12AB0184B2F', 0.4108694429907836),
 ('SOLGPOU12A58A7EA20', 0.36762002793912213),
 ('SOWFGOB12A58A7A7FD', 0.34599532041329145)]

## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [22]:
import pyspark.sql.functions as psf
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Normalizer
from pyspark.sql.functions import desc
from pyspark.sql.types import DoubleType
from pyspark.sql import SparkSession

spark = SparkSession(sc)

triplet_rdd=sc.parallelize(triplet_rdd.take(10000))
triplet_rdd.take(10)

[['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOBONKR12A58A7A7E0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOEGIYH12A6D4FC0E3', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOFLJQZ12A6D4FADA6', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOHTKMO12AB01843B0', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SODQZCY12A6D4F9D11', '1'],
 ['fd50c4007b68a3737fe052d5a4f78ce8aa117f3d', 'SOXLOQG12AF72A2D55', '1'],
 ['d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOUVUHC12A67020E3B', '1'],
 ['d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOUQERE12A58A75633', '1'],
 ['d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOIPJAX12A8C141A2D', '1'],
 ['d7083f5e1d50c264277d624340edaaf3dc16095b', 'SOEFCDJ12AB0185FA0', '2']]

In [23]:
#Step1: Compute cosine similarity between all pairs of users.
#We are multipying the second column with the third that means song-id, no of times the song was played.
#For example if a UserID: 'fdf6afb5daefb42774617cf223475c6013969724' SongID: 'SOYPQYA12A6D4FB8F4’ Times played: ‘3’
#then it will be Column 1: User id,Column 2: list of songs then applying the reduced by key.
#Now all the songs listened by the user are present in a single row in the temp RDD.
X = triplet_rdd.map(lambda z: (z[0],((z[1]+',')*int(z[2]))[:-1]))
temp = X.reduceByKey(lambda a, v: a + "," + v)
temp.take(2)

[('d7083f5e1d50c264277d624340edaaf3dc16095b',
  'SOUVUHC12A67020E3B,SOUQERE12A58A75633,SOIPJAX12A8C141A2D,SOEFCDJ12AB0185FA0,SOEFCDJ12AB0185FA0,SOATCSU12A8C13393A,SOZPZGN12A8C135B45,SOPFVWP12A6D4FC636,SOHEKND12A8AE481D0,SOPSVVG12A8C13B444,SODSKZZ12AB0188524,SONZTNP12A8C1321DF,SOVVLKF12A8C1424F0,SOMLKZO12AB017F4AE,SOACRJG12A8C137A8D,SONJVYU12A8AE44F9E,SOSOUKN12A8C13AB79'),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  'SOFRQTD12A81C233C0,SOZQIUZ12A8C13CFBE,SOKQNYH12A6D4FA5D3,SOQDMED12A67ADE731,SOAXGDH12A8C13F8A1,SOAAGFH12A8C13D072')]

In [24]:
# We are converting the temp RDD into Spark DataFrame by importing toDF, which has 2 columns i.e user id and Songs.
hasattr(temp, "toDF")
Data_frame = temp.toDF(["User_id", "Songs"]).withColumn("Songs", psf.split(psf.regexp_replace("Songs", " ", ""), ','))
Data_frame.head(2)

[Row(User_id='d7083f5e1d50c264277d624340edaaf3dc16095b', Songs=['SOUVUHC12A67020E3B', 'SOUQERE12A58A75633', 'SOIPJAX12A8C141A2D', 'SOEFCDJ12AB0185FA0', 'SOEFCDJ12AB0185FA0', 'SOATCSU12A8C13393A', 'SOZPZGN12A8C135B45', 'SOPFVWP12A6D4FC636', 'SOHEKND12A8AE481D0', 'SOPSVVG12A8C13B444', 'SODSKZZ12AB0188524', 'SONZTNP12A8C1321DF', 'SOVVLKF12A8C1424F0', 'SOMLKZO12AB017F4AE', 'SOACRJG12A8C137A8D', 'SONJVYU12A8AE44F9E', 'SOSOUKN12A8C13AB79']),
 Row(User_id='d68dc6fc25248234590d7668a11e3335534ae4b4', Songs=['SOFRQTD12A81C233C0', 'SOZQIUZ12A8C13CFBE', 'SOKQNYH12A6D4FA5D3', 'SOQDMED12A67ADE731', 'SOAXGDH12A8C13F8A1', 'SOAAGFH12A8C13D072'])]

NOTES:
#Term frequency TF(t,d) is the number of times that term t appears in document d 
#while document frequency DF(t,D) is the number of documents that contains term t.
#When applying HashingTF we need only needs a single pass to the data and stored it in "term_frequency"
#while applying IDF needs two passes: first to compute the IDF vector 
#and second to scale the term frequencies by IDF.

In [25]:
### Next we took the Hashing term frequency (TF) and stored it as ‘tf’ 
### We took the inverse document(IDF) frequency of the DataFrame and stored it as ‘tfidf’.
#Term frequency TF(t,d) is the number of times that term t appears in document d, 
#while document frequency DF(t,D) is the number of documents that contains term t.

hashingTF = HashingTF(inputCol="Songs", outputCol="term_frequency")
term_frequency = hashingTF.transform(Data_frame)
idf = IDF(inputCol="term_frequency", outputCol="feature").fit(term_frequency)
tfidf = idf.transform(term_frequency)
tfidf.head(2)

[Row(User_id='d7083f5e1d50c264277d624340edaaf3dc16095b', Songs=['SOUVUHC12A67020E3B', 'SOUQERE12A58A75633', 'SOIPJAX12A8C141A2D', 'SOEFCDJ12AB0185FA0', 'SOEFCDJ12AB0185FA0', 'SOATCSU12A8C13393A', 'SOZPZGN12A8C135B45', 'SOPFVWP12A6D4FC636', 'SOHEKND12A8AE481D0', 'SOPSVVG12A8C13B444', 'SODSKZZ12AB0188524', 'SONZTNP12A8C1321DF', 'SOVVLKF12A8C1424F0', 'SOMLKZO12AB017F4AE', 'SOACRJG12A8C137A8D', 'SONJVYU12A8AE44F9E', 'SOSOUKN12A8C13AB79'], term_frequency=SparseVector(262144, {5585: 1.0, 33749: 1.0, 50952: 1.0, 52168: 1.0, 53458: 1.0, 63492: 1.0, 77828: 1.0, 86816: 1.0, 94885: 2.0, 103956: 1.0, 112693: 1.0, 170827: 1.0, 214706: 1.0, 229499: 1.0, 230458: 1.0, 234410: 1.0}), feature=SparseVector(262144, {5585: 5.2444, 33749: 5.2444, 50952: 5.2444, 52168: 5.2444, 53458: 4.1458, 63492: 5.2444, 77828: 5.2444, 86816: 5.2444, 94885: 10.4888, 103956: 5.2444, 112693: 4.8389, 170827: 5.2444, 214706: 5.2444, 229499: 5.2444, 230458: 5.2444, 234410: 5.2444})),
 Row(User_id='d68dc6fc25248234590d7668a11e33

In [26]:
# now we are using Normalizer function we are performing L2 normalized for the entire ‘tfidf’ 
# and stored it as ‘normalizer_data’. Normalizer produce a transformed Vector on an RDD[Vector]

normalizer = Normalizer(inputCol="feature", outputCol="normalized")
normalizer_data = normalizer.transform(tfidf)
normalizer_data.head(2)

[Row(User_id='d7083f5e1d50c264277d624340edaaf3dc16095b', Songs=['SOUVUHC12A67020E3B', 'SOUQERE12A58A75633', 'SOIPJAX12A8C141A2D', 'SOEFCDJ12AB0185FA0', 'SOEFCDJ12AB0185FA0', 'SOATCSU12A8C13393A', 'SOZPZGN12A8C135B45', 'SOPFVWP12A6D4FC636', 'SOHEKND12A8AE481D0', 'SOPSVVG12A8C13B444', 'SODSKZZ12AB0188524', 'SONZTNP12A8C1321DF', 'SOVVLKF12A8C1424F0', 'SOMLKZO12AB017F4AE', 'SOACRJG12A8C137A8D', 'SONJVYU12A8AE44F9E', 'SOSOUKN12A8C13AB79'], term_frequency=SparseVector(262144, {5585: 1.0, 33749: 1.0, 50952: 1.0, 52168: 1.0, 53458: 1.0, 63492: 1.0, 77828: 1.0, 86816: 1.0, 94885: 2.0, 103956: 1.0, 112693: 1.0, 170827: 1.0, 214706: 1.0, 229499: 1.0, 230458: 1.0, 234410: 1.0}), feature=SparseVector(262144, {5585: 5.2444, 33749: 5.2444, 50952: 5.2444, 52168: 5.2444, 53458: 4.1458, 63492: 5.2444, 77828: 5.2444, 86816: 5.2444, 94885: 10.4888, 103956: 5.2444, 112693: 4.8389, 170827: 5.2444, 214706: 5.2444, 229499: 5.2444, 230458: 5.2444, 234410: 5.2444}), normalized=SparseVector(262144, {5585: 0.2326

In [27]:
#The cosine similarity of the vectors is the dot product of two L2 normalized TF-IDF vectors, result is stored 
#in ‘final_result’ DataFrame which has three columns User1, User2 and Cosine_Similarity. 

dot_product = psf.udf(lambda a,b: float(a.dot(b)), DoubleType())
temp  = normalizer_data.alias("a").join(normalizer_data.alias("b"), psf.col("a.User_id") < psf.col("b.User_id"))
final_result = temp.select(psf.col("a.User_id").alias("User1"),psf.col("b.User_id").alias("User2"), 
                dot_product("a.normalized", "b.normalized").alias("Cosine_Similarity")).sort(desc("Cosine_Similarity"))

final_result.take(10)

[Row(User1='53f02466e42999daf6ab54b38a532b5435024f22', User2='eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', Cosine_Similarity=0.6664200920551405),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', Cosine_Similarity=0.5739153330964452),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='a344d1f94e0f6d5783860f62d8bc8ba2fec3d530', Cosine_Similarity=0.5545182603991983),
 Row(User1='62eb207ebff66e00a42ab80b06e0e3a2a2c13554', User2='8c9f3dd16c120e68bdd1dd34bfb7c5fd3d6dc487', Cosine_Similarity=0.5543085145384308),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='53f02466e42999daf6ab54b38a532b5435024f22', Cosine_Similarity=0.5426874103603517),
 Row(User1='07260aa795b86382fbddea37b7b8f6f7d5a1db0a', User2='bcb1e6d620cf522390d5c92bae26936928e0b588', Cosine_Similarity=0.5295798772858137),
 Row(User1='3a151b699ce726e1cff56a5bb069aec50efcba45', User2='fbcafcd35212137638b34ce4d78981b994e2aefa', Cosine_Similarity=0.50959088808

In [28]:
# Part 2:Sort the similarity score and print the top-5 similar users. For which we are sorting the Cosine_Similarity 
#final_result and top 5 are displayed. 

resultt = final_result.sort(desc("Cosine_Similarity"))
resultt.take(5)

[Row(User1='53f02466e42999daf6ab54b38a532b5435024f22', User2='eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', Cosine_Similarity=0.6664200920551405),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', Cosine_Similarity=0.5739153330964452),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='a344d1f94e0f6d5783860f62d8bc8ba2fec3d530', Cosine_Similarity=0.5545182603991983),
 Row(User1='62eb207ebff66e00a42ab80b06e0e3a2a2c13554', User2='8c9f3dd16c120e68bdd1dd34bfb7c5fd3d6dc487', Cosine_Similarity=0.5543085145384308),
 Row(User1='00f7c493ee64884998ea98d9f5bed87bc4a0afcf', User2='53f02466e42999daf6ab54b38a532b5435024f22', Cosine_Similarity=0.5426874103603517)]

In [29]:
#Part 3: If the top-5 user set has an user appearing more than once, 
#ignore that pair and take the next best pair from the sorted list.

# Now we are converting the dataframe into rdd and rearranging it into key value pairs
temp = resultt
final_result_rdd = temp.rdd
final_result_rdd = final_result_rdd.map(lambda z: (z[0],(z[1],z[2])))
final_result_rdd.take(5)

[('53f02466e42999daf6ab54b38a532b5435024f22',
  ('eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', 0.6664200920551405)),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  ('eb3bfe7f67fb59d4427cca56932a4d83caaaaa76', 0.5739153330964452)),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  ('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530', 0.5545182603991983)),
 ('62eb207ebff66e00a42ab80b06e0e3a2a2c13554',
  ('8c9f3dd16c120e68bdd1dd34bfb7c5fd3d6dc487', 0.5543085145384308)),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  ('53f02466e42999daf6ab54b38a532b5435024f22', 0.5426874103603517))]

In [30]:
# Applying reduceByKey to final_result_rdd and added all the values. We have selected the first value 
#from the reduceByKey result using map, so that we will be eliminating the duplicate values. 
# Note : User1, (User2,Rating) --> resembles our final_result_rdd , To this we are applying map transform function to swap
# the User1 and User2 column i.e User2, (User1,Rating) 

user_1 = final_result_rdd.reduceByKey(lambda a,v:a+v).map(lambda z: (z[1][0],(z[0],z[1][1])))
user_1.take(5)

[('eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  ('53f02466e42999daf6ab54b38a532b5435024f22', 0.6664200920551405)),
 ('bcb1e6d620cf522390d5c92bae26936928e0b588',
  ('3d28488a659d1cacc598d17074b233985271c65e', 0.304061936030354)),
 ('d8c425c171d9d67dc33240f605f6beed96e5e6ff',
  ('18ce1da0e1017e31baaa5f80afa64ee3c7fab379', 0.29226509591111255)),
 ('ef0d21935a2f8ae90571dbfab800f87fa5b38769',
  ('6e6b836e0a82b8d77865041d83f6b35bcc491f36', 0.22141713324973636)),
 ('8061f61372876878c2d67bc49b3ddbd2c83d69e2',
  ('0c65a060faaf2c3f9a6aed6c9732131709c33d55', 0.20766124966037036))]

In [31]:
# Repeating the same above step and obtaining distinct User2 values. 
# values in user_2 are saved in user_2_list 
user_2 = user_1.reduceByKey(lambda a,v:a+v).map(lambda z: (z[0],z[1][0],z[1][1]))
user_2_list = user_2.map(lambda z:z[0]).take(5)

In [32]:
# Now we are verifying whether our user present in the column User1 is unique from all users in the 
#user_2_list list and sorted by Cosine similarity values
final_result = user_2.filter(lambda z:z[1] not in user_2_list).sortBy(lambda z:-float(z[2]))
# Cosine similarity top 5 pairs.
final_result.take(5)

[('eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  '53f02466e42999daf6ab54b38a532b5435024f22',
  0.6664200920551405),
 ('fbcafcd35212137638b34ce4d78981b994e2aefa',
  '3a151b699ce726e1cff56a5bb069aec50efcba45',
  0.5095908880845609),
 ('9d6aec26a2b0e0f10a58fe2ab64190263c2585ca',
  '7b104f995943ca5f5df3d6b71aba712cd6e81632',
  0.4384438636975086),
 ('c34670d9c1718361feb93068a853cead3c95b76a',
  '49bf7109eba57fa27b82242ba8e058e4f57d71c4',
  0.40097671623245884),
 ('773afdd8317e10701e81008881e78e6966f031d9',
  '42a9daa28f605e4f269711946cdbe0498a172706',
  0.3325825924802534)]

In [33]:
#Part 4 : For a given user_id, identify the top-5 similar users and 
# hence song recommendations from other user's list.

# Now we are converting the dataframe 'final_result' obtained in part 2 in step 4 into rdd and rearranging it.
final = resultt 
final_rdd = final.rdd
final_rdd=final_rdd.map(lambda z: (z[0],z[1],z[2]))
final_rdd.take(5)

[('53f02466e42999daf6ab54b38a532b5435024f22',
  'eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  0.6664200920551405),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  'eb3bfe7f67fb59d4427cca56932a4d83caaaaa76',
  0.5739153330964452),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  'a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  0.5545182603991983),
 ('62eb207ebff66e00a42ab80b06e0e3a2a2c13554',
  '8c9f3dd16c120e68bdd1dd34bfb7c5fd3d6dc487',
  0.5543085145384308),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  '53f02466e42999daf6ab54b38a532b5435024f22',
  0.5426874103603517)]

In [34]:
# We are considering a user_id for instance, then getting all the rows from 'final_RDD' where user id is 
#either User1 or User2.We are storing the result in 'similar_users' and extracting the top 5 users.

user_id="6493c305190b52657d4ea3f4adf367ffcf3427af"

similar_users=final_rdd.filter(lambda z: (z[0]==user_id or z[1]==user_id))
similar_users.take(5)

[('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'a344d1f94e0f6d5783860f62d8bc8ba2fec3d530',
  0.035400549259528534),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'c732f882aa8d6db3bfaf8037d6418f27d3e07fc8',
  0.030611838009092972),
 ('00f7c493ee64884998ea98d9f5bed87bc4a0afcf',
  '6493c305190b52657d4ea3f4adf367ffcf3427af',
  0.025400266694165983),
 ('36a35718d262b62cf00f038d76c4920912501b8a',
  '6493c305190b52657d4ea3f4adf367ffcf3427af',
  0.020937179591105817),
 ('6493c305190b52657d4ea3f4adf367ffcf3427af',
  'a5d92e23cf3f711dfc473f1c3b296492ec02effd',
  0.018188662563497538)]

In [35]:
# If our considered user_id is present in User1 column, then we need to get the User2 column and Cosine Similarity.
# and save the value in 'a'. If our considered user_id is present in User2 column, then we need to get the User1 column and Cosine Similarity.
# and save the value in 'b' then performing union of 'x' and 'y'.

a = similar_users.filter(lambda z: z[0]==user_id).map(lambda z: (z[1],z[2]))
b = similar_users.filter(lambda z: z[1]==user_id).map(lambda z: (z[0],z[2]))
similar = a.union(b).map(lambda z: (z))
similar.take(10)

[('a344d1f94e0f6d5783860f62d8bc8ba2fec3d530', 0.035400549259528534),
 ('c732f882aa8d6db3bfaf8037d6418f27d3e07fc8', 0.030611838009092972),
 ('a5d92e23cf3f711dfc473f1c3b296492ec02effd', 0.018188662563497538),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4', 0.01608824411530434),
 ('bdbf8ddd82fa83ef4538a15298dfca19bfc4a3ca', 0.013944172374947189),
 ('acbc046380b5b7bb4c3bdb4f2acd2bda76f31553', 0.013358874628789546),
 ('c759e740af57c477fe358e62ad7b3b1f2f113a2f', 0.009018871203308977),
 ('c1b38ae42a5c3846a2d7bffb107729c561f57da0', 0.008515212434300503),
 ('c5a705e3d17dfd5d8322ac5ec2a534469794bd7e', 0.00799113658452152),
 ('97507276e3739cc868d2b29d06eb3605cfb9b583', 0.007713955812924308)]

In [36]:
# From Step 3, we are getting the triplet_rdd which has users, songs, Rating and stored in 'Final' and applied 
#map to re-arrange key, value pairs. Now we are joining this with 'similar' to obtain top 5 similar users.

Final = triplet_rdd.map(lambda z: (z[0],z[1],(int(z[2])/total)*10000)).map(lambda z: (z[0],(z[1],z[2])))
result=Final.join(similar)
result.take(10)

[('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOFRQTD12A81C233C0', 0.0021624707525830715), 0.01608824411530434)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOZQIUZ12A8C13CFBE', 0.0021624707525830715), 0.01608824411530434)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOKQNYH12A6D4FA5D3', 0.0021624707525830715), 0.01608824411530434)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOQDMED12A67ADE731', 0.0021624707525830715), 0.01608824411530434)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOAXGDH12A8C13F8A1', 0.0021624707525830715), 0.01608824411530434)),
 ('d68dc6fc25248234590d7668a11e3335534ae4b4',
  (('SOAAGFH12A8C13D072', 0.0021624707525830715), 0.01608824411530434)),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  (('SOCHADN12A6310ED94', 0.010812353762915358), 0.0)),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  (('SOXVVSM12A8C142224', 0.0021624707525830715), 0.0)),
 ('6530c4fc41b9110de5d39fe0355fa103c66385f0',
  (('SOQLUTQ12A8AE48037', 0.0021624

In [37]:
# Applying map transformation to 'result' to get only the songs recommendation from other user's list 
# and Rating and sort it in descending order. Thus we are getting top-5 similar users and recommendations.

top_similar_users=result.map(lambda z: (z[1][0][0],z[1][0][1])).sortBy(lambda z: -z[1])
top_similar_users.take(5)

[('SOJSXJY12A8C13E32E', 0.2703088440728839),
 ('SOTWNDJ12A8C143984', 0.19245989697989332),
 ('SOAKBZX12AB017C912', 0.17083518945406262),
 ('SOFGJCW12AF72A812D', 0.1578603649385642),
 ('SOMUGJA12A67020841', 0.151372952680815)]