#EJERCICIO
- Cargar los datos desde el archivo CSV.
- Eliminar duplicados.
- Imputar valores faltantes utilizando la media para las columnas numéricas.
- Detectar y filtrar outliers en la columna "altura" utilizando el rango intercuartílico.
- Mostrar el DataFrame resultante.

In [37]:
schema = StructType([
    StructField("id", IntegerType(), nullable=True),
    StructField("edad", IntegerType(), nullable=True),
    StructField("altura", IntegerType(), nullable=True),
    StructField("peso", IntegerType(), nullable=True)
])

# Definir los datos
data = [
    (1, 25, 170, 70),
    (2, 30, 165, 65),
    (3, 40, None, 80),
    (4, None, 160, 55),
    (5, 35, 175, 75),
    (6, 45, 185, 85),
    (7, 28, 172, 72),
    (8, 25, None, 68),
    (9, 32, 198, 78),
    (10, 38, 176, 76),
    (11, 22, 158, 53),
    (12, None, 173, 71),
    (13, 33, 166, 77),
    (14, 37, 174, 74),
    (15, 42, 182, 83),
    (1, None, 169, 69),
    (2, 29, 164, 64),
    (3, 41, 179, 79),
    (16, 45, 185, 85),
    (17, 28, 172, 72)
]
# Crear el DataFrame
df_edad = spark.createDataFrame(data, schema)

# Mostrar el DataFrame
df_edad.show()


+---+----+------+----+
| id|edad|altura|peso|
+---+----+------+----+
|  1|  25|   170|  70|
|  2|  30|   165|  65|
|  3|  40|  NULL|  80|
|  4|NULL|   160|  55|
|  5|  35|   175|  75|
|  6|  45|   185|  85|
|  7|  28|   172|  72|
|  8|  25|  NULL|  68|
|  9|  32|   198|  78|
| 10|  38|   176|  76|
| 11|  22|   158|  53|
| 12|NULL|   173|  71|
| 13|  33|   166|  77|
| 14|  37|   174|  74|
| 15|  42|   182|  83|
|  1|NULL|   169|  69|
|  2|  29|   164|  64|
|  3|  41|   179|  79|
| 16|  45|   185|  85|
| 17|  28|   172|  72|
+---+----+------+----+



## Eliminar duplicados

In [44]:
# filas totales duplicadas
df_edad.count(), df_edad.distinct().count()

(20, 20)

No tenemos ninguna fila total duplicada, vemos sin ids

In [45]:
# quitamos ids
no_ids = (
    df_edad
    .select([col for col in df_edad.columns if col != 'id'])
)

no_ids.count(), no_ids.distinct().count()

(20, 18)

Vemos que sin tener en cuenta los ids, tenemos dos filas duplicadas.
Son las siguientes:

In [46]:
(
    df_edad
    .groupby([col for col in df_edad.columns if col != 'id'])
    .count()
    .filter('count > 1')
    .show()
)

+----+------+----+-----+
|edad|altura|peso|count|
+----+------+----+-----+
|  28|   172|  72|    2|
|  45|   185|  85|    2|
+----+------+----+-----+



In [47]:
# quitamos las observaciones duplicadas

id_removed = df_edad.dropDuplicates(
    subset = [col for col in df_edad.columns if col != 'id']
)

In [48]:
#Nos quedamos con:
id_removed.count()
#observaciones

18

Comprobamos IDs Duplicados

In [49]:
import pyspark.sql.functions as fn

id_removed.agg(
      fn.count('id').alias('CountOfIDs')
    , fn.countDistinct('id').alias('CountOfDistinctIDs')
).show()

+----------+------------------+
|CountOfIDs|CountOfDistinctIDs|
+----------+------------------+
|        18|                15|
+----------+------------------+



Tenemos tres IDs duplicados, veamos cuales son:

In [50]:
# what's duplicated?
(
    id_removed
    .groupby('Id')
    .count()
    .filter('count > 1')
    .show()
)

+---+-----+
| Id|count|
+---+-----+
|  1|    2|
|  3|    2|
|  2|    2|
+---+-----+



In [52]:
id_repetidos = [1,2,3]
(
    id_removed
    .filter(id_removed["id"].isin(id_repetidos))
    .show()
)

+---+----+------+----+
| id|edad|altura|peso|
+---+----+------+----+
|  1|  25|   170|  70|
|  3|  40|  NULL|  80|
|  2|  30|   165|  65|
|  2|  29|   164|  64|
|  1|NULL|   169|  69|
|  3|  41|   179|  79|
+---+----+------+----+



In [53]:
#Actualizamos los ids

new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('id')] +
        [col for col in id_removed.columns if col != 'id'])
)

new_id.show()

+---+----+------+----+
| id|edad|altura|peso|
+---+----+------+----+
|  0|  25|   170|  70|
|  1|  32|   198|  78|
|  2|  38|   176|  76|
|  3|  35|   175|  75|
|  4|  28|   172|  72|
|  5|  30|   165|  65|
|  6|  45|   185|  85|
|  7|  37|   174|  74|
|  8|  29|   164|  64|
|  9|  22|   158|  53|
| 10|  33|   166|  77|
| 11|  42|   182|  83|
| 12|  41|   179|  79|
| 13|  25|  NULL|  68|
| 14|  40|  NULL|  80|
| 15|NULL|   160|  55|
| 16|NULL|   173|  71|
| 17|NULL|   169|  69|
+---+----+------+----+



Esto crea un nuevo df, cogiendo todas las columnas anteriores y añadiéndoles una columna id con la función monotonically_increasing_id

## Imputar valores missings

Usando la media para las columnas numéricas

In [None]:
#Veo el porcentaje de missings que tiene por columnas

In [54]:
for k, v in sorted(
    new_id.agg(*[
               (1 - (fn.count(c) / fn.count('*')))
                    .alias(c + '_miss')
               for c in new_id.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

edad_miss 0.16666666666666663
altura_miss 0.11111111111111116
id_miss 0.0
peso_miss 0.0


In [None]:
#Vemos que solo tienen missings Edad y Altura y ninguna de ellas tiene tantos missings como para eliminarlas completamente

In [55]:
#Vemos las medias de estas dos columnas
multipliers = (
    new_id
    .agg(
          fn.mean(
              fn.col('Edad')
          ).alias('Edad')
        ,           fn.mean(
              fn.col('Altura')
          ).alias('Altura')
    )
).toPandas().to_dict('records')[0]

multipliers

{'Edad': 33.46666666666667, 'Altura': 172.875}

In [56]:
imputed = (
    new_id
    .withColumn('Edad', fn.col('Edad'))
    .fillna(multipliers)
    .withColumn('Altura', fn.col('Altura'))
    .fillna(multipliers)
)

imputed.show()

+---+----+------+----+
| id|Edad|Altura|peso|
+---+----+------+----+
|  0|  25|   170|  70|
|  1|  32|   198|  78|
|  2|  38|   176|  76|
|  3|  35|   175|  75|
|  4|  28|   172|  72|
|  5|  30|   165|  65|
|  6|  45|   185|  85|
|  7|  37|   174|  74|
|  8|  29|   164|  64|
|  9|  22|   158|  53|
| 10|  33|   166|  77|
| 11|  42|   182|  83|
| 12|  41|   179|  79|
| 13|  25|   172|  68|
| 14|  40|   172|  80|
| 15|  33|   160|  55|
| 16|  33|   173|  71|
| 17|  33|   169|  69|
+---+----+------+----+



## Detectar y filtrar outliers en la columna "altura" utilizando el rango intercuartílico

In [None]:
features = ['Displacement', 'Cylinders', 'FuelEconomy']
quantiles = [0.25, 0.75]

cut_off_points = []

for feature in features:
    quants = imputed.approxQuantile(feature, quantiles, 0.05)

    IQR = quants[1] - quants[0]
    cut_off_points.append((feature, [
        quants[0] - 1.5 * IQR,
        quants[1] + 1.5 * IQR,
    ]))

cut_off_points = dict(cut_off_points)

outliers = imputed.select(*['id'] + [
       (
           (imputed[f] < cut_off_points[f][0]) |
           (imputed[f] > cut_off_points[f][1])
       ).alias(f + '_o') for f in features
  ])
outliers.show()