# Bienvenido a este Notebook
## Vamos a explorar datos con PySpark (python) para crear un DataMart de ejemplo 

In [None]:
import pandas as pd 

folder_path = "Files"

# Leer todos los Parquets en una carpeta
df = spark.read.format("parquet").option("header", "true").load(folder_path)

# Mostrar los datos cargados
display(df)


## Ahora guardaremos los datos en las tablas del Lakehouse

In [None]:
# Especificar el nombre de la tabla en el Lakehouse
table_name = "LH_Taller.Landing"

# Escribir los datos en el esquema de tablas del Lakehouse con opción mergeSchema
df.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(table_name)

print(f"Los datos se han guardado en la tabla '{table_name}' en el Lakehouse.")


In [None]:
df = spark.sql("SELECT * FROM LH_Taller.landing LIMIT 100")
display(df)

# intentemos ahora con datos de la Web

In [None]:
import requests

# URL de la página
url = "https://si3.bcentral.cl/Indicadoressiete/secure/Serie.aspx?gcode=PRE_EUR&param=cgBnAE8AOQBlAGcAIwBiAFUALQBsAEcAYgBOAEkASQBCAEcAegBFAFkAeABkADgASAA2AG8AdgB2AFMAUgBYADIAQwBzAEEARQBMAG8ASgBWADQATABrAGQAZAB1ADIAeQBBAFAAZwBhADIAbABWAHcAXwBXAGgATAAkAFIAVAB1AEIAbAB3AFoAdQBRAFgAZwA5AHgAdgAwACQATwBZADcAMwAuAGIARwBFAFIASwAuAHQA"

# Hacer la solicitud HTTP
response = requests.get(url)

# Verificar el estado de la respuesta
if response.status_code == 200:
    print("Datos obtenidos exitosamente")
    # Guardar el contenido de la respuesta
    with open("datos.html", "w", encoding="utf-8") as file:
        file.write(response.text)
else:
    print(f"Error al acceder a la URL: {response.status_code}")


In [None]:
import pandas as pd
from pyspark.sql import SparkSession

# Crear sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Leer el archivo HTML guardado
dataframes = pd.read_html("datos.html")  

# Mostrar las tablas encontradas
for i, df in enumerate(dataframes):
    print(f"Tabla {i}:\n", df)

In [None]:
# Convertir la Tabla1 (índice 1) a formato Spark DataFrame
table1_pandas = dataframes[1]
table1_spark = spark.createDataFrame(table1_pandas)

# Definir la carpeta en el Lakehouse
parquet_folder = "Files/eurbcentral"

# Guardar la Tabla1 como archivo Parquet en el Lakehouse
table1_spark.write.mode("overwrite").parquet(parquet_folder)

print(f"Tabla1 guardada exitosamente en formato Parquet en '{parquet_folder}'.")



## Ahora vamos a trabajar esa tabla importada desde la web, la desdinamizaremos y la escribiremos en las tablas del lakehouse

In [None]:
ruta_eurclp = "Files/eurbcentral"

df_eur = spark.read.format("parquet").option("header","true").load(ruta_eurclp)
display(df_eur)

print("Columnas originales:", df_eur.columns)

In [None]:
df_eur = df_eur.toDF(*[col_name.replace("Ã", "i").replace("\xada", "a").replace(" ", "_") for col_name in df_eur.columns])

# Verifica las columnas corregidas
print("Columnas corregidas:", df_eur.columns)


# Ajusta los nombres de las columnas para que no contengan caracteres especiales
columns = [col_name.strip().replace("Dia", "day").replace(" ","_") for col_name in df_eur.columns]

df_eur = df_eur.toDF(*columns)
print("nuevos nombres:", df_eur.columns)

#display(df_eur)

In [None]:
# Realiza la transformación pivot inversa (melt)

from pyspark.sql.functions import col

# Convertir todas las columnas de los meses a STRING
columnas_meses = ['Enero', 'Febrero', 'Marzo', 'Abril', 'Mayo', 'Junio', 'Julio', 'Agosto', 
                  'Septiembre', 'Octubre', 'Noviembre', 'Diciembre']

for columna in columnas_meses:
    df_eur = df_eur.withColumn(columna, col(columna).cast("string"))


melted_df = df_eur.selectExpr(
    "day as dia",
    "stack(12, 'Enero', Enero, 'Febrero', Febrero, 'Marzo', Marzo, 'Abril', Abril, \
                 'Mayo', Mayo, 'Junio', Junio, 'Julio', Julio, 'Agosto', Agosto, \
                 'Septiembre', Septiembre, 'Octubre', Octubre, 'Noviembre', Noviembre, \
                 'Diciembre', Diciembre) as (mes, valor)"
)
# Limpia los datos reemplazando los valores nulos por 0
melted_df = melted_df.fillna({'valor': 0})

# Muestra el resultado final
display(melted_df)

In [None]:
melted_df.select("mes").distinct().show()

In [None]:
# vamos a limpiar estos datos y dejarlos listos para escribirlos en tabla
from pyspark.sql.functions import col, concat_ws, to_date, lit, when , regexp_replace, create_map, lpad

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Creamos un diccionario para mapear meses
month_mapping = {
    "Enero": "01", "Febrero": "02", "Marzo": "03", "Abril": "04", "Mayo": "05",
    "Junio": "06", "Julio": "07", "Agosto": "08", "Septiembre": "09",
    "Octubre": "10", "Noviembre": "11", "Diciembre": "12"
}
#crear la expresion para aplicar el diccionario
mapping_expr = create_map([lit(x) for x in sum(month_mapping.items(), ())])

# Transformar el DataFrame
melted_df = melted_df.withColumn("mes", mapping_expr[col("mes")]) \
                           .withColumn("dia", lpad(col("dia"), 2, "0"))
display(melted_df)

# Crear la columna de fecha
df_final = melted_df.withColumn(
    "fecha",
    to_date(concat_ws("-", col("dia"), col("mes"), lit("2024")), "dd-MM-yyyy")
)

# Mostrar el DataFrame transformado

display(df_final)



# Crear una columna de fecha combinando día, mes y año
#melted_df = melted_df.withColumn(
#    "fecha",
#    to_date(concat_ws("-", col("dia"), col("mes"), lit("2024")), "dd-MM-yyyy")
#)

# Filtrar las filas inválidas (fechas nulas)
#melted_df = melted_df.filter(col("fecha").isNotNull())

# Formatear la columna 'valor' a float (manejar separadores de miles)

#melted_df = melted_df.withColumn(
#    "valor",
#    regexp_replace(col("valor"), r"[.,]", "").cast("float")
#)

# Mostrar el DataFrame resultante
#melted_df.show()


In [None]:
#Vamos a escribir en la talba landing_eur

df_export = df_final.select("fecha","valor")
df_export.limit(10).show()

tablafinal = "landing_eur"
df_export.write.mode("overwrite").option("mergeSchema", "true").saveAsTable(tablafinal)

In [None]:
df = spark.sql("SELECT * FROM LH_Taller.landing_eur LIMIT 1000")
display(df)