In [0]:
# ============================================================
# PROYECTO: Amazon Prime Titles - Arquitectura Medallion
# CAPA: Bronze -> Silver -> Gold
# AUTOR: Yonathan Montenegro
# STACK: Azure Databricks, PySpark, Delta Lake, ADLS Gen2, ADF, CI/CD
# ============================================================


# ============================================================
# CONFIGURACIÓN GENERAL
# ============================================================

# Widget para seleccionar entorno (DEV / PROD)
dbutils.widgets.dropdown("ENV", "dev", ["dev", "prod"])
env = dbutils.widgets.get("ENV")

# Definimos el scope de Key Vault según entorno
SCOPE = "kv-dev-scope" if env == "dev" else "kv-prod-scope"
print("ENV:", env, "| SCOPE:", SCOPE)

In [0]:
#Leer secretos
dbutils.secrets.list("kv-dev-scope")


In [0]:
# ============================================================
# LECTURA DE SECRETOS DESDE AZURE KEY VAULT
# ============================================================

# Leemos secretos de forma segura (no hardcode)
tenant_id     = dbutils.secrets.get(SCOPE, "tenant-id")
client_id     = dbutils.secrets.get(SCOPE, "sp-client-id")
client_secret = dbutils.secrets.get(SCOPE, "sp-client-secret")
storage       = dbutils.secrets.get(SCOPE, "storage-account")
print("Secretos cargados correctamente")

In [0]:
#Conexión ADSL para Azure Data Lake
spark.conf.set(f"fs.azure.account.auth.type.{storage}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage}.dfs.core.windows.net",
               "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

In [0]:
#Validación que lee el archivo
display(dbutils.fs.ls(f"abfss://bronze@{storage}.dfs.core.windows.net/"))

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
#Armar ruta Path
bronze_path = f"abfss://bronze@{storage}.dfs.core.windows.net/"


In [0]:
# Accedemos a la carpeta bronze (DEV o PROD según ENV)
dbutils.fs.ls(bronze_path)


In [0]:
# ============================================================
# LECTURA DE DATOS DESDE BRONZE
# ============================================================
df_bronze = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(f"{bronze_path}/amazon_prime_titles.csv")
)

df_bronze.display()


In [0]:
df_bronze.count()

In [0]:
#Ver los tipos de datos
df_bronze.printSchema()


In [0]:
#Una vista rápida
display(df_bronze.limit(10))

In [0]:
#Número de registros
df_bronze.count()

In [0]:
# Ver la cantidad de NULL por columnas
nulls = df_bronze.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_bronze.columns
])

display(nulls)


In [0]:
# Eliminar los NULL
string_cols = ['rating','country','cast','director','description',
               'date_added','duration','listed_in','title','type','show_id']

df_bronze = df_bronze.na.fill('Unknown', subset=string_cols)
df_bronze = df_bronze.na.fill({'rating': 'Unrated'})
df_bronze = df_bronze.na.fill({'release_year': -1})

display(df_bronze)


In [0]:
df_bronze.groupBy("show_id").count().filter("count > 1")


In [0]:

# Transformar los datos
df_silver = df_bronze.select(
    trim(col("show_id")).alias("show_id"),
    lower(trim(col("type"))).alias("type"),
    trim(col("title")).alias("title"),
    col("release_year").cast("int"),
    trim(col("rating")).alias("rating"),
    trim(col("country")).alias("country"),
    trim(col("listed_in")).alias("category"),
    trim(col("description")).alias("description"),
    trim(col("date_added")).alias("date_added"),
    trim(col("duration")).alias("duration"),
    trim(col("director")).alias("director"),
    trim(col("cast")).alias("cast")
)
display(df_silver.limit(10))


In [0]:

df_silver = df_silver.withColumn(
    "date_added",
    to_date(col("date_added"), "MMMM d, yyyy")
)


In [0]:

df_silver = df_silver.withColumn(
    "duration_value",
    split(col("duration"), " ").getItem(0).cast("int")
).withColumn(
    "duration_unit",
    split(col("duration"), " ").getItem(1)
)

In [0]:
df_silver.printSchema()
df_silver.display()


In [0]:
#Eliminar ID Show duplicados
df_silver = df_silver.dropDuplicates(["show_id"])


In [0]:
orden = [
    "show_id",
    "type",
    "title",
    "release_year",
    "rating",
    "country",
    "category",
    "description",
    "date_added",
    "duration",
    "duration_value",
    "duration_unit",
    "director",
    "cast"
]
df_silver = df_silver.select(*orden)
display(df_silver.limit(10))


In [0]:

silver_path = f"abfss://silver@{storage}.dfs.core.windows.net/amazon_prime_titles_silver"
(
    df_silver
    .write
    .format("delta")
    .mode("overwrite")   # solo la primera vez
    .save(silver_path)
) #crea la tabla física Delta en ADLS

print("OK guardado en:", silver_path)

In [0]:
from delta.tables import DeltaTable

silver_delta = DeltaTable.forPath(spark, silver_path)

In [0]:
(
    silver_delta.alias("t")
    .merge(
        df_silver.alias("s"),
        "t.show_id = s.show_id"
    )
    .whenMatchedUpdateAll()      # SCD Tipo 1
    .whenNotMatchedInsertAll()
    .execute()
)

In [0]:
#Verificar Sivler
spark.read.format("delta").load(silver_path).display()

In [0]:
#Verificar el contenedor gold
dbutils.fs.ls(f"abfss://gold@{storage}.dfs.core.windows.net/")

In [0]:
dbutils.fs.ls(f"abfss://gold@{storage}.dfs.core.windows.net/")

In [0]:
# Guardamos en Gold Todo los datos
gold_root = f"abfss://gold@{storage}.dfs.core.windows.net"
gold_path = f"{gold_root}/amazon_prime_titles"   

# Si en Gold solo vas a "pasar todo" desde Silver:
df_gold = df_silver

# --- Guardar Gold como Delta ---
(
  df_gold
  .write.format("delta")
  .mode("overwrite")   # o "append" si tu gold es incremental
  .save(gold_path)
)

print("✅ Gold escrito en:", gold_path)

# --- Verificar ---
#gold_root = f"abfss://gold@{storage}.dfs.core.windows.net"spark.read.format("delta").load(gold_path).display()

In [0]:
#Grabamos en gold el total de titulos por tipos
path_type = f"{gold_root}/titles_by_type"

df_gold_type = (
    df_silver
    .groupBy("type")
    .agg(count("*").alias("total_titles"))
)

(df_gold_type.write
 .format("delta")
 .mode("overwrite")
 .save(path_type)
)

display(df_gold_type)

In [0]:
#Grabamos en gold el total de titulos
path_year = f"{gold_root}/total_titles"
df_gold_year = (
    df_silver
    .groupBy("release_year")
    .agg(count("*").alias("total_titles"))
    .orderBy(col("release_year").desc())
)

(df_gold_year.write
 .format("delta")
 .mode("overwrite")
 .save(path_year)
)

display(df_gold_year)

In [0]:
#Guardamos en gold el total de titulos por pais
path_country = f"{gold_root}/total_country"

df_gold_country = (
    df_silver
    .withColumn("country", explode(split(col("country"), ", ")))
    .groupBy("country")
    .agg(count("*").alias("total_titles"))
    .orderBy(col("total_titles").desc())
)
(df_gold_country.write
 .format("delta")
 .mode("overwrite")
 .save(path_country)
)

display(df_gold_country)