# Manipulación de Datos con `PySpark`


# Contenido
---
**1.**  [**Introducción**](#Step1)<br>
**2.**  [**Operaciones**](#Step2)<br>
**3.**  [**Creating SparkSession**](#Step3)<br>
**4.**  [**Reading Data**](#Step4)<br>
**5.**  [**Understanding Data**](#Step5)<br>
**6.**  [**Selecting Columns**](#Step6)<br>
**7.**  [**Data Filtering**](#Step7)<br>
**8.**  [**Adding New Columns**](#Step8)<br>
**9.**  [**Grouping Data**](#Step9)<br>
**10.**  [**Applying User-Defined Functions**](#Step10)<br>
**11.**  [**Deleting Data**](#Step11)<br>
**12.** [**Writing Data**](#Step12)<br>

# 1. Introducción

La manipulación de datos es un proceso fundamental en el ámbito de la ciencia de datos y el análisis de datos. Consiste en la `transformación, limpieza, reorganización o modificación de conjuntos de datos` con el objetivo de prepararlos para su análisis, visualización o modelado. 

### Importancia de la manipulación de datos:

1. **Preparación de datos**:
   - Antes de poder realizar un análisis significativo, es crucial preparar los datos de manera adecuada, lo que implica limpiarlos, transformarlos y estructurarlos de manera que sean aptos para el análisis.

2. **Calidad de los datos**:
   - La calidad de los datos influye directamente en la calidad de los resultados obtenidos. La manipulación de datos incluye la identificación y corrección de errores, valores atípicos, datos faltantes y otros problemas que puedan afectar la integridad de los datos.

3. **Toma de decisiones informadas**:
   - Una manipulación efectiva de los datos proporciona una base sólida para la toma de decisiones informadas. Los datos limpios y bien estructurados permiten extraer información significativa y generar conocimientos útiles.

### Procesos comunes de manipulación de datos:

1. **Limpieza de datos**:
   - Esto implica identificar y corregir errores en los datos, eliminar duplicados, tratar valores faltantes y valores atípicos, y asegurarse de que los datos estén en el formato correcto.

2. **Transformación de datos**:
   - Incluye convertir datos a un formato adecuado, realizar cálculos, aplicar funciones matemáticas o estadísticas, normalizar datos y crear nuevas variables derivadas.

3. **Filtrado y selección de datos**:
   - Consiste en seleccionar columnas o filas específicas, filtrar datos basados en condiciones específicas y eliminar datos irrelevantes para el análisis.

4. **Agregación y agrupación de datos**:
   - Implica combinar datos en grupos, calcular estadísticas resumidas como sumas, promedios, máximos, mínimos, contar valores, etc., dentro de esos grupos.

5. **Unión y combinación de datos**:
   - Permite combinar múltiples conjuntos de datos en uno solo, ya sea mediante uniones por columnas comunes o a través de concatenación.

### Herramientas para la manipulación de datos:

- **Lenguajes de programación** como Python, R, SQL, que ofrecen bibliotecas y funciones especializadas para la manipulación de datos.
- **Bibliotecas y herramientas** como pandas, NumPy, dplyr, SQL, PySpark, que facilitan operaciones específicas de manipulación de datos.
- **Herramientas de visualización de datos** que pueden ayudar a comprender mejor los datos durante el proceso de manipulación.

En resumen, la manipulación de datos es una etapa crítica en el proceso de análisis de datos que implica limpiar, transformar y preparar los datos para su posterior análisis, visualización y modelado. Una manipulación efectiva de los datos es esencial para garantizar la calidad y la integridad de los resultados obtenidos a partir de ellos.

# 2. Operaciones con Pyspark

La manipulación de datos con `PySpark` es una parte fundamental del análisis de datos a gran escala en entornos distribuidos. `PySpark` proporciona una interfaz de alto nivel para trabajar con conjuntos de datos distribuidos a través de la API `DataFrame`, que permite manipular datos de manera eficiente y escalable. A continuación, se presentan algunas de las operaciones comunes de manipulación de datos que se pueden realizar con PySpark:

### Lectura y escritura de datos:
- PySpark permite leer y escribir datos desde y hacia una variedad de fuentes de datos como archivos CSV, JSON, Parquet, bases de datos, etc., utilizando métodos como `spark.read.format()` y `df.write.format()`.

### Selección y filtrado de datos:
- Puedes seleccionar columnas específicas de un DataFrame utilizando `df.select()` o filtrar filas basadas en una condición utilizando `df.filter()` o `df.where()`.

### Agregaciones y agrupaciones:
- PySpark permite realizar operaciones de agregación como `groupby()`, `agg()` para realizar cálculos estadísticos en grupos de datos, y funciones de agregación como `sum()`, `mean()`, `max()`, `min()`.

### Creación y modificación de columnas:
- Puedes añadir, renombrar o eliminar columnas en un DataFrame utilizando métodos como `withColumn()`, `withColumnRenamed()`, `drop()`, y también aplicar transformaciones a columnas existentes.

### Unión de DataFrames:
- PySpark permite unir DataFrames utilizando diferentes tipos de uniones como `join()`, `union()`, `intersect()`, `subtract()`, etc., para combinar datos de múltiples fuentes.

### Operaciones de ventana:
- Puedes realizar operaciones de ventana como `rank()`, `lead()`, `lag()`, `row_number()` para realizar cálculos en grupos de filas en un DataFrame.

### Manejo de valores nulos:
- PySpark proporciona funciones para manejar valores nulos como `fillna()`, `dropna()`, `na.drop()`, `na.fill()` para limpiar los datos antes de realizar análisis.


La manipulación de datos en `PySpark` es una parte esencial del flujo de trabajo de análisis de datos a gran escala, permitiendo realizar transformaciones complejas y operaciones estadísticas en conjuntos de datos distribuidos de manera eficiente.


# 3. Creating SparkSession


Para trabajar con `PySpark`, primero debes crear `SparkSession`. `SparkSession` es un punto de entrada a la funcionalidad de `PySpark`.

Cargamos las librerias que vamos a usar, para luego crear nuestro `SparkSession`

In [0]:
from pyspark.sql import SparkSession
import requests
import pandas as pd
import io
import warnings
warnings.filterwarnings("ignore")


In [0]:
# Creating spar session object
spark = SparkSession.builder.appName('manipulacion_data').getOrCreate()


# 4. Reading Data


`PySpark` ofrece dos estructuras principales para almacenar datos al realizar manipulaciones: el RDD y el DataFrame. Puedes pensar en el RDD como una colección distribuida de objetos (o filas). Puedes pensar en el DataFrame como si fuera una tabla. Leamos nuestro conjunto de datos como DataFrame.

In [0]:
df_url = "https://raw.githubusercontent.com/narencastellon/Mi-Dataset/refs/heads/main/diabetes_data.csv"

In [0]:
# Download the CSV file using requests
response = requests.get(df_url)
response.raise_for_status()  # Raise an exception for bad responses

# Read the CSV data into a Pandas DataFrame
diabetes = pd.read_csv(io.StringIO(response.text), sep=',')

# Convert the Pandas DataFrame to a Spark DataFrame
df = spark.createDataFrame(diabetes)

df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|



# 5. Understanding Data

Echemos un vistazo a las primeras diez filas del conjunto de datos con el método `show`.

In [0]:
# looking at the first rows the dataset
df.show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


Puede imprimir los nombres de las columnas de su conjunto de datos con el método `columns`.

In [0]:
# Columns of dataframe
df.columns

Out[7]: ['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

Puede utilizar el método `count` para obtener la cantidad total de registros en el DataFrame. El método `len` le permite ver la cantidad de columnas en el DataFrame. Echemos un vistazo a la forma de nuestro conjunto de datos con los métodos `count` y `len`.

In [0]:
# Shape of dataset
print((df.count(),len(df.columns)))

(768, 9)


Para obtener la información del esquema del conjunto de datos, puede utilizar el método `printSchema` que a menudo se utiliza para comprender datos con el método `show` en el análisis de datos.

In [0]:
# printSchema
df.printSchema()

root
 |-- Pregnancies: long (nullable = true)
 |-- Glucose: long (nullable = true)
 |-- BloodPressure: long (nullable = true)
 |-- SkinThickness: long (nullable = true)
 |-- Insulin: long (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: long (nullable = true)
 |-- Outcome: long (nullable = true)



Puedes usar `describe().show()` para ver las estadísticas de descripción del conjunto de datos. Voy a usar el parámetro `truncate` para ver solo 8 caracteres.

In [0]:
# description statistics
df.describe().show(truncate=8)

+-------+-----------+--------+-------------+-------------+--------+--------+------------------------+--------+--------+
|summary|Pregnancies| Glucose|BloodPressure|SkinThickness| Insulin|     BMI|DiabetesPedigreeFunction|     Age| Outcome|
+-------+-----------+--------+-------------+-------------+--------+--------+------------------------+--------+--------+
|  count|        768|     768|          768|          768|     768|     768|                     768|     768|     768|
|   mean|   3.845...|120.8...|     69.10...|     20.53...|79.79...|31.99...|                0.471...|33.24...|0.348...|
| stddev|   3.369...|31.97...|     19.35...|     15.95...|115.2...|7.884...|                0.331...|11.76...|0.476...|
|    min|          0|       0|            0|            0|       0|     0.0|                   0.078|      21|       0|
|    max|         17|     199|          122|           99|     846|    67.1|                    2.42|      81|       1|
+-------+-----------+--------+----------


# 6. Selecting Columns

Puede utilizar el método `select` para seleccionar columnas específicas. Tomemos las columnas Embarazos y Edad del conjunto de datos con el método `select`.

In [0]:
# Selecting
df.select("Pregnancies", "Age").show(10)

+-----------+---+
|Pregnancies|Age|
+-----------+---+
|          6| 50|
|          1| 31|
|          8| 32|
|          1| 21|
|          0| 33|
|          5| 30|
|          3| 26|
|         10| 29|
|          2| 53|
|          8| 54|
+-----------+---+
only showing top 10 rows



También puedes usar la función `col` en el módulo pyspark.sql.functions para seleccionar columnas. Te lo mostraré.

In [0]:
# Selecting the col method
import pyspark.sql.functions as F
df.select(F.col("Pregnancies"), F.col("Age")).show(10)

+-----------+---+
|Pregnancies|Age|
+-----------+---+
|          6| 50|
|          1| 31|
|          8| 32|
|          1| 21|
|          0| 33|
|          5| 30|
|          3| 26|
|         10| 29|
|          2| 53|
|          8| 54|
+-----------+---+
only showing top 10 rows




# 7. Data Filtering

Para limpiar el conjunto de datos y conservar solo los registros que desee, puede filtrar los registros en función de las condiciones. Hay dos métodos para filtrar datos: `filter()` y `where()`. Filtremos los datos en los que el valor de la columna de edad sea inferior a 40 con el método `filter`.

In [0]:
# Filtering
df.filter(df['Age'] < 40).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|
|         10|    115|            0|            0|      0|35.3|                   0.134| 29|      0|


Puede realizar un filtrado adicional utilizando el método "seleccionar" para ver solo columnas específicas.

In [0]:
# Filtering for two columns
df.filter(df['age'] < 40).select('Insulin','Outcome').show(10)

+-------+-------+
|Insulin|Outcome|
+-------+-------+
|      0|      0|
|      0|      1|
|     94|      0|
|    168|      1|
|      0|      0|
|     88|      1|
|      0|      0|
|      0|      0|
|      0|      1|
|      0|      1|
+-------+-------+
only showing top 10 rows



También puedes aplicar filtros a los registros según las condiciones. Buscamos registros con más de 60 años y personas que solo están enfermas.

In [0]:
# Filtering with multiple conditions
df.filter(df['age'] > 60).filter(df['Outcome'] == '1').show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          4|    146|           92|            0|      0|31.2|                   0.539| 61|      1|
|          0|    105|           84|            0|      0|27.9|                   0.741| 62|      1|
|          2|    158|           90|            0|      0|31.6|                   0.805| 66|      1|
|          4|    146|           78|            0|      0|38.5|                    0.52| 67|      1|
|          2|    197|           70|           99|      0|34.7|                   0.575| 62|      1|
|          4|    145|           82|           18|      0|32.5|                   0.235| 70|      1|
|          6|    190|           92|            0|      0|35.5|                   0.278| 66|      1|


Puede utilizar operadores como el signo `&` y `|` para aplicar múltiples condiciones de filtro. Filtremos a las personas que están enfermas y tienen 10 o más embarazos utilizando &.

In [0]:
# Filtering with multiple conditions
df.filter((df['Outcome']==1) & (df['Pregnancies'] >=9)).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|         10|    168|           74|            0|      0|38.0|                   0.537| 34|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
|         11|    143|           94|           33|    146|36.6|                   0.254| 51|      1|
|         10|    125|           70|           26|    115|31.1|                   0.205| 41|      1|
|          9|    102|           76|           37|      0|32.9|                   0.665| 46|      1|
|          9|    171|          110|           24|    240|45.4|                   0.721| 54|      1|
|         13|    126|           90|            0|      0|43.4|                   0.583| 42|      1|


Para encontrar un recuento de la cantidad de registros después del filtrado, puede utilizar el método 'count'.

In [0]:
# Counting after filtering
df.filter(df['age']>40).count()

Out[17]: 194

Puedes filtrar datos con el método `where` como con el método `filter`. Te lo mostraré.

In [0]:
# Filtering with the where method
df.where((df['Outcome']==1) & (df['Pregnancies'] >=9)).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|         10|    168|           74|            0|      0|38.0|                   0.537| 34|      1|
|          9|    119|           80|           35|      0|29.0|                   0.263| 29|      1|
|         11|    143|           94|           33|    146|36.6|                   0.254| 51|      1|
|         10|    125|           70|           26|    115|31.1|                   0.205| 41|      1|
|          9|    102|           76|           37|      0|32.9|                   0.665| 46|      1|
|          9|    171|          110|           24|    240|45.4|                   0.721| 54|      1|
|         13|    126|           90|            0|      0|43.4|                   0.583| 42|      1|


También puedes utilizar el método `where` junto con el método `count`.

In [0]:
# Filtering the where method
df.where(df['age']>40).count()

Out[19]: 194


# 8. Adding New Columns

Puede utilizar el método `withColumn` para agregar una nueva columna. Vamos a crear una nueva columna utilizando la columna de edad. Para ello, voy a añadir los valores de edad a diez valores.

In [0]:
# Creating a new column
df.withColumn('New_Age',df['age']+10).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|New_Age|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|     60|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|     41|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|     42|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|     31|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|     43|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|     40|
|          3|     78|       


# 9. Grouping Data

Cuando trabajamos con grandes cantidades de datos, a menudo utilizamos el método `groupBy` para resumir los datos. Después de agrupar los datos, puede aplicar una función de agregación a cada uno de ellos. Echemos un vistazo a la suma de cada valor categórico de la columna de resultados.

In [0]:
# Grouping data
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      0|  500|
|      1|  268|
+-------+-----+



También puede utilizar los métodos `distinct` y `count` para buscar valores distintos en una columna. Echemos un vistazo a los valores de distrito en la columna Embarazos.

In [0]:
# Finding distinct values
df.select('Pregnancies').distinct().count()

Out[22]: 17

Puede utilizar otras funciones agregadas, como suma, media o mínima. Busquemos la media de la edad después de agrupar la columna de resultados. Tenga en cuenta que se utiliza el método "alias" para nombrar la nueva columna.

In [0]:
# Applying the aggregate functions
df.groupBy('Outcome').agg(F.mean("age").alias("age_mean")).show(10)

+-------+-----------------+
|Outcome|         age_mean|
+-------+-----------------+
|      0|            31.19|
|      1|37.06716417910448|
+-------+-----------------+




# 10. Applying User-Defined Functions

También puede aplicar su propia función a los datos agrupados con UDF (funciones definidas por el usuario) en el módulo pyspark.sql.functions. Para demostrarlo, primero crearemos una función denominada diabetes.

In [0]:
# Creating a function
def diabete(case):
    if case == 1 :
        return "diabete"
    else:
        return 'no diabete'

Ahora, declaremos la UDF y su tipo de retorno (StringType en este ejemplo). Después de eso, usaré withColumn para crear una nueva columna y luego pasaré la columna Dataframe relevante (Resultado):

In [0]:
# Applying your own function
from pyspark.sql.types import *
diabete_udf = F.udf(diabete, StringType())
df.withColumn('diabete_case', diabete_udf(df['Outcome'])).show(10)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|diabete_case|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+------------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|     diabete|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|  no diabete|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|     diabete|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|  no diabete|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|     diabete|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|     


# 11. Deleting Data

Para eliminar una o varias columnas, puede utilizar el método `drop` en PySpark. Eliminemos la columna `Insulin` con el método `drop`.

In [0]:
# Deleting a column
df.drop('Insulin').show(10)

+-----------+-------+-------------+-------------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+----+------------------------+---+-------+
|          6|    148|           72|           35|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|31.0|                   0.248| 26|      1|
|         10|    115|            0|            0|35.3|                   0.134| 

Para eliminar los registros duplicados del Dataframe, puede utilizar el método `dropDuplicates`.

In [0]:
# Deleting the duplicate records
print("The number of records: ", df.count())
df=df.dropDuplicates()
print("The number of records after removing the duplicate : ", df.count())

The number of records:  768
The number of records after removing the duplicate :  768


Como puede ver, no hay registros duplicados en el conjunto de datos.


# 12. Writing Data

Después de realizar la manipulación de datos, a menudo querrás exportar los resultados. Puedes escribir el Dataframe limpio en la ubicación deseada en el formato requerido con el método `write`.

Escribamos nuestros resultados en archivos CSV.

In [0]:
# Guardar el DataFrame como CSV
df.write.format("csv").option("header", "true").save("./Curso PySpark/prueba")


In [0]:
# Exporting data
df.write.csv("./data/my_dataset.csv")