In [1]:
import json
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql.functions import avg, sum as _sum

import logging
logging.basicConfig(level=logging.INFO)


with open('director_data_new111.json', 'r') as f:
    director_dict = json.load(f)

try:
    logging.info("Starting Spark session...")
    spark = SparkSession.builder \
        .appName("OMDB Data Analysis") \
        .getOrCreate()
    logging.info("Spark session started successfully.")
except Exception as e:
    logging.error(f"Error starting Spark session: {e}")
    raise e

schema = StructType([
    StructField("Title", StringType(), True),
    StructField("Director", StringType(), True),
    StructField("IMDb_Rating", FloatType(), True),
    StructField("RT_Rating", FloatType(), True),
    StructField("BoxOffice", FloatType(), True)
])

parquet_data = []
for director, movies in director_dict.items():
    for title, info in movies.items():
        imdb_rating = float(info['imdb_rating']) if 'imdb_rating' in info else None
        rt_rating = float(info['rt_rating']) / 10 if 'rt_rating' in info else None 
        box_office = float(info['box_office']) if 'box_office' in info else None
        parquet_data.append((title, director, imdb_rating, rt_rating, box_office))


try:
    logging.info("Creating Spark DataFrame...")
    parquet_df = spark.createDataFrame(parquet_data, schema=schema)
    logging.info("Spark DataFrame created successfully.")
except Exception as e:
    logging.error(f"Error creating Spark DataFrame: {e}")
    raise e


parquet_df.printSchema()
parquet_df.show(truncate=False)


aggregated_df = parquet_df.groupBy('Director').agg(
    _sum('BoxOffice').alias('Total_BoxOffice'),
    avg('IMDb_Rating').alias('Avg_IMDb_Rating'),
    avg('RT_Rating').alias('Avg_RT_Rating')
)

aggregated_df.show(truncate=False)

print("Data processing and aggregation completed.")


INFO:root:Starting Spark session...
INFO:root:Spark session started successfully.
INFO:root:Creating Spark DataFrame...
INFO:root:Spark DataFrame created successfully.


root
 |-- Title: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- IMDb_Rating: float (nullable = true)
 |-- RT_Rating: float (nullable = true)
 |-- BoxOffice: float (nullable = true)

+-----------------------------+---------------+-----------+---------+------------+
|Title                        |Director       |IMDb_Rating|RT_Rating|BoxOffice   |
+-----------------------------+---------------+-----------+---------+------------+
|Raging Bull                  |Martin Scorsese|8.1        |0.92     |2.3383988E7 |
|The King of Comedy           |Martin Scorsese|7.8        |0.89     |2536242.0   |
|After Hours                  |Martin Scorsese|7.6        |0.9      |1.0609321E7 |
|The Color of Money           |Martin Scorsese|7.0        |0.88     |5.2293984E7 |
|The Last Temptation of Christ|Martin Scorsese|7.5        |0.82     |8373585.0   |
|Goodfellas                   |Martin Scorsese|8.7        |0.95     |4.690972E7  |
|Cape Fear                    |Martin Scorsese|7

In [6]:
aggregated_df.write.mode('overwrite').csv('aggregated_directors.csv', header=True)

print("Aggregated data exported to CSV.")



Aggregated data exported to CSV.
