In [1]:
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from sklearn.preprocessing import StandardScaler
import warnings
warnings.filterwarnings("ignore")
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import coo_matrix, hstack
from sklearn.metrics.pairwise import linear_kernel
import numpy as np



In [13]:
data = pd.read_csv("/workspaces/IDS706-Final-Project/clean_df/clean_df.csv")

In [16]:
data = data.drop(['Unnamed: 0'],axis = 1)

In [14]:
data = data.dropna(subset=['overview','director','runtime','year']).reset_index(drop=True)

In [24]:
data = data.fillna('')

In [17]:
text_data = ['title','director','actor','overview','genres_list','key','country']
data[text_data] = data[text_data].astype(str)

In [18]:
data["key"] = data["key"].str.encode('ascii', 'ignore').str.decode('ascii')

In [19]:
def to_dummy(col,num = None):
    li = set()
    for i in range(len(data[col])):
        if num is None:
            try:
                num = len(data[col][i].split(','))
            except:
                print(data[col][i])
        for act in data[col][i].split(',')[:num]:
            li.add(act)
    li = list(li)
    for element in li:
        data[element] = data[col].astype(str).str.contains(element, case=False).astype(int)

In [None]:
dum = ['director','actor','genres_list','key','country']
for d in dum:
    if d == 'actor':
        to_dummy(d,num = 4)
    else:
        to_dummy(d)
    print(d)

### sklearn

In [27]:
X = data.drop(['id','title','director','actor', 'overview','genres_list','key','country'], axis=1)

In [29]:
scaler = StandardScaler()
X = scaler.fit_transform(X)

In [30]:
X.shape

(4770, 19228)

In [31]:
tfidf = TfidfVectorizer(stop_words = 'english')  # initialising the TF-IDF Vector object
tfidf_matrix = tfidf.fit_transform(data['overview'])  # Constructing the TF-IDF Matrix (no. of movies x every word in vocabulary)
tfidf_matrix.shape

(4770, 20876)

In [32]:
X = hstack([X,tfidf_matrix]).toarray()

In [33]:
cosine_sim = linear_kernel(X, X)  # Constructing the Cosine Similarity Matrix (no. of movies x no. of movies)
cosine_sim.shape

(4770, 4770)

In [34]:
cosine_sim

array([[ 1.52824867e+04,  1.61380201e+01,  4.54139529e+00, ...,
        -3.05047409e+01, -3.30062328e+01, -3.09524484e+01],
       [ 1.61380201e+01,  1.30060802e+04,  3.91278598e+02, ...,
        -2.15051510e+01, -2.40526719e+01, -3.28775221e+01],
       [ 4.54139529e+00,  3.91278598e+02,  1.21672372e+04, ...,
        -3.15888136e+01, -1.79865360e+01, -2.67401897e+01],
       ...,
       [-3.05047409e+01, -2.15051510e+01, -3.15888136e+01, ...,
         2.05681039e+04, -4.66418446e+00, -1.26694186e+01],
       [-3.30062328e+01, -2.40526719e+01, -1.79865360e+01, ...,
        -4.66418446e+00,  7.96679809e+03,  1.05214226e+01],
       [-3.09524484e+01, -3.28775221e+01, -2.67401897e+01, ...,
        -1.26694186e+01,  1.05214226e+01,  9.39743243e+03]])

In [36]:
with open('cosine_sim.npy', 'wb') as f:
    np.save(f, cosine_sim)

In [37]:
with open('cosine_sim.npy', 'rb') as f:
    cosine_sim = np.load(f)

In [38]:
indices = pd.Series(data.index, index = data['title']).drop_duplicates()

In [39]:
# Function that inputs movie titles and outputs top 10 movies similar to it

def get_recommendations(title, cosine_sim = cosine_sim):
  idx = indices[title]
  
  sim_scores = list(enumerate(cosine_sim[idx]))  # Get the similarity scores of all movies wrt input movie
  sim_scores = sorted(sim_scores, key = lambda x : x[1], reverse = True)
  sim_scores = sim_scores[1:11]
  
  movie_indices = [i[0] for i in sim_scores]
  
  return data['title'].iloc[movie_indices]

In [40]:
get_recommendations('The Dark Knight Rises')

65                     The Dark Knight
119                      Batman Begins
2207                         12 Rounds
186                        Bad Boys II
95                        Interstellar
303                           Catwoman
708     Maze Runner: The Scorch Trials
933                   Shanghai Knights
2721                 Seven Psychopaths
96                           Inception
Name: title, dtype: object

### Use pyspark

In [53]:
spark = SparkSession.builder.getOrCreate()

22/12/13 22:49:54 WARN Utils: Your hostname, codespaces-fdee62 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
22/12/13 22:49:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/13 22:49:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [55]:
data_spark=spark.createDataFrame(data)

In [None]:
from pyspark.ml.feature import Word2Vec
from pyspark.sql.functions import split
from pyspark.sql.functions import lower, col
data_spark = data_spark.withColumn("overview_splitted", split(lower(col("overview")), " "))

In [None]:
word2Vec = Word2Vec(vectorSize=100, minCount=0, maxIter=100, inputCol="overview_splitted", outputCol="features")
model = word2Vec.fit(data_spark)

In [None]:
result = model.transform(data_spark)

In [None]:
result

In [None]:
from pyspark.ml.feature import VectorAssembler
assemble=VectorAssembler(inputCols=[
 'popularity',
 'vote_average',
 'year'], outputCol='feature')
assembled_data=assemble.transform(data_spark)