In [0]:
df_automobile = spark.read.format("delta").table("bronze.automobile")
df_motor      = spark.read.format("delta").table("bronze.motor")
df_dimensoes  = spark.read.format("delta").table("bronze.dimensoes")
df_origin     = spark.read.format("delta").table("bronze.origem")
df_ano_modelo = spark.read.format("delta").table("bronze.ano_modelo")


In [0]:
from pyspark.sql.functions import current_timestamp, lit

df_automobile = df_automobile.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_tabela", lit("automobile"))
df_motor      = df_motor.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_tabela", lit("motor"))
df_dimensoes  = df_dimensoes.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_tabela", lit("dimensoes"))
df_origin     = df_origin.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_tabela", lit("origem"))
df_ano_modelo = df_ano_modelo.withColumn("data_hora_silver", current_timestamp()).withColumn("nome_tabela", lit("ano_modelo"))


In [0]:
from pyspark.sql import functions as F

# ---------- Helpers ----------
def _apply_name_rules(colname: str) -> str:
    n = colname.upper()
    n = n.replace("CD_", "CODIGO_")
    n = n.replace("VL_", "VALOR_")
    n = n.replace("DT_", "DATA_")
    n = n.replace("NM_", "NOME_")
    n = n.replace("DS_", "DESCRICAO_")
    n = n.replace("NR_", "NUMERO_")
    n = n.replace("_UF", "_UNIDADE_FEDERATIVA")
    return n

def _safe_drop(df, cols):
    existing = set(df.columns)
    to_drop = [c for c in cols if c in existing]
    return df.drop(*to_drop) if to_drop else df

# ---------- Core ----------
def renomear_colunas_managed(src_fqn: str, dest_fqn: str = None):
    dest_fqn = dest_fqn or src_fqn
    df = spark.read.format("delta").table(src_fqn)
    new_cols = [_apply_name_rules(c) for c in df.columns]
    df = df.toDF(*new_cols)
    df = _safe_drop(df, ["DATA_HORA_BRONZE", "NOME_ARQUIVO"])
    df = (df
          .withColumn("NOME_ARQUIVO_BRONZE", F.lit(src_fqn))
          .withColumn("DATA_ARQUIVO_SILVER", F.current_timestamp())
         )
    (df.write
       .format("delta")
       .mode("overwrite")
       .option("mergeSchema", "true")
       .saveAsTable(dest_fqn))
    return dest_fqn


In [0]:
renomear_colunas_managed("bronze.automobile", "silver.automobile")
renomear_colunas_managed("bronze.motor", "silver.motor")
renomear_colunas_managed("bronze.dimensoes", "silver.dimensoes")
renomear_colunas_managed("bronze.origem", "silver.origem")
renomear_colunas_managed("bronze.ano_modelo", "silver.ano_modelo")
