In [None]:
""" !pip install -U pyspark
from pyspark.sql import SparkSession
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable """

# StreamQuest Movie Plot Searching System
This application is created for movie studios to check the latest trends in the movie industry......

## Section 1. Word2vec Movie Recommender Model Training
The first part of this notebook is dedicated to data cleaning and trainning the word2vec model to create the following three tools for the studio writers and executives: 
1. Basic movie recommender: The user input one movie; and the system recommends 10 other movies with similar plotlines. 
2. Advance movie recommender: The user input two movies; and the system recommends 10 other movies with plotlines that are similar to the combination of these two movies. 
3. Duplicate plot checker: The user input his/her script for a new movie idea, and the system checks if his/her idea has already been produced in a previous movie. 

### 1.1 Data cleaning
There are three data sources used in this section: 
1. IMDB: used for matching movie title & ID
2. Details: contains plots&movie ID, used for trainning
3. Wiki_Plot: contains plots& movie name, used for trainning

In [1]:
path_to_imdb_dataset = '/Users/yupan/Library/CloudStorage/OneDrive-Personal/Academic/5430/data/title.basics.tsv.gz'
path_to_reviews_dataset = '/Users/yupan/Library/CloudStorage/OneDrive-Personal/Academic/5430/data/IMDB_reviews.json'
path_to_plots_dataset = '/Users/yupan/Library/CloudStorage/OneDrive-Personal/Academic/5430/data/wiki_movie_plots_deduped.csv'
path_to_details_dataset = '/Users/yupan/Library/CloudStorage/OneDrive-Personal/Academic/5430/data/IMDB_movie_details.json'

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()
print("Using Apache Spark Version", spark.version)

In [None]:
# clean & combine the IMDB dataset with details dataset 
# reading the IMDB dataset
imdb = spark.read.options(header = True, inferSchema = True, delimiter = "\t")\
  .csv(path_to_imdb_dataset)
# filter the imdb dataset so that only movies are included
imdb = imdb.filter("titleType = 'movie'")\
  .select('tconst', 'primaryTitle', 'startYear')\
    .withColumnRenamed('startYear', 'Year')\
      .withColumnRenamed('primaryTitle', 'Title')\
        .dropDuplicates(['Title', 'Year'])


# reading the details dataset, preserving only three important variables
details_summary = spark.read.json(path_to_details_dataset)
details_summary = details_summary\
  .select('movie_id','plot_summary')\
    .withColumnRenamed('plot_summary','Plot')

# reading the details dataset, preserving only three important variables
details_synopsis = spark.read.json(path_to_details_dataset)
details_synopsis = details_synopsis.select('movie_id','plot_synopsis')\
  .filter("plot_synopsis != ''")\
    .withColumnRenamed('plot_synopsis', 'Plot')

details = details_summary.union(details_synopsis)
print('there is a total of ', details.count(), ' plot descriptions in the details dataset')


from pyspark.sql.functions import lit
# join the imdb with details by matching the unique identifier(e.g. tt0000000)
imdb_join_details = imdb.join(details, imdb.tconst == details.movie_id, 'inner')\
  .withColumnRenamed('tconst', 'id')\
    .select('id', 'Title', 'Plot')\
      .withColumn("Source", lit("imdb_details"))

print("The joined dataset has ", imdb_join_details.count(), " entries")
# inspect the joined dataset
imdb_join_details.show(3)

In [None]:
# clean and combine wiki plot dataset with imdb dataset
from pyspark.sql.functions import length
# reading the plot dataset, preserving only three important variables
wiki_plot = spark.read.options(header = True, inferSchema = True, quote = '"', escape = '"', multiLine = True).csv(path_to_plots_dataset)
wiki_plot = wiki_plot.select('Title', 'Release Year','Plot')\
  .withColumnRenamed('Release Year', 'Year')\
    .filter(length(wiki_plot['Plot']) >= 200) # filter out the very short plot descriptions


# join the imdb with the plot dataset by matching movie titles and release year
imdb_join_plot = imdb.join(wiki_plot, ["Title", "Year"], 'inner')\
  .withColumnRenamed('tconst', 'id')\
    .select('id', 'Title', 'Plot')\
      .withColumn("Source", lit("wiki_plot"))

print("The joined dataset has ", imdb_join_plot.count(), " entries")
# inspect the joined dataset
imdb_join_plot.show(1)

In [None]:
# combine the above two dataset to get the dataset that we will train the model on
df = imdb_join_plot.union(imdb_join_details)

print('after merging & cleaning, there is a total of ', df.count(), ' movie plot entries left in the merged dataset')
# inspect the combined new dataset
df.show(1)

### 1.2 Trainning Word2vec Model 

In [None]:
# tokenize and remove stop words in this cell
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, Word2Vec

# create a new field by copying Plot
df = df.withColumn('inputText', F.col('Plot')) 

# regular expression tokenizer to tokenize inputText into individual tokens (words)
regextok = RegexTokenizer(gaps = False, pattern = '\w+', inputCol = 'inputText', outputCol = 'tokens')

# StopWordsRemover to remove stopwords in the list of tokens
stopwrmv = StopWordsRemover(inputCol = 'tokens', outputCol = 'tokens_sw_removed')
df = regextok.transform(df)
df = stopwrmv.transform(df)
df.show(1)

In [None]:
# train word2vec model, the parameters here can be changed to optimize the model
word2vec = Word2Vec(vectorSize = 100, minCount = 5, inputCol = 'tokens_sw_removed', outputCol = 'wordvectors')
model = word2vec.fit(df)

# using transform to add wordvectors column to dataframe
df = model.transform(df)
chunks = df.select('id', 'Title','wordvectors', 'Plot', 'Source').limit(30000).collect()

### 1.3 Create Basic Recommender

In [None]:
# writing a function to obtain the plot string from the plot dataset
def acquire_plot(base_movie: str): 
  # input: a movie name (precise) or a movie id 
  # output: the movie's plot

  if base_movie.startswith("tt"):   # search by movie name
    base_movie_row = df.filter(df.id == base_movie).collect()
  else:                             # search by movie id
    base_movie_row = df.filter(df.Title == base_movie).collect()

  if base_movie_row: 
    movie_plot = base_movie_row[0]['Plot']
    return movie_plot
  else: 
    print("Sorry, ", base_movie, " is not found in the database")

### 1.4 Create Advanced Recommender

### 1.5 Create Duplicate Plot Checker

### a few things to pay attention to: 
1. the data takes a few minutes to clean and the word2vec takes 2.5 minutes to train(aparently there is a way to save the trained model, I'll figure it out ASAP), so we can implement the cleaning code before the flask code so that it can be run before we do the demo
2. 

In [None]:
## Search Engine Template -> uses spark df to query based on user input
### need to different html -> one is the home page (user input) then the second one is the output screen
#### there is a way to only use one html for simplicity 

In [None]:
from flask import Flask, request, jsonify, redirect, url_for, render_template
import numpy as np

app = Flask("JSON_OUTPUT")

@app.route('/')
def form():
    return render_template('moreedits.html')
        
@app.route('/submit', methods=['GET','POST'])
def submit():
    if request.method == 'POST':
        types_user = request.form['q']
        price_level_user = request.form['price_level']
        neighborhood_user = request.form['neighborhoods']
        rating_user = request.form['rating']
        
        def cossim(v1, v2):
            dot_product = np.sum(v1 * v2)
            mag_v1 = np.sqrt(np.sum(np.power(v1, 2)))
            mag_v2 = np.sqrt(np.sum(np.power(v2, 2)))
            return dot_product / (mag_v1 * mag_v2 + 0.1)

        query_txt = types_user
        query_df = sc.parallelize([(1,query_txt)]).toDF(['index','Types'])
        query_tok = regexTokFilter.transform(query_df)
        query_vec = model_type.transform(query_tok)
        query_vec = query_vec.select('wordvectors_type').collect()[0][0]
        
        sim_rdd = sc.parallelize((i[0],i[1], i[2],i[3],i[4],i[5],i[6],float(cossim(query_vec, i[7])), i[8]) for i in sparkDF_wv_final)
        sim_df = spark.createDataFrame(sim_rdd).\
            withColumnRenamed('_1', 'Name').\
            withColumnRenamed('_2', 'Address').\
            withColumnRenamed('_3', 'Latitude').\
            withColumnRenamed('_4', 'Longitude').\
            withColumnRenamed('_5', 'Types').\
            withColumnRenamed('_6', 'Price_level').\
            withColumnRenamed('_7', 'Rating').\
            withColumnRenamed('_8', 'Similarity').\
            withColumnRenamed('_9', 'Neighborhood').\
            orderBy("Similarity", ascending=False)

    
    pandas_df = sim_df.toPandas()
    df_filtered = pandas_df[pandas_df['Price_level'] == int(price_level_user)]
    df_filtered1 = df_filtered[df_filtered['Rating'] == int(rating_user)]
    df_filtered2 = df_filtered1[df_filtered1['Neighborhood'] == neighborhood_user]
    html_table = df_filtered2.head(10).to_html(classes='table')
    return render_template('onlytable.html', table=html_table)

@app.route('/output')
def output():
    # render the output HTML page
    return render_template('onlytable.html',table=processed_data)

app.run(host='localhost', port=7030)