**Setting up the envieomence**

In [1]:
# Mount with Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://www-us.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

In [4]:
!tar xf spark-3.1.1-bin-hadoop2.7.tgz

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop2.7"

In [6]:
!pip install -q findspark

In [7]:
import findspark
findspark.init()
findspark.find()

'/content/spark-3.1.1-bin-hadoop2.7'

**Importing Movie Data**

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName('MovieDataMining').config('spark.ui.port', '4050').getOrCreate()

In [9]:
# Read movie data
df = spark.read.csv("/content/gdrive/Shareddrives/CSC522 Project/Data/Netflix_with_IMDB_with_customerIDcsv.csv", header=True, inferSchema=True)

In [10]:
df.show()

+-------+-----------+-------------+-------+-----------+------------+---------+------+------+------+--------+----------+----------+-------------------+-----------+-------------+--------------+---------------+--------+------+
| Actor1|     Actor2|       Actor3|Country|Customer Id|   Director1|Director2|Genre1|Genre2|Genre3|Language|Movie Id11|Movie ID12| Production Company|      Title|      Writer1|       Writer2|Year Of Release|Duration|Rating|
+-------+-----------+-------------+-------+-----------+------------+---------+------+------+------+--------+----------+----------+-------------------+-----------+-------------+--------------+---------------+--------+------+
|Ja Rule|Tim Meadows|Jenifer Lewis|    USA|     966140|Lance Rivera|     null|Comedy|  null|  null| English|       225|       225|Cookout Productions|The Cookout|Queen Latifah|Shakim Compere|           2004|      97|     1|
|Ja Rule|Tim Meadows|Jenifer Lewis|    USA|    2439493|Lance Rivera|     null|Comedy|  null|  null| Engl

In [11]:
df.printSchema()

root
 |-- Actor1: string (nullable = true)
 |-- Actor2: string (nullable = true)
 |-- Actor3: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Director1: string (nullable = true)
 |-- Director2: string (nullable = true)
 |-- Genre1: string (nullable = true)
 |-- Genre2: string (nullable = true)
 |-- Genre3: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Movie Id11: integer (nullable = true)
 |-- Movie ID12: integer (nullable = true)
 |-- Production Company: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Writer1: string (nullable = true)
 |-- Writer2: string (nullable = true)
 |-- Year Of Release: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Rating: integer (nullable = true)



In [12]:
# Cast integer to string
from pyspark.sql.types import StringType
df_chType = df.withColumn("Customer Id",df['Customer Id'].cast(StringType())).withColumn("Movie ID",df['Movie Id11'].cast(StringType())).withColumn("Year",df['Year Of Release'].cast(StringType()))

In [13]:
df_chType.printSchema()

root
 |-- Actor1: string (nullable = true)
 |-- Actor2: string (nullable = true)
 |-- Actor3: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Customer Id: string (nullable = true)
 |-- Director1: string (nullable = true)
 |-- Director2: string (nullable = true)
 |-- Genre1: string (nullable = true)
 |-- Genre2: string (nullable = true)
 |-- Genre3: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- Movie Id11: integer (nullable = true)
 |-- Movie ID12: integer (nullable = true)
 |-- Production Company: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Writer1: string (nullable = true)
 |-- Writer2: string (nullable = true)
 |-- Year Of Release: integer (nullable = true)
 |-- Duration: integer (nullable = true)
 |-- Rating: integer (nullable = true)
 |-- Movie ID: string (nullable = true)
 |-- Year: string (nullable = true)



In [14]:
df_chType.columns

['Actor1',
 'Actor2',
 'Actor3',
 'Country',
 'Customer Id',
 'Director1',
 'Director2',
 'Genre1',
 'Genre2',
 'Genre3',
 'Language',
 'Movie Id11',
 'Movie ID12',
 'Production Company',
 'Title',
 'Writer1',
 'Writer2',
 'Year Of Release',
 'Duration',
 'Rating',
 'Movie ID',
 'Year']

In [15]:
# Ignore movie Id and Title
my_cols = df_chType.select(['Customer Id', 
 'Actor1',
 'Actor2',
 'Actor3',
 'Country',
 'Director1',
 'Director2',
 'Genre1',
 'Genre2',
 'Genre3',
 'Language',
 'Production Company',
 'Writer1',
 'Writer2',
 'Duration',
 'Year',
 'Rating'])

In [16]:
# Filling NULL
my_fill_cols = my_cols.na.fill('No Name',subset=['Actor1', 'Actor2', 'Actor3', 'Director1', 'Director2', 'Writer1', 'Writer2'])
my_fill_cols = my_fill_cols.na.fill('No Genre',subset=['Genre2', 'Genre3'])
my_fill_cols = my_fill_cols.na.fill('No Country',subset=['Country'])
my_fill_cols = my_fill_cols.na.fill('No Language',subset=['Language'])
my_fill_cols = my_fill_cols.na.fill('No Company',subset=['Production Company'])
my_fill_cols = my_fill_cols.na.fill(0,subset=['Duration'])
my_fill_cols = my_fill_cols.na.fill('No Year',subset=['Year'])

In [17]:
# num_rows = my_cols.count() # 22996094

In [18]:
#from pyspark.sql.functions import monotonically_increasing_id 

#my_cols_index = my_cols.select("*").withColumn("id", monotonically_increasing_id())

In [19]:
#my_cols_index.printSchema()

In [20]:
# dataSchema = my_cols.schema

In [21]:
# from pyspark.sql import Row
# head_5_df = spark.createDataFrame(head_5, schema=dataSchema)

In [22]:
'''
# Split to train, valitation, and test sets
training_size = int(num_row * 0.6)
validataion_size = int(num_row * 0.2)
training_data = my_cols_index.filter(my_cols_index['id'] < training_size)
validation_data = my_cols_index.filter((my_cols_index['id'] >= training_size) & (my_cols_index['id'] < (training_size + validataion_size)))
testing_data = my_cols_index.filter(my_cols_index['id'] >= (training_size + validataion_size))
'''

"\n# Split to train, valitation, and test sets\ntraining_size = int(num_row * 0.6)\nvalidataion_size = int(num_row * 0.2)\ntraining_data = my_cols_index.filter(my_cols_index['id'] < training_size)\nvalidation_data = my_cols_index.filter((my_cols_index['id'] >= training_size) & (my_cols_index['id'] < (training_size + validataion_size)))\ntesting_data = my_cols_index.filter(my_cols_index['id'] >= (training_size + validataion_size))\n"

**Working with Categorical Columns**

In [23]:
from pyspark.ml.feature import (VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer)

In [24]:
# Transform strings to indexes
customer_indexer = StringIndexer(inputCol='Customer Id', outputCol='CustomerIndex')
actors_indexer = StringIndexer(inputCols=['Actor1', 'Actor2', 'Actor3'], outputCols=['Actor1Index', 'Actor2Index', 'Actor3Index'])
country_indexer = StringIndexer(inputCol='Country', outputCol='CountryIndex')
directors_indexer = StringIndexer(inputCols=['Director1', 'Director2'], outputCols=['Director1Index', 'Director2Index'])
genres_indexer = StringIndexer(inputCols=['Genre1','Genre2','Genre3'], outputCols=['Genre1Index','Genre2Index','Genre3Index'])
language_indexer = StringIndexer(inputCol='Language', outputCol='LanguageIndex')
pc_indexer = StringIndexer(inputCol='Production Company', outputCol='PCIndex')
writers_indexer = StringIndexer(inputCols=['Writer1', 'Writer2'], outputCols=['Writer1Index', 'Writer2Index'])
year_indexer = StringIndexer(inputCol='Year', outputCol='YearIndex')

In [25]:
from pyspark.ml import Pipeline
pipeline_indexer = Pipeline(stages=[customer_indexer, 
                            actors_indexer, 
                            country_indexer, 
                            directors_indexer, 
                            genres_indexer, 
                            language_indexer, 
                            pc_indexer, 
                            writers_indexer,
                            year_indexer])

In [26]:
my_indexer_cols = pipeline_indexer.fit(my_fill_cols).transform(my_fill_cols)

In [27]:
my_indexer_cols.printSchema()

root
 |-- Customer Id: string (nullable = true)
 |-- Actor1: string (nullable = false)
 |-- Actor2: string (nullable = false)
 |-- Actor3: string (nullable = false)
 |-- Country: string (nullable = false)
 |-- Director1: string (nullable = false)
 |-- Director2: string (nullable = false)
 |-- Genre1: string (nullable = true)
 |-- Genre2: string (nullable = false)
 |-- Genre3: string (nullable = false)
 |-- Language: string (nullable = false)
 |-- Production Company: string (nullable = false)
 |-- Writer1: string (nullable = false)
 |-- Writer2: string (nullable = false)
 |-- Duration: integer (nullable = true)
 |-- Year: string (nullable = false)
 |-- Rating: integer (nullable = true)
 |-- CustomerIndex: double (nullable = false)
 |-- Actor1Index: double (nullable = false)
 |-- Actor2Index: double (nullable = false)
 |-- Actor3Index: double (nullable = false)
 |-- CountryIndex: double (nullable = false)
 |-- Director1Index: double (nullable = false)
 |-- Director2Index: double (nullabl

**Feature Selection**

In [28]:
# Vector Assemble
# 13 features
# Set CustomerIndex as default feature
assembler_index = VectorAssembler(inputCols=['Actor1Index', 
 'Actor2Index', 
 'Actor3Index',
 'CountryIndex',
 'Director1Index', 
 'Director2Index',
 'Genre1Index',
 'Genre2Index',
 'Genre3Index',
 'LanguageIndex',
 'PCIndex',
 'Writer1Index', 
 'Writer2Index'],outputCol='features')

In [29]:
my_feature_cols = assembler_index.transform(my_indexer_cols)

In [30]:
my_feature_cols.printSchema()

root
 |-- Customer Id: string (nullable = true)
 |-- Actor1: string (nullable = false)
 |-- Actor2: string (nullable = false)
 |-- Actor3: string (nullable = false)
 |-- Country: string (nullable = false)
 |-- Director1: string (nullable = false)
 |-- Director2: string (nullable = false)
 |-- Genre1: string (nullable = true)
 |-- Genre2: string (nullable = false)
 |-- Genre3: string (nullable = false)
 |-- Language: string (nullable = false)
 |-- Production Company: string (nullable = false)
 |-- Writer1: string (nullable = false)
 |-- Writer2: string (nullable = false)
 |-- Duration: integer (nullable = true)
 |-- Year: string (nullable = false)
 |-- Rating: integer (nullable = true)
 |-- CustomerIndex: double (nullable = false)
 |-- Actor1Index: double (nullable = false)
 |-- Actor2Index: double (nullable = false)
 |-- Actor3Index: double (nullable = false)
 |-- CountryIndex: double (nullable = false)
 |-- Director1Index: double (nullable = false)
 |-- Director2Index: double (nullabl

In [31]:
my_feature_cols.columns

['Customer Id',
 'Actor1',
 'Actor2',
 'Actor3',
 'Country',
 'Director1',
 'Director2',
 'Genre1',
 'Genre2',
 'Genre3',
 'Language',
 'Production Company',
 'Writer1',
 'Writer2',
 'Duration',
 'Year',
 'Rating',
 'CustomerIndex',
 'Actor1Index',
 'Actor2Index',
 'Actor3Index',
 'CountryIndex',
 'Director1Index',
 'Director2Index',
 'Genre1Index',
 'Genre2Index',
 'Genre3Index',
 'LanguageIndex',
 'PCIndex',
 'Writer1Index',
 'Writer2Index',
 'YearIndex',
 'features']

In [32]:
feature_data = my_feature_cols.select('features', 'Rating')
feature_data.show()

+--------------------+------+
|            features|Rating|
+--------------------+------+
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
|[371.0,444.0,478....|     1|
+--------------------+------+
only showing top 20 rows



In [33]:
# Applying Chi-Square Selector to select top 7 features
from pyspark.ml.feature import ChiSqSelector

selector = ChiSqSelector(numTopFeatures=7,
                         featuresCol="features",
                         outputCol="selectedFeatures", 
                         labelCol="Rating")
selected_feature_data = selector.fit(feature_data).transform(feature_data)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
selected_feature_data.head(1)

ChiSqSelector output with top 7 features selected


[Row(features=DenseVector([371.0, 444.0, 478.0, 0.0, 387.0, 0.0, 1.0, 4.0, 0.0, 0.0, 233.0, 460.0, 346.0]), Rating=1, selectedFeatures=DenseVector([371.0, 444.0, 478.0, 0.0, 387.0, 0.0, 1.0]))]

In [34]:
my_final_cols = my_feature_cols.select(['CustomerIndex',
 'Actor1Index',
 'Actor2Index',
 'Actor3Index',
 'CountryIndex',
 'Director1Index',
 'Director2Index',
 'Genre1Index',
 'Duration',
 'Rating'])

In [35]:
my_final_cols.printSchema()

root
 |-- CustomerIndex: double (nullable = false)
 |-- Actor1Index: double (nullable = false)
 |-- Actor2Index: double (nullable = false)
 |-- Actor3Index: double (nullable = false)
 |-- CountryIndex: double (nullable = false)
 |-- Director1Index: double (nullable = false)
 |-- Director2Index: double (nullable = false)
 |-- Genre1Index: double (nullable = false)
 |-- Duration: integer (nullable = true)
 |-- Rating: integer (nullable = true)



**Split the training, validation, and teststing sets**

In [47]:
# Split to train, valitation, and test sets
training_data, vali_data = my_final_cols.randomSplit([0.6, 0.4], seed=24) # 13802178
validation_data, testing_data = vali_data.randomSplit([0.5, 0.5], seed=24) # 4597448, 4596468

In [48]:
training_data.printSchema()

root
 |-- CustomerIndex: double (nullable = false)
 |-- Actor1Index: double (nullable = false)
 |-- Actor2Index: double (nullable = false)
 |-- Actor3Index: double (nullable = false)
 |-- CountryIndex: double (nullable = false)
 |-- Director1Index: double (nullable = false)
 |-- Director2Index: double (nullable = false)
 |-- Genre1Index: double (nullable = false)
 |-- Duration: integer (nullable = true)
 |-- Rating: integer (nullable = true)



In [71]:
# Save data
training_data.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save("/content/gdrive/MyDrive/Colab Notebooks/CSC 522/training.csv")
validation_data.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save("/content/gdrive/MyDrive/Colab Notebooks/CSC 522/validation.csv")
testing_data.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save("/content/gdrive/MyDrive/Colab Notebooks/CSC 522/testing.csv")