<a href="https://colab.research.google.com/github/pullz6/Netflix-Show-Classification-Pyspark/blob/main/Netflix_Shows_Classification_with_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [44]:
#Setting up the pyspark
from google.colab import drive
drive.mount('/content/drive')
#Installing Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Unpack Spark from google drive
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.3.0-bin-hadoop3"
# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [45]:
### Load the California Housing Prices Dataset
# .read, .option, .csv

usersCsvPath = "/content/drive/MyDrive/sample_data/Projects/Netflix-Data/netflix_titles.csv"

netflixDF = (spark
             .read
             .option('header', True)
             .option('inferSchema', True)
             .csv(usersCsvPath))

netflixDF.show(5)
netflixDF.printSchema()

+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|show_id|   type|               title|       director|                cast|      country|        date_added|release_year|rating| duration|           listed_in|         description|
+-------+-------+--------------------+---------------+--------------------+-------------+------------------+------------+------+---------+--------------------+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|Kirsten Johnson|                null|United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|
|     s2|TV Show|       Blood & Water|           null|Ama Qamata, Khosi...| South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|
|     s3|TV Show|           Ganglands|Julien Leclercq|Sami Bouajila, Tr...|         null|Septem

In [46]:
from pyspark.sql.functions import to_date, col
netflixDF = netflixDF.withColumn("date", to_date(col("date_added"), "yyyy-MM-dd"))
netflixDF.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+----+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|date|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+----+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|null|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|        2021| TV-MA|2 Seasons|International TV ...|After crossing pa...|null|
|     s3|T

In [47]:
from pyspark.sql.functions import split
netflixDF = netflixDF.withColumn("date_list", split(netflixDF.date_added, " "))
netflixDF.show()

+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+----+--------------------+
|show_id|   type|               title|            director|                cast|             country|        date_added|release_year|rating| duration|           listed_in|         description|date|           date_list|
+-------+-------+--------------------+--------------------+--------------------+--------------------+------------------+------------+------+---------+--------------------+--------------------+----+--------------------+
|     s1|  Movie|Dick Johnson Is Dead|     Kirsten Johnson|                null|       United States|September 25, 2021|        2020| PG-13|   90 min|       Documentaries|As her father nea...|null|[September, 25,, ...|
|     s2|TV Show|       Blood & Water|                null|Ama Qamata, Khosi...|        South Africa|September 24, 2021|    

In [48]:
date_listing = netflixDF.select(
    netflixDF.date_list.getItem(0).alias('month'),
    netflixDF.date_list.getItem(1).alias('date'),
    netflixDF.date_list.getItem(2).alias('year'),
)

date_listing.show()

+---------+----+----+
|    month|date|year|
+---------+----+----+
|September| 25,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 23,|2021|
|September| 23,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
+---------+----+----+
only showing top 20 rows



In [49]:
import pyspark.sql.functions as f
date_corrected = date_listing.select("date", f.regexp_replace(f.col("date"), ",", "").alias("date_corrected")).show()

+----+--------------+
|date|date_corrected|
+----+--------------+
| 25,|            25|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 24,|            24|
| 23,|            23|
| 23,|            23|
| 22,|            22|
| 22,|            22|
| 22,|            22|
| 22,|            22|
| 22,|            22|
| 22,|            22|
| 22,|            22|
+----+--------------+
only showing top 20 rows



In [50]:
date_listing.show()

+---------+----+----+
|    month|date|year|
+---------+----+----+
|September| 25,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 24,|2021|
|September| 23,|2021|
|September| 23,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
|September| 22,|2021|
+---------+----+----+
only showing top 20 rows



In [54]:
import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def convert_datetime(month_name):
  print(month_name)
  """This function is to create string month into correct numerical form"""
  datetime_object = datetime.datetime.strptime(month_name, "%B")
  month_number = datetime_object.month
  return month_number

numericalCaseUDF = udf(lambda x:convert_datetime(x),IntegerType())

In [55]:
date_listing.withColumn("Month",col("Month").cast('string'))
print(date_listing.schema)
date_listing.dropna().show(truncate=False)
#

StructType([StructField('month', StringType(), True), StructField('date', StringType(), True), StructField('year', StringType(), True)])
+---------+----+----+
|month    |date|year|
+---------+----+----+
|September|25, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|24, |2021|
|September|23, |2021|
|September|23, |2021|
|September|22, |2021|
|September|22, |2021|
|September|22, |2021|
|September|22, |2021|
|September|22, |2021|
|September|22, |2021|
|September|22, |2021|
+---------+----+----+
only showing top 20 rows



In [56]:
date_listing.select(f.col("month"),numericalCaseUDF(f.col("month"))).show(truncate=False)

PythonException: ignored