# Introducción a Apache Spark y PySpark

## Apache Spark
Apache Spark es un motor de análisis de datos de código abierto diseñado para procesar grandes volúmenes de datos de manera rápida y eficiente. Es especialmente adecuado para el procesamiento de datos en clústeres, lo que permite el procesamiento paralelo. Spark soporta diferentes lenguajes de programación, incluidos Java, Scala, R y Python.

## PySpark
PySpark es la interfaz de Python para Apache Spark. Permite a los desarrolladores de Python aprovechar la potencia de Spark y realizar tareas de procesamiento de datos a gran escala sin necesidad de cambiar de lenguaje. Con PySpark, puedes manipular grandes conjuntos de datos utilizando las mismas estructuras de datos que en Python, lo que facilita la transición para aquellos que ya están familiarizados con el lenguaje.

## Características Clave de Spark y PySpark
- Velocidad: Spark permite realizar análisis de datos en memoria, lo que reduce significativamente el tiempo de procesamiento en comparación con sistemas que dependen de discos.
- Flexibilidad: Puedes trabajar con diferentes tipos de datos, incluidos datos estructurados, semiestructurados y no estructurados.
- Compatibilidad con Hadoop: Spark se integra bien con el ecosistema Hadoop, lo que permite aprovechar HDFS y otras herramientas de Hadoop.
- Módulos Integrados: Spark incluye módulos para SQL (Spark SQL), procesamiento de datos en tiempo real (Spark Streaming), aprendizaje automático (MLlib) y análisis de grafos (GraphX).

In [1]:
# !pip install pyspark #install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=0400882de39b32d6769b187984157a6c1f0756b4fdd3390881037094002a73e9
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


# **CREAR SESION DE SPARK**

En PySpark, es necesario crear una SparkSession porque es el punto de entrada principal para interactuar con el motor de Spark. La SparkSession proporciona acceso a todas las funcionalidades de Spark, como la carga de datos, la manipulación de DataFrames y el uso de herramientas integradas como Spark SQL, MLlib (aprendizaje automático) y Spark Streaming.

**hay que tener instalado java. En wsl: sudo apt install default-jre y ver si esta con java -version.**

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

SpSession = SparkSession.builder.appName("Demo Spark").getOrCreate()

24/10/22 16:32:31 WARN Utils: Your hostname, DESKTOP-SO3FMKM resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
24/10/22 16:32:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/22 16:32:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
SpContext = SpSession.sparkContext

**ETL (Extract, Transform, Load) es un proceso crucial en la preparación de datos para análisis. Aquí hay una descripción de cómo se pueden implementar estos procesos utilizando Spark y PySpark**

# **CARGA DE DATOS**

Fuentes de Datos: Spark puede conectarse a diversas fuentes de datos, incluidas bases de datos relacionales (usando JDBC), HDFS, Amazon S3, Apache Kafka, y más.

Lectura de Datos: PySpark proporciona métodos como spark.read para leer datos en varios formatos (CSV, JSON, Parquet, etc.).

In [5]:
data = SpSession.read.csv('cars.csv', header=True, sep=";")
data.show(5)

                                                                                

+--------------------+------+---------+------------+----------+------+------------+-----+------+
|                 Car|   MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+------+---------+------------+----------+------+------------+-----+------+
|              STRING|DOUBLE|      INT|      DOUBLE|    DOUBLE|DOUBLE|      DOUBLE|  INT|   CAT|
|Chevrolet Chevell...|  18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|  15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|  18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|  16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
+--------------------+------+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



# **Conociendo los datos**

In [6]:
data.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: string (nullable = true)
 |-- Cylinders: string (nullable = true)
 |-- Displacement: string (nullable = true)
 |-- Horsepower: string (nullable = true)
 |-- Weight: string (nullable = true)
 |-- Acceleration: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Origin: string (nullable = true)



In [7]:
data.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin']

In [8]:
data.dtypes

[('Car', 'string'),
 ('MPG', 'string'),
 ('Cylinders', 'string'),
 ('Displacement', 'string'),
 ('Horsepower', 'string'),
 ('Weight', 'string'),
 ('Acceleration', 'string'),
 ('Model', 'string'),
 ('Origin', 'string')]

# **SELECCIÓN DE COLUMNAS**

In [9]:
from os import truncate
#MÉTODO 1

data.select(data.Car).show(truncate=False)

+--------------------------------+
|Car                             |
+--------------------------------+
|STRING                          |
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
+--------------------------------+
only showing top 20 rows



In [10]:
#MÉTODO 2

data.select(data['car']).show(truncate=False)

+--------------------------------+
|car                             |
+--------------------------------+
|STRING                          |
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
+--------------------------------+
only showing top 20 rows



In [11]:
#MÉTODO 3
from pyspark.sql.functions import col
data.select(col('car')).show(truncate=False)

+--------------------------------+
|car                             |
+--------------------------------+
|STRING                          |
|Chevrolet Chevelle Malibu       |
|Buick Skylark 320               |
|Plymouth Satellite              |
|AMC Rebel SST                   |
|Ford Torino                     |
|Ford Galaxie 500                |
|Chevrolet Impala                |
|Plymouth Fury iii               |
|Pontiac Catalina                |
|AMC Ambassador DPL              |
|Citroen DS-21 Pallas            |
|Chevrolet Chevelle Concours (sw)|
|Ford Torino (sw)                |
|Plymouth Satellite (sw)         |
|AMC Rebel SST (sw)              |
|Dodge Challenger SE             |
|Plymouth 'Cuda 340              |
|Ford Mustang Boss 302           |
|Chevrolet Monte Carlo           |
+--------------------------------+
only showing top 20 rows



In [12]:
#Método 1 de la selección multiple de columnas

data.select(data.Car, data.Cylinders).show(truncate=False)

+--------------------------------+---------+
|Car                             |Cylinders|
+--------------------------------+---------+
|STRING                          |INT      |
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet

In [13]:
#Método 2 de la selección multiple de columnas

data.select(data['car'], data['cylinders']).show(truncate=False)

+--------------------------------+---------+
|car                             |cylinders|
+--------------------------------+---------+
|STRING                          |INT      |
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet

In [14]:
#Método 3 de la selección multiple de columnas
from pyspark.sql.functions import col
data.select(col('CAR'), col('CYLINDERS')).show(truncate=False)

+--------------------------------+---------+
|CAR                             |CYLINDERS|
+--------------------------------+---------+
|STRING                          |INT      |
|Chevrolet Chevelle Malibu       |8        |
|Buick Skylark 320               |8        |
|Plymouth Satellite              |8        |
|AMC Rebel SST                   |8        |
|Ford Torino                     |8        |
|Ford Galaxie 500                |8        |
|Chevrolet Impala                |8        |
|Plymouth Fury iii               |8        |
|Pontiac Catalina                |8        |
|AMC Ambassador DPL              |8        |
|Citroen DS-21 Pallas            |4        |
|Chevrolet Chevelle Concours (sw)|8        |
|Ford Torino (sw)                |8        |
|Plymouth Satellite (sw)         |8        |
|AMC Rebel SST (sw)              |8        |
|Dodge Challenger SE             |8        |
|Plymouth 'Cuda 340              |8        |
|Ford Mustang Boss 302           |8        |
|Chevrolet

# Filtrado de datos (Limpieza de datos)

In [25]:
from pyspark.sql.functions import col
data.filter(col("horsepower") > 200).show(5)

+--------------------+----+---------+------------+----------+------+------------+-----+------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
|    Chevrolet Impala|14.0|        8|       454.0|     220.0| 4354.|         9.0|   70|    US|
|   Plymouth Fury iii|14.0|        8|       440.0|     215.0| 4312.|         8.5|   70|    US|
|    Pontiac Catalina|14.0|        8|       455.0|     225.0| 4425.|        10.0|   70|    US|
|Buick Estate Wago...|14.0|        8|       455.0|     225.0| 3086.|        10.0|   70|    US|
|           Ford F250|10.0|        8|       360.0|     215.0| 4615.|        14.0|   70|    US|
+--------------------+----+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



# **AGREGAR NUEVAS COLUMNAS**

In [26]:
#CASO 1

from pyspark.sql.functions import lit

df = data.withColumn('First_Column',lit(1))
df.show(5, truncate=False)

+-------------------------+------+---------+------------+----------+------+------------+-----+------+------------+
|Car                      |MPG   |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|First_Column|
+-------------------------+------+---------+------------+----------+------+------------+-----+------+------------+
|STRING                   |DOUBLE|INT      |DOUBLE      |DOUBLE    |DOUBLE|DOUBLE      |INT  |CAT   |1           |
|Chevrolet Chevelle Malibu|18.0  |8        |307.0       |130.0     |3504. |12.0        |70   |US    |1           |
|Buick Skylark 320        |15.0  |8        |350.0       |165.0     |3693. |11.5        |70   |US    |1           |
|Plymouth Satellite       |18.0  |8        |318.0       |150.0     |3436. |11.0        |70   |US    |1           |
|AMC Rebel SST            |16.0  |8        |304.0       |150.0     |3433. |12.0        |70   |US    |1           |
+-------------------------+------+---------+------------+----------+------+-----

In [27]:
#CASO 2

from pyspark.sql.functions import lit

df = data.withColumn('Second_Column',lit(2)) \
          .withColumn('third_column',lit('Third Column'))
df.show(5, truncate=False)

+-------------------------+------+---------+------------+----------+------+------------+-----+------+-------------+------------+
|Car                      |MPG   |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|Second_Column|third_column|
+-------------------------+------+---------+------------+----------+------+------------+-----+------+-------------+------------+
|STRING                   |DOUBLE|INT      |DOUBLE      |DOUBLE    |DOUBLE|DOUBLE      |INT  |CAT   |2            |Third Column|
|Chevrolet Chevelle Malibu|18.0  |8        |307.0       |130.0     |3504. |12.0        |70   |US    |2            |Third Column|
|Buick Skylark 320        |15.0  |8        |350.0       |165.0     |3693. |11.5        |70   |US    |2            |Third Column|
|Plymouth Satellite       |18.0  |8        |318.0       |150.0     |3436. |11.0        |70   |US    |2            |Third Column|
|AMC Rebel SST            |16.0  |8        |304.0       |150.0     |3433. |12.0        |70   |US 

# **AGRUPACIÓN**

In [28]:
df.groupBy('Horsepower').count().show(5)

[Stage 12:>                                                         (0 + 1) / 1]

+----------+-----+
|Horsepower|count|
+----------+-----+
|     102.0|    1|
|     68.00|    6|
|     116.0|    1|
|     145.0|    7|
|     90.00|   20|
+----------+-----+
only showing top 5 rows



                                                                                

In [29]:
df.groupBy('Origin','Model').count().show(10)

+------+-----+-----+
|Origin|Model|count|
+------+-----+-----+
| Japan|   76|    4|
|    US|   81|   13|
|    US|   80|    7|
|    US|   76|   22|
| Japan|   70|    2|
|    US|   78|   22|
|Europe|   76|    8|
|    US|   70|   27|
| Japan|   75|    4|
|Europe|   80|    9|
+------+-----+-----+
only showing top 10 rows



# **Eliminar Columnas**

In [30]:
df = df.drop('Second_Column','third_column')
df.show(5)

+--------------------+------+---------+------------+----------+------+------------+-----+------+
|                 Car|   MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------+------+---------+------------+----------+------+------------+-----+------+
|              STRING|DOUBLE|      INT|      DOUBLE|    DOUBLE|DOUBLE|      DOUBLE|  INT|   CAT|
|Chevrolet Chevell...|  18.0|        8|       307.0|     130.0| 3504.|        12.0|   70|    US|
|   Buick Skylark 320|  15.0|        8|       350.0|     165.0| 3693.|        11.5|   70|    US|
|  Plymouth Satellite|  18.0|        8|       318.0|     150.0| 3436.|        11.0|   70|    US|
|       AMC Rebel SST|  16.0|        8|       304.0|     150.0| 3433.|        12.0|   70|    US|
+--------------------+------+---------+------------+----------+------+------------+-----+------+
only showing top 5 rows



In [31]:
df

DataFrame[Car: string, MPG: string, Cylinders: string, Displacement: string, Horsepower: string, Weight: string, Acceleration: string, Model: string, Origin: string]

In [32]:
total_count = df.count()
print("Total de registros:", total_count)

europa= df.filter(col('Origin')=="US").count()
print("Total de registros en US:", europa)

df.filter(col('Origin')=="US").show(truncate=False)

Total de registros: 407
Total de registros en US: 254
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                             |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+--------------------------------+----+---------+------------+----------+------+------------+-----+------+
|Chevrolet Chevelle Malibu       |18.0|8        |307.0       |130.0     |3504. |12.0        |70   |US    |
|Buick Skylark 320               |15.0|8        |350.0       |165.0     |3693. |11.5        |70   |US    |
|Plymouth Satellite              |18.0|8        |318.0       |150.0     |3436. |11.0        |70   |US    |
|AMC Rebel SST                   |16.0|8        |304.0       |150.0     |3433. |12.0        |70   |US    |
|Ford Torino                     |17.0|8        |302.0       |140.0     |3449. |10.5        |70   |US    |
|Ford Galaxie 500                |15.0|8        |429.0       |198.0     |4341. |10.0      

In [33]:
total_count = df.count()
print("Total de registros:", total_count)
usa= df.filter((col('Origin')=="US")&(col('Horsepower')=="175.0")).count()
print("Total de registros:", usa)

df.filter((col('Origin')=="US")&(col('Horsepower')=="175.0")).show(truncate=False)



Total de registros: 407
Total de registros: 7
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                      |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------------+----+---------+------------+----------+------+------------+-----+------+
|Plymouth Satellite (sw)  |0   |8        |383.0       |175.0     |4166. |10.5        |70   |US    |
|AMC Rebel SST (sw)       |0   |8        |360.0       |175.0     |3850. |11.0        |70   |US    |
|Pontiac Catalina Brougham|14.0|8        |400.0       |175.0     |4464. |11.5        |71   |US    |
|Pontiac Safari (sw)      |13.0|8        |400.0       |175.0     |5140. |12.0        |71   |US    |
|Pontiac Catalina         |14.0|8        |400.0       |175.0     |4385. |12.0        |72   |US    |
|Buick Century 350        |13.0|8        |350.0       |175.0     |4100. |13.0        |73   |US    |
|AMC Ambassador Brougham  |13.0|8        |360.0       

# **ORDENAR FILAS**

In [34]:
df.orderBy('Cylinders').show(truncate=False)

+---------------------------+----+---------+------------+----------+------+------------+-----+------+
|Car                        |MPG |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+---------------------------+----+---------+------------+----------+------+------------+-----+------+
|Mazda RX2 Coupe            |19.0|3        |70.00       |97.00     |2330. |13.5        |72   |Japan |
|Mazda RX3                  |18.0|3        |70.00       |90.00     |2124. |13.5        |73   |Japan |
|Mazda RX-4                 |21.5|3        |80.00       |110.0     |2720. |13.5        |77   |Japan |
|Mazda RX-7 GS              |23.7|3        |70.00       |100.0     |2420. |12.5        |80   |Japan |
|Ford Pinto Runabout        |21.0|4        |122.0       |86.00     |2226. |16.5        |72   |US    |
|Chevrolet Vega (sw)        |22.0|4        |140.0       |72.00     |2408. |19.0        |71   |US    |
|Toyota Corolla Mark ii     |24.0|4        |113.0       |95.00     |2372. |15.0   

In [35]:
df.orderBy('Cylinders',ascending=False).show(truncate=False)

+-------------------------+------+---------+------------+----------+------+------------+-----+------+
|Car                      |MPG   |Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|
+-------------------------+------+---------+------------+----------+------+------------+-----+------+
|STRING                   |DOUBLE|INT      |DOUBLE      |DOUBLE    |DOUBLE|DOUBLE      |INT  |CAT   |
|Plymouth 'Cuda 340       |14.0  |8        |340.0       |160.0     |3609. |8.0         |70   |US    |
|Chevrolet Chevelle Malibu|18.0  |8        |307.0       |130.0     |3504. |12.0        |70   |US    |
|Ford Mustang Boss 302    |0     |8        |302.0       |140.0     |3353. |8.0         |70   |US    |
|Buick Skylark 320        |15.0  |8        |350.0       |165.0     |3693. |11.5        |70   |US    |
|Chevrolet Monte Carlo    |15.0  |8        |400.0       |150.0     |3761. |9.5         |70   |US    |
|AMC Rebel SST            |16.0  |8        |304.0       |150.0     |3433. |12.0   

In [36]:
df.groupBy('Origin').count().orderBy('count', ascending=False).show(10)

+------+-----+
|Origin|count|
+------+-----+
|    US|  254|
| Japan|   79|
|Europe|   73|
|   CAT|    1|
+------+-----+

