### La tarea debe modelar una tubería (pipeline) de datos que ejecuta los siguientes pasos
##### Paso 1 (ejecución en paralelo)
- Verificar el esquema de datos;
- Verificar datos perdidos/faltantes;
- Verificar datos erróneos/anómalos.

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import mean as mean_func

from pyspark.ml import Pipeline

# Inicializar la sesión de Spark
spark = SparkSession.builder.appName("DataPipeline").getOrCreate()

# Cargar los datos en un DataFrame
df = spark.read.csv("backend-dev-data-dataset.txt", header=True, inferSchema=True)

#### PASO 1: 

# Función para verificar el esquema de datos
def check_schema(df):
    print("Data schema:")
    df.printSchema()    

# Verificar datos perdidos/faltantes 
def missing_data(df):
    for c in df.columns:
        df = df.withColumn(c, \
                      when(col(c)=="na", None) \
                      .otherwise(col(c))) 

    missing_data_count = [df.filter(col(c).isNull()).count() for c in df.columns]
    print("Number of rows with missing data per column: ", missing_data_count)
    
    # Acá me quedo duda con la letra, ya que dice "VERIFICAR" no pide corregirlo.
    # Si hubiera que corregirlo utilizaría una estrategia de Mediana o usar percentil 50 y retornaría el df.
    
# Verificar datos erróneos/anómalos
def find_outliers(df, interval):
    for c in df.columns:
        col_type = df.schema[c].dataType

        if col_type == DoubleType():   
            mean = df.select(mean_func(col(c))).first()[0]
            std = df.select(stddev(col(c))).first()[0]
            
            # Se propone un estudio de outliers, pueden hacerse otros como box-plots, etc. 
            # Filtro las filas que tengan valor fuera de 3 standard deviations de la media
            outliers = df.filter((col(c) < (mean - interval * std)) | (col(c) > (mean + interval * std)))
            print(f'Total outliers for column {c} : {outliers.count()}')

                                                                                

In [3]:
# Ejecutar las funciones en paralelo

import concurrent.futures

""" concurrent.futures ThreadPoolExecutor nos permite ejecutar funciones en paralelo en distintos hilos o procesos. """

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(check_schema, df), 
               executor.submit(missing_data, df),
               executor.submit(find_outliers, df, 3)]

Data schema:
root
 |-- key_1: string (nullable = true)
 |-- date_2: timestamp (nullable = true)
 |-- cont_3: double (nullable = true)
 |-- cont_4: double (nullable = true)
 |-- disc_5: integer (nullable = true)
 |-- disc_6: string (nullable = true)
 |-- cat_7: string (nullable = true)
 |-- cat_8: string (nullable = true)
 |-- cont_9: double (nullable = true)
 |-- cont_10: string (nullable = true)



                                                                                

Total outliers for column cont_3 : 17977


                                                                                

Total outliers for column cont_4 : 2732


                                                                                

Number of rows with missing data per column:  [0, 0, 0, 0, 0, 100327, 0, 0, 0, 100266]


[Stage 58:>                                                         (0 + 2) / 2]

Total outliers for column cont_9 : 2752


                                                                                

#### Paso 2 (ejecución secuencial)
- Normalizar una columna (cualquiera de valores continuos);
- Filtrar una columna por cierto valor (cualquiera de valores categóricos);
- Agrupar ciertas columnas (cualesquiera que correspondan a fechas).

In [4]:
#### PASO 2: 

def normalize_column(df, column):
    col_mean = df.agg(mean(f"{column}")).first()[0]
    col_std = df.agg(stddev(f"{column}")).first()[0]
    scaled_df = df.select("*", ((col(f"{column}") - col_mean) / col_std).alias(f"normalized_{column}"))
    return scaled_df
    
def filter_column(df, column, filter_value):
    df = df.filter(col(column) == filter_value)
    return df

def grouped_avg_data_monthly(df, group_column, date_column):
    grouped_df = df.groupBy(month(col(date_column)).alias("month"))
    grouped_df = grouped_df.agg(avg(group_column).alias(f"avg_{group_column}"))
    return grouped_df

In [5]:
scaled_df = normalize_column(df, "cont_9")
filtered_df = filter_column(df, "cat_7", "frequent")
grouped_avg_monthly_df = grouped_avg_data_monthly(df,"cont_3", "date_2")


scaled_df.show(10)
filtered_df.show(10)
grouped_avg_monthly_df.show(10)

                                                                                

+------+-------------------+-------+------+------+------+--------+---------+------+-------+--------------------+
| key_1|             date_2| cont_3|cont_4|disc_5|disc_6|   cat_7|    cat_8|cont_9|cont_10|   normalized_cont_9|
+------+-------------------+-------+------+------+------+--------+---------+------+-------+--------------------+
|HC2030|2016-11-16 00:00:00| 622.27| -2.36|     2|     6|frequent|    happy|  0.24|   0.25|  0.2397046450812019|
|sP8147|2004-02-18 00:00:00|1056.16| 59.93|     2|     8|   never|    happy|  1.94|   2.29|  1.9409134599100886|
|Cq3823|2007-03-25 00:00:00| 210.73|-93.94|     1|     1|   never|    happy| -0.11|   -0.1|-0.11054422856003945|
|Hw9428|2013-12-28 00:00:00|1116.48| 80.58|     3|    10|   never|surprised|  1.27|   1.15|  1.2704370446539979|
|xZ0360|2003-08-25 00:00:00| 1038.3| 12.37|     6|    17|   never|    happy|  1.76|   1.76|  1.7607854677517358|
|IK2721|2012-10-19 00:00:00| 835.17|  16.3|     4|    11|frequent|surprised|  2.04|    2.3|   2.

[Stage 69:>                                                         (0 + 2) / 2]

+-----+------------------+
|month|        avg_cont_3|
+-----+------------------+
|   12|1644.4190835669908|
|    1|1653.6455568870767|
|    6|1654.4757854043357|
|    3|1636.5632992579451|
|    5|   1642.2685611918|
|    9|1669.9463364770413|
|    4|1653.8981577273562|
|    8|1648.1477843510443|
|    7|1648.7407461333883|
|   10| 1652.369164142287|
+-----+------------------+
only showing top 10 rows



                                                                                

#### Paso 3 (ejecución secuencial)
 - Transformar una variable y agregarla al conjunto de datos. (Aplique la función x^3 + exp(y) sobre cualquier tupla de variables continuas);
 - Agregación - Conteo de registros únicos (sobre cualquier columna devalores categóricos).

In [6]:
#### PASO 3: 

def transformed_column(df,x,y):
    transformed_column = pow(df[x], 3) + exp(df[y])
    df_transformed = df.withColumn(f"transformed_value_{x}_{y}", transformed_column)
    return df_transformed

def unique(df, column):
    unique_count = df.agg(countDistinct(col(f"{column}")).alias(f"unique_{column}_count"))
    return unique_count

In [7]:
df_transformed = transformed_column(df, "cont_4", "cont_9")
df_unique = unique(df, "cat_8")

df_transformed.show(10)
df_unique.show(10)

+------+-------------------+-------+------+------+------+--------+---------+------+-------+-------------------------------+
| key_1|             date_2| cont_3|cont_4|disc_5|disc_6|   cat_7|    cat_8|cont_9|cont_10|transformed_value_cont_4_cont_9|
+------+-------------------+-------+------+------+------+--------+---------+------+-------+-------------------------------+
|HC2030|2016-11-16 00:00:00| 622.27| -2.36|     2|     6|frequent|    happy|  0.24|   0.25|            -11.873006849678594|
|sP8147|2004-02-18 00:00:00|1056.16| 59.93|     2|     8|   never|    happy|  1.94|   2.29|             215251.84040797062|
|Cq3823|2007-03-25 00:00:00| 210.73|-93.94|     1|     1|   never|    happy| -0.11|   -0.1|             -828993.6391498647|
|Hw9428|2013-12-28 00:00:00|1116.48| 80.58|     3|    10|   never|surprised|  1.27|   1.15|             523220.49196456233|
|xZ0360|2003-08-25 00:00:00| 1038.3| 12.37|     6|    17|   never|    happy|  1.76|   1.76|             1898.6314903944024|
|IK2721|



+------------------+
|unique_cat_8_count|
+------------------+
|                 4|
+------------------+



                                                                                

#### Si quisieramos conectarnos directo con Kinesis Data Stream 

In [None]:
"""from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Step 1: Create a SparkSession
spark = SparkSession.builder.appName("StreamingInference").getOrCreate()

# Step 2: Read the data in real-time
df = spark.readStream \
    .format("kinesis") \
    .option("streamName", "kinesis_stream_name") \
    .option("awsAccessKey", "your_aws_access_key") \
    .option("awsSecretKey", "your_aws_secret_key") \
    .option("region", "us-west-2") \
    .load()

# Step 3: Transform the data
df_transformed = df.selectExpr("cast (data as string) as data") \
    .groupBy(window(df.timestamp, "10 minutes", "5 minutes")) \
    .agg(count("data").alias("count"))

# Step 4: Store the results
query = df_transformed.writeStream \
    .outputMode("complete") \
    .format("parquet") \
    .option("path", "/tmp/streaming_data") \
    .option("checkpointLocation", "/tmp/streaming_checkpoints") \
    .start()

# Step 5: Start the streaming process
query.awaitTermination()"""

#### Paso 4: 

###### check model_pipeline.ipynb