In [0]:
#Fetch all data from imdb table

df = spark.read.table('imdb_movies.default.imdb_top_1000_dataset')
display(df)

In [0]:
#Fetch only the name and release year for all movies.
df2=df.select('Series_Title','Released_Year')
display(df2)

In [0]:
#Fetch the name, release year and imdb rating of movies which are UA certified.
df3 = df.filter(df['Certificate'] == 'UA').select('Series_Title','Released_Year','IMDB_Rating','Certificate')
display(df3)

In [0]:
#Fetch the name and genre of movies which are UA certified and have a Imdb rating of over 8.
df4 = df.filter((df['Certificate'] == 'UA') | (df['IMDB_Rating'] > 8)).select('Series_Title','Genre','Certificate','IMDB_Rating')
display(df4)

In [0]:
#Find out how many movies are of Drama genre.
from pyspark.sql.functions import explode, split
df5 = df.select('Series_Title', explode(split(df['Genre'], ',')))
df6 = df5.filter(df5['col'] == 'Drama').count()

display(df5)

display(df6)

In [0]:
#How many movies are directed by "Quentin Tarantino", "Steven Spielberg","Christopher Nolan" and "Rajkumar Hirani".

df7 = df.groupBy('Director').count().filter((df['Director'] == 'Quentin Tarantino') | (df['Director'] == 'Steven Spielberg') | (df['Director'] == 'Christopher Nolan') | (df['Director'] == 'Rajkumar Hirani'))
display(df7)




In [0]:
#What is the highest imdb rating given so far?
df8 = df.select('IMDB_Rating').orderBy(df['IMDB_Rating'].desc()).limit(1)
display(df8)

In [0]:
#What is the highest and lowest imdb rating given so far?

df9 = df.select('IMDB_Rating').orderBy(df['IMDB_Rating'].desc()).limit(1)
df10 = df.select('IMDB_Rating').orderBy(df['IMDB_Rating'].asc()).limit(1)
display(df9)
display(df10)

In [0]:
#Solve the above problem but display the results in different rows. And have a column which indicates the value as lowest and highest.
from pyspark.sql.functions import lit
df11 = df.select('IMDB_Rating').orderBy(df['IMDB_Rating'].desc()).limit(1)
df12 = df.select('IMDB_Rating').orderBy(df['IMDB_Rating'].asc()).limit(1)
df13 = df11.withColumn('Rating',lit('Highest'))
df14 = df12.withColumn('Rating', lit('Lowest'))
df15 = df13.union(df14)
display(df15)


In [0]:
#Find out the total business done by movies staring "Aamir Khan".
from pyspark.sql.functions import sum, regexp_replace, col

df16 = df.filter(df['Star1'] == 'Aamir Khan') \
         .withColumn('Gross', regexp_replace(col('Gross'), ',', '').cast('double')) \
         .select('Gross') \
         .agg(sum('Gross').alias('Total_Gross'))

display(df16)

In [0]:
#Find out the average imdb rating of movies which are neither directed by "Quentin Tarantino", "Steven Spielberg", "Christopher Nolan" and are not acted by any of these stars "Christian Bale", "Liam Neeson", "Heath Ledger", "Leonardo DiCaprio", "Anne Hathaway".

from pyspark.sql.functions import avg

df17 = df.filter(~((df['Director'] == 'Quentin Tarantino') | (df['Director'] == 'Steven Spielberg') | (df['Director'] == 'Christopher Nolan'))) \
         .filter(~((df['Star1'] == 'Christian Bale') | (df['Star1'] == 'Liam Neeson') | (df['Star1'] == 'Heath Ledger') | (df['Star1'] == 'Leonardo DiCaprio') | (df['Star1'] == 'Anne Hathaway'))) \
         .filter(~((df['Star2'] == 'Christian Bale') | (df['Star2'] == 'Liam Neeson') | (df['Star2'] == 'Heath Ledger') | (df['Star2'] == 'Leonardo DiCaprio') | (df['Star2'] == 'Anne Hathaway'))) \
         .filter(~((df['Star3'] == 'Christian Bale') | (df['Star3'] == 'Liam Neeson') | (df['Star3'] == 'Heath Ledger') | (df['Star3'] == 'Leonardo DiCaprio') | (df['Star3'] == 'Anne Hathaway'))) \
         .filter(~((df['Star4'] == 'Christian Bale') | (df['Star4'] == 'Liam Neeson') | (df['Star4'] == 'Heath Ledger') | (df['Star4'] == 'Leonardo DiCaprio') | (df['Star4'] == 'Anne Hathaway'))) \
         .select('IMDB_Rating') \
         .agg(avg('IMDB_Rating').alias('Average_IMDB_Rating'))

display(df17)

In [0]:
#Mention the movies involving both "Steven Spielberg" and "Tom Cruise".

from pyspark.sql.functions import array_contains

df18 = df.filter((df['Director'] == 'Steven Spielberg')) \
         .filter((df['Star1'] == 'Tom Cruise') |(df['Star2'] == 'Tom Cruise') | (df['Star3'] == 'Tom Cruise') | (df['Star4'] == 'Tom Cruise')) \
         .select('Series_Title')

display(df18)

In [0]:
#Display the movie name and watch time (in both mins and hours) which have over 9 imdb rating.
from pyspark.sql.functions import col, round, regexp_replace

df19 = df.filter(df['IMDB_Rating'] > 9) \
         .withColumn('runtime', regexp_replace(col('runtime'), ' min', '').cast('int')) \
         .withColumn('Watch_Time_Min', col('runtime') * 10) \
         .withColumn('Watch_Time_Hours', round(col('runtime') * 10 / 60, 2)) \
         .select('Series_Title', 'Watch_Time_Min', 'Watch_Time_Hours')
display(df19)


In [0]:
#What is the average imdb rating of movies which are released in the last 10 years and have less than 2 hrs of runtime.
from pyspark.sql.functions import avg, current_date, year, col, round, regexp_replace ,try_cast

current_year = year(current_date())

df20 = df.withColumn('current_year', current_year) \
         .filter((col('Released_Year') >= (current_year - 10))) \
         .withColumn('runtime', try_cast(regexp_replace(col('runtime'), ' min', ''), 'double')) \
         .withColumn('Watch_Time_Hours', round(col('runtime') / 60, 2)) \
         .filter(col('Watch_Time_Hours') < 2) \
         .select('IMDB_Rating') \
         .agg(avg('IMDB_Rating').alias('Average_IMDB_Rating'))

display(df20)

In [0]:
#Identify the Batman movie which is not directed by "Christopher Nolan".

df21 = df. filter(df['Series_Title'].contains('Batman')) \
         .filter(~(df['Director'] == 'Christopher Nolan')) \
         .select('Series_Title')

display(df21)

In [0]:
#Display all the A and UA certified movies which are either directed by "Steven Spielberg", "Christopher Nolan" or which are directed by other directors but have a rating of over 8.
from pyspark.sql.functions import col

df22 = df.filter((df['Certificate'] == 'A') | (df['Certificate'] == 'UA')) \
         .filter((df['Director'] == 'Steven Spielberg') | (df['Director'] == 'Christopher Nolan') | (df['IMDB_Rating'] > 8)) \
         .select('Series_Title', 'Director', 'IMDB_Rating', 'Certificate').show()

In [0]:
#What are the different certificates given to movies?
df23 = df.select('Certificate').distinct()

display(df23)

In [0]:
#Display all the movies acted by Tom Cruise in the order of their release.Consider only movies which have a meta score.
from pyspark.sql.functions import col

df24 = df.filter((df['Star1'] == 'Tom Cruise') | (df['Star2'] == 'Tom Cruise') | (df['Star3'] == 'Tom Cruise') | (df['Star4'] == 'Tom Cruise')) \
         .filter(df['Meta_score'].isNotNull()) \
         .orderBy(col('Released_Year').asc()) \
         .select('Series_Title', 'Released_Year','Meta_score')

display(df24)

In [0]:
#Segregate all the Drama and Comedy movies released in the last 10 years as per their runtime. Movies shorter than 1 hour should be termed as short film. Movies longer than 2 hrs should be termed as longer movies. All others can be termed as Good watch time.

from pyspark.sql.functions import col, when, regexp_replace
current_year = 2025
df27 = df.withColumn('runtime', (regexp_replace(col('runtime'), ' min', '')))
df28 = df27.filter((df['Genre'].contains('Drama') | df27['Genre'].contains('Comedy')) & (df27['Released_Year'] >= (current_year - 10)))
df29 = df28.withColumn('Runtime_Category', 
                                     when(col('Runtime') < 60, 'Short film')
                                     .when(col('Runtime') > 120, 'Longer movies')
                                     .otherwise('Good watch time')).select('Series_Title', 'Runtime', 'Runtime_Category')

display(df29)

In [0]:
#Write a query to display the "Christian Bale" movies which released in odd year and even year. Sort the data as per Odd year at the top.
from pyspark.sql.functions import col, lit, when

df30 = df.filter(df['Star1'] == 'Christian Bale') \
         .withColumn('Released_Year', col('Released_Year').cast('int')) \
         .withColumn('Year_Type', when(col('Released_Year') % 2 == 0, lit('Even')).otherwise(lit('Odd'))) \
         .orderBy(col('Year_Type').asc()) \
         .select('Series_Title', 'Released_Year', 'Year_Type')

display(df30)

In [0]:
df.select('Released_Year').show()