# Seguimiento

## importaciones
Nota: estas importaciones son necesarias para este notebook, si quiere ver las importaciones necesarias para la función específica vaya al archivo original [seguimiento](../features/seguimiento.py)

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame
spark = SparkSession.builder.getOrCreate()


## Dataframes y validaciones 
Estos son dataframes de testeo obtenido de internet. 
Las validaciones son una pieza importante de la parte técnica de la librería. Para más detalle vea las [validaciones](../validaciones/validaciones.py)

In [2]:
df_cars = spark.read.csv('csv/used_cars_data.csv' , header=True,inferSchema=True )
df_countries = spark.read.csv('csv/countries.csv' ,header= True ,inferSchema=True )
df_dates = spark.read.csv('csv/US_Holiday_Dates_(2004-2021).csv' , header=True,inferSchema=True )
df_cop = spark.read.csv('csv/eurocup_2020_results.csv', header= True,inferSchema=True)
df_test = spark.read.csv('csv/Countries_usefulFeatures.csv' , header= True ,inferSchema=True)
df_ernigs = spark.read.csv('csv/all_earnings_dates.csv' , header= True ,inferSchema=True)
df_null = spark.read.csv('csv/cars_null.csv' , header= True  ,inferSchema=True)
df_seph = spark.read.csv('csv/sephora_website_dataset.csv', header= True,inferSchema=True)
df_tw = spark.read.csv('csv/most_followed_twitter.csv' , header= True ,inferSchema=True)
df_airlines = spark.read.csv('csv/airlines.csv' , header= True ,inferSchema=True)

def is_dataframe(dataframe):
    try:
        type_df = type(dataframe)
        if not isinstance(dataframe , DataFrame):
            raise TypeError(f"Expected a DataFrame, got {type_df}")
    except Exception as e: 
        print("An error occurred: ", e)

## Seguimiento

In [3]:
def seguimiento(df, tiempo, seguimiento, tolerancia=50):
    """
    La función 'seguimiento' es una función que permite realizar un seguimiento a los valores de una columna mediante otra columna de tiempo.
    Esta función realiza un seguimiento de los valores únicos de la columna seleccionada, calculando la variación neta y variacion porcentual
    respecto a un periodo de tiempo dado. Recibe un parámetro de tolerancia que sirve como referencia para indicar cuando una variación
    porcentual está fuera de los rangos esperados.
    Argumentos:
    dataframe (pyspark.sql.dataframe.DataFrame): Dataframe a procesar.
    Tiempo (str): Nombre de la columna en el dataframe que representa el tiempo.
    Seguimiento (str): Nombre de la columna en el dataframe que representa el seguimiento.
    Tolerancia (int): Valor opcional de tolerancia para los cálculos. Valor por defecto: 50.
    Retorno:
    dataframe (pyspark.sql.dataframe.DataFrame) con los resultados de los cálculos.
    Lanza una excepción con un mensaje de error si ocurre un error durante el procesamiento de los datos.
    """

    # Verificar si es un dataframe
    is_dataframe(df)

    # Verificar el tipo de datos de tiempo
    data_type = df.schema[tiempo].dataType.simpleString()

    # Convertir tiempo a tipo de datos integer
    try:
        df = timeToInteger(data_type, df, tiempo)
    except Exception as e:
        print('Error al convertir el tipo de dato: ', e)
        # Remover espacios y caracteres especiales de seguimiento
    try:
        df = replaceExpChar(df, seguimiento)
    except Exception as e:
        print('error al intentar sacar los caracteres especiales o espacios: ', e)
    # Agrupar por tiempo y pivote seguimiento
    try:
        df = groupByTimePivot(df, seguimiento, tiempo)
    except Exception as e:
        print("Error al intentar agrupar por el pivot: ", e)

    # Rellenar valores nulos con 0
    try:
        for col in df.columns: df = df.fillna(0, col)
    except Exception as e:
        print("Error al rellenar los nulos con 0 ", e)
    # Calcular var_neta y var_porc
    try:
        df = var_neta_porc(df, tiempo, tolerancia)
        return df
    except Exception as e:
        print('Error al procesar los datos: ', e)


def var_neta_porc(df, tiempo, tolerancia):
    window = Window.partitionBy().orderBy(tiempo)
    for col in df.columns:
        df = df.withColumn('var_neta_' + col, df[col] - lag(df[col]).over(window))
        df = df.withColumn('var_porc_' + col,
                           when(lag(df[col]).over(window) == 0, 0)
                           .otherwise((df[col] - lag(df[col]).over(window)) / lag(df[col]).over(window) * 100))
        df = df.withColumn('fuera_de_tolerancia_' + col,
                           when(abs(df['var_porc_' + col]) > tolerancia, '*FUERA DE RANGO*')
                           .otherwise('dentro de rango'))
    df = df.withColumn("tolerancia", lit(tolerancia))
    return df


def groupByTimePivot(df, seguimiento, tiempo):
    df = df.groupBy(tiempo).pivot(seguimiento).agg(count("*"))
    df = df.sort(tiempo)
    return df


def replaceExpChar(df, seguimiento):
    df = df.withColumn(seguimiento, regexp_replace(df[seguimiento], " ", "_"))
    df = df.withColumn(seguimiento, regexp_replace(df[seguimiento], "[^a-zA-Z0-9]+", ""))
    return df


def timeToInteger(data_type, df, tiempo):
    if data_type == TimestampType().simpleString():
        df = df.withColumn(tiempo, df[tiempo].cast(TimestampType()).cast(IntegerType()))
    elif data_type == DateType().simpleString():
        df = df.withColumn(tiempo, df[tiempo].cast(DateType()).cast(IntegerType()))
    elif data_type == 'string':
        df = df.withColumn(tiempo, regexp_replace(df[tiempo], "-", ""))
        df = df.withColumn(tiempo, df[tiempo].cast(IntegerType()))
    return df

## Testing 
Hay que tener ciertas consideraciones con esta función, ya que como se muestra acá es probable que genere dataframes con muchas columnas, lo cual significa un gasto de recursos.
También el ingresar datos erroneos en la función puede generar dataframes que no sean esperados y no tengan sentido.

In [4]:
df_seguimiento1 = seguimiento(df_cars , 'year' , 'fuel' , 43)
df_seguimiento1.show()

+----+-----+--------+---+--------+------+-------------+--------------------+------------------------+--------------+-------------------+-------------------------+-----------------+-----------------+----------------------------+------------+------------+-----------------------+-----------------+-------------------+----------------------------+---------------+------------------+--------------------------+----------+
|year|Disel|Elctrico|GLP|Gasolina|Hbrido|var_neta_year|       var_porc_year|fuera_de_tolerancia_year|var_neta_Disel|     var_porc_Disel|fuera_de_tolerancia_Disel|var_neta_Elctrico|var_porc_Elctrico|fuera_de_tolerancia_Elctrico|var_neta_GLP|var_porc_GLP|fuera_de_tolerancia_GLP|var_neta_Gasolina|  var_porc_Gasolina|fuera_de_tolerancia_Gasolina|var_neta_Hbrido|   var_porc_Hbrido|fuera_de_tolerancia_Hbrido|tolerancia|
+----+-----+--------+---+--------+------+-------------+--------------------+------------------------+--------------+-------------------+-------------------------+--

In [6]:
df_seguimiento2 = seguimiento(df_ernigs , 'filingDate' , 'form')
df_seguimiento2.show()

+----------+---+---+-------------------+--------------------+------------------------------+------------+------------+-----------------------+------------+------------------+-----------------------+----------+
|filingDate|10K|10Q|var_neta_filingDate| var_porc_filingDate|fuera_de_tolerancia_filingDate|var_neta_10K|var_porc_10K|fuera_de_tolerancia_10K|var_neta_10Q|      var_porc_10Q|fuera_de_tolerancia_10Q|tolerancia|
+----------+---+---+-------------------+--------------------+------------------------------+------------+------------+-----------------------+------------+------------------+-----------------------+----------+
| 745214400|  0|  1|               null|                null|               dentro de rango|        null|        null|        dentro de rango|        null|              null|        dentro de rango|        50|
| 753073200|  0|  1|            7858800|   1.054568993835868|               dentro de rango|           0|         0.0|        dentro de rango|           0|     

In [10]:
df_seguimiento3 = seguimiento(df_dates , 'Date' , 'Year')
df_seguimiento3.show()

+----------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-------------+--------------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+------------------------+-------------+-------------+---------------