# <p style="text-align: center;">MIS 285N: Big Data and Distributed Programming</p>
# <p style="text-align: center;">Project - 1 : Apache Spark</p>
## <p style="text-align: center;">Instructor: Dr. Ramesh Yerraballi</p>
## <p style="text-align: center;">Due: Tuesday, September 14th submitted via Canvas by 11:59 pm</p>

Your work should be written in a **Jupyter notebook**.   

Also, please make sure your code runs in your notebook before submitting.

**Note:**

This project is based on Map-Reduce Framework. In these you will get to work with Spark and will get to know how 
does spark work, what functionalities does spark provide, what does map-reduce framework do and why is it useful. 

In this project you will be implementing a basic song recommender system. You will be given a dataset where there are multiple csv files. These csv files have data corresponding to song play count and song information.

The data you will use is provided in a zip file along with this notebook. The __msd.zip__ archive contains:
1. **'kaggle_visible_evaluation_triplets.txt'**. We will be using the visible part of the testing data to understand the working on Apache Spark.  The user's listening history is provided as: (user, song, play count).  
2. In **'kaggle_songs.txt'** file, each song is marked using an index for easier representation of songs.  
3. And **'kaggle_users.txt'** file is the canonical list of user identifiers.
4. Take **'MSDChallengeGettingstarted.pdf'** as your reference.



### **What to turn in?**  

A zip folder which will have:
1. Jupyter Notebook
2. A brief report in PDF format on what features you used for recommendation. And a brief explanation of flow of your code. For example,  what RDD does what or, why it was created.
3. datasets folder with the csv files you are using in your notebook.
4. Notebook should use relative path to the csv files in datasets folder.
5. Name of the zip folder - `<your_name>_<your_partner_name>.zip`

This project consists of 4 questions:  

1. Create an RDD with _kaggle_visible_evaluation_triplets.txt_ and replace the song name with the song index from _kaggle_songs.txt_. Identify the number of songs that do not have any rating. 
2. Generate song ratings based on the song play count as a normalized score between 0 and 1. 
3. Identify the popular song based on this rating and recommend songs to user, given user id based on the algorithm used in Movie recommender system from class. 
4. Using Cosine similarity function, identify pair-wise similarity between each pair of users and generate the top 5 most similar users without an overlap in users. 

The above list is the high-level idea about the questions. 

In [38]:
### Starter code ####
import findspark
import math
findspark.init()
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local[*]").setAppName("Songs")
sc = SparkContext.getOrCreate(conf = conf)
#### These lines are to tell jupyter where to find Apache Spark ####

In [39]:
#sc.stop()

In [40]:
## Read triplet file into RDD
# User, song, playcount
triplet_rdd = sc.textFile(r"msd/kaggle_visible_evaluation_triplets.txt") \
    .map(lambda line: line.split("\t")) 

## Step 1: 
Replace song name with song index and identify the number of songs without user history

In [41]:
from operator import index


f = open('msd/kaggle_songs.txt', 'r')
song_to_index = dict(map(lambda line: line.strip().split(' '), f.readlines())) 
f.close()

indexed_triplet_rdd = triplet_rdd.map(lambda x: (x[0], song_to_index[x[1]], x[2])) # switch songs for song indexes

songs_rdd = indexed_triplet_rdd.map(lambda x: x[1]) # remove user and playcount
songs_rdd = songs_rdd.distinct() # deduplicate songs

all_songs_rdd = sc.parallelize(song_to_index.values()) # create rdd with all songs

no_history_rdd = all_songs_rdd.subtract(songs_rdd) # subtract songs with history from all songs

print(no_history_rdd.take(10))
print(no_history_rdd.count())

                                                                                

['24', '25', '28', '33', '56', '61', '70', '75', '84', '114']
223007


## Step 2:
Generate song ratings based on the play_count. For example, if (song_1, 5; song_2, 10; song_3, 5) i.e., song_1 is played 5 times, song_2 is played 10 times and song_3 is played 5 times, the normalized rating score should be 0.25, 0.5 and 0.25 respectively. 
Similarly, generate the rating for all the songs. You may notice that based on all songs, the rating is almost always very low. So, think of the best way to convert song count to ratings. (Hint: Try generating ratings based on each user's song play history)

In [42]:
dict_rdd = indexed_triplet_rdd.map(lambda x : (x[0], [x[1], int(x[2])]))
grouped_rdd = dict_rdd.groupByKey().map(lambda x : (x[0], list(x[1]))) # These lines get the user and song history in dictionary format

def rating(x): #this function gives a rating for each song for each user specific to that user's history. rating = song plays / total song plays per user
    result = []
    totalPlays = 0 # counts up the total plays per user
    for topIndex in range(len(x)):
        totalPlays += x[topIndex][1]
        x[topIndex].append(0) # add a new value in the list, later to be changed to the rating
    for topIndex in range(len(x)):
        temp = []
        for value in x[topIndex]:
            if x[topIndex].index(value) == 2: # if the index is the newly added '0', then the rating is calculated and added
                newValue = int(x[topIndex][1])/totalPlays
            else:
                newValue = value
            temp.append(newValue)
        result.append(temp)
    return result

ratings_rdd = grouped_rdd.mapValues(lambda x: rating(x))

print(ratings_rdd.take(10))



[('d7083f5e1d50c264277d624340edaaf3dc16095b', [['315822', 1, 0.058823529411764705], ['312787', 1, 0.058823529411764705], ['136848', 1, 0.058823529411764705], ['67457', 2, 0.11764705882352941], ['11841', 1, 0.058823529411764705], ['381063', 1, 0.058823529411764705], ['236368', 1, 0.058823529411764705], ['114601', 1, 0.058823529411764705], ['243463', 1, 0.058823529411764705], ['59733', 1, 0.058823529411764705], ['218288', 1, 0.058823529411764705], ['329557', 1, 0.058823529411764705], ['195315', 1, 0.058823529411764705], ['1640', 1, 0.058823529411764705], ['209058', 1, 0.058823529411764705], ['283892', 1, 0.058823529411764705]]), ('d68dc6fc25248234590d7668a11e3335534ae4b4', [['91177', 1, 0.16666666666666666], ['381270', 1, 0.16666666666666666], ['168118', 1, 0.16666666666666666], ['249250', 1, 0.16666666666666666], ['14397', 1, 0.16666666666666666], ['157', 1, 0.16666666666666666]]), ('fdf6afb5daefb42774617cf223475c6013969724', [['157983', 1, 0.1], ['226229', 1, 0.1], ['217191', 1, 0.1], 

                                                                                

## Step 3: 
For a given user_id (choose one by yourselves), rating, recommend 5 other songs from the list. One way to do this is based on another user who liked the same song liked by this user with rating more than the given rating and recommend the 5 songs based on the matched user's rating. 

In [43]:
user_input = '4b9b549e84fb29bfbcf7ab34f01c0bcd1bbf93a5'

def IDSearch(row):
    if row[0] != user_input:
        return False
    return True

search_rdd = ratings_rdd.filter(IDSearch).values() # create an RDD with only our selected user and song history
sortedsearch = search_rdd.collect()
sortedsearch = sortedsearch[0]
sortedsearch.sort(key=lambda x: x[2], reverse=True) # sort the song history for our selected user so the top reated song is first
topSong = sortedsearch[0][0] #get the top rated song for the user, this will be the rating to match against
topSongRating = sortedsearch[0][2] # get teh rating fro the top rated song
allSongs = []

for topIndex in range(len(sortedsearch)):
    for value in sortedsearch[topIndex]:
        if sortedsearch[topIndex].index(value) == 0:
            allSongs.append(value) # gather all the songs that are in the history of the target user

def compare(x):
    for topIndex in range(len(x)):
        temp = []
        for value in x[topIndex]:
            if x[topIndex].index(value) == 2:
                if value > topSongRating and str(topSong) == str(x[topIndex][x[topIndex].index(value) - 2]):
                    return x # if the song matches and the rating is greater, keep the song history for this user, otherwise the history is dropped

suggestions_rdd = ratings_rdd.mapValues(lambda x: compare(x)) # this line only keeps songs frome users who match
suggestions_rdd = suggestions_rdd.filter(lambda x: x[1] != None) # remove all users who didn't match
suggestions_values_rdd = suggestions_rdd.values() # We only want to see a list of songs, users are removed
suggestions_values_rdd = suggestions_values_rdd.flatMap(lambda x : x) # move all songs to a single list




print(suggestions_values_rdd.take(5))


[['265366', 1, 0.07142857142857142], ['136139', 3, 0.21428571428571427], ['179939', 6, 0.42857142857142855], ['62063', 1, 0.07142857142857142], ['169295', 3, 0.21428571428571427]]


## Step 4: 
1. Compute cosine similarity between all pairs of users. 
2. Sort the similarity score and print the top-5 similar users. 
3. If the top-5 user set has an user appearing more than once, ignore that pair and take the next best pair from the sorted list. 
4. For a given user_id, identify the top-5 similar users and hence song recommendations from other user's list. 

In [44]:
from operator import itemgetter
import copy

songs_list = songs_rdd.collect()

def toDict(x): #this finction was used to change an rdd into dictionary format
    result = []
    for I1 in range(len(x)):
        if I1 == 1:   
            for value in x[I1]:
                new = [x[0], value]
                result.append(new)
    return result

rest_rdd = ratings_rdd.map(lambda x: toDict(x))
new_rating_rdd = rest_rdd.map(lambda x: x[1]) # create an rdd from ratings_rdd that is in dictionary format

user_subset = ratings_rdd.map(lambda x: [x[0], list(map(lambda y: y[0], x[1]))]).take(200) # put the rdd into list format in order to calculate cosine similarity

user_dictionary = {line[0]: line[1] for line in user_subset} # take the new list and put it into dictionary format with user and song history

index_list = list(range(len(user_dictionary))) # create a list of indexes to later create user_index_dictionary

user_index_dictionary = {}
for i in range(len(user_dictionary.keys())):
    user_index_dictionary[list(user_dictionary.keys())[i]]=index_list[i]
user_index_dictionary = dict([(value, key) for key, value in user_index_dictionary.items()]) # see below for user_index_dictionary purpose

only_songs = list(user_dictionary.values()) # create a list of lists for all users song history in the subset
all_songs = [item for sublist in only_songs for item in sublist]
all_songs = list(dict.fromkeys(all_songs)) # create a list of unique songs for our subset of data

# user_dictionary is [user, [all song in history]]
# user_index_dictionary is [user, index of user] used to map back user and song later
# only_songs is each user's song history without user, to be turned into rdd
# all_songs is all availbe songs to be used in cosine operations

def dot(v,w): # dot product function
    """ v_1*w_1 + ... v_n*w_n"""
    return sum(v_i *w_i
              for v_i,w_i in zip(v,w))

def cosine_similarity(v, w): # calculate the cosine similarity
    return dot(v, w) / math.sqrt(dot(v, v) * dot(w, w))

def make_user_interest_vector(user_songs):
    """given a list of interests, produce a vector whose i-th element is 1
    if unique_interests[i] is in the list, 0 otherwise"""
    return [1 if song in user_songs else 0
            for song in all_songs]

user_interest_matrix = list(map(make_user_interest_vector, only_songs)) # put the user song history through the cosine process

user_similarities = [[cosine_similarity(interest_vector_i, interest_vector_j)
                      for interest_vector_j in user_interest_matrix]
                     for interest_vector_i in user_interest_matrix]


def most_similar_users_to(user_id):
    pairs = [[other_user_id, similarity]              # find other
             for other_user_id, similarity in         # users with
                enumerate(user_similarities[user_id]) # nonzero
             if user_id != other_user_id and similarity > 0]  # similarity

    return sorted(pairs,                              # sort them
                  key=lambda pair: pair[1],           # most similar
                  reverse=True)                       # first

all_user_similarity = list(map(most_similar_users_to, index_list)) # create the similarity matrix for all users 

for topIndex in range(len(all_user_similarity)):
    for bottomIndex in range(len(all_user_similarity[topIndex])):
        all_user_similarity[topIndex][bottomIndex].insert(0, topIndex) # inject the index of the user to the similarity matrix for identification

all_user_similarity = [item for sublist in all_user_similarity for item in sublist]
all_user_similarity = sorted(all_user_similarity, key=itemgetter(2), reverse=True) # unnest the list and sort the values from greatest to least

for x in all_user_similarity: # replace the index we injected with the given user ID from the dictionary we created earlier
    x[0] = user_index_dictionary[x[0]]
    x[1] = user_index_dictionary[x[1]]

unreduced_user_similarity = copy.deepcopy(all_user_similarity) # create a copy to answer part 4 of the question below question

all_user_similarity = all_user_similarity[1::2] # since the values are sorted, the inverse pairs of the matrix are next to eachother, we remove every other pair to remove duplication


##### part 1 print statement
print(all_user_similarity) # Cosine Similarity of all Users with the duplicate pairings removed

similarity_pairs = copy.deepcopy(all_user_similarity) # create a copy to answer part 2 of the question

def removeSimilarity(x): # we are only returning the similar pairs, so the cosine similarity gets removed
    for x in similarity_pairs:
        del x[2]
        
removeSimilarity(similarity_pairs)

def printFive(x): # print the top 5 similar users without duplicates
    counter = 0 # counter to only print 5
    checker =[] # collection of printed values to ignore duplicates
    for index in range(len(x)):
        if (counter < 5) and (x[index][0] not in checker) and (x[index][1] not in checker): # statement to ensure no duplicates and correct amount are printed
            counter += 1
            print(x[index])
            checker.append(x[index][0])
            checker.append(x[index][1])

#### part 2 and 3 print statement
printFive(similarity_pairs) # Print the top 5 most similar users without repeats

given_user_id = '4fbde29be137e7179046c148f9e15db17c3278be' # input user for part 4 of this question

def similarRecs(list, id): # function to print recommened songs for a specific user
    result = [] # to collect matching user pairs
    result2 = [] # to collect the songs of the matched pair
    counter = 0 # to ensure we are only getting 5
    for x in list:
        if (x[0] == id) and (counter < 5):
            counter += 1
            result.append(x)
    for x in result:
        result2.append(user_dictionary[x[1]])
    result2 = [item for sublist in result2 for item in sublist] # these lines are to adjust the format of the songs collected
    result2 = set(result2)
    return result2

#### part 4 print statement
print(similarRecs(unreduced_user_similarity, given_user_id)) # this will print all the recommended songs from the top 5 most similar users 



[['07046c62a9dcf4eebd2c979a9847b12fc624d23a', 'c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 0.3651483716701107], ['c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 'dd87b5cc43b72578298a6ffadfe8155c567f31b8', 0.3380617018914066], ['92c8f48344247891247a8a24192f19676074d8ea', 'b923b4fe22f3891e10d2d7c8614b74c404e06885', 0.3321819194149599], ['5495dccfae0b78a23c60c95db39e1e1d6e99ba6d', 'dd87b5cc43b72578298a6ffadfe8155c567f31b8', 0.30304576336566325], ['07046c62a9dcf4eebd2c979a9847b12fc624d23a', 'eead1f1e3ad91575346f9ce826ddaea19bff80ed', 0.2581988897471611], ['bc5369909007708d3cc7d62d0e2819b1ced29c5e', '23d7cde4945a1f4fc62f2b78b31502aa6e59ede8', 0.2519763153394848], ['23d7cde4945a1f4fc62f2b78b31502aa6e59ede8', '6530c4fc41b9110de5d39fe0355fa103c66385f0', 0.25], ['cbf111c324aa93ddb602dd9ea26b4ffcf5b0279d', 'c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 0.24806946917841693], ['5495dccfae0b78a23c60c95db39e1e1d6e99ba6d', 'c1d24ce8cd80e40aa8d803d5ddfceb91a6b5d75d', 0.23904572186687872], ['07046c62a9dcf4eebd

22/09/13 20:17:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 634507 ms exceeds timeout 120000 ms
22/09/13 20:17:36 WARN SparkContext: Killing executors is not supported by current scheduler.
