# Clases - Computación distribuida con Apache Spark.

## Objetivos
- Instalación de librerías necesarias 
- SparkSession
- Creación y manipulación de DataFrames
- Operaciones básicas de DataFrames
- Selección de columnas y filtros
- Carga de datos a un Spark DataFrame


### Instalación de librerías necesarias

Si utilizas Conda para administrar ambientes de desarrollo la mejor vía para asegurar que funcionen correctamente las librerías es instalando directo desde Conda. Adicionalmente se instalan todas las librerías que necesita Spark.
- `conda create -n pyspark-DE`
- `conda activate pyspark-DE`
- `conda install -c conda-forge pyspark python=3.10`

De igual forma se puede instalar a través de PIP, pero sin asegurar funcionamiento correcto ni la instalación de dependencias. Adicionalmente será necesario tener instalado Pandas.
- `conda create -n pyspark-DE python=3.10`
- `conda activate pyspark-DE`
- `pip install pyspark`
- `pip install pandas`.




### SparkSession

SparkSession es una clase en PySpark que existe desde la versión 2.0 (2016) que simplifica la forma de trabajar con Spark, tanto en las configuraciones como en la manipulación de datos estructurados. 

#### Funcionalidades principales de SparkSession:

1. **Configura Spark**: Para profundizar en las configuraciones posibles de Spark, visite https://spark.apache.org/docs/latest/configuration.html.
    - `SparkSession.builder.appName("some name").**config("some config key,value")**.getOrCreate()`
    - `spark.conf.get("some config key")`
2. **Crear DataFrames**: permite leer y escribir (Input/Outpu) diversas fuentes de datos y crear DataFrames para la manipulación de datos.
    - `spark.createDataFrame(data [, schema])`
    - `spark.read.json("path to some json")`
3. **Ejecutar SQL**: facilita la ejecución de consultas en SQL sobre los DataFrames.
    - `spark.sql("query to some view")`
4. **Gestiona contexto de Spark**: facilita la configuración y acceso a diferentes componentes y funcionalidades de Spark
    - `spark.sql.shuffle.partitions`
    - `spark.executor.memory`
    - `spark.catalog.listTables()`
    - `spark.catalog.listColumns("someTable")`
    - `spark.udf.register("someName", someUdf)`

In [27]:
from pyspark.sql import SparkSession

# Crear una SparkSession con configuraciones personalizadas
spark = SparkSession.builder \
    .appName("ConfiguracionEjemplo") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Obtener una configuración específica
shuffle_partitions = spark.conf.get("spark.sql.shuffle.partitions")
print(f"Shuffle Partitions: {shuffle_partitions}")

Shuffle Partitions: 50


In [None]:
## Application
# config("spark.driver.memory", "1g")
# spark.app.name
# spark.driver.cores, 1
# spark.executor.memory, "1g" 

## Runtime Environment

## Shuffle Behavior

## Spark UI
# spark.eventLog.enabled, false
# spark.eventLog.buffer.kb, 100k
# spark.ui.enabled, true
# spark.ui.port, 4040

## Compression and Serialization



In [None]:
## CHATGPT

from pyspark.sql import SparkSession

# Crear una SparkSession con configuraciones personalizadas
spark = SparkSession.builder \
    .appName("ConfiguracionEjemplo") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Mostrar las configuraciones actuales
spark.conf.getAll()


# Crear una tabla temporal a partir de un DataFrame
data = [("Alice", 34), ("Bob", 45), ("Catherine", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView("people")

# Listar las tablas del catálogo
spark.catalog.listTables()

# Describir la estructura de una tabla
spark.catalog.listColumns("people")


### UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Definir una función de Python
def increment_by_one(x):
    return x + 1

# Registrar la función como UDF
increment_udf = udf(increment_by_one, IntegerType())
spark.udf.register("increment", increment_udf)

# Usar la UDF en una consulta SQL
spark.sql("SELECT Name, increment(Age) as AgePlusOne FROM people").show()

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("example").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/09 08:30:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
SparkSession.active()

### DataFrame en Spark

Un DataFrame es una estructura de datos bidimensional similar a cualquier tabla en una base de datos estructurada. Algunas de las características que tiene un DataFrame en PySpark son:
- **Distribuido**: Los datos están distribuidos en un clúster de nodos, lo que permite el procesamiento paralelo.
- **Inmutable**: Cada transformación produce un nuevo DataFrame.
- **SQL**: Permite operaciones tipo SQL y ofrece una interfaz similar a pandas pero a gran escala.
- **Conexiones diversas**: Puede leer datos de múltiples fuentes como JSON, CSV, Parquet, JDBC y más.
- **Optimización Automática**: Utiliza Catalyst Optimizer para optimizar automáticamente las consultas.


### Comparación entre PySpark DataFrame y Pandas DataFrame


|   |Pyspark   |Pandas|
|----------|---------|----------|
|**Escalabilidad**| Escala horizontalmente, puede manejar terabytes o petabytes de datos distribuidos en múltiples nodos de un clúster.|Diseñado para datos que caben en memoria, generalmente utilizado para análisis de datos pequeños a medianos.|
|**Performance**|Optimizado para el procesamiento paralelo y distribuido. Puede manejar tareas complejas y pesadas con eficiencia.|Muy rápido para operaciones en memoria y datos de tamaño moderado, pero no es adecuado para grandes volúmenes de datos debido a las limitaciones de memoria.|
|**Optimización**|Utiliza Catalyst Optimizer para optimizar automáticamente las consultas.|No tiene optimización automática, el rendimiento depende del diseño del código del usuario.|
|**Uso**|Ideal para grandes volúmenes de datos y procesamiento de big data en un entorno distribuido.| Adecuado para análisis de datos exploratorios, transformación de datos y preparación de datos en el entorno local.


In [10]:
from datetime import date

data = [
    {
        "nombre":"Marcelo",
        "nacimiento":date(1987, 10, 7),
        "mail":"marcelo@correo.cl",
        "x":3232,
        "y":24.36,
        "active":True
    },
    {
        "nombre":"Juan",
        "nacimiento":date(1990, 5, 15),
        "mail":"juan@correo.cl",
        "x":5226,
        "y":26.26,
        "active":False
    },
    {
        "nombre":"Andrea",
        "nacimiento":date(1995, 3, 20),
        "mail":"andrea@correo.cl",
        "x":2258,
        "y":25.75,
        "active":True

    }
]

df1 = spark.createDataFrame(data)
df1.show()

+------+-----------------+----------+-------+----+-----+
|active|             mail|nacimiento| nombre|   x|    y|
+------+-----------------+----------+-------+----+-----+
|  true|marcelo@correo.cl|1987-10-07|Marcelo|3232|24.36|
| false|   juan@correo.cl|1990-05-15|   Juan|5226|26.26|
|  true| andrea@correo.cl|1995-03-20| Andrea|2258|25.75|
+------+-----------------+----------+-------+----+-----+



In [12]:
from pyspark.sql import Row

df2 = spark.createDataFrame([
    Row(nombre="Marcelo", nacimiento=date(1987, 10, 7), mail="marcelo@correo.cl", x=3232, y=24.36, active=True),
    Row(nombre="Juan", nacimiento=date(1990, 5, 15), mail="juan@correo.cl", x=5226, y=26.26, active=False),
    Row(nombre="Andrea", nacimiento=date(1995, 3, 20), mail="andrea@correo.cl", x=2258, y=25.75, active=True),
])

df2.show()

+-------+----------+-----------------+----+-----+------+
| nombre|nacimiento|             mail|   x|    y|active|
+-------+----------+-----------------+----+-----+------+
|Marcelo|1987-10-07|marcelo@correo.cl|3232|24.36|  true|
|   Juan|1990-05-15|   juan@correo.cl|5226|26.26| false|
| Andrea|1995-03-20| andrea@correo.cl|2258|25.75|  true|
+-------+----------+-----------------+----+-----+------+



In [13]:
df1.printSchema()
df2.printSchema()

root
 |-- active: boolean (nullable = true)
 |-- mail: string (nullable = true)
 |-- nacimiento: date (nullable = true)
 |-- nombre: string (nullable = true)
 |-- x: long (nullable = true)
 |-- y: double (nullable = true)

root
 |-- nombre: string (nullable = true)
 |-- nacimiento: date (nullable = true)
 |-- mail: string (nullable = true)
 |-- x: long (nullable = true)
 |-- y: double (nullable = true)
 |-- active: boolean (nullable = true)



In [18]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, FloatType, BooleanType

cols=[StructField('nombre', StringType(),False),
      StructField('nacimiento', DateType(),True),
      StructField('mail', StringType(),True),
      StructField('x', IntegerType(),False),
      StructField('y', FloatType(),True),
      StructField('active', BooleanType(),False),
      ]

schema = StructType(cols)

df3 = spark.createDataFrame([], schema)

df3.show()

+------+----------+----+---+---+------+
|nombre|nacimiento|mail|  x|  y|active|
+------+----------+----+---+---+------+
+------+----------+----+---+---+------+



In [19]:
df3.printSchema()

root
 |-- nombre: string (nullable = false)
 |-- nacimiento: date (nullable = true)
 |-- mail: string (nullable = true)
 |-- x: integer (nullable = false)
 |-- y: float (nullable = true)
 |-- active: boolean (nullable = false)



In [21]:
data = [
    {
        "nombre":"Marcelo",
        "nacimiento":date(1987, 10, 7),
        "mail":"marcelo@correo.cl",
        "x":3232,
        "y":24.36,
        "active":True
    },
    {
        "nombre":"Juan",
        "nacimiento":date(1990, 5, 15),
        "mail":"juan@correo.cl",
        "x":5226,
        "y":26.26,
        "active":False
    },
    {
        "nombre":"Andrea",
        "nacimiento":date(1995, 3, 20),
        "mail":"andrea@correo.cl",
        "x":2258,
        "y":25.75,
        "active":True

    }
]

df3 = spark.createDataFrame(data, schema)
df3.show()

+-------+----------+-----------------+----+-----+------+
| nombre|nacimiento|             mail|   x|    y|active|
+-------+----------+-----------------+----+-----+------+
|Marcelo|1987-10-07|marcelo@correo.cl|3232|24.36|  true|
|   Juan|1990-05-15|   juan@correo.cl|5226|26.26| false|
| Andrea|1995-03-20| andrea@correo.cl|2258|25.75|  true|
+-------+----------+-----------------+----+-----+------+



In [22]:
import pandas as pd

pd_df = pd.DataFrame(data)

pd_df.head()

Unnamed: 0,nombre,nacimiento,mail,x,y,active
0,Marcelo,1987-10-07,marcelo@correo.cl,3232,24.36,True
1,Juan,1990-05-15,juan@correo.cl,5226,26.26,False
2,Andrea,1995-03-20,andrea@correo.cl,2258,25.75,True


In [23]:
df = spark.createDataFrame(pd_df)
df.show()
df.printSchema()
df.show(5,vertical=True)

+-------+----------+-----------------+----+-----+------+
| nombre|nacimiento|             mail|   x|    y|active|
+-------+----------+-----------------+----+-----+------+
|Marcelo|1987-10-07|marcelo@correo.cl|3232|24.36|  true|
|   Juan|1990-05-15|   juan@correo.cl|5226|26.26| false|
| Andrea|1995-03-20| andrea@correo.cl|2258|25.75|  true|
+-------+----------+-----------------+----+-----+------+

root
 |-- nombre: string (nullable = true)
 |-- nacimiento: date (nullable = true)
 |-- mail: string (nullable = true)
 |-- x: long (nullable = true)
 |-- y: double (nullable = true)
 |-- active: boolean (nullable = true)

-RECORD 0-----------------------
 nombre     | Marcelo           
 nacimiento | 1987-10-07        
 mail       | marcelo@correo.cl 
 x          | 3232              
 y          | 24.36             
 active     | true              
-RECORD 1-----------------------
 nombre     | Juan              
 nacimiento | 1990-05-15        
 mail       | juan@correo.cl    
 x          | 

In [24]:
df.columns

['nombre', 'nacimiento', 'mail', 'x', 'y', 'active']

In [11]:
df.collect()

[Row(nombre='Marcelo', nacimiento=datetime.date(1987, 10, 7), mail='marcelo@correo.cl', min_played=180),
 Row(nombre='Juan', nacimiento=datetime.date(1990, 5, 15), mail='juan@correo.cl', min_played=90),
 Row(nombre='Andrea', nacimiento=datetime.date(1995, 3, 20), mail='andrea@correo.cl', min_played=270)]

In [12]:
ps_df = df.toPandas()
ps_df.head()

Unnamed: 0,nombre,nacimiento,mail,min_played
0,Marcelo,1987-10-07,marcelo@correo.cl,180
1,Juan,1990-05-15,juan@correo.cl,90
2,Andrea,1995-03-20,andrea@correo.cl,270


In [13]:
df.nombre

Column<'nombre'>

In [14]:
df.select('nombre').show()
df.select(df.nombre).show()

+-------+
| nombre|
+-------+
|Marcelo|
|   Juan|
| Andrea|
+-------+

+-------+
| nombre|
+-------+
|Marcelo|
|   Juan|
| Andrea|
+-------+



In [15]:
from pyspark.sql.functions import upper
df.withColumn('upper_nombre',upper(df.nombre)).show()

+-------+----------+-----------------+----------+------------+
| nombre|nacimiento|             mail|min_played|upper_nombre|
+-------+----------+-----------------+----------+------------+
|Marcelo|1987-10-07|marcelo@correo.cl|       180|     MARCELO|
|   Juan|1990-05-15|   juan@correo.cl|        90|        JUAN|
| Andrea|1995-03-20| andrea@correo.cl|       270|      ANDREA|
+-------+----------+-----------------+----------+------------+



In [16]:
df.show()

+-------+----------+-----------------+----------+
| nombre|nacimiento|             mail|min_played|
+-------+----------+-----------------+----------+
|Marcelo|1987-10-07|marcelo@correo.cl|       180|
|   Juan|1990-05-15|   juan@correo.cl|        90|
| Andrea|1995-03-20| andrea@correo.cl|       270|
+-------+----------+-----------------+----------+



In [17]:
df = df.withColumn('upper_nombre',upper(df.nombre))
df.show()

+-------+----------+-----------------+----------+------------+
| nombre|nacimiento|             mail|min_played|upper_nombre|
+-------+----------+-----------------+----------+------------+
|Marcelo|1987-10-07|marcelo@correo.cl|       180|     MARCELO|
|   Juan|1990-05-15|   juan@correo.cl|        90|        JUAN|
| Andrea|1995-03-20| andrea@correo.cl|       270|      ANDREA|
+-------+----------+-----------------+----------+------------+



In [18]:
df.filter(df.min_played > 90).show()

df.filter("min_played > 90").show()

+-------+----------+-----------------+----------+------------+
| nombre|nacimiento|             mail|min_played|upper_nombre|
+-------+----------+-----------------+----------+------------+
|Marcelo|1987-10-07|marcelo@correo.cl|       180|     MARCELO|
| Andrea|1995-03-20| andrea@correo.cl|       270|      ANDREA|
+-------+----------+-----------------+----------+------------+

+-------+----------+-----------------+----------+------------+
| nombre|nacimiento|             mail|min_played|upper_nombre|
+-------+----------+-----------------+----------+------------+
|Marcelo|1987-10-07|marcelo@correo.cl|       180|     MARCELO|
| Andrea|1995-03-20| andrea@correo.cl|       270|      ANDREA|
+-------+----------+-----------------+----------+------------+



In [19]:
SparkSession.builder.appName("example").getOrCreate().stop()