<a href="https://colab.research.google.com/github/siddharth1608/datascience/blob/master/recommender_systems/item_based/collaborative_filtering_using_RDDs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import sys
import pprint
np.seed = 24
seed = 24

pp = pprint.PrettyPrinter(depth=4)


In [2]:
# Spark Way

## Uncomment below line to install pyspark
!pip install pyspark
from pyspark.sql import SparkSession



Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/ef/88/8e5f4cfb99a4186b4b7f06aa1294353e0be6b05b25802a82f3d16cb30b79/pyspark-2.4.2.tar.gz (193.9MB)
[K     |████████████████████████████████| 193.9MB 52kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 44.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/dc/0e/02/e9fdf0bf3ad20284175307d4ab31afcf967604f25f3b4f1d96
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.2


In [0]:
spark = SparkSession \
    .builder \
    .appName("Collaborative filtering - Item based") \
    .getOrCreate()

sc = spark.sparkContext

In [0]:
movieNames = pd.read_csv('u.item',sep="|", encoding='latin', header=None, index_col=0, usecols=[0,1], names=['id','name'])
movieNamesDict = movieNames.to_dict()
movieNamesDict['name'][51]

In [0]:
def deriveMovieNames(x):
  movieId1 = x[1][0]
  movieId2 = x[1][1]
  
  return (x[0], movieNamesDict['name'][movieId1], movieNamesDict['name'][movieId2])

In [0]:
reviews_raw = sc.textFile('u.data')

Next we map our data file into something with user_id as key and (movie, rating) pair as a value

In [0]:
reviews = reviews_raw.map(lambda x: x.split()).map(lambda m: (m[0],(int(m[1]),float(m[2]))))

In [8]:
#reviews.take(2)

[('196', (242, 3.0)), ('186', (302, 3.0))]

In [0]:
reviews_pairs_by_user = reviews.join(reviews)
reviews_pairs_by_user = reviews_pairs_by_user.cache()

In [10]:
reviews_pairs_by_user.take(2)

[('244', ((51, 2.0), (51, 2.0))), ('244', ((51, 2.0), (815, 4.0)))]

In [0]:
def deDuplicate( item ):
  (user, ratings) = item
  movie1 = ratings[0][0]
  movie2 = ratings[1][0]
  return movie1 < movie2

unique_reviews_pairs_by_user = reviews_pairs_by_user.filter(deDuplicate)
unique_reviews_pairs_by_user = unique_reviews_pairs_by_user.cache()

In [18]:
unique_reviews_pairs_by_user.take(2)

[('244', ((51, 2.0), (815, 4.0))), ('244', ((51, 2.0), (756, 2.0)))]

In [0]:
def makeMoviePairAsKey( item ):
  (user, ratings) = item
  movie1 = ratings[0][0]
  movie2 = ratings[1][0]
  rating1 = ratings[0][1]
  rating2 = ratings[1][1]
  
  
  
  
  return ((movie1, movie2), (rating1, rating2))

reviews_by_movies_pairs = unique_reviews_pairs_by_user.map(makeMoviePairAsKey)
reviews_by_movies_pairs = reviews_by_movies_pairs.cache()

In [29]:
reviews_by_movies_pairs.take(2)

[((51, 815), (2.0, 4.0)), ((51, 756), (2.0, 2.0))]

In [0]:
reviews_grpd_by_movies_pairs = reviews_by_movies_pairs.groupByKey()
reviews_grpd_by_movies_pairs = reviews_grpd_by_movies_pairs.cache()

In [32]:
reviews_grpd_by_movies_pairs.take(1)

[((51, 815), <pyspark.resultiterable.ResultIterable at 0x7f57da6eeac8>)]

In [0]:
import math
def cosine_similarity(ratingPairs):
  
  sumX2 = 0
  sumY2 = 0
  sumXY = 0
  pairsCount = 0
  for x, y in ratingPairs:
    sumX2 += x * x
    sumY2 += y * y
    sumXY += x * y
    pairsCount += 1
  
  return ( sumXY / (math.sqrt(sumX2) * math.sqrt(sumY2) ),  pairsCount)

In [0]:
moviePairSimilarities = reviews_grpd_by_movies_pairs.mapValues(cosine_similarity).cache()

In [0]:
# Save the results if desired 
# moviePairSimilarities.sortByKey() 
# moviePairSimilarities.saveAsTextFile("movie-sims") 

Find the most similar movies for our movie

In [0]:
movie = 60
similarityThreshold = 0.90
coOccurenceThreshold = 20

In [0]:
results = moviePairSimilarities.filter(lambda x: ((x[0][0] == movie) or (x[0][1] == movie)) \
                            and (x[1][0] > similarityThreshold)  and (x[1][1] > coOccurenceThreshold ) 
                            )

In [119]:
pp.pprint(results.map(lambda x: (x[1], x[0]) ).sortByKey(ascending=False).map(deriveMovieNames).take(10))

[((0.9869656221961467, 55),
  'Three Colors: Red (1994)',
  'Three Colors: Blue (1993)'),
 ((0.985960931836453, 31), 'Three Colors: Blue (1993)', 'Henry V (1989)'),
 ((0.982611776170406, 45),
  'Three Colors: Blue (1993)',
  'Three Colors: White (1994)'),
 ((0.9797996983004086, 22),
  'Three Colors: Blue (1993)',
  'Manon of the Spring (Manon des sources) (1986)'),
 ((0.9791633088404823, 21),
  'Three Colors: Blue (1993)',
  'Jean de Florette (1986)'),
 ((0.9782308349399984, 39),
  'Three Colors: Blue (1993)',
  'Nikita (La Femme Nikita) (1990)'),
 ((0.9774049313180511, 22),
  'Three Colors: Blue (1993)',
  'Sex, Lies, and Videotape (1989)'),
 ((0.976513770199478, 28), 'Three Colors: Blue (1993)', 'Big Night (1996)'),
 ((0.9752943944662508, 29), 'Three Colors: Blue (1993)', 'Chinatown (1974)'),
 ((0.9744415217567001, 24), 'Three Colors: Blue (1993)', 'True Romance (1993)')]


'Legends of the Fall (1994)'