In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType 

In [0]:
# load data to df using wildcard * to access all files matching _dirty
titles_df = spark.read.csv(path="dbfs:/FileStore/netflix_titles_dirty*.csv", header=False, sep="\t")
# Issues : No schema, mixed data types in columns, empty columns
titles_df.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)



In [0]:
titles_df.head(5)

Out[3]: [Row(_c0='80044126', _c1='Movie', _c2='D.L. Hughley: Clear', _c3='Jay Chapman', _c4='D.L. Hughley', _c5='United States', _c6='July 13, 2017', _c7='2014', _c8='TV-MA', _c9='59 min', _c10='Stand-Up Comedy', _c11='In this 2014 standup special filmed in San Francisco, comedic genius D.L. Hughley entertains with his hilarious take on current affairs and more.'),
 Row(_c0='80148179', _c1='Movie', _c2='My Scientology Movie', _c3='John Dower', _c4='Louis Theroux', _c5='United Kingdom', _c6='July 13, 2017', _c7='2015', _c8='TV-MA', _c9='99 min', _c10='Documentaries', _c11='After speaking with former Scientology members and being stonewalled by higher-ups, filmmaker Louis Theroux hires actors to re-create alleged events.'),
 Row(_c0='70301023', _c1='Movie', _c2='Tom Segura: Completely Normal', _c3='Jay Chapman', _c4='Tom Segura', _c5='United States', _c6='July 13, 2017', _c7='2014', _c8='TV-MA', _c9='74 min', _c10='Stand-Up Comedy', _c11='Levelheaded stand-up Tom Segura shares offhand ap

In [0]:
# Import the Pyspark SQL helper functions
# Using col() method find the column named _c0
# Typecast string column to int
# Filter out the columns that are not null


notnull_titles_df = titles_df.filter(F.col("_c0").cast("int").isNotNull())
print(notnull_titles_df.count())

#find the null columns
isnull_titles_df = titles_df.filter(F.col("_c0").cast("int").isNull()).show(truncate=True)


"""
ISSUES:
    Comment rows - These begin with a # character in the first column, and all other columns are null
    Missing first column - We have few rows that reference TV Show or Movie, which should be the 2nd column.
    Odd columns - There are a few rows included where the columns seem out of sync
    (ie, a content type in the ID field, dates in the wrong column, etc).
"""



6173
+--------------------+--------------------+--------------------+-----------------+--------------+-----+--------+--------------------+--------------------+--------------------+----+----+
|                 _c0|                 _c1|                 _c2|              _c3|           _c4|  _c5|     _c6|                 _c7|                 _c8|                 _c9|_c10|_c11|
+--------------------+--------------------+--------------------+-----------------+--------------+-----+--------+--------------------+--------------------+--------------------+----+----+
|# (87.8 percent B...|                null|                null|             null|          null| null|    null|                null|                null|                null|null|null|
|               Movie|                null|This fun, charmin...|             null|          null| null|    null|                null|                null|                null|null|null|
|                null|Kaycie Chase, Dav...|                null| 

In [0]:
# Manipulate the csv loading method by providing a custom seperator that is not present in the dataset
# Load the files into a DataFrame with a single column - notice that { does not exist
titles_single_df = spark.read.csv(path="dbfs:/FileStore/netflix_titles_dirty*.csv", sep="{")

titles_single_df.printSchema()

root
 |-- _c0: string (nullable = true)



In [0]:
# row count
titles_single_df.count()

Out[25]: 6238

In [0]:
# print the first 3 rows
titles_single_df.head(3)

Out[27]: [Row(_c0='80044126\tMovie\tD.L. Hughley: Clear\tJay Chapman\tD.L. Hughley\tUnited States\tJuly 13, 2017\t2014\tTV-MA\t59 min\tStand-Up Comedy\tIn this 2014 standup special filmed in San Francisco, comedic genius D.L. Hughley entertains with his hilarious take on current affairs and more.'),
 Row(_c0='80148179\tMovie\tMy Scientology Movie\tJohn Dower\tLouis Theroux\tUnited Kingdom\tJuly 13, 2017\t2015\tTV-MA\t99 min\tDocumentaries\tAfter speaking with former Scientology members and being stonewalled by higher-ups, filmmaker Louis Theroux hires actors to re-create alleged events.'),
 Row(_c0='70301023\tMovie\tTom Segura: Completely Normal\tJay Chapman\tTom Segura\tUnited States\tJuly 13, 2017\t2014\tTV-MA\t74 min\tStand-Up Comedy\tLevelheaded stand-up Tom Segura shares offhand appraisals on hotels and opens up about his hobbies and digestive ailments in this comedy special.')]

In [0]:
# filter out the columns that start with # to a new DataFrame
comments_df = titles_single_df.filter(F.col("_c0").startswith("#"))

comments_df.show()

+--------------------+
|                 _c0|
+--------------------+
|# (87.8 percent B...|
|# On exteriority,...|
|# Roosevelt Unive...|
|# An "or". a conv...|
|# 1611–1613 Kalma...|
|# A legally scien...|
|# Reparenting, ma...|
| # USA South is done|
|# The wave for is...|
|# Which was, Chan...|
|# Five 40 dangero...|
|# Heisei Hyakkei ...|
|# (or will advanc...|
|# Procedures. Kow...|
|# 3 million. part...|
|# And 1985, by bo...|
|# Their decision,...|
|# 82 (4): birds h...|
|# Viva was surfac...|
|# Of Monte-Cristo...|
+--------------------+
only showing top 20 rows



In [0]:
# get row count for comments
comments_df.count()

Out[29]: 47

In [0]:
# remove comments df from memory
del comments_df

In [0]:
# filter out comments from the DF
# new stuff: ~, which returns the result of bitwise NOT of expr.
titles_single_df = titles_single_df.filter(~F.col("_c0").startswith("#"))
titles_single_df.head(3)


Out[31]: [Row(_c0='80044126\tMovie\tD.L. Hughley: Clear\tJay Chapman\tD.L. Hughley\tUnited States\tJuly 13, 2017\t2014\tTV-MA\t59 min\tStand-Up Comedy\tIn this 2014 standup special filmed in San Francisco, comedic genius D.L. Hughley entertains with his hilarious take on current affairs and more.'),
 Row(_c0='80148179\tMovie\tMy Scientology Movie\tJohn Dower\tLouis Theroux\tUnited Kingdom\tJuly 13, 2017\t2015\tTV-MA\t99 min\tDocumentaries\tAfter speaking with former Scientology members and being stonewalled by higher-ups, filmmaker Louis Theroux hires actors to re-create alleged events.'),
 Row(_c0='70301023\tMovie\tTom Segura: Completely Normal\tJay Chapman\tTom Segura\tUnited States\tJuly 13, 2017\t2014\tTV-MA\t74 min\tStand-Up Comedy\tLevelheaded stand-up Tom Segura shares offhand appraisals on hotels and opens up about his hobbies and digestive ailments in this comedy special.')]

In [0]:
titles_single_df.count()

Out[17]: 6191

In [0]:
# add a column named fieldCount to display the number of items present in each row
titles_single_df = titles_single_df.withColumn(
    "fieldcount", F.size(F.split(titles_single_df["_c0"], "\t"))
)
titles_single_df


Out[50]: DataFrame[_c0: string, fieldcount: int]

In [0]:
# check count for rows that have more than 12 columns
over_columned_df = titles_single_df.select("fieldcount", "_c0").where("fieldcount > 12")
over_columned_df.count()



Out[51]: 42

In [0]:
# check count for rows that have less than 12 columns
under_columned_df = titles_single_df.select("fieldcount", "_c0").where("fieldcount < 12")
under_columned_df.count()



Out[52]: 36

In [0]:
# set the DataFrame without uneven rows
titles_single_df = titles_single_df.where("fieldcount == 12")
titles_single_df.count()

Out[53]: 6113

In [0]:
# create a new column that is a list containing all columns and give an alias as splitColumn
titles_clean_df = titles_single_df.select(F.split("_c0", "\t").alias("splitcolumn"))

titles_clean_df.show(5)

+--------------------+
|         splitcolumn|
+--------------------+
|[80044126, Movie,...|
|[80148179, Movie,...|
|[70301023, Movie,...|
|[80221109, Movie,...|
|[80221942, TV Sho...|
+--------------------+
only showing top 5 rows



In [0]:

# create columns with specific datatypes using getItem() method
titles_clean_df = titles_clean_df.withColumn(
    "show_id", titles_clean_df.splitcolumn.getItem(0).cast(IntegerType())
)
titles_clean_df = titles_clean_df.withColumn(
    "type", titles_clean_df.splitcolumn.getItem(1)
)
titles_clean_df = titles_clean_df.withColumn(
    "title", titles_clean_df.splitcolumn.getItem(2)
)
titles_clean_df = titles_clean_df.withColumn(
    "director", titles_clean_df.splitcolumn.getItem(3)
)
titles_clean_df = titles_clean_df.withColumn(
    "cast", titles_clean_df.splitcolumn.getItem(4)
)
titles_clean_df = titles_clean_df.withColumn(
    "country", titles_clean_df.splitcolumn.getItem(5)
)
titles_clean_df = titles_clean_df.withColumn(
    "date_added", titles_clean_df.splitcolumn.getItem(6)
)
titles_clean_df = titles_clean_df.withColumn(
    "release_year", titles_clean_df.splitcolumn.getItem(7).cast(IntegerType())
)
titles_clean_df = titles_clean_df.withColumn(
    "rating", titles_clean_df.splitcolumn.getItem(8)
)
titles_clean_df = titles_clean_df.withColumn(
    "duration", titles_clean_df.splitcolumn.getItem(9)
)
titles_clean_df = titles_clean_df.withColumn(
    "listed_in", titles_clean_df.splitcolumn.getItem(10)
)
titles_clean_df = titles_clean_df.withColumn(
    "description", titles_clean_df.splitcolumn.getItem(11)
)



In [0]:
# drop the columns that are not needed any more
titles_clean_df = titles_clean_df.drop("_c0", "splitcolumn", "fieldcount")

titles_clean_df.printSchema()


root
 |-- show_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- director: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- rating: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- description: string (nullable = true)



In [0]:
# check out unique items in type column
titles_clean_df.select(titles_clean_df.type).distinct().show()

+-------+
|   type|
+-------+
|TV Show|
|  Movie|
|       |
+-------+



#### Issue -> Type column should have either `Movie` or `TV Show` and shoul not have any missing values.

In [0]:
# isolate empty "" values in the type column
titles_clean_df.where(titles_clean_df.type == "").show()

+--------+----+--------------------+--------------+--------------------+----------------+----------------+------------+------+--------+--------------------+--------------------+
| show_id|type|               title|      director|                cast|         country|      date_added|release_year|rating|duration|           listed_in|         description|
+--------+----+--------------------+--------------+--------------------+----------------+----------------+------------+------+--------+--------------------+--------------------+
|80156212|    |           Wakefield| Robin Swicord|Bryan Cranston, J...|   United States|   March 2, 2019|        2016|     R| 109 min|              Dramas|An unhappy father...|
|80219180|    |Darr Sabko Lagta Hai|              |        Bipasha Basu|           India|   March 1, 2018|        2015| TV-MA|1 Season|International TV ...|In this chilling ...|
|80032097|    |        Romeo Ranjha|Navaniat Singh|Jazzy B, Garry Sa...|   India, Canada|November 1, 2017|    

In [0]:

# create a UDF
def set_deriveType(showtype, showduration):
    if showtype in [ "Movie", "TV Show" ]:
        return showtype
    else:
        return "Movie" if showduration.endswith("min") else "TV Show"



udfDeriveType = udf(set_deriveType, returnType=StringType())

# create a new derived column, passing in the column 'type' and 'duration' as args
titles_clean_df = titles_clean_df.withColumn(
    "derivedType", udfDeriveType(F.col("type"), F.col("duration"))
)

# drop the original type column and rename derivedType to type
titles_clean_df = titles_clean_df.drop("type").withColumnRenamed("derivedType", "type")

# verify that type column has only Movie and TV Show
titles_clean_df.select("type").distinct().show()





+-------+
|   type|
+-------+
|TV Show|
|  Movie|
+-------+



In [0]:
# verify that the row count is still 6113
titles_clean_df.count()

Out[63]: 6113

In [0]:
# Coalesce and save the data in CSV format
titles_clean_df.coalesce(1).write.csv(
    "/processed-data/cleaned_netflix_titles.csv", mode="overwrite", sep="\t", header=True
)

In [0]:
# filter out the newly-created csv file and rename it
dbutils.fs.mv("dbfs:/processed-data/cleaned_netflix_titles.csv/part-00000-tid-9158732687778480339-8298b629-317d-4e0f-920c-5bb88b858686-662-1-c000.csv", "/cleaned-data/cleaned_netflix_titles.csv")


Out[67]: True

In [0]:
# recursively remove the contents of cleaned_netflix_titles.csv directory
dbutils.fs.rm("/processed-data/cleaned_netflix_titles.csv/", True)


Out[74]: False

+--------------------+----------+
|               title|cast_count|
+--------------------+----------+
|        Black Mirror|        50|
|COMEDIANS of the ...|        47|
|         Creeped Out|        47|
|    Arthur Christmas|        44|
|              Narcos|        42|
|Dolly Parton's He...|        41|
|Michael Bolton's ...|        41|
|American Horror S...|        40|
|Love, Death & Robots|        40|
|            Movie 43|        39|
+--------------------+----------+
only showing top 10 rows



In [0]:
""" Analysis 3: Filter out TV shows to a seperate DF """

# 1)filter out seasons to a new dataframe
seasons_only_df = clean_titles_df.where("duration LIKE '%Season%'")

print(seasons_only_df.count())

# 2) create a new colum with integer value of the season duration
seasons_only_df = seasons_only_df.withColumn(
    "season_volume", F.split("duration", " ")[0].cast(IntegerType())
)

# 3) drop columns no longer needed
seasons_only_df = seasons_only_df.drop("duration", "type").withColumnRenamed(
    "season_volume", "duration"
)

# 4) save the data in CSV format
seasons_only_df.coalesce(1).write.csv(
    path="/processed_data/seasons_only_titles.csv", mode="overwrite", sep="\t", header=True
)


1926


In [0]:
# filter out the newly-created csv file and rename it
dbutils.fs.mv("/processed_data/seasons_only_titles.csv/part-00000-tid-257114746936815035-1b824af4-7e4b-444b-bf53-254060426f60-343-1-c000.csv", "/cleaned-data/seasons_only_titles.csv")

# recursively remove the contents of seasons_only_titles.csv directory
dbutils.fs.rm("/processed_data/seasons_only_titles.csv/", True)

Out[61]: True