# Bronze to silver notebook

En esta notebook, los datos financieros que fueron inicialmente almacenados en la capa Bronze son leídos y sometidos a una serie de transformaciones, tales como la limpieza de los datos, la imputación de valores nulos y la creación de nuevas columnas. 

## Parámetros
- **Asset (str)**: Indica el activo financiero que se desea analizar. Es el nombre del directorio del que se recibe la información delta y en el que se guardará. Es el nombre del directorio en el que se guardará. 'apple' es el valor por defecto.

In [None]:
# Paramaters
asset = 'apple' 

In [None]:
import sys
import pandas as pd
from delta.tables import DeltaTable 
from pyspark.sql import DataFrame 
from pyspark.sql.functions import col, last, max, min, year, month, dayofmonth, dayofweek, hour
from pyspark.sql.window import Window
from typing import List


In [None]:
def valida_nulos(df: DataFrame, lista_columnas: list[str]) -> None:
    """
    Valida que no existan valores nulos en las columnas especificadas del DataFrame.

    Esta función recorre una lista de columnas y verifica si hay valores nulos
    en cada una de ellas. Si se encuentran valores nulos en alguna columna de la lista, se lanza una
    excepción con un mensaje que indica la columna en la que se encontraron los valores nulos.

    Parámetros:
    df (DataFrame): El DataFrame de Spark que contiene los datos a verificar.
    lista_columnas (list[str]): LA lista de nombres de columnas que se desean verificar.

    Excepciones:
    AssertionError: Si se encuentran valores nulos en alguna de las columnas especificadas,
                    se lanza una excepción con un mensaje indicando la columna con valores nulos.
    """
    for i in lista_columnas:
        conteo_nulos = df.filter(df[i].isNull()).count()
        assert conteo_nulos == 0, f"Existen valores nulos en la columna {i}"

In [None]:
storage_account_name = 'nticmasterstg' 
 
data_lake_container = f'abfss://datalake@{storage_account_name}.dfs.core.windows.net' 
bronze_folder = 'bronze'  # Carpeta de ingesta de datos (raw) 
silver_folder = 'silver'  # Carpeta donde se almacenarán las tablas resultantes

# Ruta de los archivos de origen 
source_path = f"{data_lake_container}/{bronze_folder}/{asset}" 
 
# Ruta de la tabla Delta de guardado
delta_table_path = f"{data_lake_container}/{silver_folder}/{asset}" 

In [None]:
# Carga de datos de entrada
data_input = spark.read.format('delta').option("recursiveFileLookup", "true").option("header", 
"true").load(source_path) 

In [None]:
# Comprobaciones de datos

# Mostrar las primeras filas del DataFrame
print("Primeras filas del DataFrame:")
print(data_input.head())

# Verificar la columna de fechas
print("\nColumnas del DataFrame:")
print(data_input.columns)

# Comprobar el formato de la columna de fechas
print("\nTipo de datos de la columna 'Date':")
print(data_input['Date'].dtype)

# Mostrar el rango de fechas disponibles
date_range = data_input.select(min('Date').alias('min_date'), max('Date').alias('max_date')).collect()

# Mostrar el rango de fechas disponibles
min_date = date_range[0]['min_date']
max_date = date_range[0]['max_date']
print(f"\nRango de fechas disponibles: {min_date} al {max_date}")

# Mostrar la primera fila del dataframe
print("\nPrimeras filas del DataFrame:")
print(data_input.head())

In [None]:

silver_df = data_input.withColumn("Date", col("Date").cast("timestamp"))

# Eliminamos duplicados respecto a las columnas 'Date' y 'Close'.
silver_df = silver_df.dropDuplicates(['Date', 'Close'])

# Creamos nuevas columnas basadas en la fecha
silver_df = silver_df.withColumn("year", year(col("Date")))
silver_df = silver_df.withColumn("month", month(col("Date")))
silver_df = silver_df.withColumn("day_of_month", dayofmonth(col("Date")))
silver_df = silver_df.withColumn("day_of_week", dayofweek(col("Date")))

# Imputamos valores nulos, el volumen de movimiento lo sustituimos por 0, mientras que el resto de valores utilizamos el último valor disponible (forward fill)
silver_df = silver_df.fillna({"Volume": 0}) 
window_spec = Window.orderBy('Date').rowsBetween(-sys.maxsize, 0)

columns_to_ffill = ['Open', 'High', 'Low', 'Close', 'Adj_Close']

for column in columns_to_ffill:
    silver_df = silver_df.withColumn(column, last(silver_df[column], ignorenulls=True).over(window_spec))

# Estandarizamos formatos de datos 
silver_df = silver_df.withColumn("Open", col("Open").cast("double"))
silver_df = silver_df.withColumn("High", col("High").cast("double"))
silver_df = silver_df.withColumn("Low", col("Low").cast("double"))
silver_df = silver_df.withColumn("Close", col("Close").cast("double"))
silver_df = silver_df.withColumn("Adj_Close", col("Adj_Close").cast("double"))

# Lista de columnas a verificar
columnas_a_revisar = ['Date', 'Open', 'High', 'Low', 'Close', 'Adj_Close', 'Volume']

# Llamada a la función para verificar nulos en las columnas especificadas
valida_nulos(silver_df, columnas_a_revisar)

# Filtrar y eliminar registros correspondientes a sábados y domingos
silver_df = silver_df.filter(~col("day_of_week").isin([6, 7]))

In [None]:
silver_df.write.format("delta").mode("overwrite").save(delta_table_path)

print('La tabla {table_name} se ha guardado en la ruta {delta_table_path}')

delta_df = spark.read.format("delta").load(delta_table_path)
print('Mostrando 5 líneas del contenido de la tabla Delta...')
delta_df.show(5)

conteo_filas = delta_df.count()

print(f'La tabla Delta tiene {conteo_filas} filas en total')