In [0]:
from pathlib import Path
import pandas as pd
import numpy as np

In [0]:
%sql
DROP DATABASE IF EXISTS workspace.silver CASCADE; 

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS workspace.silver
COMMENT 'Capa Silver procesados'


In [0]:
spark.sql("DROP TABLE IF EXISTS workspace.silver.tvmaze")

In [0]:

spark.sql("""
CREATE TABLE IF NOT EXISTS workspace.silver.tvmaze (
  episode_id BIGINT,
  url STRING,
  episode_name STRING,
  season BIGINT,
  episode_number DOUBLE,
  episode_type STRING,
  airdate STRING,
  airtime STRING,
  airstamp STRING,
  episode_runtime DOUBLE,
  image DOUBLE,
  episode_summary STRING,
  episode_rating DOUBLE,
  links_self_href STRING,
  links_show_href STRING,
  links_show_name STRING,
  embedded_show_id DOUBLE,
  embedded_show_url STRING,
  embedded_show_name STRING,
  embedded_show_type STRING,
  embedded_show_language STRING,
  embedded_show_genres STRING,
  embedded_show_status STRING,
  embedded_show_runtime DOUBLE,
  embedded_show_averageruntime DOUBLE,
  embedded_show_premiered STRING,
  embedded_show_ended STRING,
  embedded_show_officialsite STRING,
  embedded_show_schedule_time STRING,
  embedded_show_schedule_days STRING,
  embedded_show_rating_average DOUBLE,
  embedded_show_weight DOUBLE,
  embedded_show_webchannel_id DOUBLE,
  embedded_show_webchannel_name STRING,
  embedded_show_webchannel_country_name STRING,
  embedded_show_webchannel_country_code STRING,
  embedded_show_webchannel_country_timezone STRING,
  embedded_show_webchannel_officialsite STRING,
  embedded_show_dvdcountry_name STRING,
  embedded_show_dvdcountry_code STRING,
  embedded_show_dvdcountry_timezone STRING,
  embedded_show_externals_tvrage DOUBLE,
  embedded_show_externals_thetvdb DOUBLE,
  embedded_show_externals_imdb STRING,
  embedded_show_image_medium STRING,
  embedded_show_image_original STRING,
  embedded_show_summary STRING,
  embedded_show_updated DOUBLE,
  embedded_show_links_self_href STRING,
  embedded_show_links_previousepisode_href STRING,
  embedded_show_links_previousepisode_name STRING,
  image_medium STRING,
  image_original STRING,
  embedded_show_links_nextepisode_href STRING,
  embedded_show_links_nextepisode_name STRING,
  embedded_show_network_id DOUBLE,
  embedded_show_network_name STRING,
  embedded_show_network_country_name STRING,
  embedded_show_network_country_code STRING,
  embedded_show_network_country_timezone STRING,
  embedded_show_network_officialsite STRING,
  _links_self_href STRING,
  _links_show_href STRING,
  _links_show_name STRING,
  show_id BIGINT,
  _embedded_show_url STRING,
  show_name STRING,
  show_type STRING,
  show_language STRING,
  show_genres STRING,
  show_status STRING,
  _embedded_show_runtime DOUBLE,
  _embedded_show_averageruntime DOUBLE,
  show_premiered STRING,
  _embedded_show_ended STRING,
  _embedded_show_officialsite STRING,
  show_schedule_time STRING,
  show_schedule_days STRING,
  show_rating_average DOUBLE,
  _embedded_show_weight BIGINT,
  _embedded_show_network DOUBLE,
  show_webchannel_id DOUBLE,
  show_webchannel_name STRING,
  show_webchannel_country_name STRING,
  show_webchannel_country_code STRING,
  _embedded_show_webchannel_country_timezone STRING,
  _embedded_show_webchannel_officialsite STRING,
  _embedded_show_dvdcountry DOUBLE,
  _embedded_show_externals_tvrage DOUBLE,
  _embedded_show_externals_thetvdb DOUBLE,
  _embedded_show_externals_imdb STRING,
  _embedded_show_image_medium STRING,
  _embedded_show_image_original STRING,
  _embedded_show_summary STRING,
  _embedded_show_updated BIGINT,
  _embedded_show__links_self_href STRING,
  _embedded_show__links_previousepisode_href STRING,
  _embedded_show__links_previousepisode_name STRING,
  _embedded_show__links_nextepisode_href STRING,
  _embedded_show__links_nextepisode_name STRING,
  _embedded_show_webchannel_country DOUBLE,
  show_network_id DOUBLE,
  show_network_name STRING,
  show_network_country_name STRING,
  show_network_country_code STRING,
  _embedded_show_network_country_timezone STRING,
  _embedded_show_network_officialsite STRING,
  _embedded_show_webchannel DOUBLE,
  _embedded_show_dvdcountry_name STRING,
  _embedded_show_dvdcountry_code STRING,
  _embedded_show_dvdcountry_timezone STRING,
  _embedded_show_image DOUBLE
)
""")

In [0]:
# Limpia nombres de columnas para estandarizarlos:
# - quita espacios al inicio/fin
# - convierte a minúsculas
# - reemplaza espacios, puntos y guiones por "_"
def clean_column_name(name: str) -> str:
    return name.strip().lower().replace(" ", "_").replace(".", "_").replace("-", "_")

# Convierte valores complejos a formatos comparables:
# - list  -> tuple
# - ndarray -> tuple
# - dict -> tuple ordenada de pares (key, value)
def normalize_cell(value):
    if isinstance(value, list):
        return tuple(value)
    if isinstance(value, np.ndarray):
        return tuple(value.tolist())
    if isinstance(value, dict):
        return tuple(sorted(value.items()))
    return value

# Si el valor es lista/tupla, lo convierte a string separado por comas.
# Si no, lo deja igual.
# Se usa para columnas como géneros o días de programación.
def join_if_sequence(value):
    if isinstance(value, (list, tuple)):
        return ",".join(str(item) for item in value)
    return value

In [0]:
# Leer bronze y pasar a pandas
df_spark = spark.table("workspace.bronze.tvmaze")
df = df_spark.toPandas()

In [0]:
# Limpiar nombres de columnas
df.columns = [clean_column_name(col) for col in df.columns]

In [0]:
# Renombrar columnas clave
rename_map = {
    "id": "episode_id",
    "name": "episode_name",
    "number": "episode_number",
    "type": "episode_type",
    "runtime": "episode_runtime",
    "summary": "episode_summary",
    "rating_average": "episode_rating",
    "_embedded_show_id": "show_id",
    "_embedded_show_name": "show_name",
    "_embedded_show_type": "show_type",
    "_embedded_show_language": "show_language",
    "_embedded_show_status": "show_status",
    "_embedded_show_genres": "show_genres",
    "_embedded_show_premiered": "show_premiered",
    "_embedded_show_rating_average": "show_rating_average",
    "_embedded_show_webchannel_id": "show_webchannel_id",
    "_embedded_show_webchannel_name": "show_webchannel_name",
    "_embedded_show_webchannel_country_name": "show_webchannel_country_name",
    "_embedded_show_webchannel_country_code": "show_webchannel_country_code",
    "_embedded_show_network_id": "show_network_id",
    "_embedded_show_network_name": "show_network_name",
    "_embedded_show_network_country_name": "show_network_country_name",
    "_embedded_show_network_country_code": "show_network_country_code",
    "_embedded_show_schedule_days": "show_schedule_days",
    "_embedded_show_schedule_time": "show_schedule_time",
}

df = df.rename(columns=rename_map, errors="ignore")

In [0]:
# 5) Normalizar tipos complejos para poder deduplicar
df = df.apply(lambda col: col.map(normalize_cell))

In [0]:
# 6) Quitar duplicados
df = df.drop_duplicates()

In [0]:
# 7) Convertir tuplas a texto en columnas específicas

df["show_genres"] = df["show_genres"].map(join_if_sequence)

df["show_schedule_days"] = df["show_schedule_days"].map(join_if_sequence)

In [0]:
# 8) Eliminar columnas duplicadas (si las hubiera)
df = df.loc[:, ~df.columns.duplicated()]

# 9) Regresar a Spark DataFrame
df_spark = spark.createDataFrame(df)


In [0]:
# Usamos overwrite para reemplazar completamente los datos existentes
df_spark.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("workspace.silver.tvmaze")