In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('explore_data_1').getOrCreate()
import pyspark.sql.functions as F
from pyspark.sql.functions import col,isnan, when, count
from pyspark.sql.types import FloatType, TimestampType
import plotly.express as px
from functools import reduce
import pickle

# O. IMPORTING DATA

In [None]:
df = spark.read.csv("../data/weatherAUS.csv", header=True, inferSchema=True)
df = df.na.replace('NA', None)
# df = spark.read.csv("../data/penguins.csv", header=True, inferSchema=True)

# 1. TAMAÑO DE DATA - PRIMEROS VISTAZOS GENERALES.

A partir de aquí, se prepará la data de tal forma que pueda soportar un problema de clasificación binaria, es decir, para entrenar un modelo supervisado. Siendo así, las variables de localización y meteorológicas se considerarán como __features__ o __predictoras__. La __variable objetivo__ o __respuesta__ será `RainTomorrow`.

In [None]:
num_rows = df.count()
num_columns = len(df.columns)
print("Número de filas:", num_rows)
print("Número de columnas:", num_columns)

Se revisa los tipos de variables que se tienen.

In [None]:
df.printSchema()

Hay variables cuya naturaleza es en verdad float. Se cambia los tipos de variables.

In [None]:
float_names_columns = ['MinTemp', 'MaxTemp', 'Rainfall',
                       'WindGustSpeed', 'WindSpeed9am',
                       'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am',
                       'Pressure3pm', 'Temp9am', 'Temp3pm']

In [None]:
for column in float_names_columns:
    df = df.withColumn(column,
                       col(column).cast(FloatType()))

In [None]:
df.toPandas().head(5)

Veamos qué variables poseen valores perdidos y los porcentajes de missing.

In [None]:
columns_names = list(set(df.columns)-set(['Date']))

In [None]:
d = df.select([
    count(
        when(
            col(c).contains('None') |
            col(c).contains('NULL') |
            (col(c) == '') |
            col(c).isNull() |
            isnan(c), c
        )
    ).alias(c)
    for c in columns_names
    ])
d.toPandas().T

In [None]:
total_rows = df.count()
d_per = df.select([(count(when(col(c).contains('None') | \
                           col(c).contains('NULL') | \
                           (col(c) == '') | \
                           col(c).isNull() | \
                           isnan(c), c)) / total_rows * 100).alias(c)
               for c in columns_names])
d_per.toPandas().T

Debido a la presencia de missing en la variable objetivo `RainTomorrow`, eliminamos las filas que no cuenten con información de esta variable.

In [None]:
df_NOT_MISS_IN_OBJECTIVE =  df.dropna(subset=['RainTomorrow'])

In [None]:
df_NOT_MISS_IN_OBJECTIVE.toPandas()

Debido a que hay algunas variables con un porcentaje importante de valores perdidos, establecemos un umbral de filtrado de variables. Las variables arriba del 11 % de valores perdidos, no serán consideradas.

Siendo así, se procede a eliminar aquellas variables.

In [None]:
low_missing_names = [c for c in columns_names if d_per.select(col(c)).first()[c] < 11]
low_missing_names

In [None]:
df_NOT_HIGH_MISSING =  df_NOT_MISS_IN_OBJECTIVE.select(low_missing_names + ['Date'])

In [None]:
df_NOT_HIGH_MISSING.toPandas()

Se exhiben los nuevos porcentajes de missing.

In [None]:
columns_names = list(set(df_NOT_HIGH_MISSING.columns)-set(['Date']))
total_rows = df_NOT_HIGH_MISSING.count()
d_per = df_NOT_HIGH_MISSING.select([(count(when(col(c).contains('None') | \
                                       col(c).contains('NULL') | \
                                       (col(c) == '') | \
                                       col(c).isNull() | \
                                       isnan(c), c)) / total_rows * 100).alias(c)
                           for c in columns_names])
d_per.toPandas().T

In [None]:
df_NOT_HIGH_MISSING.count()

# 2. TRATAMIENTO DE VARIABLES NUMÉRICAS.

## 2.1 Visualización y Control de outliers.

In [None]:
columnas_float = [col_name for col_name, col_type in df_NOT_HIGH_MISSING.dtypes if col_type == 'float']
columnas_float

Al contar con un número no tan grande de variables numéricas, es posible una exploración individual de la distribuciones usando histogramas y box-plots.

In [None]:
dp_NOT_HIGH_MISSING = df_NOT_HIGH_MISSING.toPandas() 
for name in columnas_float:
    print(f'################################## VARIABLE: {name} ################################################')
    fig = px.histogram(dp_NOT_HIGH_MISSING,
                       x=name,
                       nbins=50,
                       marginal='box')
    fig.show()

Podemos observar lo siguiente:
* Algunas variables tiene cierto grado de concentración en el centro y otras están sesgadas.
* Variables sesgadas como `Rainfall` y  `WindSpeed9am` cuentan con claros outliers, es decir, puntos que se alejan no solo de la concentración de la información, sino de la cola natural de la distribución. Estos valores son extremos (valores positivos muy grandes).
* Algunas variables tienen valores negativos, la mayoría de ellas se refieren a la temperatura. Debido a que no se tiene mayor contexto de ellas, no se tomará ninguna acción al respecto.

Para ejercer cierto control de outliers en variables sesgadas a la izquierda, se impondrán cotas superiores a estas variables, más precisamente a `Rainfall` y  `WindSpeed9am`.

In [None]:
columnas = ['Rainfall', 'WindSpeed9am']
umbrales = [300, 100]  

condiciones = []

for columna, umbral in zip(columnas, umbrales):
    condicion = (
        isnan(col(columna)) | (col(columna) == '') |
         col(columna).isNull() | col(columna).contains('None') |
         col(columna).contains('NULL') |
         (col(columna) <= umbral))
    condiciones.append(condicion)

df_SKEW_UPPER_OUTLIERS = df_NOT_HIGH_MISSING.filter(
                             reduce(lambda a, b: a & b, condiciones)
)

# Mostrar el DataFrame resultante
df_SKEW_UPPER_OUTLIERS.toPandas()

In [None]:
skew_num_names = ['Rainfall', 'WindSpeed9am']
dp_NOT_HIGH_MISSING = df_NOT_HIGH_MISSING.toPandas()
dp_SKEW_UPPER_OUTLIERS = df_SKEW_UPPER_OUTLIERS.toPandas()
for name in skew_num_names:
    print(f'################################## VARIABLE: {name} ################################################')
    print(f'Antes de outliers.')
    fig = px.histogram(dp_NOT_HIGH_MISSING,
                       x=name,
                       nbins=50,
                       marginal='box')
    fig.show()
    print(f'Después de outliers.')
    fig2 = px.histogram(dp_SKEW_UPPER_OUTLIERS,
                        x=name,
                        nbins=50,
                        marginal='box' )
    fig2.show()

Se comparan los tamaños de data antes y después de outliers. Se puede ver que el porcentaje de data eliminada es ínfimo. Si esto fuera lo contrario, es decir, si data eliminada fuera considerble, se tendría que pensar en otras estrategias para control de outliers y sesgos extremos, como por ejemplo las tranformaciones yeo-johnson o box-cox.

In [None]:
print('Porcentaje remanente de la data después de eliminar outliers:', (dp_SKEW_UPPER_OUTLIERS.shape[0] / dp_NOT_HIGH_MISSING.shape[0]) * 100, '%')

In [None]:
type(df_SKEW_UPPER_OUTLIERS)

In [None]:
(df_SKEW_UPPER_OUTLIERS.write
                       .mode("overwrite")
                       .parquet("../outputs/df_SKEW_UPPER_OUTLIERS.parquet"))