Import libraries

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 53.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=7077b686a15d895a49c499da30b08617bdb3a85e50cca4a913aae5aad3b01582
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [None]:
import pandas as pd
import numpy as np
import time
import pyspark
from pyspark import SparkConf, SparkContext
from sklearn.feature_extraction.text import CountVectorizer
import math
from sklearn.feature_extraction.text import TfidfVectorizer
import warnings
warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive',force_remount=True)

Mounted at /content/drive


Read file

In [None]:
df = pd.read_csv('movies.csv',converters={i: str for i in range(7668)})

Add Movie_id columns

In [None]:
df['Movie_id'] = [i for i in range(df.shape[0])]

Transform all important feature into a string

In [None]:
def transform_important_features(data):
  important_features = []
  for i in range(data.shape[0]):
    important_features.append(data['genre'][i] + ' ' + data['year'][i] + ' ' + data['director'][i] + ' ' + data['writer'][i] + ' ' + data['star'][i])
  return important_features

Function to calculate Eclidean distance

In [None]:
def euclidean_dis(a,b):
  res = 0
  for i in range(len(a)):
    res += (a[i] - b[i])**2
  return math.sqrt(res)

In [None]:
def Recommendation_System_Content_Based_Parallelized(data,movie_name,expectation):

#create SparkContext
  conf = SparkConf().setMaster("local").setAppName("content-based-recommendation-system").set("spark.executor.memory", "15g")
  sc = SparkContext.getOrCreate(conf=conf)

#Reduce dimension and keep important features
  selected_columns = ['genre','year','director','writer','star']
  selected_df = data[selected_columns]

#create columns containing important feartures
  selected_df['important_features'] = transform_important_features(data)

#get movie id by movie_name
  movie_id = data[data.name == movie_name]['Movie_id'].values[0 ]

#standardize the columns containing important features string by TfidVectorizer
  ndarray_standardize_vectors = TfidfVectorizer().fit_transform(selected_df['important_features']).toarray()
  #print(ndarray_standardize_vectors[:10])
#using RDD to parallelize standardized vectors
  vector_rdd = sc.parallelize(ndarray_standardize_vectors)

#calculate distance from the vector of movie_name was typed to the rest 
  distance = vector_rdd.map(lambda x : euclidean_dis(x,ndarray_standardize_vectors[movie_id])).zipWithIndex().filter(lambda x : x[0] > 0.0).sortByKey(ascending=True)

#take n = expectation shorest distance
  movie_ids = distance.values().take(expectation)

#append into a new list
  recommended_movies = []
  for id in movie_ids:
    recommended_movies.append(data[data.Movie_id == id]['name'].values[0])

#return recommended movie list
  return recommended_movies

testing

In [None]:
start = time.time()
print(Recommendation_System_Content_Based_Parallelized(df,"Serial",10))
print("Execute time: ", time.time() - start)

['Comfort and Joy', 'A Thin Line Between Love and Hate', 'My Bodyguard', 'Three Amigos!', 'L.A. Story', 'The Long Riders', 'Being Human', 'Loving Couples', 'Local Hero', 'Hero at Large']
Execute time:  20.89415669441223


In [None]:
start = time.time()
print(Recommendation_System_Content_Based_Parallelized(df,"Venom",10))
print("Execute time: ", time.time() - start)

['The Fiendish Plot of Dr. Fu Manchu', 'A Summer Story', 'The Four Seasons', "King Solomon's Mines", 'Allan Quatermain and the Lost City of Gold', 'Alive', 'Cat People', 'Exposed', 'DuckTales the Movie: Treasure of the Lost Lamp', 'Come See the Paradise']
Execute time:  12.797738313674927
