In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 71kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 40.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=621837c70244325e56ca9089444bc175ee0ee3c344b4c2847f3c6d489961973f
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1
The 

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

# Setting up PySpark environment

In [None]:
conf = SparkConf().set("spark.ui.port", "4050")
# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
cur_path = "/content/drive/MyDrive/bigdata_final"
os.chdir(cur_path)
!pwd

/content/drive/MyDrive/bigdata_final


In [None]:
df = pd.read_csv("netflix_titles.csv")

In [None]:
df_spark = spark.read.csv("/content/drive/MyDrive/bigdata_final/netflix_titles.csv", header=True)

# Data preprocess

In [None]:
from pyspark.sql.functions import lower, col

## Extracting useful features:

In [None]:
df_low = df_spark.select('show_id', lower(col('director')).alias('director'),
                lower(col('description')).alias('description'),
                lower(col('title')).alias('title'),
                lower(col('listed_in')).alias('listed_in'),
                lower(col('type')).alias('type'))

In [None]:
df_spark.show()

+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+--------------------+--------------------+
|show_id|   type| title|            director|                cast|             country|       date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+------+--------------------+--------------------+--------------------+-----------------+------------+------+---------+--------------------+--------------------+
|     s1|TV Show|    3%|                null|João Miguel, Bian...|              Brazil|  August 14, 2020|        2020| TV-MA|4 Seasons|International TV ...|In a future where...|
|     s2|  Movie|  7:19|   Jorge Michel Grau|Demián Bichir, Hé...|              Mexico|December 23, 2016|        2016| TV-MA|   93 min|Dramas, Internati...|After a devastati...|
|     s3|  Movie| 23:59|        Gilbert Chan|Tedd Chan, Stella...|           Singapore|December 20, 2018|     

## Apply tokenizer function to each column

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
tokenizer = Tokenizer(inputCol="director", outputCol="director_words")

In [None]:
t = tokenizer.transform(df_low)

In [None]:
df = df.dropna(how='any')

In [None]:
df.drop(columns=['director', 'cast', 'country', 'date_added', 'release_year', 'rating', 'duration'], inplace=True)

In [None]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
from nltk.tokenize import word_tokenize

df['title_words'] = df['title'].str.lower()
df['listed_in'] = df['listed_in'].str.lower()
df['description'] = df['description'].str.lower()
df['type'] = df['type'].str.lower()

df['title_words'] = df['title_words'].apply(word_tokenize)
df['listed_in'] = df['listed_in'].apply(word_tokenize)
df['description'] = df['description'].apply(word_tokenize)
df['type'] = df['type'].apply(word_tokenize)

In [None]:
df

Unnamed: 0,show_id,type,title,listed_in,description,title_words
0,s1,"[tv, show]",3%,"[international, tv, shows, ,, tv, dramas, ,, t...","[in, a, future, where, the, elite, inhabit, an...","[3, %]"
1,s2,[movie],7:19,"[dramas, ,, international, movies]","[after, a, devastating, earthquake, hits, mexi...",[7:19]
2,s3,[movie],23:59,"[horror, movies, ,, international, movies]","[when, an, army, recruit, is, found, dead, ,, ...",[23:59]
3,s4,[movie],9,"[action, &, adventure, ,, independent, movies,...","[in, a, postapocalyptic, world, ,, rag-doll, r...",[9]
4,s5,[movie],21,[dramas],"[a, brilliant, group, of, students, become, ca...",[21]
...,...,...,...,...,...,...
7782,s7783,[movie],Zozo,"[dramas, ,, international, movies]","[when, lebanon, 's, civil, war, deprives, zozo...",[zozo]
7783,s7784,[movie],Zubaan,"[dramas, ,, international, movies, ,, music, &...","[a, scrappy, but, poor, boy, worms, his, way, ...",[zubaan]
7784,s7785,[movie],Zulu Man in Japan,"[documentaries, ,, international, movies, ,, m...","[in, this, documentary, ,, south, african, rap...","[zulu, man, in, japan]"
7785,s7786,"[tv, show]",Zumbo's Just Desserts,"[international, tv, shows, ,, reality, tv]","[dessert, wizard, adriano, zumbo, looks, for, ...","[zumbo, 's, just, desserts]"


In [None]:
from nltk.corpus import stopwords
from string import punctuation

list_stopwords = set(stopwords.words('english') + list(punctuation))
df['title_words'] = df['title_words'].apply(lambda x: [word for word in x if word not in list_stopwords])
df['listed_in'] = df['listed_in'].apply(lambda x: [word for word in x if word not in list_stopwords])
df['description'] = df['description'].apply(lambda x: [word for word in x if word not in list_stopwords])
df['type'] = df['type'].apply(lambda x: [word for word in x if word not in list_stopwords])

In [None]:
import string

df['description'] = df['description'].apply(lambda x : [word.translate(str.maketrans('', '', string.punctuation)) for word in x])
df['description'] = df['description'].apply(lambda x : [word for word in x if len(word) > 0])

In [None]:
df['title_words'] = df['title_words'].apply(lambda x : list(set(x)))
df['listed_in'] = df['listed_in'].apply(lambda x : list(set(x)))
df['description'] = df['description'].apply(lambda x : list(set(x)))
df['type'] = df['type'].apply(lambda x : list(set(x)))

In [None]:
df

Unnamed: 0,show_id,type,title,listed_in,description,title_words
1,s2,[movie],7:19,"[dramas, international, movies]","[alive, mexico, rescued, devastating, city, wa...",[7:19]
2,s3,[movie],23:59,"[international, horror, movies]","[haunting, dead, found, jungle, island, army, ...",[23:59]
3,s4,[movie],9,"[adventure, independent, action, sci-fi, fanta...","[postapocalyptic, machines, joins, world, fear...",[9]
4,s5,[movie],21,[dramas],"[experts, intent, students, become, brilliant,...",[21]
5,s6,"[tv, show]",46,"[dramas, tv, mysteries, international, shows]","[medical, unlocks, shocking, genetics, blends,...",[46]
...,...,...,...,...,...,...
7778,s7779,[movie],Zombieland,"[comedies, horror, movies]","[zombies, urban, sisters, world, roughneck, co...",[zombieland]
7780,s7781,[movie],Zoo,"[dramas, independent, international, movies]","[addictions, rappers, brother, trade, two, slu...",[zoo]
7781,s7782,[movie],Zoom,"[comedies, family, children, movies]","[familiar, preps, military, villain, youthful,...",[zoom]
7782,s7783,[movie],Zozo,"[dramas, international, movies]","[zozo, sweden, little, war, civil, means, left...",[zozo]


# Download the pretrained word2vec model

In [None]:
!wget -c "https://s3.amazonaws.com/dl4j-distribution/GoogleNews-vectors-negative300.bin.gz"
!gunzip GoogleNews-vectors-negative300.bin.gz

--2021-04-25 22:02:27--  https://s3.amazonaws.com/dl4j-distribution/GoogleNews-vectors-negative300.bin.gz
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.141.174
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.141.174|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1647046227 (1.5G) [application/x-gzip]
Saving to: ‘GoogleNews-vectors-negative300.bin.gz’


2021-04-25 22:03:06 (41.4 MB/s) - ‘GoogleNews-vectors-negative300.bin.gz’ saved [1647046227/1647046227]

gzip: GoogleNews-vectors-negative300.bin already exists; do you wish to overwrite (y or n)? n
	not overwritten


In [None]:
import gensim
wv = gensim.models.KeyedVectors.load_word2vec_format("GoogleNews-vectors-negative300.bin", binary=True)

In [None]:
matrix_vocab = []
for list_ in df.to_numpy():
    list_[1] = [word for word in list_[1] if word in wv.vocab]
    list_[3] = [word for word in list_[3] if word in wv.vocab]
    list_[4] = [word for word in list_[4] if word in wv.vocab]
    list_[5] = [word for word in list_[5] if word in wv.vocab]
    matrix_vocab.append(list_)
df_vocab = pd.DataFrame(matrix_vocab, columns=df.columns)

In [None]:
from tqdm import tqdm

def recommendation(title):
    matrix_netflix_title_vocab = []
    for list_ in df[df['title'] == title].to_numpy():
        list_[1] = [word for word in list_[1] if word in wv.vocab]
        list_[3] = [word for word in list_[3] if word in wv.vocab]
        list_[4] = [word for word in list_[4] if word in wv.vocab]
        list_[5] = [word for word in list_[5] if word in wv.vocab]
        matrix_netflix_title_vocab.append(list_)

    matrix_similarity = []
    
    for list1 in matrix_vocab:
        for list2 in matrix_netflix_title_vocab:
            #score_title = wv.n_similarity(list1[5], list2[5])
            score_type = wv.n_similarity(list1[1], list2[1])
            score_catg = wv.n_similarity(list1[3], list2[3])
            score_desc = wv.n_similarity(list1[4], list2[4])
            #print(score_type)
            try:
                score_title = 0.5*wv.n_similarity(list1[5], list2[5])
            except:
                score_title = 0
            if ((list1[2] != list2[2]) & (score_catg > 0.85)):
                matrix_similarity.append([list1[2], list2[2], score_title, score_type, score_catg, score_desc])
        #pbar.update()
    #pbar.close()
    df_netflix_similarity = pd.DataFrame(matrix_similarity, columns = ['recommendation', 'title','score_type','score_title', 'score_category', 'score_description'])
    df_netflix_similarity['final_score'] = df_netflix_similarity['score_title'] + df_netflix_similarity['score_category'] + df_netflix_similarity['score_description']+ df_netflix_similarity['score_type']
    return (df_netflix_similarity.sort_values(by=['final_score', 'score_category', 'score_description', 'score_title', 'score_type'], ascending=False).head(10))

# Making Recommendation

In [None]:
recommendation("Supergirl")

Unnamed: 0,recommendation,title,score_type,score_title,score_category,score_description,final_score
17,DC's Legends of Tomorrow,Supergirl,0.092983,1.0,1.0,0.587867,2.68085
48,Miraculous: Tales of Ladybug & Cat Noir,Supergirl,0.211503,1.0,0.865795,0.602604,2.679902
73,The Flash,Supergirl,0.097387,1.0,0.904568,0.596589,2.598544
66,Superman Returns,Supergirl,0.226345,0.347143,0.90935,0.63876,2.121598
57,Scorpion King 5: Book of Souls,Supergirl,0.175592,0.347143,0.90935,0.574693,2.006778
42,La Leyenda del Diamante,Supergirl,0.153008,0.347143,0.87453,0.620473,1.995153
13,Chappie,Supergirl,0.196643,0.347143,0.90935,0.523589,1.976725
43,Legend of the Naga Pearls,Supergirl,0.177256,0.347143,0.87453,0.575657,1.974586
46,Marvel's Iron Man & Hulk: Heroes United,Supergirl,0.180753,0.347143,0.90935,0.526424,1.96367
69,The Book of Eli,Supergirl,0.13593,0.347143,0.90935,0.565626,1.958049
