<a href="https://colab.research.google.com/github/ulises1229/DataAnalisysBigData_BLOQUE/blob/main/demos/demo_dia_5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Demo día 5 - PySpark

Curso Análisis de Datos y Big Data - 20 de agosto 2024

Contenidos:
- Demo Instalación Spark
- Demo Uso Spark

## Demo Instalación Spark

1. Verifica que tengas un JDK de Java instalado y que la variable de entorno *JAVA_HOME apunte* al directorio donde se encuentre.
2. Descarga Spark [Descarga](https://spark.apache.org/downloads.html).
3. Descomprime el archivo descargado en tu directorio de preferencia.
4. Registra la variable de entorno *SPARK_HOME* con la direccion del directorio de Spark.
5. Registra el directorio Spark en la variable de entorno *PATH*.
6. Instala PySpark en python

In [27]:
!pip install pyspark



## Demo uso PySpark

Usando un dataset de un cuestionario de percpcion de inseguridad del inegi, se plantea hacer una clusterización, se van a preprocesar los datos de manera previa a la clusterización.


In [28]:
from pyspark.sql import SparkSession

In [29]:
spark = SparkSession.builder.getOrCreate()

In [30]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [31]:
df = spark.read.csv('/content/drive/MyDrive/conjunto_de_datos_TPer_Vic1_ENVIPE_2019.csv',header=True,escape="\"")

In [32]:
#Eliminamos las columnas que no son necesarias para la clusterización.
df = df.drop("ID_VIV","ID_HOG","ID_PER","UPM","VIV_SEL","HOGAR","RESUL_H","R_SEL","AREAM","FAC_HOG","FAC_ELE","FAC_HOG_AM","FAC_ELE_AM","EST_DIS","UPM_DIS","NOM_ENT","CVE_MUN","NOM_MUN")

In [33]:
df.show(5,0)

+----+----+-------+-----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+-------+--------+---

In [34]:
#Ajustamos las variables numericas
df = df.withColumn("EDAD",df["EDAD"].cast('float'))

In [35]:
df.printSchema()

root
 |-- SEXO: string (nullable = true)
 |-- EDAD: float (nullable = true)
 |-- CVE_ENT: string (nullable = true)
 |-- AP4_1: string (nullable = true)
 |-- AP4_2_01: string (nullable = true)
 |-- AP4_2_02: string (nullable = true)
 |-- AP4_2_03: string (nullable = true)
 |-- AP4_2_04: string (nullable = true)
 |-- AP4_2_05: string (nullable = true)
 |-- AP4_2_06: string (nullable = true)
 |-- AP4_2_07: string (nullable = true)
 |-- AP4_2_08: string (nullable = true)
 |-- AP4_2_09: string (nullable = true)
 |-- AP4_2_10: string (nullable = true)
 |-- AP4_2_11: string (nullable = true)
 |-- AP4_2_12: string (nullable = true)
 |-- AP4_2_99: string (nullable = true)
 |-- AP4_3_1: string (nullable = true)
 |-- AP4_3_2: string (nullable = true)
 |-- AP4_3_3: string (nullable = true)
 |-- AP4_4_01: string (nullable = true)
 |-- AP4_4_02: string (nullable = true)
 |-- AP4_4_03: string (nullable = true)
 |-- AP4_4_04: string (nullable = true)
 |-- AP4_4_05: string (nullable = true)
 |-- AP4_4_

### String Indexer

tranformar columnas string categoricas a numericas categoricas, para eliminar NANs

In [36]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['EDAD'])) ]


pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

In [37]:
df_r.show(5,0)

+----+----+-------+-----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+-------+-------+-------+-------+--------+---

In [38]:
df_r=df_r.drop(*list(set(df.columns)-set(['EDAD'])))

In [39]:
df_r.show(5,0)

+----+-------------+---------------+--------------+--------------+--------------+--------------+---------------+----------------+--------------+--------------+---------------+-------------+--------------+----------------+---------------+---------------+---------------+--------------+--------------+-------------+--------------+--------------+---------------+--------------+-----------+--------------+---------------+--------------+--------------+--------------+--------------+---------------+-------------+--------------+--------------+-------------+--------------+-------------+----------------+---------------+--------------+---------------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+---------------+--------------+--------------+----------------+--------------+-------------+-----------+--------------+--------------+--------------+--------------+-------------+--------------+-------------+---------------+---------------+----------

###  One Hot Encoder

Queremos usar K-means para clusterizar los datos, para eso nuestras variables categoricas se tienen que transformar a variables continuas, eso se logra generando un vector unitario que represente a que categoria pertenece

ejemplo

SEXO = 2

después del One Hot Encoder

SEXO = [0,1]


In [40]:
from pyspark.ml.feature import OneHotEncoder
encoders = [OneHotEncoder(inputCol=column, outputCol=column+"_vector").fit(df_r) for column in list(set(df_r.columns)-set(['EDAD'])) ]
pipeline = Pipeline(stages=encoders)
encoded = pipeline.fit(df_r).transform(df_r)
encoded = encoded.drop(*list(set(df_r.columns)-set(['EDAD'])))


In [41]:
encoded.show(5,0)

+----+--------------------+-----------------+--------------------+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+---------------------+--------------------+---------------------+----------------------+---------------------+----------------------+---------------------+---------------------+----------------------+-----------------------+----------------------+---------------------+---------------------+---------------------+---------------------+----------------------+----------------------+---------------------+---------------------+---------------------+--------------------+---------------------+---------------------+----------------------+--------------------+--------------------+-----------------------+----------------------+---------------------+--------------------+--------------------+------

In [42]:
encoded.columns

['EDAD',
 'AP4_8_4_index_vector',
 'SEXO_index_vector',
 'AP5_2_4_index_vector',
 'AP4_2_09_index_vector',
 'AP5_3_09_index_vector',
 'AP5_5_09_index_vector',
 'AP4_6_1_index_vector',
 'AP5_4_08_index_vector',
 'AP4_5_09_index_vector',
 'AP5_6_10_index_vector',
 'AP4_2_04_index_vector',
 'AP4_9_3_index_vector',
 'AP5_1_07_index_vector',
 'AP5_3_07_index_vector',
 'AP4_2_08_index_vector',
 'AP4_8_3_index_vector',
 'AP4_5_06_index_vector',
 'AP4_10_09_index_vector',
 'AP4_5_01_index_vector',
 'AP4_11_11_index_vector',
 'AP5_1_01_index_vector',
 'AP4_5_11_index_vector',
 'AP4_11_09_index_vector',
 'AP5_4_07_C_index_vector',
 'AP4_10_02_index_vector',
 'AP4_2_06_index_vector',
 'AP5_3_02_index_vector',
 'AP5_3_03_index_vector',
 'AP5_6_03_index_vector',
 'AP4_10_12_index_vector',
 'AP4_10_05_index_vector',
 'AP4_2_05_index_vector',
 'AP4_5_15_index_vector',
 'AP5_5_02_index_vector',
 'AP4_6_2_index_vector',
 'AP5_5_06_index_vector',
 'AP5_5_05_index_vector',
 'AP4_11_06_index_vector',
 'AP

### Vector Assembler

Combina columnas en una sola en forma de vector

In [43]:
from pyspark.ml.feature import VectorAssembler

vs = VectorAssembler(inputCols=df_r.columns, outputCol="features")
df_features = vs.transform(df_r)
#df_features = df_features.drop(*df_r.columns)


In [44]:
df_features.show(5,0)

+----+-------------+---------------+--------------+--------------+--------------+--------------+---------------+----------------+--------------+--------------+---------------+-------------+--------------+----------------+---------------+---------------+---------------+--------------+--------------+-------------+--------------+--------------+---------------+--------------+-----------+--------------+---------------+--------------+--------------+--------------+--------------+---------------+-------------+--------------+--------------+-------------+--------------+-------------+----------------+---------------+--------------+---------------+--------------+--------------+--------------+--------------+--------------+--------------+----------------+---------------+--------------+--------------+----------------+--------------+-------------+-----------+--------------+--------------+--------------+--------------+-------------+--------------+-------------+---------------+---------------+----------

In [45]:
df_features.printSchema()

root
 |-- EDAD: float (nullable = true)
 |-- AP4_3_1_index: double (nullable = false)
 |-- AP4_11_09_index: double (nullable = false)
 |-- AP4_5_16_index: double (nullable = false)
 |-- AP4_4_05_index: double (nullable = false)
 |-- AP5_5_09_index: double (nullable = false)
 |-- AP5_6_08_index: double (nullable = false)
 |-- AP4_10_05_index: double (nullable = false)
 |-- AP5_4_10_C_index: double (nullable = false)
 |-- AP4_5_12_index: double (nullable = false)
 |-- AP5_1_10_index: double (nullable = false)
 |-- AP4_11_01_index: double (nullable = false)
 |-- AP4_7_1_index: double (nullable = false)
 |-- AP4_4_10_index: double (nullable = false)
 |-- AP5_4_08_C_index: double (nullable = false)
 |-- AP4_10_04_index: double (nullable = false)
 |-- AP4_10_09_index: double (nullable = false)
 |-- AP4_10_10_index: double (nullable = false)
 |-- AP5_3_06_index: double (nullable = false)
 |-- AP5_4_08_index: double (nullable = false)
 |-- AP4_6_3_index: double (nullable = false)
 |-- AP5_6_05