#Notebook para la automatización de la carga de datos a la dimensión de Género

###0.Definir listas de errores

In [0]:
log_error_list = []

###1.Importar librerías

In [0]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
import datetime
import calendar
import numpy
import glob
import os
from os import remove
from datetime import datetime
from delta.tables import *
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
from delta.tables import DeltaTable
from delta.tables import *

###2.Leer el csv de la dimensión de Género

In [0]:
##Se lee la sabana inicial de datos 
df_Genero = pd.read_csv("/dbfs/mnt/processed/Genero/Genero.csv", sep=',')

###3.Convertir a Spark

In [0]:
if len(log_error_list)>0:
    print("Previous Error")
else:
    try:
      #Se convierte el dataframe de pandas a spark
      df_Genero = spark.createDataFrame(df_Genero)
      #display(df_Dim_Tiempo)
      
      #Ver el esquema del dataframe
      #df_Dim_Tiempo.printSchema()

    except Exception as e:
        print("Failed! Due to: {}".format(str(e)))
        log_error_list.append("Error: {}".format(str(e)))

###4. Hacer un Upsert a la tabla de Dim_Tiempo

In [0]:
if len(log_error_list)>0:
    print("Previous Error")
else:
    try:
      presentation_folder_path="/mnt/presentation"
      
      ##Declarar la tabla que se va a sobreescribir y a la que se le va a hacer el Upsert
      deltaTable = DeltaTable.forPath(spark, str(presentation_folder_path)+"/"+"Dim_Genero/")

      ###Instrucción para sobreescribir los datos existentes e insertar registros nuevos 
      deltaTable.alias("tgt").merge(
          df_Genero.alias("upd"),
          "tgt.Id_Genero = upd.Id_Genero") \
        .whenMatchedUpdate(set = { "Id_Genero" : "upd.Id_Genero", "Genero" : "upd.Genero"} ) \
        .whenNotMatchedInsert(values =
          { "Id_Genero" : "upd.Id_Genero",
            "Genero" : "upd.Genero"
            
          }
        ) \
        .execute()

    except Exception as e:
        print("Failed! Due to: {}".format(str(e)))
        log_error_list.append("Error: {}".format(str(e)))

###5.Leer y escribir la última versión de la tabla de Dim_Genero

####5.1. Leer y escribir última versión

In [0]:
if len(log_error_list)>0:
    print("Previous Error")
else:
    try:
      df_Genero=spark.read.format("delta").load(str(presentation_folder_path)+"/"+"Dim_Genero/")
      df_Genero.write.mode("overwrite").format("delta").save(str(presentation_folder_path)+"/"+"Dim_Genero/")
      #display(df_Dim_Tiempo)

    except Exception as e:
        print("Failed! Due to: {}".format(str(e)))
        log_error_list.append("Error: {}".format(str(e)))

####5.2. Eliminar las versiones anteriores y dejar solo la última

In [0]:
if len(log_error_list)>0:
    print("Previous Error")
else:
    try:
      ##Desactivar el tiempo de permanencia de las versiones anteriores
      spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled","false")
      
      #Elimnar versiones anteriores 
      deltaTable = DeltaTable.forPath(spark, str(presentation_folder_path)+"/"+"Dim_Genero/")
      deltaTable.vacuum(0)

      ##Leer e imprimir la última versión
      #Dim_Genero=spark.read.format("delta").load(str(processed_folder_path)+"/"+"/Dim_Genero/")
      #display(Dim_Genero)

    except Exception as e:
        print("Failed! Due to: {}".format(str(e)))
        log_error_list.append("Error: {}".format(str(e)))


#6. Estado final del notebook

In [0]:
if len(log_error_list)>0:
    log_status = "Failure"
    log_error_list = list(dict.fromkeys(log_error_list))
    log_error = str(log_error_list)
    rows_read = 0
    rows_written = 0
    print(log_error)
else:
    log_status = "Success"
    log_error = ''
print(log_status)