In [10]:
import sys
import os
# Add 'src' directory to Python path
src_path = os.path.abspath(os.path.join(os.getcwd(), '..'))
if src_path not in sys.path:
    sys.path.append(src_path)

In [11]:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from src import schema as sc   # your custom schema function
from src import datacleaning as dcp
from src import kpi_analysis as kpi
from src import franchise_analysis as franch
from pyspark.sql.types import *
from pyspark.sql.functions import * 



### Fetching the data

In [12]:

# Initialize Spark
spark = SparkSession.builder.appName("TMDB Movies").getOrCreate()

# Load JSON data using the schema
json_path = "/Users/gyauk/github/labs/Pyspark_IMBD_movie_analysis/data/raw/movies.json"
df = spark.read.schema(sc.schema_build()).json(json_path)


In [13]:
df.show()

+------+--------------------+--------------------+------------+-----------------+---------+----------+----------+------------+----------+-------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|    id|               title|             tagline|release_date|original_language|   budget|   revenue|vote_count|vote_average|popularity|runtime|            overview|         poster_path|belongs_to_collection|              genres|production_companies|production_countries|    spoken_languages|             credits|
+------+--------------------+--------------------+------------+-----------------+---------+----------+----------+------------+----------+-------+--------------------+--------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|299534|   Avengers: Endgame|  Avenge the fallen.|  201

## Data Preparation & Cleaning

### Drop Irrelevant Columns

In [14]:
# Drop columns you don't need
cols_to_drop = ['adult', 'imdb_id', 'original_title', 'video', 'homepage']
df = df.drop(*cols_to_drop)

df.columns

['id',
 'title',
 'tagline',
 'release_date',
 'original_language',
 'budget',
 'revenue',
 'vote_count',
 'vote_average',
 'popularity',
 'runtime',
 'overview',
 'poster_path',
 'belongs_to_collection',
 'genres',
 'production_companies',
 'production_countries',
 'spoken_languages',
 'credits']

In [15]:
# Check how nested data looks
df.select("genres", "production_companies", "spoken_languages", "credits.cast", "credits.crew").show(2, truncate=False)

+----------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------+-------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Evaluate JSON Like column and extract and clean data points

In [16]:
df= dcp.extract_and_clean_json_columns(df)

### Handling Missing & Incorrect Data

In [17]:
# dcp.value_counts(df, "genre_names").show(truncate=False)
# dcp.value_counts(df, "spoken_languages").show(truncate=False)
# dcp.value_counts(df, "collection_name").show(truncate=False)

- Convert column datatypes

In [18]:
df=dcp.convert_column_types(df)


root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- original_language: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- revenue: long (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- popularity: double (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- overview: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- belongs_to_collection: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: integer (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- production_companies: string (nullable = false)
 |-- production_countries: string (nullable = false)
 |-- spoken_languages: string (nullable = false)
 |-- credits

- Replace unrealistic values:

In [19]:
df=dcp.replace_unrealistic_data(df)

- Remove duplicates

In [20]:
df=dcp.clean_duplicates_and_missing_data(df)

- Non-Nan values 

In [21]:
df=dcp.filter_non_null(df)

- Filter to include only 'Released' movies

In [22]:
# df=dcp.released_movies(df)

- extracting 'cast', 'cast_size', 'director', 'crew_size' from credits column 

In [23]:
df=dcp.extract_credits_info(df)

### Reorder & Finalize DataFrame

In [24]:
new_order = ['id', 'title', 'tagline', 'release_date', 'genres', 'belongs_to_collection',
'original_language', 'budget_musd', 'revenue_musd', 'production_companies',
'production_countries', 'vote_count', 'vote_average', 'popularity', 'runtime',
'overview', 'spoken_languages', 'poster_path', 'cast', 'cast_size', 'director', 'crew_size']

df_reordered = df.select(*[col for col in new_order if col in df.columns])

In [25]:
df_reordered.columns

['id',
 'title',
 'tagline',
 'release_date',
 'genres',
 'belongs_to_collection',
 'original_language',
 'budget_musd',
 'revenue_musd',
 'production_companies',
 'production_countries',
 'vote_count',
 'vote_average',
 'popularity',
 'runtime',
 'overview',
 'spoken_languages',
 'poster_path',
 'cast',
 'cast_size',
 'director',
 'crew_size']

In [26]:
# df_reordered.write.mode("overwrite").json("/Users/gyauk/github/labs/Pyspark_IMBD_movie_analysis/data/processed")

In [27]:
from pyspark.sql.functions import monotonically_increasing_id

df_reordered = df_reordered.withColumn("index_column",monotonically_increasing_id())

In [None]:
df_reordered.show(5)

                                                                                

### KPI Implementation & Analysis

- highest revenue

In [None]:
kpi.highest_revenue_movie(df_reordered,'title','revenue_musd')
   

- Highest Budget

In [None]:
# kpi.highest_budget_movie(df_reordered,'title','revenue_musd')

- Highest Profit (Revenue - Budget)



In [None]:
kpi.highest_profit_movie(df_reordered,'title','revenue_musd','budget_musd')

- Lowest Profit (Revenue - Budget)


In [None]:
kpi.lowest_profit_movie(df_reordered,'title','revenue_musd','budget_musd')

- Highest ROI (Revenue / Budget) (only movies with Budget ≥ 10M) o Lowest ROI (only movies with Budget ≥ 10M)


In [None]:
kpi.highest_roi(df_reordered,'title','revenue_musd','budget_musd')

In [None]:
kpi.lowest_roi(df_reordered,'title','revenue_musd','budget_musd')


- Most Voted Movie

In [None]:
kpi.most_voted(df_reordered,'title','vote_count')

- Highest Rated Movie

In [None]:
kpi.highest_rated(df_reordered,'title','vote_count','vote_average')


- Lowest Rated Movie

In [None]:
kpi.lowest_rated(df_reordered,'title','vote_count','vote_average')    

- Most Popular

In [None]:
kpi.most_popular(df_reordered,'title','popularity')    


### Advanced Movie Filtering & Search Queries

- Search 1: Find the best-rated Science Fiction Action movies starring Bruce Willis (sorted by Rating - highest to lowest)

In [None]:
from pyspark.sql.functions import col, explode, lower

# Step 1: Explode genres
genres_exploded = df_reordered.withColumn("genre", explode(col("genres")))

# Step 2: Filter for Science Fiction or Action
filtered_genre_df = genres_exploded.filter(
    (col("genre.name") == "Science Fiction") | (col("genre.name") == "Action")
)

# Step 3: Explode cast
cast_exploded = filtered_genre_df.withColumn("actor", explode(col("cast")))

# Step 4: Filter for Bruce Willis (or Robert if you were testing)
filtered_actor_df = cast_exploded.filter(lower(col("actor.name")) == "Chris evans")

# Step 5: Drop duplicates if needed
unique_movies_df = filtered_actor_df.dropDuplicates(["id"])

# Step 6: Sort by vote_average descending
sorted_movies = unique_movies_df.orderBy(col("vote_average").desc())

# Step 7: Select desired columns
best_rated_movies = sorted_movies.select("title", "vote_average")

# Show results
best_rated_movies.show(truncate=False)


- Search 2: Find movies starring Uma Thurman, directed by Quentin Tarantino (sorted by runtime - shortest to longest).

### Franchise vs. Standalone Movie Performance

- Creating and populating a "is_franchise" column

In [None]:
df_with_franchise_flag = franch.add_is_franchise_column(df)
df_with_franchise_flag.select("title", "is_franchise").show(5)

- mean revenue

In [None]:
mean_revenue_df = franch.mean_revenue_by_franchise(df_with_franchise_flag)
mean_revenue_df.show()

Median ROI

In [None]:
median_roi=franch.median_roi_by_franchise(df_with_franchise_flag)
median_roi.show()

Mean Popularity

In [None]:
mean_popularity=franch.mean_popularity_by_franchise(df_with_franchise_flag)
mean_popularity.show()

mean budget raised

In [None]:
mean_budget=franch.mean_rating_by_franchise(df_with_franchise_flag)
mean_budget.show()

### Most Successful Franchises & Directors

In [None]:
franchise_summary=franch.generate_franchise_summary(df_with_franchise_flag)
franchise_summary.show()

In [None]:
franch.sort_mean_budget(franchise_summary,'collection_name','mean_budget')

In [None]:
franch.sort_total_budget(franchise_summary,'collection_name','total_budget')

In [None]:
franch.sort_total_revenue(franchise_summary,'collection_name','total_revenue')

In [None]:
franch.sort_mean_revenue(franchise_summary,'collection_name','mean_revenue')


In [None]:
franch.sort_mean_rating(franchise_summary,'collection_name','mean_rating')

In [None]:
franch.sort_most_successful_movieinfranchise(franchise_summary,'collection_name','movie_count')


In [None]:
franch.generate_director_df(df)

In [None]:
# franch.generate_director_df(reordered_df)
director_df= franch.generate_director_df(df)
director_df.show()

In [None]:
franch.most_movies_directed(director_df,'director','movie_count')

In [None]:
franch.most_successful_director_by_revenue(director_df,'director','total_revenue')


In [None]:
franch.successful_director_meanrating(director_df,'director','mean_rating')


### Data Visualization

- Revenue vs. Budget Trends

In [None]:
visn.revenue_vs_budget(reordered_df)

- ROI Distribution by Genre

In [None]:
visn.roi_distribution_by_genre(df)

- Popularity vs. Rating

In [None]:
visn.popularity_vs_rating(df)

- Yearly Trends in Box Office Performance

In [None]:
visn.yearly_box_office_performance(df)

- Comparison of Franchise vs. Standalone Success

In [None]:
visn.franchise_vs_standalone_success(df)