#Load silver table with different Genres identified by Primary Keys

## Declare Catalog and Schema for future use

Since we are using Medallion Architecture here we will have Bronze, Silver and Gold schemas. In this code block we are declaring the schema and catalog variables which we will call later on to ignore hardcoding of values.<br>

Refer workbook - Movie-Data-Engineering-Using-Databricks/Create medallion Architecture - movie/Create medallion Architecture - movie/create medellion architecture for movies.ipynb to see how they are created in the system.

In [0]:
catalog = "movie"
b_schema = "movie_bronze"
s_schema = "movie_silver"
g_schema = "movie_gold"


## Select all the Genre in the bronze table

In [0]:
df_genre = spark.sql(f"""SELECT Genre FROM {catalog}.{b_schema}.movies order by Genre""")


## Create a Delta Table to Store Genre

- Creates a Delta table named movie_genre_master in the specified catalog and schema.
- Ensures the table has two columns (Genre_id, Genre) and enforces a primary key on Genre_id.

In [0]:
spark.sql(f"""CREATE TABLE IF NOT EXISTS {catalog}.{s_schema}.movie_genre_master (
    Genre_id STRING,
    Genre STRING,
    PRIMARY KEY (Genre_id)) USING DELTA""").display()


## Clean and find Distinct Genres from the Dataframe

In [0]:

distinct_genre = df_genre.select("Genre").dropna().dropDuplicates()

## Select existing Genres in movie_genre_master record

In [0]:
genre_table = spark.table(f"""{catalog}.{s_schema}.movie_genre_master""")

## create a new Dataframe with existing and new genres

Since it is a primary keyed setup table a new Genre might come in for which we might need to asign new primary key id. Hence creating a new_genres dataframe which will be a left-anti join of existing genre and new genre

In [0]:
new_genres = distinct_genre.join(genre_table, distinct_genre.Genre == genre_table.Genre, "left_anti")

## Assign key values to Genre_id for new Genres

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

max_id = genre_table.agg(F.max("Genre_id").cast("int").alias("max_id")).collect()[0]["max_id"]

if max_id is None:
    max_id = 0

window = Window.orderBy(F.monotonically_increasing_id())
new_genres = new_genres.withColumn(
    "Genre_id", (F.row_number().over(window) + max_id).cast("int")
)

new_genres.select("Genre_id", "Genre").createOrReplaceTempView("new_genres")



## Insert Data into movie_genre_master in silver schema

In [0]:
spark.sql(
    """
    INSERT INTO movie.movie_silver.movie_genre_master (Genre_id, Genre)
    SELECT
      COALESCE(
    TRY_CAST(regexp_replace(Genre_id, '[^0-9]', '') AS BIGINT),
    0
  ) AS Genre_id,
      Genre
    FROM new_genres
    WHERE Genre_id IS NOT NULL
    """
)

DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

## Check the Data

In [0]:
%sql
SELECT * FROM movie.movie_silver.movie_genre_master

Genre_id,Genre
1,Horror
2,Thriller
3,Romance
4,Comedy
5,Fantasy
6,Crime
7,Animation
8,Mystery
9,Biography
10,Anime
