In [0]:
client_id = dbutils.secrets.get(scope="movieverse-keyvault-scope", key="clientid")
tenant_id = dbutils.secrets.get(scope="movieverse-keyvault-scope", key="tenantid")
client_secret = dbutils.secrets.get(scope="movieverse-keyvault-scope", key="secretvalue")

# Set configs
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": client_secret,
  "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}

# Mount storage
dbutils.fs.mount(
  source = "abfss://rawdata@movieversedata.dfs.core.windows.net/",
  mount_point = "/mnt/moviedata",
  extra_configs = configs
)


In [0]:
dbutils.fs.mount(
  source = "abfss://bronze@movieversedata.dfs.core.windows.net/",
  mount_point = "/mnt/bronze",
  extra_configs = configs
)

dbutils.fs.mount(
  source = "abfss://silver@movieversedata.dfs.core.windows.net/",
  mount_point = "/mnt/silver",
  extra_configs = configs
)

dbutils.fs.mount(
  source = "abfss://gold@movieversedata.dfs.core.windows.net/",
  mount_point = "/mnt/gold",
  extra_configs = configs
)


In [0]:
display(dbutils.fs.mounts())

In [0]:

# Read the CSV file

df = spark.read.option("header", "true") \
               .option("inferSchema", "true") \
               .option("quote", '"') \
               .option("escape", '"') \
               .option("multiLine", True) \
               .csv("/mnt/moviedata/raw")

display(df)

In [0]:
print(df.count())

print(df.columns)


In [0]:
df.write.format("delta").mode("overwrite").save("/mnt/bronze/raw_delta")


In [0]:
bronze_df = spark.read.format("delta").load("/mnt/bronze/raw_delta")
display(bronze_df)

In [0]:
spark.sql("create schema if not exists bronze");
df.write.format("delta").mode("overwrite").saveAsTable("bronze.raw_data")

In [0]:
bronze_df.printSchema()

## Checking null values

In [0]:
from pyspark.sql.functions import col, sum, when

# Calculate null values per column
null_counts = bronze_df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in bronze_df.columns
])

# Convert the result row to a dict and print each column's null count
nulls_dict = null_counts.collect()[0].asDict()

# Print in a clean format
for col_name, null_count in nulls_dict.items():
    print(f"{col_name}: {null_count}")


## Droping Columns

In [0]:
from pyspark.sql import functions as F

# Step 1: Load data from Bronze layer
df = spark.read.format("delta").load("/mnt/bronze/raw_delta")

# Step 2: Drop unnecessary columns
cols_to_drop = [
    "status", "adult", "backdrop_path", "homepage",
    "imdb_id", "overview", "poster_path", "tagline", "keywords"
]

df_cleaned = df.drop(*cols_to_drop)
df_cleaned = df_cleaned.withColumn("release_date", F.to_date("release_date", "yyyy-MM-dd")) \
                      .withColumn("release_year", F.year("release_date"))





display(df_cleaned)

In [0]:
from pyspark.sql.functions import col, sum, when

# Calculate null values per column
null_counts = df_cleaned.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df_cleaned.columns
])

# Convert the result row to a dict and print each column's null count
nulls_dict = null_counts.collect()[0].asDict()

# Print in a clean format
for col_name, null_count in nulls_dict.items():
    print(f"{col_name}: {null_count}")


In [0]:
columns_to_fill = ["genres", "production_companies", "production_countries", "spoken_languages"]

# Step 5: Fill specified columns with mode
for col_name in columns_to_fill:
    # Calculate the mode
    mode_row = df_cleaned.groupBy(col_name).count().orderBy(F.desc("count")).first()
    
    if mode_row:
        mode_value = mode_row[col_name]
        # Replace nulls with mode
        df_cleaned = df_cleaned.withColumn(
            col_name,
            F.when(F.col(col_name).isNull(), mode_value).otherwise(F.col(col_name))
        )

display(df_cleaned)


In [0]:
df_cleaned.count()

In [0]:
df_cleaned.columns

In [0]:
df_cleaned.printSchema()

In [0]:
df_cleaned.write.format("delta").mode("overwrite").save("/mnt/silver/")
display(df_cleaned)


In [0]:
# Create schema if not exists for silver layer
spark.sql("CREATE SCHEMA IF NOT EXISTS silver")

# Write the cleaned DataFrame to the Silver Delta table
df_cleaned.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.transformed_data")


In [0]:
# Read data from the Silver Delta table
silver_df = spark.read.format("delta").table("silver.transformed_data")

# Display the data (optional)
display(silver_df)



In [0]:
silver_df_cleaned = silver_df.drop("spoken_languages")
display(silver_df_cleaned)

In [0]:
from pyspark.sql.functions import split

# Split string to array
silver_df_cleaned = silver_df_cleaned.withColumn('genres', split('genres', ',\\s*')) \
                                     .withColumn('production_companies', split('production_companies', ',\\s*')) \
                                     .withColumn('production_countries', split('production_countries', ',\\s*'))


In [0]:
display(silver_df_cleaned)

In [0]:
# Step 2: Explode first (no trim here yet)
from pyspark.sql.functions import col,explode,trim

genres_exploded = silver_df_cleaned.select('id', explode('genres').alias('genre_name_raw'))
companies_exploded = silver_df_cleaned.select('id', explode('production_companies').alias('company_name_raw'))
countries_exploded = silver_df_cleaned.select('id', explode('production_countries').alias('country_name_raw'))

# Step 3: Now apply trim separately
genres_exploded = genres_exploded.withColumn('genre_name', trim(col('genre_name_raw'))).drop('genre_name_raw')
companies_exploded = companies_exploded.withColumn('company_name', trim(col('company_name_raw'))).drop('company_name_raw')
countries_exploded = countries_exploded.withColumn('country_name', trim(col('country_name_raw'))).drop('country_name_raw')


In [0]:
display(genres_exploded)
display(companies_exploded)
display(countries_exploded)

In [0]:
from pyspark.sql.functions import monotonically_increasing_id

# Genre Dimension
genres_dim = genres_exploded.select('genre_name').distinct() \
                            .withColumn('genre_id', monotonically_increasing_id())

# Companies Dimension
companies_dim = companies_exploded.select('company_name').distinct() \
                                  .withColumn('company_id', monotonically_increasing_id())

# Countries Dimension
countries_dim = countries_exploded.select('country_name').distinct() \
                                  .withColumn('country_id', monotonically_increasing_id())


In [0]:
display(genres_dim)
display(companies_dim)
display(countries_dim) 

In [0]:
# Movie-Genre Bridge
movie_genres_bridge = genres_exploded.join(genres_dim, on='genre_name', how='inner') \
                                     .select('id', 'genre_id') \
                                     .withColumnRenamed('id', 'movie_id')

# Movie-Company Bridge
movie_companies_bridge = companies_exploded.join(companies_dim, on='company_name', how='inner') \
                                           .select('id', 'company_id') \
                                           .withColumnRenamed('id', 'movie_id')

# Movie-Country Bridge
movie_countries_bridge = countries_exploded.join(countries_dim, on='country_name', how='inner') \
                                           .select('id', 'country_id') \
                                           .withColumnRenamed('id', 'movie_id')


In [0]:
display(movie_genres_bridge)
display(movie_companies_bridge)
display(movie_countries_bridge)

In [0]:
# Drop multi-valued columns from silver to create fact
movies_fact = silver_df_cleaned.drop('genres', 'production_companies', 'production_countries')

# Rename 'id' to 'movie_id'
movies_fact = movies_fact.withColumnRenamed('id', 'movie_id')


In [0]:
# Save Fact Table
movies_fact.write.format('delta').mode('overwrite').save('/mnt/gold/movies_fact')

# Save Dimension Tables
genres_dim.write.format('delta').mode('overwrite').save('/mnt/gold/genres_dim')
companies_dim.write.format('delta').mode('overwrite').save('/mnt/gold/companies_dim')
countries_dim.write.format('delta').mode('overwrite').save('/mnt/gold/countries_dim')

# Save Bridge Tables
movie_genres_bridge.write.format('delta').mode('overwrite').save('/mnt/gold/movie_genres_bridge')
movie_companies_bridge.write.format('delta').mode('overwrite').save('/mnt/gold/movie_companies_bridge')
movie_countries_bridge.write.format('delta').mode('overwrite').save('/mnt/gold/movie_countries_bridge')


In [0]:
# 1. Create the Gold schema
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

# 2. Write your cleaned tables to the Gold schema

# Fact Table
movies_fact.write.format("delta").mode("overwrite").saveAsTable("gold.movies_fact")

# Dimension Tables
genres_dim.write.format("delta").mode("overwrite").saveAsTable("gold.genres_dim")
companies_dim.write.format("delta").mode("overwrite").saveAsTable("gold.companies_dim")
countries_dim.write.format("delta").mode("overwrite").saveAsTable("gold.countries_dim")

# Bridge Tables
movie_genres_bridge.write.format("delta").mode("overwrite").saveAsTable("gold.movie_genres_bridge")
movie_companies_bridge.write.format("delta").mode("overwrite").saveAsTable("gold.movie_companies_bridge")
movie_countries_bridge.write.format("delta").mode("overwrite").saveAsTable("gold.movie_countries_bridge")


In [0]:
display(movie_countries_bridge)
display(movie_genres_bridge)
display(movie_companies_bridge)
display(movies_fact)
display(genres_dim)
display(companies_dim)

In [0]:
# Fact Table
movies_fact_df = spark.read.table("gold.movies_fact")

# Dimension Tables
genres_dim_df = spark.read.table("gold.genres_dim")
companies_dim_df = spark.read.table("gold.companies_dim")
countries_dim_df = spark.read.table("gold.countries_dim")

# Bridge Tables
movie_genres_bridge_df = spark.read.table("gold.movie_genres_bridge")
movie_companies_bridge_df = spark.read.table("gold.movie_companies_bridge")
movie_countries_bridge_df = spark.read.table("gold.movie_countries_bridge")


## Top 10 Movies by Revenue

In [0]:
top_movies = movies_fact_df.select("title", "revenue") \
    .orderBy("revenue", ascending=False) \
    .limit(10)

display(top_movies)


Databricks visualization. Run in Databricks to view.

## Average Revenue by Genre


In [0]:
from pyspark.sql import functions as F

avg_revenue_by_genre = movies_fact_df.alias("m") \
    .join(movie_genres_bridge_df.alias("mg"), "movie_id") \
    .join(genres_dim_df.alias("g"), "genre_id") \
    .groupBy("g.genre_name") \
    .agg(F.avg("m.revenue").alias("average_revenue")) \
    .orderBy(F.col("average_revenue").desc()) \
    .limit(10)

display(avg_revenue_by_genre)


Databricks visualization. Run in Databricks to view.

In [0]:
top_companies = movies_fact_df.alias("m") \
    .join(movie_companies_bridge_df.alias("mc"), "movie_id") \
    .join(companies_dim_df.alias("c"), "company_id") \
    .groupBy("c.company_name") \
    .count() \
    .orderBy("count", ascending=False) \
    .limit(10)

display(top_companies)


In [0]:
#  Budget vs Revenue (Scatter)
budget_vs_revenue = spark.sql("""
  SELECT
    title,
    budget,
    revenue
  FROM gold.movies_fact
  WHERE budget IS NOT NULL AND revenue IS NOT NULL
  ORDER BY revenue DESC
  LIMIT 10
""")
display(budget_vs_revenue)  # → Choose Scatter Chart (X=budget, Y=revenue, Details=title)


Databricks visualization. Run in Databricks to view.

In [0]:
flop_movies = spark.sql("""
  SELECT
    title,
    budget,
    revenue
  FROM gold.movies_fact
  WHERE budget > revenue
  ORDER BY budget DESC
  LIMIT 10
""")
display(flop_movies)  # → Choose Table or Bar Chart (X=title, Y=budget, Y=revenue)


Databricks visualization. Run in Databricks to view.

In [0]:
# Top 5 Countries by Movie Revenue
top_countries_by_revenue = spark.sql("""
  SELECT
    c.country_name,
    SUM(f.revenue) AS total_revenue
  FROM gold.movies_fact f
  JOIN gold.movie_countries_bridge mcb ON f.movie_id = mcb.movie_id
  JOIN gold.countries_dim c               ON mcb.country_id = c.country_id
  GROUP BY c.country_name
  ORDER BY total_revenue DESC
  LIMIT 5
""")
display(top_countries_by_revenue)  # → Choose Bar/Column Chart (X=country_name, Y=total_revenue)


Databricks visualization. Run in Databricks to view.

In [0]:
#Top 5 Movies with Highest Vote Count
top_voted_movies = spark.sql("""
  SELECT
    title,
    vote_count
  FROM gold.movies_fact
  ORDER BY vote_count DESC
  LIMIT 5
""")
display(top_voted_movies)  # → Choose Bar/Column Chart (X=title, Y=vote_count)


Databricks visualization. Run in Databricks to view.