In [None]:
# for similarity need to download larger english word library with the below in gitbash 
# python -m spacy download en_core_web_lg

In [1]:
import numpy as np
import pandas as pd
import spacy
import pyspark
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

In [3]:
file = pd.read_csv("master_split_100.csv")

In [4]:
file = pd.DataFrame(file)
file.head()

Unnamed: 0,Poster,imdb_id,adult,belongs_to_collection,budget,genres,homepage,id,original_language,original_title,...,release_date,revenue,runtime,spoken_languages,status,tagline,title,video,vote_average,vote_count
0,https://images-na.ssl-images-amazon.com/images...,tt6029122,False,,0,"[{'id': 27, 'name': 'Horror'}]",https://www.facebook.com/Siccin3/,414827,tr,Siccin 3: C√ºrm√º Ask,...,2016-09-02,0,87.0,"[{'iso_639_1': 'tr', 'name': 'T√ºrk√ße'}]",Released,,Siccin 3: C√ºrm√º Ask,False,10.0,2
1,,tt6032170,False,,0,"[{'id': 99, 'name': 'Documentary'}]",http://www.americananarchistmovie.com/,413049,en,American Anarchist,...,2016-09-02,0,80.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,,American Anarchist,False,6.7,7
2,https://images-na.ssl-images-amazon.com/images...,tt3903852,False,,4000000,"[{'id': 28, 'name': 'Action'}]",,403232,en,Vigilante Diaries,...,2016-09-01,0,107.0,"[{'iso_639_1': 'fr', 'name': 'Fran√ßais'}, {'i...",Released,Crime Never Stops. Neither Do They.,Vigilante Diaries,False,4.7,27
3,https://images-na.ssl-images-amazon.com/images...,tt3991412,False,,0,"[{'id': 14, 'name': 'Fantasy'}, {'id': 53, 'na...",http://www.louis-drax-movie.com/,294795,en,The 9th Life of Louis Drax,...,2016-09-01,0,108.0,"[{'iso_639_1': 'en', 'name': 'English'}]",Released,A mystery beyond reality.,The 9th Life of Louis Drax,False,6.5,128
4,,tt4102722,False,,0,"[{'id': 35, 'name': 'Comedy'}, {'id': 27, 'nam...",https://www.facebook.com/TNWMovie,398798,en,The Night Watchmen,...,2016-09-01,0,90.0,[],Released,Some men were born heroes... It wasn't these g...,The Night Watchmen,False,4.4,19


In [5]:
file.count()

Poster                   4724
imdb_id                  6001
adult                    6001
belongs_to_collection     451
budget                   6001
genres                   6001
homepage                 2005
id                       6001
original_language        5998
original_title           6001
overview                 5852
popularity               6001
poster_path              5964
production_companies     6001
production_countries     6001
release_date             6001
revenue                  6001
runtime                  5947
spoken_languages         6001
status                   6001
tagline                  2493
title                    6001
video                    6001
vote_average             6001
vote_count               6001
dtype: int64

In [6]:
file.drop_duplicates(keep='first', inplace=True)


In [7]:
file.count()

Poster                   4723
imdb_id                  5999
adult                    5999
belongs_to_collection     451
budget                   5999
genres                   5999
homepage                 2005
id                       5999
original_language        5996
original_title           5999
overview                 5850
popularity               5999
poster_path              5962
production_companies     5999
production_countries     5999
release_date             5999
revenue                  5999
runtime                  5945
spoken_languages         5999
status                   5999
tagline                  2493
title                    5999
video                    5999
vote_average             5999
vote_count               5999
dtype: int64

In [8]:
overview_df2 = file[["imdb_id","title", "overview"]].copy()
overview_df2.dropna(inplace=True)
overview_df2.count()

imdb_id     5850
title       5850
overview    5850
dtype: int64

In [9]:
overview_df2.isna().any()

imdb_id     False
title       False
overview    False
dtype: bool

In [10]:
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [11]:
sentenceData = spark.createDataFrame(overview_df2.astype(str))
sentenceData.show() 

+---------+--------------------+--------------------+
|  imdb_id|               title|            overview|
+---------+--------------------+--------------------+
|tt6029122|Siccin 3: C√ºrm√º...|After a terrible ...|
|tt6032170|  American Anarchist|The story of one ...|
|tt3903852|   Vigilante Diaries|The Vigilante Dia...|
|tt3991412|The 9th Life of L...|A psychologist wh...|
|tt4102722|  The Night Watchmen|Three inept night...|
|tt4424228|              Mahana|It is the 1960s. ...|
|tt4520364|              Morgan|A corporate risk-...|
|tt4650738|Much Ado About No...|After a night of ...|
|tt5221584|            Aquarius|Clara, a 65-year-...|
|tt5722234|       Chasing Great|Chasing Great is ...|
|tt6001712|       Teli and Toli|This down-to-eart...|
|tt6798722|Ashley Madison: S...|This film details...|
|tt4551314|             Odd Job|An unemployed fac...|
|tt4730986|             Divines|In a ghetto where...|
|tt4795546|           Nocturama|The new film by B...|
|tt4895668|        Saint Geo

In [12]:
tokenizer = Tokenizer(inputCol="overview", outputCol="words")
tokenizer

Tokenizer_b9f958511dc3

In [13]:
# Transform and show DataFrame
tokenized = tokenizer.transform(sentenceData)
tokenized_2 = tokenizer.transform(sentenceData)
tokenized.show(truncate=False)

+---------+-------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [14]:
# Instantiate Remover
remover = StopWordsRemover(inputCol="words", outputCol="nostopwords")

In [15]:
# Transform and show data
tokenized = remover.transform(tokenized)
tokenized_2 = remover.transform(tokenized_2)
tokenized.show(truncate=True)

+---------+--------------------+--------------------+--------------------+--------------------+
|  imdb_id|               title|            overview|               words|         nostopwords|
+---------+--------------------+--------------------+--------------------+--------------------+
|tt6029122|Siccin 3: C√ºrm√º...|After a terrible ...|[after, a, terrib...|[terrible, car, a...|
|tt6032170|  American Anarchist|The story of one ...|[the, story, of, ...|[story, one, infa...|
|tt3903852|   Vigilante Diaries|The Vigilante Dia...|[the, vigilante, ...|[vigilante, diari...|
|tt3991412|The 9th Life of L...|A psychologist wh...|[a, psychologist,...|[psychologist, be...|
|tt4102722|  The Night Watchmen|Three inept night...|[three, inept, ni...|[three, inept, ni...|
|tt4424228|              Mahana|It is the 1960s. ...|[it, is, the, 196...|[1960s., two, mao...|
|tt4520364|              Morgan|A corporate risk-...|[a, corporate, ri...|[corporate, risk-...|
|tt4650738|Much Ado About No...|After a 

In [16]:
tokenized_2.show(truncate=True)

+---------+--------------------+--------------------+--------------------+--------------------+
|  imdb_id|               title|            overview|               words|         nostopwords|
+---------+--------------------+--------------------+--------------------+--------------------+
|tt6029122|Siccin 3: C√ºrm√º...|After a terrible ...|[after, a, terrib...|[terrible, car, a...|
|tt6032170|  American Anarchist|The story of one ...|[the, story, of, ...|[story, one, infa...|
|tt3903852|   Vigilante Diaries|The Vigilante Dia...|[the, vigilante, ...|[vigilante, diari...|
|tt3991412|The 9th Life of L...|A psychologist wh...|[a, psychologist,...|[psychologist, be...|
|tt4102722|  The Night Watchmen|Three inept night...|[three, inept, ni...|[three, inept, ni...|
|tt4424228|              Mahana|It is the 1960s. ...|[it, is, the, 196...|[1960s., two, mao...|
|tt4520364|              Morgan|A corporate risk-...|[a, corporate, ri...|[corporate, risk-...|
|tt4650738|Much Ado About No...|After a 

# Starting Spacy Similarity code

In [17]:
# Load English tokenizer, tagger, parser, NER and word vectors
#MAKE SURE THIS IS LARGE LIBRARY

nlp = spacy.load("en_core_web_lg")



## adding columns for similarities

In [18]:
from pyspark.sql.functions import lit

In [19]:
tokenized.show()

+---------+--------------------+--------------------+--------------------+--------------------+
|  imdb_id|               title|            overview|               words|         nostopwords|
+---------+--------------------+--------------------+--------------------+--------------------+
|tt6029122|Siccin 3: C√ºrm√º...|After a terrible ...|[after, a, terrib...|[terrible, car, a...|
|tt6032170|  American Anarchist|The story of one ...|[the, story, of, ...|[story, one, infa...|
|tt3903852|   Vigilante Diaries|The Vigilante Dia...|[the, vigilante, ...|[vigilante, diari...|
|tt3991412|The 9th Life of L...|A psychologist wh...|[a, psychologist,...|[psychologist, be...|
|tt4102722|  The Night Watchmen|Three inept night...|[three, inept, ni...|[three, inept, ni...|
|tt4424228|              Mahana|It is the 1960s. ...|[it, is, the, 196...|[1960s., two, mao...|
|tt4520364|              Morgan|A corporate risk-...|[a, corporate, ri...|[corporate, risk-...|
|tt4650738|Much Ado About No...|After a 

In [20]:
tokenized_2.show()

+---------+--------------------+--------------------+--------------------+--------------------+
|  imdb_id|               title|            overview|               words|         nostopwords|
+---------+--------------------+--------------------+--------------------+--------------------+
|tt6029122|Siccin 3: C√ºrm√º...|After a terrible ...|[after, a, terrib...|[terrible, car, a...|
|tt6032170|  American Anarchist|The story of one ...|[the, story, of, ...|[story, one, infa...|
|tt3903852|   Vigilante Diaries|The Vigilante Dia...|[the, vigilante, ...|[vigilante, diari...|
|tt3991412|The 9th Life of L...|A psychologist wh...|[a, psychologist,...|[psychologist, be...|
|tt4102722|  The Night Watchmen|Three inept night...|[three, inept, ni...|[three, inept, ni...|
|tt4424228|              Mahana|It is the 1960s. ...|[it, is, the, 196...|[1960s., two, mao...|
|tt4520364|              Morgan|A corporate risk-...|[a, corporate, ri...|[corporate, risk-...|
|tt4650738|Much Ado About No...|After a 

In [21]:
#Code to compare each movie to the all movie and recommend top 3
ll = []
result = []
for m in tokenized_2.collect():
    main_row = str(m.nostopwords)
    docA = nlp(main_row)
    for f in tokenized.collect():

        overview_row = str(f.nostopwords)
        #print(overview_row)
        doc_row = nlp(overview_row)

        compare = docA.similarity(doc_row)
        ll.append({'imdb_id': f.imdb_id,'Title': f.title, 'Similarity': compare })
# sample = tokenized.withColumn('similarity', lit(str(ll)))
# ll   
    df_simi = pd.DataFrame(ll)
    df_simi.sort_values(by=['Similarity'], inplace=True, ascending=False)
    df_simi.reset_index(drop=True)
    top3_df = df_simi.iloc[1:4]
    result.append({'imdb_id': m.imdb_id,'Title': m.title, 'Recom1': top3_df.iloc[0,2],
                  'Recom2': top3_df.iloc[1,2], 'Recom3': top3_df.iloc[2,2]})
    print(result)
#     top3_df

Py4JJavaError: An error occurred while calling o111.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost, executor driver): TaskResultLost (result lost from block manager)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)


# Importing recommandation to csv

In [None]:
df_recomm = pd.DataFrame(result)
df_recomm.count()

In [None]:
#Use this code for the first time to create the csv file
# df_recomm.to_csv('/Users/ericH/Desktop/moviesDashboard/raw/recom.csv', index = None, header=True)

In [None]:
#Use this code to add to csv file
# df_recomm.to_csv('/Users/ericH/Desktop/moviesDashboard/raw/recom.csv', mode='a', index = None, header=False)

In [None]:
file_path = '/Users/ericH/Desktop/moviesDashboard/raw/recom.csv'
with open(file_path, mode='a', newline='\n') as f:
            df_recomm.to_csv(f, index = None, header=False)