### Dataset = Movies Daily Update Dataset (movies.csv) - 357.53 MB

## Pre-processing
1. The dataset is directly imported from kaggle using opendatasets
2. Duplicate rows are dropped
3. Rows containing any null values are dropped
4. Columns that are not needed for doing the tasks are dropped
5. Data types of columns are changed to appropriate type
6. Cleaned Data is saved as cleaned_movies_dataset.csv

In [2]:
import opendatasets as od

In [3]:
od.download("https://www.kaggle.com/datasets/akshaypawar7/millions-of-movies?select=movies.csv")

Skipping, found downloaded files in "./millions-of-movies" (use force=True to force download)


In [4]:
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName("Spark_Final_Project_Movies").getOrCreate()

22/11/02 23:22:53 WARN Utils: Your hostname, pallavi-xps resolves to a loopback address: 127.0.1.1; using 192.168.1.81 instead (on interface wlp2s0)
22/11/02 23:22:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/02 23:22:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
movies_df = spark.read.csv('millions-of-movies/movies.csv',inferSchema=True,header=True)

                                                                                

In [7]:
movies_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- credits: string (nullable = true)
 |-- keywords: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- recommendations: string (nullable = true)



In [8]:
movies_df.columns

['id',
 'title',
 'genres',
 'original_language',
 'overview',
 'popularity',
 'production_companies',
 'release_date',
 'budget',
 'revenue',
 'runtime',
 'status',
 'tagline',
 'vote_average',
 'vote_count',
 'credits',
 'keywords',
 'poster_path',
 'backdrop_path',
 'recommendations']

In [9]:
movies_df.count()

[Stage 2:>                                                          (0 + 8) / 8]                                                                                

739953

In [10]:
movies_df = movies_df.dropDuplicates()

In [11]:
movies_df.count()

                                                                                

739825

In [12]:
movies_df=movies_df.na.drop("any")

In [13]:
movies_df.count()

                                                                                

15900

In [14]:
# movies_df.show(10)

In [16]:
movies_df=movies_df.drop("overview")

In [17]:
movies_df=movies_df.drop("credits")

In [19]:
movies_df=movies_df.drop("recommendations")

In [25]:
movies_df=movies_df.drop("backdrop_path")

In [26]:
movies_df=movies_df.drop("poster_path")

In [27]:
movies_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- vote_count: string (nullable = true)
 |-- keywords: string (nullable = true)



In [29]:
#all columns have data type string. So, we need to convert the datatypes using cast method

from pyspark.sql.types import StringType, FloatType, IntegerType, DateType
from pyspark.sql.functions import col


movies_df = movies_df\
            .withColumn('popularity', col('popularity').cast(FloatType()))\
            .withColumn('release_date', col('release_date').cast(DateType()))\
            .withColumn('budget', col('budget').cast(IntegerType()))\
            .withColumn('revenue', col('revenue').cast(IntegerType()))\
            .withColumn('runtime', col('runtime').cast(IntegerType()))\
            .withColumn('vote_average', col('vote_average').cast(FloatType()))\
            .withColumn('vote_count', col('vote_count').cast(IntegerType()))


movies_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- budget: integer (nullable = true)
 |-- revenue: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- keywords: string (nullable = true)



In [31]:
movies_df.toPandas().to_csv('millions-of-movies/cleaned_movies_dataset.csv',index= False)