In [None]:
import sys
from collections import defaultdict
from itertools import combinations
import numpy as np
from pyspark import SparkContext

def user_pairs(movie_id,users_with_rating_for_movie):
  
    #For each item, find all user-user pairs combos. (i.e. users with the same item) 
    for u1,u2 in combinations(users_with_rating_for_movie,2):
        return (u1[0],u2[0]),(u1[1],u2[1])

def cos_similarity(u_pairs,r_pairs):
   
    ##For each user pair find cosine similarity
    xx, xy, yy,  n = (0.0, 0.0, 0.0,  0)
    
    for r_pair in r_pairs:
        xx += np.float(r_pair[0]) * np.float(r_pair[0])
        yy += np.float(r_pair[1]) * np.float(r_pair[1])
        xy += np.float(r_pair[0]) * np.float(r_pair[1])
        n += 1

    cos = cosine_eva(xy,np.sqrt(xx),np.sqrt(yy))
    return u_pairs, (cos,n)

def cosine_eva(product,rating1,rating2):
    
    #The cosine between two vectors A, B dotProduct(A, B) / (norm(A) * norm(B))
    
    num = product
    den = rating1 * rating2

    return (num / (float(den))) if den else 0.0

def user_p(u_pair,movie_sim_data):

    #For each user-user pair, make the first user's id the key
    
    (user1_id,user2_id) = u_pair
    return user1_id,(user2_id,movie_sim_data)

def NN(u,u_sims,n):
    
    #Sort the predictions list by similarity and select the top-N neighbors
    u_sims.sort(key=lambda x: x[1][0],reverse=True)
    return u, u_sims[:n]

def TopTenMovieRecommendation(u_id,u_sims,users_with_rating,n):
    
    #Calculate the top-N movie recommendations for each user using the weighted sums method
    # initialize dicts to store the score of each individual movie,
  
    totals = defaultdict(int)
    sim_sums = defaultdict(int)

    for (neighbor,(sim,count)) in u_sims:

        # lookup the movie predictions for this neighbor
        unscored_movies = users_with_rating.get(neighbor,None)

        if unscored_movies:
            for (movie,rating) in unscored_movies:
                if neighbor != movie:

                    # update totals and sim_sums with the rating data
                    totals[neighbor] += sim * rating
                    sim_sums[neighbor] += sim

    # create the normalized list of scored movies 
    scored_items = [(total/sim_sums[item],item) for item,total in totals.items()]

    # sort the scored movies in ascending order
    scored_items.sort(reverse=True)

    # take out the movies score
    ranked_items = [x[1] for x in scored_items]

    return u_id,ranked_items[:n]

if __name__ == "__main__":

    #sc = SparkContext(sys.argv[1],"PythonUserItemCF")
    #lines = sc.textFile(sys.argv[2])
    
    #taking the input ratings data file
    complete_ratings_raw_data = sc.textFile("/Users/ashokvardhan/Desktop/ml-latest-small/ratings.csv")
    
    #Separating the header
    complete_ratings_raw_data_header = complete_ratings_raw_data.take(1)[0]

    # Removing the header from the dataset
    complete_ratings_data_no_header = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)
    
    #spliting the lines using separater ","
    complete_ratings_data_split=complete_ratings_data_no_header.map(lambda line: line.split(","))
    
    #Taking the input as user->movieid,ratings
    complete_ratings_data=complete_ratings_data_split.map(lambda tokens: (int(tokens[1]),[int(tokens[0]),float(tokens[2])])).cache()
    
    #taking the movies data set
    movies_raw_data = sc.textFile("/Users/ashokvardhan/Desktop/ml-latest-small/movies.csv")
    
    #taking the header for the movies data set
    movies_raw_data_header = movies_raw_data.take(1)[0]
    
    #Removing the header
    movies_data_without_header = movies_raw_data.filter(lambda l: l!=movies_raw_data_header)
    
    #Splitting the input movies data set
    movies_data_split=movies_data_without_header.map(lambda line: line.split(","))
    
    #Taking the moviesid and movie title
    movies_data=movies_data_split.map(lambda x: (x[0],x[1])).cache()
    
    movies_broadcast = sc.broadcast({
k: v for (k, v) in movies_data.collect()
    })
    
    ##Combing all the users and rating for all movies
    movie_user_pairs = complete_ratings_data.groupByKey()

    #Getting all the users pairs for each movies and there respective ratings
    #Movies with more than 1 users
    pairwise_users_more = movie_user_pairs.filter(lambda p: len(p[1]) > 1)
    
    #User pairs
    pairwise_users=pairwise_users_more.map(lambda p: user_pairs(p[0],p[1])).groupByKey()

    ##calculating the cosine similarities for the users
    user_similarity_cosine = pairwise_users.map(lambda p: cos_similarity(p[0],p[1]))
    user_similarity_pairs=user_similarity_cosine.map(lambda p: user_p(p[0],p[1])).groupByKey()
    user_similarity=user_similarity_pairs.map(lambda p: NN(p[0],list(p[1]),50))


    ##Calculating the movie history for each user
    user_item_hist = complete_ratings_raw_data.filter(lambda line: line!=complete_ratings_raw_data_header)\
        .map(lambda line: line.split(",")).map(lambda tokens: (int(tokens[0]),[int(tokens[1]),float(tokens[2])])).groupByKey().collect()

    
    ui_dict = {}
    for (user,items) in user_item_hist: 
        ui_dict[user] = items

    uib = sc.broadcast(ui_dict)

    #Calculating the top 10 Movie recommendations for each user
    
    user_item_recs = user_similarity.map(lambda p: TopTenMovieRecommendation(p[0],p[1],uib.value,10)).collect()
    
    for i in list(range(10, 20)):
        a=[]
        for j in user_item_recs[i][1]:
            try:
                a.append(movies_broadcast.value[str(j)] )
            except:
                h=1
        print("User Id=",user_item_recs[i][0],"Movie List=",a,"\n")