## Computación de Alto Desempeño

##### Autor: **Sebastian Acosta Lasso**
##### Tema: **Procesamiento de Datos a Gran Escala**

<p><strong>Objetivo: </strong> El objetivo de este cuaderno es aprender sentencias pyspark para el preprocesamiento de los datos:</p>

In [1]:
import findspark

findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

### Configuracion Spark

A continuación se realiza la configuracion de spark 

cuántos cores usar, cuánta memoria, y a qué master conectarnos. Para este caso se utizará un core y 4Gb de memoria


Se deja listo el entorno para empezar a trabajar

In [2]:
config = (
    SparkConf()
        .set("spark.scheduler.mode", "FAIR")
        .set("spark.executor.cores", "1")
        .set("spark.executor.memory", "4g")
        .set("spark.cores.max", "4")
        #.setMaster("spark://10.43.100.119:8080")
        .setMaster("spark://10.43.100.119:7077")
    )
config.setAppName("sebas_spark")
spark = SparkSession.builder.config(conf=config).getOrCreate()

SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
contextoSpark = spark.sparkContext.getOrCreate()

spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/19 20:51:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Identificación y tratamiento de valores faltantes

Se crea un DataFrame con valores faltantes

Se arma un DataFrame con varios None 

Se usará para practicar cómo detectar y tratar nulos sin usar un dataset real

In [3]:
df = spark.createDataFrame(
[
('Store 1',1,448),
('Store 1',2,None),
('Store 1',3,499),
('Store 1',44,432),
(None,None,None),
('Store 2',1,355),
('Store 2',1,355),
('Store 2',None,345),
('Store 2',3,387),
('Store 2',4,312),
],
['Store','WeekInMonth','Revenue']
)

### Indentificación
1. Se mira qué filas tienen Revenue = NULL
2. Se hace un conteo por cada columna para saber cuántos nulos tiene cada una

In [4]:
df.filter(df.Revenue.isNull()).show()

                                                                                

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          2|   NULL|
|   NULL|       NULL|   NULL|
+-------+-----------+-------+



Contando nulos por columna

In [5]:
from pyspark.sql.functions import count, when, isnull
df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]).show()



+-----+-----------+-------+
|Store|WeekInMonth|Revenue|
+-----+-----------+-------+
|    1|          2|      2|
+-----+-----------+-------+



                                                                                

Eliminado registros con valores faltantes

In [6]:
df2 = df.dropna()
df2.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



In [7]:
df2 = df.dropna('all')
df2.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   NULL|
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       NULL|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



There is one important thing to note about fillna – it’ll only do the exchange
operation for matching column types. So if you use a numeric value for a string column
or the other way around, it won’t work.

### Sustituir valores faltantes (fillna)

1.Rellenar los valores faltantes

2.Rellenar con 0 en todas las columnas

3.Rellenar solo en columnas específicas

4.Rellenar cada columna con un valor distinto usando un diccionario

In [8]:
df.fillna(0).show()
df.fillna(0, ['Revenue']).show()
df.fillna({'WeekInMonth' : 2, 'Revenue' : 3}).show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|      0|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   NULL|          0|      0|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|          0|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|      0|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   NULL|       NULL|      0|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       NULL|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|      3|
|Store 1

Sustituyendo con la media

In [9]:
from pyspark.sql.functions import mean
df.select(mean(df.Revenue)).show()

+------------+
|avg(Revenue)|
+------------+
|     391.625|
+------------+



In [10]:
df.fillna(391.625, ['Revenue']).show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|    391|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   NULL|       NULL|    391|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       NULL|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



## Eliminando duplicados

Se quitan todos los duplicados de la tabla.

In [11]:
df.dropDuplicates().show()



+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          3|    499|
|Store 1|         44|    432|
|Store 1|          1|    448|
|Store 2|          1|    355|
|Store 2|          4|    312|
|Store 2|          3|    387|
|Store 1|          2|   NULL|
|   NULL|       NULL|   NULL|
|Store 2|       NULL|    345|
+-------+-----------+-------+



                                                                                

Se quitan duplicados de ciertas columnas especificas.

In [12]:
df.dropDuplicates(['Store','WeekInMonth']).show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          2|   NULL|
|Store 1|          1|    448|
|Store 2|          3|    387|
|Store 2|          1|    355|
|Store 2|       NULL|    345|
|Store 2|          4|    312|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   NULL|       NULL|   NULL|
+-------+-----------+-------+



## Eliminando columnas

In [13]:
df.drop('Revenue').show()
df.drop('Revenue','Store').show()

+-------+-----------+
|  Store|WeekInMonth|
+-------+-----------+
|Store 1|          1|
|Store 1|          2|
|Store 1|          3|
|Store 1|         44|
|   NULL|       NULL|
|Store 2|          1|
|Store 2|          1|
|Store 2|       NULL|
|Store 2|          3|
|Store 2|          4|
+-------+-----------+

+-----------+
|WeekInMonth|
+-----------+
|          1|
|          2|
|          3|
|         44|
|       NULL|
|          1|
|          1|
|       NULL|
|          3|
|          4|
+-----------+



## Identificando y resolviendo valores inconsistentes

In [14]:
df.show()

+-------+-----------+-------+
|  Store|WeekInMonth|Revenue|
+-------+-----------+-------+
|Store 1|          1|    448|
|Store 1|          2|   NULL|
|Store 1|          3|    499|
|Store 1|         44|    432|
|   NULL|       NULL|   NULL|
|Store 2|          1|    355|
|Store 2|          1|    355|
|Store 2|       NULL|    345|
|Store 2|          3|    387|
|Store 2|          4|    312|
+-------+-----------+-------+



describe() para ver stats como media, mínima, máximo

In [15]:
df.filter(df.Store == 'Store 1').describe().show()



+-------+-------+-----------------+-----------------+
|summary|  Store|      WeekInMonth|          Revenue|
+-------+-------+-----------------+-----------------+
|  count|      4|                4|                3|
|   mean|   NULL|             12.5|459.6666666666667|
| stddev|   NULL|21.01586702153082|34.99047489436708|
|    min|Store 1|                1|              432|
|    max|Store 1|               44|              499|
+-------+-------+-----------------+-----------------+



                                                                                

Esto dará el valor en un cuantil dado, en el intervalo de 0 a 1. Por lo tanto, si establece el segundo argumento en 0.0, obtendrá el valor más bajo para la columna. Con 1.0 obtienes el valor más alto. En el medio tienes la mediana, que es lo que se está buscando. Luego se usa approxQuantile() para sacar valores como la mediana sin matar la memoria.


In [16]:
print(df.approxQuantile('Revenue', [0.5], 0))

[355.0]


## Pivot

A veces, desea cambiar sus datos de filas a columnas. La función se llama pivotar y está disponible en Pyspark.

Básicamente, estás rotando los datos alrededor de un eje determinado, de ahí el nombre.

En este caso, ese eje son los datos en una de sus columnas.

In [17]:
df_pivoted = df.groupBy('WeekInMonth').pivot('Store').sum('Revenue').orderBy('WeekInMonth')
df_pivoted.show()

+-----------+----+-------+-------+
|WeekInMonth|null|Store 1|Store 2|
+-----------+----+-------+-------+
|       NULL|NULL|   NULL|    345|
|          1|NULL|    448|    710|
|          2|NULL|   NULL|   NULL|
|          3|NULL|    499|    387|
|          4|NULL|   NULL|    312|
|         44|NULL|    432|   NULL|
+-----------+----+-------+-------+



El pivot es muy útil cuando se quiere ver métricas por categoría (por ejemplo, por tienda)

Acá se agrupa por store y semana y se hace la suma de revenue para esos grupos

In [18]:
(df.groupBy('Store','WeekInMonth').sum('Revenue').orderBy('WeekInMonth')).show()

+-------+-----------+------------+
|  Store|WeekInMonth|sum(Revenue)|
+-------+-----------+------------+
|   NULL|       NULL|        NULL|
|Store 2|       NULL|         345|
|Store 2|          1|         710|
|Store 1|          1|         448|
|Store 1|          2|        NULL|
|Store 2|          3|         387|
|Store 1|          3|         499|
|Store 2|          4|         312|
|Store 1|         44|         432|
+-------+-----------+------------+



Se deshace el pivot usando stack, queda df como antes

In [20]:
(df_pivoted.withColumnRenamed('Store 1','Store1')
        .withColumnRenamed('Store 2','Store2')
        .selectExpr('WeekInMonth',"stack(2, 'Store 1', Store1, 'Store 2', Store2) as (Store,Revenue)").show())

+-----------+-------+-------+
|WeekInMonth|  Store|Revenue|
+-----------+-------+-------+
|       NULL|Store 1|   NULL|
|       NULL|Store 2|    345|
|          1|Store 1|    448|
|          1|Store 2|    710|
|          2|Store 1|   NULL|
|          2|Store 2|   NULL|
|          3|Store 1|    499|
|          3|Store 2|    387|
|          4|Store 1|   NULL|
|          4|Store 2|    312|
|         44|Store 1|    432|
|         44|Store 2|   NULL|
+-----------+-------+-------+



## Explode

Hay otra situación con la que te encontrarás de vez en cuando. A veces llegan varios puntos de datos juntos en una columna. Esto usual cuando JSON es el formato de origen.

Puede resolver este problema utilizando el comando de Explode. Tomará la cadena con varios valores y los colocará en una fila cada uno.

In [21]:
from pyspark.sql.functions import explode
df = spark.createDataFrame([
(1, ['Rolex','Patek','Jaeger']),
(2, ['Omega','Heuer']),
(3, ['Swatch','Rolex'])],
('id','watches'))
(df.withColumn('watches',explode(df.watches))).show()

+---+-------+
| id|watches|
+---+-------+
|  1|  Rolex|
|  1|  Patek|
|  1| Jaeger|
|  2|  Omega|
|  2|  Heuer|
|  3| Swatch|
|  3|  Rolex|
+---+-------+



## Normalización

In [24]:
!pip install scikit-learn



Se carga el dataset Wine que contiene análisis químico de vinos con 13 características como alcohol, ácido málico, magnesio, etc.

In [25]:
# Importaciones necesarias
from sklearn.datasets import load_wine
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, StandardScaler
# Cargar dataset Wine desde sklearn
wine = load_wine()
wine_df = pd.DataFrame(wine.data, columns=wine.feature_names)
wine_df['target'] = wine.target

In [26]:
# Convertir a Spark DataFrame
spark_wine_df = spark.createDataFrame(wine_df)

In [27]:
# Mostrar las primeras filas
print("Dataset Wine - Primeras filas:")
spark_wine_df.show(5)

Dataset Wine - Primeras filas:
+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+------+
|alcohol|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280/od315_of_diluted_wines|proline|target|
+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+----------------------------+-------+------+
|  14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|                        3.92| 1065.0|     0|
|   13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|                         3.4| 1050.0|     0|
|  13.16|      2.36|2.67|             18.6|    101.0|          2

### Conclusión
El notebook demuestra que PySpark es una herramienta versátil y poderosa para el preprocesamiento de datos, permitiendo desde la limpieza básica hasta transformaciones avanzadas sobre grandes conjuntos de datos. Ofrece métodos eficientes, sintaxis clara e integración con otras herramientas del ecosistema Python. Esto facilita la preparación de datos para modelos de machine learning en ambientes donde la escalabilidad y el desempeño son clave.