# Feature Engineering con PySpark
**Por: Naren Castellon**


<center><img src="https://www.hogarmania.com/archivos/202209/curiosidades-del-oso-panda-1280x720x80xX.jpg" width="550" height="850"></center>


Este modulo vamos a cubrir los patrones de diseño para trabajar con características de datos (cualquier atributo medible, desde precios de automóviles hasta valores genéticos, recuentos de hemoglobina o niveles educativos) al crear modelos de **Machine learning** (también conocido como ingeniería de características). Estos procesos (extracción, transformación y selección de características) son esenciales para construir modelos efectivos de **Machine learning**. La ingeniería de características es una de las más importantes temas en el **Machine learning**, porque el éxito o el fracaso de un modelo en la predicción del futuro depende principalmente de las funciones que elija.

**Spark** proporciona una API integral de **Machine learning** para muchos algoritmos conocidos, incluidos la *regresión lineal, la regresión logística y los árboles de decisión*. El objetivo de este modulo es presentar herramientas y técnicas fundamentales en **PySpark** que puede usar para crear todo tipo de **pipelines** de **Machine learning**. Vamos a presentar las poderosas herramientas y utilidades de **Machine learning** de Spark y proporciona ejemplos usando la API de PySpark. Las habilidades que aprenda aquí serán útiles para un aspirante a científico de datos o ingeniero de datos. Mi objetivo este cuaderno no es familiarizarlo con los famosos algoritmos de aprendizaje automático, como la regresión lineal, el análisis de componentes principales o las máquinas de vectores de soporte, ya que estos lo vamos a cubrir en otro modulo, sino equiparlo con herramientas (normalización, estandarización, indexación de cadenas, etc. .) que puede usar para limpiar datos y crear modelos para una amplia gama de algoritmos de **Machine learning**.

Independientemente del algoritmo que vayamos a utilizar, la ingeniería de características es importante. En **Machine learning** nos permite encontrar patrones en los datos: encontramos los patrones construyendo modelos, luego usamos los modelos construidos para hacer predicciones sobre nuevos puntos de datos (es decir, consultar datos). Para obtener esas predicciones correctas, debemos construir el conjunto de datos y transformar los datos correctamente. Este capítulo cubre estos dos pasos clave.

# Contenido

* Agregar nuevas caracteristicas
* Creando y aplicando UDFs
* Creando pipelines
* Binarizing data
* Imputación de Datos
* Tokenization
* Estandarización
* Normalizacion
* String indexing
* Vector assembly
* Bucketing
* Logarithm transformation
* One-hot encoding
* TF-IDF
* Feature hashing
* Applying SQL transformations


## Introducción a Feature Engineering
**Feature Engineering** es la forma de definir *"el proceso de transformar datos sin procesar en funciones que representen mejor el problema subyacente de los modelos predictivos, lo que da como resultado una mayor precisión del modelo en datos ocultos"*. En este apartado, mi objetivo es presentar técnicas genéricas de ingeniería de características disponibles en PySpark que puede usar para crear mejores modelos predictivos.

Digamos que sus datos están representados en una matriz de filas y columnas. En **Machine Learning**, las columnas se denominan características (como edad, sexo, educación, frecuencia cardíaca o presión arterial) y cada fila representa una instancia del conjunto de datos (es decir, un registro). Las características de sus datos influirán directamente en los modelos predictivos que cree y use y en los resultados que pueda lograr. Los científicos de datos dedicamos alrededor de la mitad de nuestro tiempo en la preparación de datos, y la ingeniería de características es una parte importante de esto.

¿Dónde encaja la **ingeniería de características** con la construcción de modelos de **Machine Learning**? ¿Cuándo aplica estas técnicas a sus datos? Echemos un vistazo a los pasos clave para construir y usar un modelo de aprendizaje automático:

1. Reúna los requisitos para los datos de **Machine Learning** y defina el problema.
2. Seleccionar datos (recopilar e integrar los datos, luego desnormalizarlos en un conjunto de datos).
3. Preprocesar datos (formatear, limpiar y muestrear los datos para poder trabajar con ellos).
4. Transformar datos (realizar ingeniería de características).
5. Datos del modelo (divida los datos en conjuntos de entrenamiento y prueba, use los datos de entrenamiento para crear modelos, luego use los datos de prueba para evaluar los modelos y ajustarlos).
6. Use el modelo construido para hacer predicciones sobre los datos de la consulta.

**Feature Engineering** ocurre justo antes de construir un modelo a partir de sus datos. Después de seleccionar y limpiar los datos (por ejemplo, asegurarse de que los valores nulos se reemplacen con los valores adecuados), transforme los datos realizando ingeniería de características: esto podría implicar convertir cadenas en datos numéricos, categorizar los datos, normalizar o estandarizar los datos, etc.

<center><img src="https://assets-global.website-files.com/620d42e86cb8ec4d0839e59d/6230e9ee021b250dd3710f8e_61ca4fbcc80819e696ba0ee9_Feature-Engineering-Machine-Learning-Diagram.png" width="800" height="500"></center>

La API de Spark proporciona varios algoritmos para trabajar con funciones, que se dividen aproximadamente en estos grupos:
* Extracción (algoritmos para extraer características de datos "sin procesar")
* Transformación (algoritmos para escalar, convertir o modificar características)
* Selección (algoritmos para seleccionar un subconjunto de un conjunto más grande de características)
* Hashing sensible a la localidad (LSH); algoritmos para agrupar elementos similares)

Puede haber muchas razones para la transformación de datos y la ingeniería de funciones, ya sean obligatorias u opcionales:

Transformaciones obligatorias

Estas transformaciones son necesarias para resolver un problema (como construir un modelo de **Machine Learning** por razones de compatibilidad de datos. Ejemplos incluyen:
* Conversión de características no numéricas en características numéricas. Por ejemplo, si una característica tiene valores no numéricos, los cálculos de promedio, suma y mediana serán imposibles; del mismo modo, no podemos realizar la multiplicación de matrices en una cadena, sino que primero debemos convertirla en alguna representación numérica.
* Cambiar el tamaño de las entradas a un tamaño fijo. Algunos modelos lineales y redes neuronales feed-forward tienen un número fijo de nodos de entrada, por lo que sus datos de entrada siempre deben tener el mismo tamaño. Por ejemplo, los modelos de imagen necesitan remodelar las imágenes en su conjunto de datos a un tamaño fijo.

Transformaciones opcionales

Las transformaciones de datos opcionales pueden ayudar a que el modelo de **Machine Learning** funcione mejor. Estas transformaciones pueden incluir:
* Cambiar el texto a minúsculas antes de aplicar otras transformaciones de datos
* Tokenización y eliminación de palabras no esenciales, como "de", "un", "y", "el" y "entonces"
* Normalización de características numéricas

Examinaremos ambos tipos en las siguientes secciones. Profundicemos en nuestro primer tema, agregando una nueva característica.

## Agregar nuevas caracteristicas
A veces deseamos agregar una nueva característica (porque necesita esa característica derivada en su algoritmo de **Machine Learning**) en el conjunto de datos, para agregar una nueva columna o característica en el conjunto de datos, se puede usar la función `DataFrame.withColumn()`. Este concepto se demuestra a continuación:

In [None]:
# importar SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *


In [None]:
# crear objeto de sesión spark
spark=SparkSession.builder.appName("Feature engineering").getOrCreate()
    

In [None]:
column_names = ["emp_id", "salario"]
records = [(100, 120000), (200, 170000), (300, 150000)]
df = spark.createDataFrame(records, schema=column_names)
df.show()

+------+-------+
|emp_id|salario|
+------+-------+
|   100| 120000|
|   200| 170000|
|   300| 150000|
+------+-------+



Puede usar `DataFrame.withColumn()` de Spark para agregar una nueva columna/característica:

In [None]:
df2 = df.withColumn("bonos", df.salario * 0.05)
df2.show()

+------+-------+------+
|emp_id|salario| bonos|
+------+-------+------+
|   100| 120000|6000.0|
|   200| 170000|8500.0|
|   300| 150000|7500.0|
+------+-------+------+



## Creando y aplicando UDFs

**¿Qué es UDF?**
Las funciones definidas por el usuario de UDF, también conocidas como funciones definidas por el usuario, si proviene de SQL, las UDF no son nada nuevo para usted, ya que la mayoría de las bases de datos RDBMS tradicionales admiten funciones definidas por el usuario, estas funciones deben registrarse en la biblioteca de la base de datos y usarlas en SQL como funciones regulares.

Las UDF de PySpark son similares a las UDF de las bases de datos tradicionales. En PySpark, crea una función en una sintaxis de Python y la envuelve con PySpark SQL udf() o la registra como udf y la usa en DataFrame y SQL respectivamente.
Si PySpark no proporciona la función que necesita, puede definir sus propias funciones de Python y registrarlas como funciones definidas por el usuario (UDF) con el DSL de Spark SQL usando `spark.udf.register()`. Luego puede aplicar estas funciones en sus transformaciones de datos.

Para que sus funciones de Python sean compatibles con los marcos de datos de Spark, debe convertirlas en UDF de PySpark pasándolas a la función `pyspark.sql.func tions.udf()`. Alternativamente, puede crear su UDF en un solo paso usando anotaciones, como se muestra aquí. Agregue udf@ como un "decorador" de su función de Python y especifique su tipo de devolución como argumento:

**¿Por qué necesitamos un UDF?**

Los UDF se utilizan para ampliar las funciones del marco y reutilizar estas funciones en múltiples DataFrame. Por ejemplo, desea convertir cada primera letra de una palabra en una cadena de nombre a mayúsculas; Las funciones integradas de PySpark no tienen esta función, por lo tanto, puede crearla como UDF y reutilizarla según sea necesario en muchos marcos de datos. Una vez creados, los UDF se pueden reutilizar en varios DataFrame y expresiones SQL.

Antes de crear cualquier UDF, investigue para verificar si la función similar que deseaba ya está disponible en Spark SQL Functions. PySpark SQL proporciona varias funciones comunes predefinidas y se agregan muchas más funciones nuevas con cada versión. por lo tanto, es mejor verificar antes de reinventar la rueda.

Cuando crea UDF, debe diseñarlos con mucho cuidado; de lo contrario, se encontrará con problemas de optimización y rendimiento.

La función `tripled()` es una UDF y su tipo de retorno es entero.

In [None]:
# Cargamo l función udf
from pyspark.sql.functions import udf

# Creamos una función 
@udf("integer")
def tripled(num):
    return 3*int(num)

# Agregamo una nueva variable DAtaFrame
df2 = df.withColumn('tripled_col', tripled(df.salario))
df2.show()

+------+-------+-----------+
|emp_id|salario|tripled_col|
+------+-------+-----------+
|   100| 120000|     360000|
|   200| 170000|     510000|
|   300| 150000|     450000|
+------+-------+-----------+



### Crear un UDF PySpark 
#### Creando un DataFrame
Antes de comenzar a crear un UDF, primero creemos un PySpark DataFrame.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

columns = ["Seqno","Nombre"]
data = [("1", "juan Jones"),
    ("2", "tracey aguilar"),
    ("3", "amy castellon")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

+-----+--------------+
|Seqno|Nombre        |
+-----+--------------+
|1    |juan Jones    |
|2    |tracey aguilar|
|3    |amy castellon |
+-----+--------------+



### Crear una función de Python
El primer paso para crear una UDF es crear una función de Python. El siguiente fragmento crea una función `convertCase()` que toma un parámetro de cadena y convierte la primera letra de cada palabra en mayúscula. Las UDF toman los parámetros de su elección y devuelven un valor.

In [None]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 


Tenga en cuenta que podría haber una mejor manera de escribir esta función. Pero por el bien de este artículo, no me preocupa mucho el rendimiento y las mejores formas.



### Convertir una función de Python a PySpark UDF
Ahora convierta esta función convertCase() a UDF pasando la función a PySpark SQL udf(), esta función está disponible en el paquete org.apache.spark.sql.functions.udf. Asegúrese de importar este paquete antes de usarlo.

La función PySpark SQL udf() devuelve el objeto de clase org.apache.spark.sql.expressions.UserDefinedFunction.

In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Convertiendo la función a UDF 
convertUDF = udf(lambda z: convertCase(z),StringType())


Nota: El tipo predeterminado de udf() es StringType, por lo tanto, también puede escribir la declaración anterior sin tipo de retorno.

### Usando UDF con DataFrame
#### Usando UDF con PySpark DataFrame select()
Ahora puede usar `convertUDF()` en una columna DataFrame como una función incorporada normal.

In [None]:
df.show()

+-----+--------------+
|Seqno|        Nombre|
+-----+--------------+
|    1|    juan Jones|
|    2|tracey aguilar|
|    3| amy castellon|
+-----+--------------+



In [None]:
df.select(col("Seqno"), \
    convertUDF(col("nombre")).alias("Nombre_Mayus") ) \
   .show(truncate=False)


+-----+---------------+
|Seqno|Nombre_Mayus   |
+-----+---------------+
|1    |Juan Jones     |
|2    |Tracey Aguilar |
|3    |Amy Castellon  |
+-----+---------------+



Tenga en cuenta que si sus funciones se representan como un RDD (donde cada elemento de RDD representa una instancia de sus funciones), puede usar la función `RDD.map()` para agregar una nueva función a su conjunto de funciones.

https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/

## Creando Pipelines
En los algoritmos de **Machine Learning**, puede unirse varias etapas y ejecutarlas en orden. Considere tres etapas, llamadas {Etapa-1, Etapa-2, Etapa-3}, donde la salida de la Etapa-1 se usa como entrada para la Etapa-2 y la salida de la Etapa-2 se usa como entrada para la Etapa- 3. Estas tres etapas forman una tubería simple. Supongamos que tenemos que transformar los datos en el orden que se muestra en la Tabla

|Stage| Description|
|-----|------------|
|Stage-1 |Label encode or string index the column dept * (create dept_index column).
|Stage-2 |Label encode or string index the column education (create education_index column).
|Stage-3| One-hot encode the indexed column education_index (create education_OHE column).

<center><img src="https://www.qubole.com/wp-content/uploads/2018/12/image15.png" width="1000" height="850"></center>

Spark proporciona una API de **Pipeline**, definida como `pyspark.ml.Pipeline(*,stags=None)`, que actúa como un estimador (una abstracción de un algoritmo de aprendizaje que se ajusta a un modelo en un conjunto de datos). Según la documentación de Spark:
> Un Pipeline se especifica como una secuencia de etapas, y cada etapa es un Transformador o un Estimador. Estas etapas se ejecutan en orden y el DataFrame de entrada se transforma a medida que pasa por cada etapa. Para las etapas de Transformer, se llama al método `transform()` en el DataFrame. Para las etapas de Estimator, se llama al método `fit()` para producir un Transformador (que se convierte en parte de PipelineModel, o Pipeline ajustado), y se llama al método `transform()` de ese Transformador en el DataFrame.

>
<center><img src="https://spark.apache.org/docs/latest/img/ml-Pipeline.png" width="800" height="850"></center>


> Arriba, la fila superior representa un Pipeline con tres etapas. Los dos primeros (Tokenizer y HashingTF) son Transformers (azul) y el tercero (LogisticRegression) es un Estimator (rojo). La fila inferior representa los datos que fluyen a través de la canalización, donde los cilindros indican tramas de datos. Se llama al método Pipeline.fit() en el DataFrame original, que tiene etiquetas y documentos de texto sin formato. El método Tokenizer.transform() divide los documentos de texto sin procesar en palabras, agregando una nueva columna con palabras al DataFrame. El método HashingTF.transform() convierte la columna de palabras en vectores de características, agregando una nueva columna con esos vectores al DataFrame. Ahora, dado que LogisticRegression es un Estimador, Pipeline primero llama a LogisticRegression.fit() para producir un LogisticRegressionModel. Si Pipeline tuviera más Estimadores, llamaría al método transform() de LogisticRegressionModel en el DataFrame antes de pasar el DataFrame a la siguiente etapa.

> Un Pipeline es un Estimador. Por lo tanto, después de que se ejecuta el método fit() de Pipeline, produce un PipelineModel, que es un Transformador. Este PipelineModel se usa en el momento de la prueba; la siguiente figura ilustra este uso.

<center><img src="https://spark.apache.org/docs/latest/img/ml-PipelineModel.png" width="550" height="850"></center>

En la figura anterior, el `PipelineModel` tiene la misma cantidad de etapas que el Pipeline original, pero todos los Estimadores en el Pipeline original se han convertido en Transformadores. Cuando se llama al método `transform()` de PipelineModel en un conjunto de datos de prueba, los datos se pasan a través de la canalización ajustada en orden. El método `transform()` de cada etapa actualiza el conjunto de datos y lo pasa a la siguiente etapa.

`Pipelines` y `PipelineModels` ayudan a garantizar que los datos de entrenamiento y prueba pasen por pasos de procesamiento de características idénticos.

Para ilustrar el concepto de **Pipelines**, primero crearemos un marco de datos de muestra con tres columnas para usar como datos de entrada, como se muestra aquí, luego crearemos una canalización simple usando`pyspark.ml.Pipeline()`:

In [None]:
# spark: Una instancia en SparkSession
# creamos un DataFrame
df = spark.createDataFrame([
(1, 'CS', 'MS'),
(2, 'MATH', 'PHD'),
(3, 'MATH', 'MS'),
(4, 'CS', 'MS'),
(5, 'CS', 'PHD'),
(6, 'ECON', 'BS'), (7, 'ECON', 'BS'),], ['id', 'dept', 'education'])

Podemos ver nuestros datos de muestra con `df.show()`:

In [None]:
df.show()

+---+----+---------+
| id|dept|education|
+---+----+---------+
|  1|  CS|       MS|
|  2|MATH|      PHD|
|  3|MATH|       MS|
|  4|  CS|       MS|
|  5|  CS|      PHD|
|  6|ECON|       BS|
|  7|ECON|       BS|
+---+----+---------+



Ahora que hemos creado el DataFrame, supongamos que queremos transformar los datos a través de tres etapas definidas, {etapa_1, etapa_2, etapa_3}. En cada etapa, pasaremos los nombres de las columnas de entrada y salida, y configuraremos el **Pipeline** pasando las etapas definidas al objeto **Pipeline** como una lista.

El modelo de **Pipeline** de Spark luego realiza pasos específicos uno por uno en una secuencia y nos brinda el resultado final deseado.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [None]:
# Stage 1: transforma el `dept` columna a numerica
stage_1 = StringIndexer(inputCol= 'dept', outputCol= 'dept_index')
#
# Stage 2: transforma la `education` a columna numérica
stage_2 = StringIndexer(inputCol= 'education', outputCol= 'education_index')
#


In [None]:
# Aplicar la transformación
stage_1.fit(df).transform(df).show()



+---+----+---------+----------+
| id|dept|education|dept_index|
+---+----+---------+----------+
|  1|  CS|       MS|       0.0|
|  2|MATH|      PHD|       2.0|
|  3|MATH|       MS|       2.0|
|  4|  CS|       MS|       0.0|
|  5|  CS|      PHD|       0.0|
|  6|ECON|       BS|       1.0|
|  7|ECON|       BS|       1.0|
+---+----+---------+----------+



In [None]:
# Aplicar la transformación
stage_2.fit(df).transform(df).show()

+---+----+---------+---------------+
| id|dept|education|education_index|
+---+----+---------+---------------+
|  1|  CS|       MS|            0.0|
|  2|MATH|      PHD|            2.0|
|  3|MATH|       MS|            0.0|
|  4|  CS|       MS|            0.0|
|  5|  CS|      PHD|            2.0|
|  6|ECON|       BS|            1.0|
|  7|ECON|       BS|            1.0|
+---+----+---------+---------------+



In [None]:
# Etapa 3: codificar one-hot la columna numérica `education_index`
stage_3 = OneHotEncoder(inputCols=['education_index'],outputCols=['education_OHE'])


A continuación, definiremos nuestro pipeline con estas tres etapas:

In [None]:
# configurar la canalización: pegar las etapas juntas
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3 ])# Entubar todas las etapas

# ajuste el modelo de tubería y transforme los datos como se define
pipeline_model = pipeline.fit(df)

In [None]:
# Se realiza la tranformación
final_df = pipeline_model.transform(df) # Se aplica nuestro datos
final_df.show(truncate=False)

+---+----+---------+----------+---------------+-------------+
|id |dept|education|dept_index|education_index|education_OHE|
+---+----+---------+----------+---------------+-------------+
|1  |CS  |MS       |0.0       |0.0            |(2,[0],[1.0])|
|2  |MATH|PHD      |2.0       |2.0            |(2,[],[])    |
|3  |MATH|MS       |2.0       |0.0            |(2,[0],[1.0])|
|4  |CS  |MS       |0.0       |0.0            |(2,[0],[1.0])|
|5  |CS  |PHD      |0.0       |2.0            |(2,[],[])    |
|6  |ECON|BS       |1.0       |1.0            |(2,[1],[1.0])|
|7  |ECON|BS       |1.0       |1.0            |(2,[1],[1.0])|
+---+----+---------+----------+---------------+-------------+



## Binarizing Data
Binarizar los datos significa establecer los valores de las características en 0 o 1 según algún umbral. Los valores mayores que el umbral se asignan a 1, mientras que los valores menores o iguales al umbral se asignan a 0. Con el umbral predeterminado de 0, solo los valores positivos se asignan a 1. La binarización es, por lo tanto, el proceso de umbralización de características numéricas a binario {0, 1} características.

`Binarizer()` de Spark toma los parámetros *inputCol* y *outputCol*, así como el umbral para la binarización. Los valores de características superiores al umbral se binarizan a 1,0; los valores iguales o inferiores al umbral se binarizan a 0,0. 

Primero, creemos un DataFrame con una sola función:

In [None]:
from pyspark.ml.feature import Binarizer
raw_df = spark.createDataFrame([
(1, 0.1),
(2, 0.2),
(3, 0.5),
(4, 0.8),
(5, 0.9),
(6, 1.1)
], ["id", "feature"])

raw_df.show()

+---+-------+
| id|feature|
+---+-------+
|  1|    0.1|
|  2|    0.2|
|  3|    0.5|
|  4|    0.8|
|  5|    0.9|
|  6|    1.1|
+---+-------+



A continuación, crearemos un Binarizer con `threshold=0.5`, por lo que cualquier valor inferior o igual a 0.5 se asignará a 0.0 y cualquier valor superior a 0.5 se asignará a 1.0:

In [None]:

from pyspark.ml.feature import Binarizer
binarizer = Binarizer(threshold=0.5, inputCol="feature",outputCol="binarized_feature")


Finalmente, aplicamos el Binarizer definido a una columna de características:

In [None]:
binarized_df = binarizer.transform(raw_df)
print("Salida del binarizador con Threshold = %f" % binarizer.getThreshold())


Salida del binarizador con Threshold = 0.500000


In [None]:
#binarized_df = binarizer.transform(raw_df)
binarized_df.show(truncate=False)

+---+-------+-----------------+
|id |feature|binarized_feature|
+---+-------+-----------------+
|1  |0.1    |0.0              |
|2  |0.2    |0.0              |
|3  |0.5    |0.0              |
|4  |0.8    |1.0              |
|5  |0.9    |1.0              |
|6  |1.1    |1.0              |
+---+-------+-----------------+



## Imputación de Datos
La función `Imputer()` de Spark's es un transformador de imputación para completar los valores faltantes. Los conjuntos de datos del mundo real suelen contener valores faltantes, a menudo codificados como valores nulos, espacios en blanco, **NaNs** u otros marcadores de posición. Hay muchos métodos para manejar estos valores, incluidos los siguientes:
* Elimine instancias si falta alguna característica (esto podría no ser una buena idea ya que se perderá información importante de otras características).
* Para una característica faltante, encuentre el valor promedio de esa característica y complete ese valor.
* Imputar los valores faltantes (es decir, inferirlos de la parte conocida de los datos).

Esta suele ser la mejor estrategia.

`class pyspark.ml.feature.Imputer(*, strategy='mean', missingValue=nan, 
inputCols=None, outputCols=None,inputCol=None, outputCol=None, relativeError=0.001)`

Utiliza la media o la mediana de las columnas en las que se encuentran los valores faltantes. Las columnas de entrada deben ser de tipo numérico; actualmente `Imputer` no admite funciones categóricas y puede crear valores incorrectos para una función categórica.

Tenga en cuenta que el valor de la media/mediana/moda se calcula después de filtrar los valores faltantes. Todos los valores nulos en las columnas de entrada se tratan como perdidos y, por lo tanto, también se imputan. Para calcular la mediana, se utiliza la función `pyspark.sql.DataFrame.approxQuantile()` con un error relativo de 0,001.

Puede indicarle al imputador que impute valores personalizados que no sean **NaN** usando ``.set MissingValue(custom_value)``. Por ejemplo, .setMissingValue(0) le indica que impute todas las apariciones de 0 (nuevamente, los valores nulos en las columnas de entrada se tratarán como faltantes y también se imputarán).

El siguiente ejemplo muestra cómo se puede utilizar un imputer. Supongamos que tenemos un DataFrame con tres columnas, id, col1 y col2:

In [None]:
df = spark.createDataFrame([(1, 12.0, 5.0),(2, 7.0, 10.0),(3, 10.0, 12.0),(4, 5.0, float("nan")),(5, 6.0, None),
                            (6, float("nan"), float("nan")),(7, None, None)], ["id", "col1", "col2"])
df.show(truncate=False)

+---+----+----+
|id |col1|col2|
+---+----+----+
|1  |12.0|5.0 |
|2  |7.0 |10.0|
|3  |10.0|12.0|
|4  |5.0 |NaN |
|5  |6.0 |null|
|6  |NaN |NaN |
|7  |null|null|
+---+----+----+



A continuación, creemos un imputer y apliquémoslo a nuestros datos creados:

In [None]:
# LLamamos a la función Imputer
from pyspark.ml.feature import Imputer

# Por defauld usa la estrategia de la media
imputer = Imputer(inputCols=["col1", "col2"],outputCols=["col1_out", "col2_out"])

# Realiza la imputación
model = imputer.fit(df)

transformed = model.transform(df)

# Muestra el resultado
transformed.show(truncate=False)

+---+----+----+--------+--------+
|id |col1|col2|col1_out|col2_out|
+---+----+----+--------+--------+
|1  |12.0|5.0 |12.0    |5.0     |
|2  |7.0 |10.0|7.0     |10.0    |
|3  |10.0|12.0|10.0    |12.0    |
|4  |5.0 |NaN |5.0     |9.0     |
|5  |6.0 |null|6.0     |9.0     |
|6  |NaN |NaN |8.0     |9.0     |
|7  |null|null|8.0     |9.0     |
+---+----+----+--------+--------+



¿Cómo conseguimos los números para los valores faltantes (8,0 para col1 y 9,0 para col2)? Es fácil; dado que la estrategia predeterminada es "media", simplemente calculamos los promedios para cada columna y los usamos para los valores faltantes:

$$col1: (12.0+7.0+10.0+5.0+6.0) / 5 = 40 / 5 = 8.0$$
$$col2: (5.0+10.0+12.0) / 3 = 27.0 / 3 = 9.0$$

En función de sus requisitos de datos, es posible que desee utilizar una estrategia diferente para completar los valores que faltan. Puede indicarle a la computadora que use la mediana de los valores de características disponibles en su lugar de la siguiente manera:

In [None]:
# Estrategia con la mediana
imputer.setStrategy("median")

# Rellena con la mediana
model = imputer.fit(df)
transformed = model.transform(df)

# Muestra el resultado
transformed.show(truncate=False)

+---+----+----+--------+--------+
|id |col1|col2|col1_out|col2_out|
+---+----+----+--------+--------+
|1  |12.0|5.0 |12.0    |5.0     |
|2  |7.0 |10.0|7.0     |10.0    |
|3  |10.0|12.0|10.0    |12.0    |
|4  |5.0 |NaN |5.0     |10.0    |
|5  |6.0 |null|6.0     |10.0    |
|6  |NaN |NaN |7.0     |10.0    |
|7  |null|null|7.0     |10.0    |
+---+----+----+--------+--------+



Para obtener estos valores (7,0 para col1 y 10,0 para col2), solo calculamos el valor de la mediana para cada columna:

median(col1) =
median(12.0, 7.0, 10.0, 5.0, 6.0) =
median(5.0, 6.0, 7.0, 10.0, 12.0) =
7.0

median(col2) =
median(5.0, 10.0, 12.0) =
10.0

## Tokenization
Los algoritmos de tokenización se utilizan para dividir una frase, una oración, un párrafo o un documento de texto completo en unidades más pequeñas, como palabras individuales, bigramas o términos. Estas unidades más pequeñas se llaman fichas. Por ejemplo, el analizador léxico (un algoritmo utilizado en la escritura del compilador) divide el código de programación en una serie de tokens eliminando cualquier espacio en blanco o comentarios. Por lo tanto, puede pensar en la tokenización de manera más general como el proceso de dividir una cadena en cualquier tipo de tokens significativos.

En Spark, puede usar `Tokenizer` y `RegexTokenizer` (que le permite definir estrategias de tokenización personalizadas a través de expresiones regulares) para tokenizar cadenas.

### Tokenizer
Spark's Tokenizer es un tokenizador que convierte la cadena de entrada a minúsculas y luego la divide por espacios en blanco. Para mostrar cómo funciona esto, creemos algunos datos de muestra:

In [None]:
from pyspark.ml.feature import Tokenizer

In [None]:
docs = [(1, "a Fox jumped over FOX"),(2, "RED of fox jumped")]
df = spark.createDataFrame(docs, ["id", "texto"])
df.show(truncate=False)

+---+---------------------+
|id |texto                |
+---+---------------------+
|1  |a Fox jumped over FOX|
|2  |RED of fox jumped    |
+---+---------------------+



Luego aplica el Tokenizer:

In [None]:
# Le decimos cual es la columna o variable que desea Tokenizer y la salida
tokenizer = Tokenizer(inputCol="texto", outputCol="tokens")

# Se realiza el Tokenizer
tokenized = tokenizer.transform(df)

tokenized.show(truncate=False)

+---+---------------------+---------------------------+
|id |texto                |tokens                     |
+---+---------------------+---------------------------+
|1  |a Fox jumped over FOX|[a, fox, jumped, over, fox]|
|2  |RED of fox jumped    |[red, of, fox, jumped]     |
+---+---------------------+---------------------------+



In [None]:
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

countTokens = udf(lambda words: len(words), IntegerType())
tokenized.select("texto", "tokens").withColumn("tokens_length",countTokens(col("tokens"))).show(truncate=False)

+---------------------+---------------------------+-------------+
|texto                |tokens                     |tokens_length|
+---------------------+---------------------------+-------------+
|a Fox jumped over FOX|[a, fox, jumped, over, fox]|5            |
|RED of fox jumped    |[red, of, fox, jumped]     |4            |
+---------------------+---------------------------+-------------+



### RegexTokenizer
RegexTokenizer de Spark es un tokenizador basado en expresiones regulares que extrae tokens usando el patrón de expresiones regulares proporcionado para dividir el texto (el valor predeterminado) o haciendo coincidir repetidamente la expresión regular (si el parámetro de espacios opcional, que es Verdadero de manera predeterminada, es Falso). Aquí hay un ejemplo:

In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

regexTokenizer = RegexTokenizer(inputCol="texto", outputCol="tokens",
pattern="\\W", minTokenLength=3)

# Relizame la tranformación
regex_tokenized = regexTokenizer.transform(df)
regex_tokenized.select("texto", "tokens").withColumn("tokens_length", countTokens(col("tokens"))).show(truncate=False)

+---------------------+------------------------+-------------+
|texto                |tokens                  |tokens_length|
+---------------------+------------------------+-------------+
|a Fox jumped over FOX|[fox, jumped, over, fox]|4            |
|RED of fox jumped    |[red, fox, jumped]      |3            |
+---------------------+------------------------+-------------+



### Tokenization con Pipeline
También podemos realizar la tokenización como parte de un pipeline. Aquí, creamos un DataFrame con dos columnas:

In [None]:
docs = [(1, "a Fox jumped, over, the fence?"),(2, "a RED, of fox?"),(3, "Curso Pyspark Bolivia")]
df = spark.createDataFrame(docs, ["id", "text"])
df.show(truncate=False)

+---+------------------------------+
|id |text                          |
+---+------------------------------+
|1  |a Fox jumped, over, the fence?|
|2  |a RED, of fox?                |
|3  |Curso Pyspark Bolivia         |
+---+------------------------------+



A continuación, aplicamos la función `RegexTokenizer()` a este DataFrame:

In [None]:
from pyspark.ml.feature import StopWordsRemover

tk = RegexTokenizer(pattern=r'(?:\p{Punct}|\s)+', inputCol="text",outputCol='text2')
sw = StopWordsRemover(inputCol='text2', outputCol='text3')

# Creamo nuestro Pipline
pipeline = Pipeline(stages=[tk, sw])
df4 = pipeline.fit(df).transform(df)
df4.show(truncate=False)

+---+------------------------------+----------------------------------+-------------------------+
|id |text                          |text2                             |text3                    |
+---+------------------------------+----------------------------------+-------------------------+
|1  |a Fox jumped, over, the fence?|[a, fox, jumped, over, the, fence]|[fox, jumped, fence]     |
|2  |a RED, of fox?                |[a, red, of, fox]                 |[red, fox]               |
|3  |Curso Pyspark Bolivia         |[curso, pyspark, bolivia]         |[curso, pyspark, bolivia]|
+---+------------------------------+----------------------------------+-------------------------+



## Estandarización
Una de las técnicas más populares para escalar datos numéricos antes de construir un modelo es la estandarización. La estandarización de un conjunto de datos implica volver a escalar la distribución de valores para que la media de los valores observados (como característica) sea 0,00 y la desviación estándar sea 1,00.

Muchos algoritmos de aprendizaje automático funcionan mejor cuando las variables de entrada numéricas (características) se escalan a un rango estándar. Por ejemplo, los algoritmos como la regresión lineal que usan una suma ponderada de la entrada y los algoritmos como los k-vecinos más cercanos que usan medidas de distancia requieren valores estandarizados, ya que, de lo contrario, los modelos construidos podrían ajustarse por debajo o por encima de los datos de entrenamiento y tener un rendimiento inferior.

Un valor se estandariza de la siguiente manera:

$$y = \frac{(x – mean)}{ standard_deviation}$$

Donde la media se calcula como:

$$mean = sum(x) / count(x)$$

$$\hat x=\frac{1}{N} \sum_{i=1}^{N} x_i$$

Y la desviación estándar se calcula como:

$$standard_deviation = \sqrt(sum( (x – mean)^2 )/count(x))$$

$$sd=\sqrt{\frac{1}{N} \sum_{i=1}^{N} (x_i - \bar x)^2} $$

Por ejemplo, si $X = (1, 3, 6, 10)$, la media/promedio se calcula como:

$$mean = (1+2+6+10)/4 = 20/4 = 5.0$$

y la desviación estándar se calcula como:

desviación estándar

$$= \sqrt {(((1-5)^2 + (3-5)^2 + (6-5)^2 + (10-5)^2)) / 4)}$$
$$= \sqrt {((16+4+1+25)/4)}$$
$$= \sqrt{(46/4)}$$
$$= \sqrt(11.5) = 3.39116$$

Entonces, los nuevos valores estandarizados serán:

$$y = (y_1, y_2, y_3, y_4) = (-1.1795, -0.5897, 0.2948, 1.4744)$$

donde
$$y_1 = (1 – 5.0) / 3.39116$$
$$y_2 = (3 - 5.0) / 3.39116$$
$$y_3 = (6 - 5.0) / 3.39116$$
$$y_4 = (10 - 5.0) / 3.39116$$

Como puede ver, la media de los valores estandarizados (y) es 0,00 y la desviación estándar es 1,00.

Repasemos cómo realizar la estandarización en PySpark. Digamos que estamos tratando de estandarizar (media = 0.00, stddev = 1.00) una columna en un DataFrame. Primero crearemos un DataFrame de muestra, luego le mostraré dos formas de estandarizar la columna de edad:

In [None]:
features = [('alex', 1), ('jans', 3), ('ali', 6), ('bruno', 10)]
columns = ("nombre", "edad")
samples = spark.createDataFrame(features, columns)
samples.show()

+------+----+
|nombre|edad|
+------+----+
|  alex|   1|
|  jans|   3|
|   ali|   6|
| bruno|  10|
+------+----+



**El método 1:** es usar funciones de DataFrame:

In [None]:
from pyspark.sql.functions import stddev, mean, col

(samples.select(mean("edad").alias("mean_edad"),
                stddev("edad").alias("stddev_edad")).crossJoin(samples).withColumn("edad_scaled",
                                                                                 (col("edad") - col("mean_edad")) / col("stddev_edad"))) .show(truncate=False)

+---------+------------------+------+----+-------------------+
|mean_edad|stddev_edad       |nombre|edad|edad_scaled        |
+---------+------------------+------+----+-------------------+
|5.0      |3.9157800414902435|alex  |1   |-1.0215078369104984|
|5.0      |3.9157800414902435|jans  |3   |-0.5107539184552492|
|5.0      |3.9157800414902435|ali   |6   |0.2553769592276246 |
|5.0      |3.9157800414902435|bruno |10  |1.276884796138123  |
+---------+------------------+------+----+-------------------+



o alternativamente, podemos escribir esto como:

In [None]:
mean_age, sttdev_age = samples.select(mean("edad"), stddev("edad")).first()
samples.withColumn("edad_scaled",(col("edad") - mean_age) / sttdev_age).show(truncate=False)

+------+----+-------------------+
|nombre|edad|edad_scaled        |
+------+----+-------------------+
|alex  |1   |-1.0215078369104984|
|jans  |3   |-0.5107539184552492|
|ali   |6   |0.2553769592276246 |
|bruno |10  |1.276884796138123  |
+------+----+-------------------+



**Método 2** es usar funciones del paquete ml de PySpark. Aquí, usamos `pyspark.ml.feature.VectorAssembler()` para transformar la columna de edad en un vector, luego estandarizamos los valores con `StandardScaler` de Spark:

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

vecAssembler = VectorAssembler(inputCols=['edad'], outputCol="edad_vector")
samples2 = vecAssembler.transform(samples)
samples2.show()

+------+----+-----------+
|nombre|edad|edad_vector|
+------+----+-----------+
|  alex|   1|      [1.0]|
|  jans|   3|      [3.0]|
|   ali|   6|      [6.0]|
| bruno|  10|     [10.0]|
+------+----+-----------+



In [None]:
scaler = StandardScaler(inputCol="edad_vector", outputCol="edad_scaled",withStd=True, withMean=True)
scalerModel = scaler.fit(samples2)
scaledData = scalerModel.transform(samples2)
scaledData.show(truncate=False)

+------+----+-----------+---------------------+
|nombre|edad|edad_vector|edad_scaled          |
+------+----+-----------+---------------------+
|alex  |1   |[1.0]      |[-1.0215078369104984]|
|jans  |3   |[3.0]      |[-0.5107539184552492]|
|ali   |6   |[6.0]      |[0.2553769592276246] |
|bruno |10  |[10.0]     |[1.276884796138123]  |
+------+----+-----------+---------------------+



A diferencia de la normalización, que veremos a continuación, la estandarización puede ser útil en los casos en que los datos siguen una distribución gaussiana. Tampoco tiene un rango límite, por lo que si tiene valores atípicos en sus datos, no se verán afectados por la estandarización.

## Normalización
La normalización es una técnica de escalado que a menudo se aplica como parte de la preparación de datos para el aprendizaje automático. El objetivo de la normalización es cambiar los valores de las columnas numéricas en el conjunto de datos para usar una escala común, sin distorsionar las diferencias en los rangos de valores ni perder información. La normalización escala cada variable de entrada numérica por separado al rango [0,1], que es el rango para los valores de coma flotante, donde tenemos la mayor precisión. En otras palabras, los valores de las características se desplazan y reescalan para que terminen oscilando entre 0,00 y 1,00. Esta técnica también se conoce como escalado mínimo-máximo, y Spark proporciona un transformador para este propósito llamado MinMaxScaler.

Aquí está la fórmula para la normalización:

$$\bar X=\frac{X_i-X_{min}}{X_{max}-X_{min}}$$

Tenga en cuenta que $X_{max}$ y $X_{min}$ son los valores máximo y mínimo de la característica dada, $X_i$, respectivamente.

Para ilustrar el proceso de normalización, creemos un DataFrame con tres características:

In [None]:
df = spark.createDataFrame([ (100, 77560, 45),(200, 41560, 23),(300, 30285, 20),
                            (400, 10345, 6),(500, 88000, 50)], 
                           ["user_id", "revenue","num_days"])
print("Antes de Scaling :")
df.show()

Antes de Scaling :
+-------+-------+--------+
|user_id|revenue|num_days|
+-------+-------+--------+
|    100|  77560|      45|
|    200|  41560|      23|
|    300|  30285|      20|
|    400|  10345|       6|
|    500|  88000|      50|
+-------+-------+--------+



A continuación, aplicaremos `MinMaxScaler` a nuestras características:

In [None]:
from pyspark.ml.feature import MinMaxScaler # Para la normalización
from pyspark.ml.feature import VectorAssembler#  convierte las variables en vectr
from pyspark.ml import Pipeline # Entuva los procesos
from pyspark.sql.functions import udf # Función definiada por el usuario
from pyspark.sql.types import DoubleType

# UDF para convertir el tipo de columna de vector a tipo doble
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

In [None]:
# Iterando sobre las columnas a escalar
for i in ["revenue","num_days"]:
    # Transformación VectorAssembler: conversión de columna a tipo vectorial
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
    # MinMaxScaler transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
    # Pipeline y VectorAssembler y MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])
    # Fitting pipeline on DataFrame
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
    

df.show(5)

+-------+-------+--------+---------------+
|user_id|revenue|num_days|num_days_Scaled|
+-------+-------+--------+---------------+
|    100|  77560|      45|          0.886|
|    200|  41560|      23|          0.386|
|    300|  30285|      20|          0.318|
|    400|  10345|       6|            0.0|
|    500|  88000|      50|            1.0|
+-------+-------+--------+---------------+



La normalización es una buena técnica para usar cuando sabe que sus datos no siguen una distribución gaussiana. Esto puede ser útil en algoritmos que no asumen ninguna distribución de los datos, como la regresión lineal, los k vecinos más cercanos y las redes neuronales. En las siguientes secciones, veremos algunos ejemplos más.

### Scaling a Column Using a Pipeline
Al igual que con la tokenización, podemos aplicar la normalización en una canalización. Primero, definamos un conjunto de características:

In [None]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
triplets = [(0, 1, 100), (1, 2, 200), (2, 5, 1000)]
df = spark.createDataFrame(triplets, ['x', 'y', 'z'])
df.show()

+---+---+----+
|  x|  y|   z|
+---+---+----+
|  0|  1| 100|
|  1|  2| 200|
|  2|  5|1000|
+---+---+----+



Ahora podemos aplicar MinMaxScaler en una canalización de la siguiente manera para normalizar los valores de la función (columna) $x$:

In [None]:
assembler = VectorAssembler(inputCols=["x"], outputCol="x_vector")
scaler = MinMaxScaler(inputCol="x_vector", outputCol="x_scaled")
pipeline = Pipeline(stages=[assembler, scaler])
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show(truncate=False)

+---+---+----+--------+--------+
|x  |y  |z   |x_vector|x_scaled|
+---+---+----+--------+--------+
|0  |1  |100 |[0.0]   |[0.0]   |
|1  |2  |200 |[1.0]   |[0.5]   |
|2  |5  |1000|[2.0]   |[1.0]   |
+---+---+----+--------+--------+



### Using MinMaxScaler on Multiple Columns
También podemos aplicar un escalador (como MinMaxScaler) en varias columnas:

In [None]:
triplets = [(0, 1, 100), (1, 2, 200), (2, 5, 1000)]
df = spark.createDataFrame(triplets, ['x', 'y', 'z'])
df.show()

+---+---+----+
|  x|  y|   z|
+---+---+----+
|  0|  1| 100|
|  1|  2| 200|
|  2|  5|1000|
+---+---+----+



In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler

columns_to_scale = ["x", "y", "z"]
assemblers = [VectorAssembler(inputCols=[col],outputCol=col + "_vector") for col in columns_to_scale]
scalers = [MinMaxScaler(inputCol=col + "_vector",outputCol=col + "_scaled") for col in columns_to_scale]
pipeline = Pipeline(stages=assemblers + scalers)
scalerModel = pipeline.fit(df)
scaledData = scalerModel.transform(df)
scaledData.show(truncate=False)

+---+---+----+--------+--------+--------+--------+--------+--------------------+
|x  |y  |z   |x_vector|y_vector|z_vector|x_scaled|y_scaled|z_scaled            |
+---+---+----+--------+--------+--------+--------+--------+--------------------+
|0  |1  |100 |[0.0]   |[1.0]   |[100.0] |[0.0]   |[0.0]   |[0.0]               |
|1  |2  |200 |[1.0]   |[2.0]   |[200.0] |[0.5]   |[0.25]  |[0.1111111111111111]|
|2  |5  |1000|[2.0]   |[5.0]   |[1000.0]|[1.0]   |[1.0]   |[1.0]               |
+---+---+----+--------+--------+--------+--------+--------+--------------------+



Puede realizar un procesamiento posterior para recuperar los nombres de las columnas originales:

In [None]:
from pyspark.sql import functions as f

names = {x + "_scaled": x for x in columns_to_scale}
scaledData = scaledData.select([f.col(c).alias(names[c]) for c in names.keys()])
scaledData.show()

+-----+------+--------------------+
|    x|     y|                   z|
+-----+------+--------------------+
|[0.0]| [0.0]|               [0.0]|
|[0.5]|[0.25]|[0.1111111111111111]|
|[1.0]| [1.0]|               [1.0]|
+-----+------+--------------------+



### Normalización usando Normalizer
Normalizer es un transformador que transforma un conjunto de datos de filas de vectores, normalizando cada vector para que tenga una norma de unidad. Toma el parámetro p, que especifica la norma p utilizada para la normalización. (p=2 por defecto). Esta normalización puede ayudar a estandarizar sus datos de entrada y mejorar el comportamiento de los algoritmos de aprendizaje.

El Normalizador de Spark transforma un conjunto de datos de filas de vectores, normalizando cada vector para que tenga una norma de unidad (es decir, una longitud de 1). Toma un parámetro $p$ del usuario, que representa la $p-norma$. Por ejemplo, puede configurar $p=1$ para usar la norma de Manhattan (o la distancia de Manhattan) o $p=2$ para usar la norma euclidiana:

$$L_1: z = || x ||_1 = sum(|x_i|) for i=1, \cdots, n$$

$$L_2: z = || x ||_2 = sqrt(sum(x_i^2)) for i=1,\cdots, n$$

In [None]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

data = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
data.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]|
|  2|[4.0,10.0,2.0]|
+---+--------------+



In [None]:
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(data)
print("Normalized usando la norma L^1")
l1NormData.show()

Normalized usando la norma L^1
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



In [None]:
# Normalize each Vector using $L^2$ norm.
normalizer = Normalizer(inputCol="features", outputCol="norma2", p=2.0)
l2NormData = normalizer.transform(data)
print("Normalizando usando la norma L^2")
l2NormData.show()

Normalizando usando la norma L^2
+---+--------------+--------------------+
| id|      features|              norma2|
+---+--------------+--------------------+
|  0|[1.0,0.5,-1.0]|[0.66666666666666...|
|  1| [2.0,1.0,1.0]|[0.81649658092772...|
|  2|[4.0,10.0,2.0]|[0.36514837167011...|
+---+--------------+--------------------+



In [None]:
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(data, {normalizer.p: float("inf")})
print("Normalizando usando la norma L^inf")
lInfNormData.show()

Normalizando usando la norma L^inf
+---+--------------+--------------+
| id|      features|        norma2|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



In [None]:
from pyspark.ml.feature import Normalizer
# Creando un object de la class Normalizer
ManhattanDistance=Normalizer().setP(1).setInputCol("features").setOutputCol("Manhattan Distance")

EuclideanDistance=Normalizer().setP(2).setInputCol("features").setOutputCol("Euclidean Distance")


## String Indexing (Indexación de cadenas)
La mayoría de los algoritmos de **Machine Learning** requieren la conversión de características categóricas (como cadenas) en numéricas. La indexación de cadenas es el proceso de convertir cadenas en valores numéricos.

`StringIndexer` de **Spark** es un indexador de etiquetas que asigna una columna de cadenas de etiquetas a una columna de índices de etiquetas. Si la columna de entrada es numérica, la convertimos en cadena e indexamos los valores de cadena. Los índices están en el rango [0, numLabels). De forma predeterminada, se ordenan por frecuencia de etiqueta en orden descendente, por lo que la etiqueta más frecuente obtiene el índice 0. El comportamiento de ordenación se controla configurando la opción `stringOrderType`.

StringIndexer codifica una columna de cadenas de etiquetas en una columna de índices de etiquetas. StringIndexer puede codificar varias columnas. Los índices están en $[0, numLabels)$ y se admiten cuatro opciones de ordenación: `frequencyDesc`: orden descendente por frecuencia de etiqueta (etiqueta más frecuente asignada 0), `frequencyAsc`: orden ascendente por frecuencia de etiqueta (etiqueta menos frecuente asignada 0) , `alphabetDesc`: orden alfabético descendente y `alphabetAsc`: orden alfabético ascendente (predeterminado = `frequencyDesc`). Tenga en cuenta que en el caso de la misma frecuencia en `frequencyDesc`/`frequencyAsc`, las cadenas se ordenan alfabéticamente.

Las etiquetas invisibles se colocarán en el índice numLabels si el usuario decide conservarlas. Si la columna de entrada es numérica, la convertimos en cadena e indexamos los valores de cadena. Cuando los componentes de pipeline posteriores, como Estimator o `Transformer`, utilizan esta etiqueta indexada por cadena, debe establecer la columna de entrada del componente en este nombre de columna indexada por cadena. En muchos casos, puede configurar la columna de entrada con `setInputCol`.

In [None]:
df = spark.createDataFrame(
    [(1111111,20151122045510, "Yin","gre"), (1111111,20151122045501, "Yin","gre"), (1111111,20151122045500, "Yln","gra")
     , (1111112,20151122065832, "Yun","ddd"), (1111113,20160101003221, "Yan","fdf"), (1111111,20160703045231, "Yin","gre"),
    (1111114,20150419134543, "Yin","fdf"), (1111115,20151123174302, "Yen","ddd"),(2111115, 20123192, "Yen","gre")],
    ["address", "date","name","food"])
df.show()

+-------+--------------+----+----+
|address|          date|name|food|
+-------+--------------+----+----+
|1111111|20151122045510| Yin| gre|
|1111111|20151122045501| Yin| gre|
|1111111|20151122045500| Yln| gra|
|1111112|20151122065832| Yun| ddd|
|1111113|20160101003221| Yan| fdf|
|1111111|20160703045231| Yin| gre|
|1111114|20150419134543| Yin| fdf|
|1111115|20151123174302| Yen| ddd|
|2111115|      20123192| Yen| gre|
+-------+--------------+----+----+



Si queremos transformarlo para usarlo con `pyspark.ml`, podemos usar `StringIndexer` de Spark para convertir la columna de nombre en una columna numérica, como se muestra aquí:

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="name", outputCol="name_index").fit(df)
df_ind = indexer.transform(df)
df_ind.show()

+-------+--------------+----+----+----------+
|address|          date|name|food|name_index|
+-------+--------------+----+----+----------+
|1111111|20151122045510| Yin| gre|       0.0|
|1111111|20151122045501| Yin| gre|       0.0|
|1111111|20151122045500| Yln| gra|       3.0|
|1111112|20151122065832| Yun| ddd|       4.0|
|1111113|20160101003221| Yan| fdf|       2.0|
|1111111|20160703045231| Yin| gre|       0.0|
|1111114|20150419134543| Yin| fdf|       0.0|
|1111115|20151123174302| Yen| ddd|       1.0|
|2111115|      20123192| Yen| gre|       1.0|
+-------+--------------+----+----+----------+



### Aplicar StringIndexer a varias columnas
¿Qué pasa si queremos aplicar `StringIndexer` a varias columnas a la vez? La manera simple de hacer esto es combinar varios `StringIndexes` en una función `list()`` y usar un Pipeline para ejecutarlos todos:

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [ StringIndexer(inputCol=column, outputCol=column+"_index").fit(df)
            for column in list(set(df.columns)-set(['date'])) ]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)

df_indexed.show()

+-------+--------------+----+----+----------+----------+-------------+
|address|          date|name|food|food_index|name_index|address_index|
+-------+--------------+----+----+----------+----------+-------------+
|1111111|20151122045510| Yin| gre|       0.0|       0.0|          0.0|
|1111111|20151122045501| Yin| gre|       0.0|       0.0|          0.0|
|1111111|20151122045500| Yln| gra|       3.0|       3.0|          0.0|
|1111112|20151122065832| Yun| ddd|       1.0|       4.0|          1.0|
|1111113|20160101003221| Yan| fdf|       2.0|       2.0|          2.0|
|1111111|20160703045231| Yin| gre|       0.0|       0.0|          0.0|
|1111114|20150419134543| Yin| fdf|       2.0|       0.0|          3.0|
|1111115|20151123174302| Yen| ddd|       1.0|       1.0|          4.0|
|2111115|      20123192| Yen| gre|       0.0|       1.0|          5.0|
+-------+--------------+----+----+----------+----------+-------------+



%md
## Vector Assembly
La función principal de `VectorAssembler` es concatenar un conjunto de características en un solo vector que se puede pasar al estimador o al algoritmo de aprendizaje automático. En otras palabras, es un transformador de funciones que fusiona múltiples columnas en una sola columna vectorial.

También `VectorAssembler` es un transformador que combina una lista dada de columnas en una sola columna vectorial. Es útil para combinar características sin procesar y características generadas por diferentes transformadores de características en un solo vector de características, para entrenar modelos ML como regresión logística y árboles de decisión. VectorAssembler acepta los siguientes tipos de columnas de entrada: todos los tipos numéricos, tipo booleano y tipo vectorial. En cada fila, los valores de las columnas de entrada se concatenarán en un vector en el orden especificado.

Veamos el proceso de datos de alto nivel en el modelado de PySpark. Como se muestra en la siguiente esquema. A continuación, comienza con la ingestión de datos seguida del análisis exploratorio de datos, la ingeniería de características, la creación de datos finales y su división en entrenamiento, prueba y validación con fines de modelado. En este artículo, nuestro enfoque es cómo podemos pasar de la ingeniería de características a los pasos de VectorAssembler.

<center><img src="https://miro.medium.com/max/1400/1*BkIwDNUo9tnJB_QfxdeC2A.webp " width="550" height="850"></center>





Antes de entrar en los detalles, echemos un vistazo al proceso de alto nivel para ensamblar una variable individual en una columna vectorial de "características". En la siguiente figura se muestra que dos variables numéricas X1 y X2 se combinan usando `VectorAssembler` en una sola columna de vector llamada **"características"**. 

<center><img src="https://miro.medium.com/max/720/1*bfI5xK7OZ-_uSJCvtXFPUA.webp" width="550" height="850"></center>


In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),(1, 20, 1.0, Vectors.dense([0.0, 15.0, 0.2]), 5.0),
    (1, 15, 3.0, Vectors.dense([0.0, 11.0, 0.8]), 7.0), (0, 10, 5.0, Vectors.dense([0.3, 11.0, 0.6]), 4.0),
    (5, 3, 5.0, Vectors.dense([0.7, 19.0, 0.1]), 3.3)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

dataset.show()

+---+----+------+--------------+-------+
| id|hour|mobile|  userFeatures|clicked|
+---+----+------+--------------+-------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|
|  1|  20|   1.0|[0.0,15.0,0.2]|    5.0|
|  1|  15|   3.0|[0.0,11.0,0.8]|    7.0|
|  0|  10|   5.0|[0.3,11.0,0.6]|    4.0|
|  5|   3|   5.0|[0.7,19.0,0.1]|    3.3|
+---+----+------+--------------+-------+



Podemos aplicar `VectorAssembler` a estas tres funciones (id, hour,useFeatures y clicked) y fusionarlas en una columna vectorial denominada features, como se muestra aquí:

In [None]:
assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="vector")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("vector", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|vector                 |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
|[20.0,1.0,0.0,15.0,0.2]|5.0    |
|[15.0,3.0,0.0,11.0,0.8]|7.0    |
|[10.0,5.0,0.3,11.0,0.6]|4.0    |
|[3.0,5.0,0.7,19.0,0.1] |3.3    |
+-----------------------+-------+



Si desea omitir filas que tienen **NaN** o valores nulos, puede hacerlo usando `VectorAssembler.setParams(handleInvalid="skip")`:

In [None]:
# Creamos el vector
assembler2 = VectorAssembler(inputCols=["hour", "mobile", "userFeatures"], outputCol="features").setParams(handleInvalid="skip")# omita los valore nulos
assembler2.transform(dataset).show()

+---+----+------+--------------+-------+--------------------+
| id|hour|mobile|  userFeatures|clicked|            features|
+---+----+------+--------------+-------+--------------------+
|  0|  18|   1.0|[0.0,10.0,0.5]|    1.0|[18.0,1.0,0.0,10....|
|  1|  20|   1.0|[0.0,15.0,0.2]|    5.0|[20.0,1.0,0.0,15....|
|  1|  15|   3.0|[0.0,11.0,0.8]|    7.0|[15.0,3.0,0.0,11....|
|  0|  10|   5.0|[0.3,11.0,0.6]|    4.0|[10.0,5.0,0.3,11....|
|  5|   3|   5.0|[0.7,19.0,0.1]|    3.3|[3.0,5.0,0.7,19.0...|
+---+----+------+--------------+-------+--------------------+



El modelado en PySpark requiere preparar datos usando `VectorAssembler` que contiene todas las características numéricas y características categóricas convertidas en vectores. `StringIndexer` y `OneHotEncoder` disponibles en `pyspark.ml.feature` son pasos importantes para convertir variables categóricas en una forma vectorizada que luego se puede usar para el trabajo de modelado posterior.

### Bucketing
Data binning— El agrupamiento de datos, también llamado agrupamiento discreto o agrupamiento, es una técnica de preprocesamiento de datos que se utiliza para reducir los efectos de errores de observación menores. Con esta técnica, los valores de datos originales que caen en un pequeño intervalo determinado (un contenedor) se reemplazan por un valor representativo de ese intervalo, a menudo el valor central. Por ejemplo, si tiene datos sobre los precios de los automóviles en los que los valores están muy dispersos, es posible que prefiera utilizar la segmentación en lugar de los precios reales de los automóviles individuales.

El Bucketizer de Spark transforma una columna de características continuas en una columna de cubos de características, donde el usuario especifica los cubos.

Considere este ejemplo: no existe una relación lineal entre la latitud y los valores de las viviendas, pero puede sospechar que las latitudes individuales y los valores de las viviendas están relacionados. Para explorar esto, puede clasificar las latitudes en cubos, creando cubos como:

Bin-1: 32 < latitude <= 33 \
Bin-2: 33 < latitude <= 34

La técnica de binning se puede aplicar tanto en datos categóricos como numéricos. La Tabla 12-2 muestra un ejemplo de clasificación numérica y la Tabla 12-3 muestra un ejemplo de clasificación categórica.

Tabla 12-2. Ejemplo de agrupación numérica

|Value| Bin|
|-----|----|
|0-10 |Very low|
|11-30| Low|
|31-70 |Mid|
|71-90| High|
|91-100| Very high|


Tabla 12-3. Ejemplo de agrupación categórica
|Value| Bin|
|-----|-----|
|India| Asia|
|China| Asia|
|Japan| Asia|
|Spain| Europe|
|Italy| Europe|
|Chile |South America|
|Brazil| South America|


El agrupamiento también se usa con datos genómicos: agrupamos los cromosomas del genoma humano (1, 2, 3, …, 22, X, Y, MT). Por ejemplo, el cromosoma 1 tiene 250 millones de posiciones, que podemos agrupar en 101 cubos.

### Bucketizer
Bucketing es el enfoque más directo para convertir variables continuas en variables categóricas. Para ilustrar, veamos un ejemplo. En PySpark, la tarea de creación de depósitos se puede realizar fácilmente utilizando la clase `Bucketizer`. El primer paso es definir los bordes del cubo; luego creamos un objeto de la clase `Bucketizer` y aplicamos el método `transform()` a nuestro DataFrame.

Bucketizer transforma una columna de funciones continuas en una columna de depósitos de funciones, donde los usuarios especifican los depósitos. Toma un parámetro:

* **splits:** Parámetro para mapear entidades continuas en cubos. Con n+1 divisiones, hay n cubos. Un cubo definido por las divisiones x, y contiene valores en el rango [x, y) excepto el último cubo, que también incluye y. Las divisiones deben ser estrictamente crecientes. Los valores en -inf, inf se deben proporcionar explícitamente para cubrir todos los valores de Double; De lo contrario, los valores fuera de las divisiones especificadas se tratarán como errores. Dos ejemplos de divisiones son Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) y Array(0.0, 1.0, 2.0).

Tenga en cuenta que si no tiene idea de los límites superior e inferior de la columna de destino, debe agregar Double.NegativeInfinity y Double.PositiveInfinity como los límites de sus divisiones para evitar una posible excepción fuera de los límites de Bucketizer.

Tenga en cuenta también que las divisiones que proporcionó deben estar en orden estrictamente creciente, es decir, $s_0 < s_1 < s_2 < \cdots < s_n$.

In [None]:
data = [('A', -99.99), ('B', -0.5), ('C', -0.3),('D', 0.0), ('E', 0.7), ('F', 99.99)]

dataframe = spark.createDataFrame(data, ["id", "features"])
dataframe.show()                                                

+---+--------+
| id|features|
+---+--------+
|  A|  -99.99|
|  B|    -0.5|
|  C|    -0.3|
|  D|     0.0|
|  E|     0.7|
|  F|   99.99|
+---+--------+



A continuación, definimos los bordes de nuestros cubos y aplicamos el Bucketizer para crear cubos:

In [None]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataframe)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()

Bucketizer output with 4 buckets
+---+--------+----------------+
| id|features|bucketedFeatures|
+---+--------+----------------+
|  A|  -99.99|             0.0|
|  B|    -0.5|             1.0|
|  C|    -0.3|             1.0|
|  D|     0.0|             2.0|
|  E|     0.7|             3.0|
|  F|   99.99|             3.0|
+---+--------+----------------+



## QuantileDiscretizer
El `QuantileDiscretizer` de Spark toma una columna con características continuas y genera una columna con características categóricas agrupadas. El número de bandejas se establece mediante el parámetro `numBuckets` y las divisiones de las bandejas se determinan en función de los datos. Es posible que la cantidad de cubos utilizados sea menor que el valor especificado, por ejemplo, si hay muy pocos valores distintos en la entrada para crear suficientes cuantiles distintos (es decir, segmentos del conjunto de datos).

Puede usar `Bucketizer` y `QuantileDiscretizer` juntos, así:

In [None]:
from pyspark.ml.feature import Bucketizer
from pyspark.ml.feature import QuantileDiscretizer

In [None]:
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
print(df.show())

+---+----+
| id|hour|
+---+----+
|  0|18.0|
|  1|19.0|
|  2| 8.0|
|  3| 5.0|
|  4| 2.2|
+---+----+

None


In [None]:
qds = QuantileDiscretizer(numBuckets=5, inputCol="hour",outputCol="buckets", relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.setHandleInvalid("skip").transform(df).show()

+---+----+-------+
| id|hour|buckets|
+---+----+-------+
|  0|18.0|    4.0|
|  1|19.0|    4.0|
|  2| 8.0|    3.0|
|  3| 5.0|    2.0|
|  4| 2.2|    1.0|
+---+----+-------+



## Transformación de logaritmos
En pocas palabras, la transformación de logaritmos (comúnmente denotada por log) comprime el rango de números grandes y expande el rango de números pequeños. En matemáticas, el logaritmo es la función inversa de la exponenciación y se define como (donde b se llama el número base):

$$log_b x = y \rightarrow  b^y = x$$

En feature engineering, la transformación logarítmica es una de las transformaciones matemáticas más utilizadas. Nos ayuda a manejar datos sesgados forzando los valores atípicos más cerca de la media, haciendo que la distribución de datos se aproxime más a la normalidad (por ejemplo, el logaritmo natural/base e del número 4000 es 8,2940496401). Esta normalización reduce el efecto de los valores atípicos, lo que ayuda a que los modelos de aprendizaje automático sean más sólidos.

El logaritmo solo se define para valores positivos distintos de 1 (0, 1 y los valores negativos no pueden ser la base de una función de potencia de manera confiable). Una técnica común para manejar valores negativos y cero es agregar una constante a los datos antes de aplicar la transformación logarítmica (por ejemplo, $log(x+1)$).

Spark proporciona la función logarítmica en cualquier base, definida de la siguiente manera:

`pyspark.sql.functions.log(arg1, arg2=None)`

>Descripción: Devuelve el primer logaritmo basado en argumento del segundo argumento. Si solo hay un argumento, entonces este toma el logaritmo natural del argumento. Su uso se ilustra en el siguiente ejemplo. Primero, creamos un DataFrame:

In [None]:
data = [('gene1', 1.2), ('gene2', 3.4), ('gene1', 3.5), ('gene2', 12.6)]
df = spark.createDataFrame(data, ["gene", "value"])
df.show()

+-----+-----+
| gene|value|
+-----+-----+
|gene1|  1.2|
|gene2|  3.4|
|gene1|  3.5|
|gene2| 12.6|
+-----+-----+



Luego aplicamos la transformación logarítmica en un valor etiquetado como característica:

In [None]:
from pyspark.sql.functions import log
df.withColumn("base-10", log(10.0, df.value)).withColumn("base-e", log(df.value)).show()

+-----+-----+------------------+------------------+
| gene|value|           base-10|            base-e|
+-----+-----+------------------+------------------+
|gene1|  1.2|0.0791812460476248|0.1823215567939546|
|gene2|  3.4| 0.531478917042255|1.2237754316221157|
|gene1|  3.5|0.5440680443502756| 1.252762968495368|
|gene2| 12.6|1.1003705451175627| 2.533696813957432|
+-----+-----+------------------+------------------+



## One-Hot Encoding
Los modelos de aprendizaje automático requieren que todas las funciones de entrada y las predicciones de salida sean numéricas. Esto implica que si sus datos contienen características categóricas, como el título de educación {BS, MBA, MS, MD, PHD}, debe codificarlos numéricamente antes de poder construir y evaluar un modelo.

La figura 12-3 ilustra el concepto de codificación one-hot, un esquema de codificación en el que cada valor categórico se convierte en un vector binario.

Un codificador one-hot asigna los índices de etiqueta a una representación vectorial binaria con un único valor 1 como máximo que indica la presencia de un valor de característica específico del conjunto de todos los valores de característica posibles. Este método es útil cuando necesita usar características categóricas pero el algoritmo espera características continuas. Para entender este método de codificación, considere una característica llamada nivel_seguridad que tiene cinco valores categóricos (representados en la Tabla 12-4). La primera columna muestra los valores de las características y el resto de las columnas muestran representaciones vectoriales binarias codificadas en caliente de esos valores.

Para los datos de entrada de tipo cadena, es común codificar características categóricas usando `StringIndexer` primero. El OneHotEncoder de Spark luego toma la etiqueta indexada por cadena y la codifica en un vector disperso. Veamos un ejemplo para ver cómo funciona esto. Primero crearemos un DataFrame con dos características categóricas:

In [None]:
from pyspark.sql.types import *

schema = StructType().add("id","integer").add("safety_level","string").add("engine_type","string")
schema
#StructType(list(StructField(id,IntegerType,True),StructField(safety_level,StringType,True),StructField(engine_type,StringType,True)))

data = [(1,'Very-Low','v4'),(2,'Very-Low','v6'),(3,'Low','v6'),(4,'Low','v6'),(5,'Medium','v4'),
        (6,'High','v6'),(7,'High','v6'),(8,'Very-High','v4'),(9,'Very-High','v6')]

df = spark.createDataFrame(data, schema=schema)
df.show(truncate=False)

+---+------------+-----------+
|id |safety_level|engine_type|
+---+------------+-----------+
|1  |Very-Low    |v4         |
|2  |Very-Low    |v6         |
|3  |Low         |v6         |
|4  |Low         |v6         |
|5  |Medium      |v4         |
|6  |High        |v6         |
|7  |High        |v6         |
|8  |Very-High   |v4         |
|9  |Very-High   |v6         |
+---+------------+-----------+



In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- safety_level: string (nullable = true)
 |-- engine_type: string (nullable = true)



A continuación, aplicaremos la transformación `OneHotEncoder` a las características de nivel de seguridad y tipo de motor. En Spark, no podemos aplicar `OneHotEncoder` directamente a las columnas de cadenas; primero debemos convertirlos a valores numéricos, lo que podemos hacer con `StringIndexer` de Spark.

Primero, aplicamos `StringIndexer` a la característica de nivel de seguridad:

In [None]:
from pyspark.ml.feature import StringIndexer

safety_level_indexer = StringIndexer(inputCol="safety_level",outputCol="safety_level_index")
df1 = safety_level_indexer.fit(df).transform(df)
df1.show()

+---+------------+-----------+------------------+
| id|safety_level|engine_type|safety_level_index|
+---+------------+-----------+------------------+
|  1|    Very-Low|         v4|               3.0|
|  2|    Very-Low|         v6|               3.0|
|  3|         Low|         v6|               1.0|
|  4|         Low|         v6|               1.0|
|  5|      Medium|         v4|               4.0|
|  6|        High|         v6|               0.0|
|  7|        High|         v6|               0.0|
|  8|   Very-High|         v4|               2.0|
|  9|   Very-High|         v6|               2.0|
+---+------------+-----------+------------------+



A continuación, aplicamos StringIndexer a la característica engine_type:

In [None]:
engine_type_indexer = StringIndexer(inputCol="engine_type",outputCol="engine_type_index")
df2 = engine_type_indexer.fit(df).transform(df)
df2.show()

+---+------------+-----------+-----------------+
| id|safety_level|engine_type|engine_type_index|
+---+------------+-----------+-----------------+
|  1|    Very-Low|         v4|              1.0|
|  2|    Very-Low|         v6|              0.0|
|  3|         Low|         v6|              0.0|
|  4|         Low|         v6|              0.0|
|  5|      Medium|         v4|              1.0|
|  6|        High|         v6|              0.0|
|  7|        High|         v6|              0.0|
|  8|   Very-High|         v4|              1.0|
|  9|   Very-High|         v6|              0.0|
+---+------------+-----------+-----------------+



Ahora podemos aplicar **OneHotEncoder** al safety_level_index y a la columna engine_type_index

In [None]:
from pyspark.ml.feature import OneHotEncoder
onehotencoder_safety_level = OneHotEncoder(inputCol="safety_level_index",outputCol="safety_level_vector")
#                                Ajusta     Transforma 
df11 = onehotencoder_safety_level.fit(df1).transform(df1)
df11.show(truncate=False)

+---+------------+-----------+------------------+-------------------+
|id |safety_level|engine_type|safety_level_index|safety_level_vector|
+---+------------+-----------+------------------+-------------------+
|1  |Very-Low    |v4         |3.0               |(4,[3],[1.0])      |
|2  |Very-Low    |v6         |3.0               |(4,[3],[1.0])      |
|3  |Low         |v6         |1.0               |(4,[1],[1.0])      |
|4  |Low         |v6         |1.0               |(4,[1],[1.0])      |
|5  |Medium      |v4         |4.0               |(4,[],[])          |
|6  |High        |v6         |0.0               |(4,[0],[1.0])      |
|7  |High        |v6         |0.0               |(4,[0],[1.0])      |
|8  |Very-High   |v4         |2.0               |(4,[2],[1.0])      |
|9  |Very-High   |v6         |2.0               |(4,[2],[1.0])      |
+---+------------+-----------+------------------+-------------------+



In [None]:
onehotencoder_engine_type = OneHotEncoder(inputCol="engine_type_index",outputCol="engine_type_vector")
df12 = onehotencoder_engine_type.fit(df2).transform(df2)
df12.show(truncate=False)

+---+------------+-----------+-----------------+------------------+
|id |safety_level|engine_type|engine_type_index|engine_type_vector|
+---+------------+-----------+-----------------+------------------+
|1  |Very-Low    |v4         |1.0              |(1,[],[])         |
|2  |Very-Low    |v6         |0.0              |(1,[0],[1.0])     |
|3  |Low         |v6         |0.0              |(1,[0],[1.0])     |
|4  |Low         |v6         |0.0              |(1,[0],[1.0])     |
|5  |Medium      |v4         |1.0              |(1,[],[])         |
|6  |High        |v6         |0.0              |(1,[0],[1.0])     |
|7  |High        |v6         |0.0              |(1,[0],[1.0])     |
|8  |Very-High   |v4         |1.0              |(1,[],[])         |
|9  |Very-High   |v6         |0.0              |(1,[0],[1.0])     |
+---+------------+-----------+-----------------+------------------+



También podemos aplicar esta codificación a varias columnas al mismo tiempo:

In [None]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['id'])) ]

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)
df_indexed.show()

+---+------------+-----------+------------------+-----------------+
| id|safety_level|engine_type|safety_level_index|engine_type_index|
+---+------------+-----------+------------------+-----------------+
|  1|    Very-Low|         v4|               3.0|              1.0|
|  2|    Very-Low|         v6|               3.0|              0.0|
|  3|         Low|         v6|               1.0|              0.0|
|  4|         Low|         v6|               1.0|              0.0|
|  5|      Medium|         v4|               4.0|              1.0|
|  6|        High|         v6|               0.0|              0.0|
|  7|        High|         v6|               0.0|              0.0|
|  8|   Very-High|         v4|               2.0|              1.0|
|  9|   Very-High|         v6|               2.0|              0.0|
+---+------------+-----------+------------------+-----------------+



In [None]:
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers],
                        outputCols=["{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers])

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=encoder.getOutputCols(),outputCol="features")

pipeline = Pipeline(stages=indexers + [encoder, assembler])

pipeline.fit(df).transform(df).show()

+---+------------+-----------+------------------+-----------------+--------------------------+-------------------------+-------------------+
| id|safety_level|engine_type|safety_level_index|engine_type_index|safety_level_index_encoded|engine_type_index_encoded|           features|
+---+------------+-----------+------------------+-----------------+--------------------------+-------------------------+-------------------+
|  1|    Very-Low|         v4|               3.0|              1.0|             (4,[3],[1.0])|                (1,[],[])|      (5,[3],[1.0])|
|  2|    Very-Low|         v6|               3.0|              0.0|             (4,[3],[1.0])|            (1,[0],[1.0])|(5,[3,4],[1.0,1.0])|
|  3|         Low|         v6|               1.0|              0.0|             (4,[1],[1.0])|            (1,[0],[1.0])|(5,[1,4],[1.0,1.0])|
|  4|         Low|         v6|               1.0|              0.0|             (4,[1],[1.0])|            (1,[0],[1.0])|(5,[1,4],[1.0,1.0])|
|  5|      Me

Hay otra forma de hacer todas las transformaciones de datos: podemos usar un pipeline para simplificar el proceso. Primero, creamos las etapas requeridas:

In [None]:
safety_level_indexer = StringIndexer(inputCol="safety_level",outputCol="safety_level_index")
engine_type_indexer = StringIndexer(inputCol="engine_type", outputCol="engine_type_index")
onehotencoder_safety_level = OneHotEncoder(inputCol="safety_level_index",outputCol="safety_level_vector")
onehotencoder_engine_type = OneHotEncoder(inputCol="engine_type_index",outputCol="engine_type_vector")

Luego creamos un pipeline y le pasamos todas las etapas definidas:

In [None]:
pipeline = Pipeline(stages=[safety_level_indexer,
                            engine_type_indexer, onehotencoder_safety_level, onehotencoder_engine_type])

df_transformed = pipeline.fit(df).transform(df)
df_transformed.show(truncate=False)

+---+------------+-----------+------------------+-----------------+-------------------+------------------+
|id |safety_level|engine_type|safety_level_index|engine_type_index|safety_level_vector|engine_type_vector|
+---+------------+-----------+------------------+-----------------+-------------------+------------------+
|1  |Very-Low    |v4         |3.0               |1.0              |(4,[3],[1.0])      |(1,[],[])         |
|2  |Very-Low    |v6         |3.0               |0.0              |(4,[3],[1.0])      |(1,[0],[1.0])     |
|3  |Low         |v6         |1.0               |0.0              |(4,[1],[1.0])      |(1,[0],[1.0])     |
|4  |Low         |v6         |1.0               |0.0              |(4,[1],[1.0])      |(1,[0],[1.0])     |
|5  |Medium      |v4         |4.0               |1.0              |(4,[],[])          |(1,[],[])         |
|6  |High        |v6         |0.0               |0.0              |(4,[0],[1.0])      |(1,[0],[1.0])     |
|7  |High        |v6         |0.0    

## TF-IDF
Term frequency–inverse document frequency (TF-IDF) is a measure of the originality of a word (a.k.a. term) based on the number of times it appears in a document and the number of documents in a collection that it appears in. In other words, it’s a feature vectorization method used in text mining to reflect the importance of a term to a document in a corpus (set of documents). The TF-IDF technique is commonly used in document analysis, search engines, recommender systems, and other natural language processing (NLP) applications.

Term frequency TF(t,d) is the number of times that term t appears in document d, while document frequency DF(t, D) is the number of documents that contain term t. If a term appears very often across the corpus, it means it does not carry special information about a particular document—usually these kinds of words (such as “of,” “the,” and “as”) may be dropped from the text analysis. Before we go deeper into the TF-IDF transformation, let’s define the terms used in the following equations (Table 12-5).

Table 12-5. TF-IDF notation

|Notation| Description|
|--------|------------|
|t| Term|
|d| Document|
|D| Corpus (set of finite documents)|
|D| The number of documents in the corpus|
|TF(t, d)| Term Frequency: the number of times that term t appears in document d|
|DF(t, D)| Document Frequency: the number of documents that contain term t|
|IDF(t, D)| Inverse Document Frequency: a numerical measure of how much information a term provides|

La frecuencia de documento inversa (IDF) se define como:

$$IDF(t,D)=log(\frac{|D|+1} {DF(t,D)+1}) $$

Digamos que $N$ es el número de documentos en un corpus. Dado que se utiliza el logaritmo, si un término aparece en todos los documentos, su valor IDF se convierte en 0:

$$IDF (t,D) = log \frac{N + 1}{N + 1} = log 1 = 0$$

Tenga en cuenta que se aplica un término de suavizado (+1) para evitar dividir por cero los términos que no aparecen en el corpus. La medida TF-IDF es simplemente el producto de TF e IDF:

$$TF − IDF (t, d,D) = TF (t, d) \times  IDF (t,D)$$

dónde:
* t denota el(los) término(s)
* d denota un documento
* D denota el corpus
* TF(t,d) denota el número de veces que el término t aparece en el documento d

Nosotros podemos expresar TF como:

$$TF_{i,j}= \frac{n_{i,j}{\sum_k n_{k,j}} IDF_i =log \frac{|D|}{d:t_i \in d}$$

Antes, le muestro cómo Spark implementa TF-IDF, veamos un ejemplo simple con dos documentos (el tamaño del corpus es 2 y D = {doc1, doc2}). Comenzamos calculando la frecuencia del término y la frecuencia del documento:

In [None]:
documents = spark.createDataFrame([("doc1", "Ada Ada Spark Spark Spark"),("doc2", "Ada SQL")],["id", "document"])

In [None]:
documents.show()

+----+--------------------+
|  id|            document|
+----+--------------------+
|doc1|Ada Ada Spark Spa...|
|doc2|             Ada SQL|
+----+--------------------+



In [None]:
TF(Ada, doc1) = 2
TF(Spark, doc1) = 3
TF(Ada, doc2) = 1
TF(SQL, doc2) = 1
DF(Ada, D) = 2
DF(Spark, D) = 1
DF(SQL, D) = 1

[0;36m  File [0;32m<command-1716065181506094>:1[0;36m[0m
[0;31m    TF(Ada, doc1) = 2[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m cannot assign to function call


Luego calculamos el IDF y el TF-IDF (tenga en cuenta que la base del logaritmo es e para todos los cálculos):

In [None]:
IDF(Ada, D) = log ( (|D|+1) / (DF(t,D)+1) )
= log ( (2+1) / (DF(Ada, D)+1) )
= log ( 3 / (2+1)) = log(1)
= 0.00

IDF(Spark, D) = log ( (|D|+1) / (DF(t,D)+1) )
= log ( (2+1) / (DF(Spark, D)+1) )
= log ( 3 / (1+1) )
= log (1.5)
= 0.40546510811

TF-IDF(Ada, doc1, D) = TF(Ada, doc1) x IDF(Ada, D)
= 2 x 0.0
= 0.0

TF-IDF(Spark, doc1, D) = TF(Spark, doc1) x IDF(Spark, D)
= 3 x 0.40546510811
= 1.21639532433

[0;36m  File [0;32m<command-1716065181506096>:1[0;36m[0m
[0;31m    IDF(Ada, D) = log ( (|D|+1) / (DF(t,D)+1) )[0m
[0m    ^[0m
[0;31mSyntaxError[0m[0;31m:[0m cannot assign to function call


En Spark, HashingTF y CountVectorizer son los dos algoritmos utilizados para generar vectores de frecuencia de términos. El siguiente ejemplo muestra cómo realizar las transformaciones requeridas. Primero, creamos nuestro DataFrame de muestra:

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentences = spark.createDataFrame([ (0.0, "we heard about Spark and Java"),(0.0, "Does Java use case classes"),
                                   (1.0, "fox jumped over fence"),(1.0, "red fox jumped over")], ["label", "text"])

sentences.show(truncate=False)

+-----+-----------------------------+
|label|text                         |
+-----+-----------------------------+
|0.0  |we heard about Spark and Java|
|0.0  |Does Java use case classes   |
|1.0  |fox jumped over fence        |
|1.0  |red fox jumped over          |
+-----+-----------------------------+



In [None]:
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(sentences)
words_data.show(truncate=False)

+-----+-----------------------------+------------------------------------+
|label|text                         |words                               |
+-----+-----------------------------+------------------------------------+
|0.0  |we heard about Spark and Java|[we, heard, about, spark, and, java]|
|0.0  |Does Java use case classes   |[does, java, use, case, classes]    |
|1.0  |fox jumped over fence        |[fox, jumped, over, fence]          |
|1.0  |red fox jumped over          |[red, fox, jumped, over]            |
+-----+-----------------------------+------------------------------------+



A continuación, creamos características sin procesar:

In [None]:
hashingTF = HashingTF(inputCol="words", outputCol="raw_features",numFeatures=16)
featurized_data = hashingTF.transform(words_data)
featurized_data.select("label", "raw_features").show(truncate=False)

+-----+-----------------------------------------------+
|label|raw_features                                   |
+-----+-----------------------------------------------+
|0.0  |(16,[1,4,6,11,12,15],[1.0,1.0,1.0,1.0,1.0,1.0])|
|0.0  |(16,[2,6,11,13,15],[1.0,1.0,1.0,1.0,1.0])      |
|1.0  |(16,[0,1,6,8],[1.0,1.0,1.0,1.0])               |
|1.0  |(16,[1,4,6,8],[1.0,1.0,1.0,1.0])               |
+-----+-----------------------------------------------+



Luego aplicamos la transformación IDF():

In [None]:
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(featurized_data)
rescaled_data = idf_model.transform(featurized_data)
rescaled_data.select("label", "features").show(truncate=False)

+-----+---------------------------------------------------------------------------------------------------------------------------+
|label|features                                                                                                                   |
+-----+---------------------------------------------------------------------------------------------------------------------------+
|0.0  |(16,[1,4,6,11,12,15],[0.22314355131420976,0.5108256237659907,0.0,0.5108256237659907,0.9162907318741551,0.5108256237659907])|
|0.0  |(16,[2,6,11,13,15],[0.9162907318741551,0.0,0.5108256237659907,0.9162907318741551,0.5108256237659907])                      |
|1.0  |(16,[0,1,6,8],[0.9162907318741551,0.22314355131420976,0.0,0.5108256237659907])                                             |
|1.0  |(16,[1,4,6,8],[0.22314355131420976,0.5108256237659907,0.0,0.5108256237659907])                                             |
+-----+---------------------------------------------------------------------

El siguiente ejemplo muestra cómo hacer TF-IDF utilizando CountVectorizer, que extrae un vocabulario de una colección de documentos y genera un CountVectorizerModel. En este ejemplo, cada fila del DataFrame representa un documento:

In [None]:
df = spark.createDataFrame([(0, ["a", "b", "c"]), (1, ["a", "b", "b", "c", "a"])], ["label", "raw"] )
df.show()

+-----+---------------+
|label|            raw|
+-----+---------------+
|    0|      [a, b, c]|
|    1|[a, b, b, c, a]|
+-----+---------------+



In [None]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer().setInputCol("raw").setOutputCol("features")
model = cv.fit(df)
transformed = model.transform(df)
transformed.show(truncate=False)

+-----+---------------+-------------------------+
|label|raw            |features                 |
+-----+---------------+-------------------------+
|0    |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+-----+---------------+-------------------------+



En la columna de características, tomando el ejemplo de la segunda fila:
* 3 es la longitud del vector.
* [0, 1, 2] son los índices vectoriales (índice(a)=0, índice(b)=1, índice(c)=2).
* [2.0,2.0,1.0] son los valores vectoriales.

HashingTF() convierte documentos en vectores de tamaño fijo:

In [None]:
hashing_TF = HashingTF(inputCol="raw", outputCol="features", numFeatures=128)
result = hashing_TF.transform(df)
result.show(truncate=False)

+-----+---------------+-------------------------------+
|label|raw            |features                       |
+-----+---------------+-------------------------------+
|0    |[a, b, c]      |(128,[40,99,117],[1.0,1.0,1.0])|
|1    |[a, b, b, c, a]|(128,[40,99,117],[1.0,2.0,2.0])|
+-----+---------------+-------------------------------+



Tenga en cuenta que el tamaño del vector generado a través de CountVectorizer depende del corpus de entrenamiento y del documento, mientras que el generado a través de HashingTF tiene un tamaño fijo (lo configuramos en 128). Esto significa que cuando se usa CountVectorizer, cada función sin procesar se asigna a un índice, pero HashingTF puede sufrir colisiones de hash, donde dos o más términos se asignan al mismo índice. Para evitar esto, podemos aumentar la dimensión de la característica de destino.

## Word2Vec
`Word2Vec` es un Estimador que toma secuencias de palabras que representan documentos y entrena un `Word2VecModel`. El modelo asigna cada palabra a un vector único de tamaño fijo. Word2VecModel transforma cada documento en un vector usando el promedio de todas las palabras en el documento; este vector se puede usar como características para predicción, cálculos de similitud de documentos, etc. Consulte la guía del usuario de MLlib en `Word2Vec` para obtener más detalles.

En el siguiente ejemplo de código, comenzamos con un conjunto de documentos, cada uno de los cuales se representa como una secuencia de palabras. Para cada documento, lo transformamos en un vector de características. Este vector de características podría luego pasarse a un algoritmo de aprendizaje.

In [None]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

documentDF.show()

+--------------------+
|                text|
+--------------------+
|[Hi, I, heard, ab...|
|[I, wish, Java, c...|
|[Logistic, regres...|
+--------------------+



In [None]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.012264367192983627,-0.06442034244537354,-0.007622340321540833]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [0.05160687722465289,0.025969027541577816,0.02736483487699713]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.06564115285873413,0.02060299552977085,-0.08455150425434113]



## ContarVectorizador
`CountVectorizer` y `CountVectorizerModel` tienen como objetivo ayudar a convertir una colección de documentos de texto en vectores de recuentos de tokens. Cuando no se dispone de un diccionario a priori, `CountVectorizer` se puede utilizar como Estimador para extraer el vocabulario y genera un `CountVectorizerModel`. El modelo produce representaciones dispersas para los documentos sobre el vocabulario, que luego se pueden pasar a otros algoritmos como LDA.

Durante el proceso de ajuste, `CountVectorizer` seleccionará las principales palabras de VocabSize ordenadas por frecuencia de términos en todo el corpus. Un parámetro opcional minDF también afecta el proceso de ajuste al especificar el número mínimo (o fracción si < 1.0) de documentos en los que debe aparecer un término para ser incluido en el vocabulario. Otro parámetro de alternancia binaria opcional controla el vector de salida. Si se establece en verdadero, todos los recuentos distintos de cero se establecen en 1. Esto es especialmente útil para modelos probabilísticos discretos que modelan recuentos binarios, en lugar de enteros.

In [None]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



## FeatureHasher
El hashing de características proyecta un conjunto de características categóricas o numéricas en un vector de características de dimensión específica (normalmente sustancialmente más pequeño que el del espacio de características original). Se utiliza un truco de hashing para asignar características a índices en el vector de características. FeatureHasher de Spark opera en múltiples columnas, que pueden contener características numéricas o categóricas. Para las características numéricas, el hash del nombre de la columna es FeatureHasher El hash de características proyecta un conjunto de características categóricas o numéricas en un vector de características de dimensión específica (normalmente, sustancialmente más pequeño que el del espacio de características original). Se utiliza un truco de hashing para asignar características a índices en el vector de características.

FeatureHasher de Spark opera en múltiples columnas, que pueden contener características numéricas o categóricas. Para funciones numéricas, el hash del nombre de la columna se usa para asignar el valor de la función a su índice en el vector de funciones. Para características categóricas y booleanas, se utiliza el hash de la cadena "column_name=value", con un valor de indicador de 1.0. Aquí hay un ejemplo:

El hashing de características proyecta un conjunto de características categóricas o numéricas en un vector de características de dimensión específica (normalmente sustancialmente más pequeño que el del espacio de características original). Esto se hace usando el truco de hashing para mapear características a índices en el vector de características.

El transformador FeatureHasher opera en varias columnas. Cada columna puede contener características numéricas o categóricas. El comportamiento y manejo de los tipos de datos de columna es el siguiente:

* **Columnas numéricas:** para las funciones numéricas, el valor hash del nombre de la columna se usa para asignar el valor de la función a su índice en el vector de funciones. De forma predeterminada, las características numéricas no se tratan como categóricas (incluso cuando son números enteros). Para tratarlos como categóricos, especifique las columnas relevantes mediante el parámetro categoricalCols.
* **Columnas de cadena(string):** para características categóricas, el valor hash de la cadena "column_name=value" se usa para mapear el índice vectorial, con un valor de indicador de 1.0. Por lo tanto, las características categóricas están codificadas en "one-hot" (de manera similar al uso de OneHotEncoder con dropLast=false).
* **Columnas booleanas:** los valores booleanos se tratan de la misma manera que las columnas de cadenas. Es decir, las características booleanas se representan como "column_name=true" o "column_name=false", con un valor de indicador de 1,0.
Los valores nulos (ausentes) se ignoran (implícitamente cero en el vector de características resultante).

La función hash utilizada aquí también es MurmurHash 3 utilizada en HashingTF. Dado que se usa un módulo simple en el valor hash para determinar el índice del vector, es recomendable usar una potencia de dos como el parámetro numFeatures; de lo contrario, las características no se asignarán uniformemente a los índices vectoriales.

In [None]:
from pyspark.ml.feature import FeatureHasher
df = spark.createDataFrame([(2.1, True, "1", "fox"), (2.1, False, "2", "gray"), (3.3, False, "2", "red"),
                            (4.4, True, "4", "fox")], ["number", "boolean", "string_number", "string"])

df.show()

+------+-------+-------------+------+
|number|boolean|string_number|string|
+------+-------+-------------+------+
|   2.1|   true|            1|   fox|
|   2.1|  false|            2|  gray|
|   3.3|  false|            2|   red|
|   4.4|   true|            4|   fox|
+------+-------+-------------+------+



In [None]:
input_columns = ["number", "boolean", "string_number", "string"]
hasher = FeatureHasher(inputCols=input_columns, outputCol="features")
#hasher.setInputCols(input_columns)
featurized = hasher.transform(df)
featurized.show(truncate=False)

+------+-------+-------------+------+--------------------------------------------------------+
|number|boolean|string_number|string|features                                                |
+------+-------+-------------+------+--------------------------------------------------------+
|2.1   |true   |1            |fox   |(262144,[102440,112150,135239,185244],[1.0,1.0,2.1,1.0])|
|2.1   |false  |2            |gray  |(262144,[43117,93531,135239,210818],[1.0,1.0,2.1,1.0])  |
|3.3   |false  |2            |red   |(262144,[93531,110541,135239,210818],[1.0,1.0,3.3,1.0]) |
|4.4   |true   |4            |fox   |(262144,[75860,102440,135239,185244],[1.0,1.0,4.4,1.0]) |
+------+-------+-------------+------+--------------------------------------------------------+



## Transformador SQL
SQLTransformer de Spark implementa las transformaciones definidas por una instrucción SQL. En lugar de registrar su DataFrame como una tabla y luego consultar la tabla, puede aplicar directamente las transformaciones de SQL a sus datos representados como DataFrame. Actualmente, SQLTransformer tiene una funcionalidad limitada y se puede aplicar a un solo DataFrame como __ THIS __, que representa la tabla subyacente del conjunto de datos de entrada.

SQLTransformer admite declaraciones como:
* SELECT a, a + b AS a_b FROM __ THIS __
* SELECT a, SQRT(b) AS b_sqrt FROM _ THIS _ where a > 5
* SELECT a, b, SUM(c) AS c_sum FROM __ THIS __ GROUP BY a, b

In [None]:
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([(10, "d1", 27000),(20, "d1", 29000),
                                (40, "d2", 31000),(50, "d2", 39000)], 
                               ["id", "dept", "salary"])

df.show()

+---+----+------+
| id|dept|salary|
+---+----+------+
| 10|  d1| 27000|
| 20|  d1| 29000|
| 40|  d2| 31000|
| 50|  d2| 39000|
+---+----+------+



In [None]:
query = "SELECT dept, SUM(salary) AS sum_of_salary FROM __THIS__ GROUP BY dept"
sqlTrans = SQLTransformer(statement=query)
sqlTrans.transform(df).show()

+----+-------------+
|dept|sum_of_salary|
+----+-------------+
|  d1|        56000|
|  d2|        70000|
+----+-------------+



## Resumen
El objetivo de los algoritmos de aprendizaje automático es utilizar datos de entrada para crear modelos utilizables que puedan ayudarnos a responder preguntas. Los datos de entrada comprenden características (como el nivel educativo, el precio del automóvil, el nivel de glucosa, etc.) que se encuentran en forma de columnas estructuradas. En la mayoría de los casos, los algoritmos requieren características con algunas características específicas para funcionar correctamente, lo que plantea la necesidad de ingeniería de características. La biblioteca de aprendizaje automático de Spark, MLlib (incluida en PySpark), tiene un conjunto de API de alto nivel que hacen posible la ingeniería de funciones. La ingeniería de características adecuada ayuda a construir modelos de aprendizaje automático semánticamente adecuados y correctos.


