# Pyspark

### Agenda 
 
1. Introduccion
2. Arquitectura
3. Tipos de Clusters
4. Modulos de Spark
5. Spark RDD
6. Spark DataFrame
7. Spark SQL
8. Spark Streaming
9. Spark GrpahFrame


## Introduccion

PySpark es una biblioteca de Spark escrita en Python para ejecutar aplicaciones de Python usando las capacidades de Apache Spark, con PySpark podemos ejecutar aplicaciones en paralelo en el clúster distribuido (múltiples nodos).
En otras palabras, PySpark es una API de Python para Apache Spark. Apache Spark es un motor de procesamiento analítico para potentes aplicaciones de aprendizaje automático y procesamiento de datos distribuidos a gran escala.

##### ¿Quién utiliza Pyspark?
PySpark se usa muy bien en la comunidad de ciencia de datos y aprendizaje automático, ya que hay muchas bibliotecas de ciencia de datos ampliamente utilizadas escritas en Python, incluidas NumPy, TensorFlow. También se utiliza debido a su procesamiento eficiente de grandes conjuntos de datos. PySpark ha sido utilizado por muchas organizaciones como Walmart, Trivago, Sanofi, Runtastic y muchas más.

##### caracterísicas importantes de Spark
![](https://sparkbyexamples.com/wp-content/uploads/2020/08/pyspark-features-1.png)
# 
- Cálculo en memoria
- Procesamiento distribuido usando paralelizar
- Se puede usar con muchos administradores de clústeres (Spark, Yarn, Mesos, etc.)
- Tolerante a fallos
- Inmutable
- Evaluación perezosa
- Caché y persistencia
- Optimización incorporada al usar DataFrames
- Soporta ANSI SQL

##### Ventajas
#
- Spark es un motor de procesamiento distribuido, en memoria y de uso general que le permite procesar datos de manera eficiente y distribuida.
- Obtendrá grandes beneficios al usar Spark para canalizaciones de ingesta de datos.
- Con Spark podemos procesar datos de Hadoop HDFS, AWS S3 y muchos sistemas de archivos.
- PySpark también se usa para procesar datos en tiempo real usando Streaming y Kafka.
- Con spark streaming, también puede transmitir archivos desde el sistema de archivos y también desde el socket.

## Arquitectura
# 
Apache Spark funciona en una arquitectura master-slave donde el master se llama "driver" y los esclavos se llaman "Workers". Cuando ejecuta una aplicación Spark, Spark Driver crea un contexto que es un punto de entrada a su aplicación, y todas las operaciones (transformaciones y acciones) se ejecutan en los workers y los recursos son administrados por el Administrador de clústeres.

![](https://sparkbyexamples.com/wp-content/uploads/2020/02/spark-cluster-overview.png)

## Tipos de clusters
#
- Standalone –  un administrador de clúster simple incluido con Spark que facilita la configuración de un clúster.
- Apache Mesos – Mesos es un administrador de clústeres que también puede ejecutar aplicaciones Hadoop MapReduce y PySpark.
- Hadoop YARN –  el administrador de recursos en Hadoop 2. Esto se usa principalmente como administrador de clústeres.
- Kubernetes – un sistema de código abierto para automatizar la implementación, el escalado y la gestión de aplicaciones en contenedores.

## Módulos de Spark

![](https://tse1.mm.bing.net/th?id=OIP.IH66oypTYnWQjFkFHoBMCgHaD2&pid=Api&P=0)

## Spark RDD

RDD es una estructura de datos fundamental de Spark que es una colección de objetos distribuidos inmutables y tolerantes a fallas, lo que significa que una vez que crea un RDD no puede cambiarlo. Cada conjunto de datos en RDD se divide en particiones lógicas, que se pueden calcular en diferentes nodos del clúster.

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.2/434.2 MB[0m [31m575.1 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.9
  Downloading py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m203.0/203.0 kB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-4.0.1-py2.py3-none-any.whl size=434813814 sha256=7b916f0514b719b7128a6fbc805aff5e7e919fbf27a83aaa4ce2d1e91d0c20f0
  Stored in directory: /root/.cache/pip/wheels/10/e6/6b/c50eb601fa827dd56a5272db5d5db360e559e527a80a665b1d
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.9 pyspark-4.0.1
[0m
[1m[[0m[34;49mnotic

In [3]:
# Import SparkSession
from pyspark.sql import SparkSession
# usa solo 1 hilo local en mi proceso
# Create SparkSession 
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("RDD") \
      .getOrCreate() 

In [3]:
# Creacion de un RDD utilizando parallelize
# Se guarda en la variable dataList 
dataList = [("Java", 20000), ("Python", 100000), ("Scala", 3000)] # 60M
# se llama a la instacia spark seguido paralleize 1 2 3
rdd = spark.sparkContext.parallelize(dataList)
rdd.collect() #accion

[('Java', 20000), ('Python', 100000), ('Scala', 3000)]

In [4]:
# se realiza conteo (suma, conteo, seleccion, seleccion con una condicion -PROCESO 
rdd.count()

                                                                                

3

In [5]:
print("Número de particiones:", rdd.getNumPartitions())

Número de particiones: 1


In [6]:
print(rdd.glom().collect())

[[('Java', 20000), ('Python', 100000), ('Scala', 3000)]]


In [10]:
rdd2 = spark.sparkContext.parallelize(dataList,3)
print("Particiones:", rdd2.getNumPartitions())
print(rdd2.glom().collect())

Particiones: 3
[[('Java', 20000)], [('Python', 100000)], [('Scala', 3000)]]


In [12]:
rdd2.count()

3

In [11]:
!pip install kagglehub

Collecting kagglehub
  Downloading kagglehub-0.3.13-py3-none-any.whl (68 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.3/68.3 kB[0m [31m6.1 MB/s[0m eta [36m0:00:00[0m
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.5/78.5 kB[0m [31m8.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tqdm, kagglehub
Successfully installed kagglehub-0.3.13 tqdm-4.67.1
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [12]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("janiobachmann/math-students")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/janiobachmann/math-students?dataset_version_number=1...


100%|██████████| 7.16k/7.16k [00:00<00:00, 1.21MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/janiobachmann/math-students/versions/1





In [None]:
path.

In [None]:
# lazy operaciones 

In [9]:
# Creación de un RDD utilizando un archivo
rdd2 = spark.sparkContext.textFile("hdfs://namenode:8020/user/raw/mysql/bd_vanessa/t_student_mat")
rdd2.collect() #accion

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://namenode:8020/user/raw/mysql/bd_vanessa/t_student_mat
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:306)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:245)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:334)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:233)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:301)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:297)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:301)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:297)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2549)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1057)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:417)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1056)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:203)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:75)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:184)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:108)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.io.IOException: Input path does not exist: hdfs://namenode:8020/user/raw/mysql/bd_vanessa/t_student_mat
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:279)
	... 30 more



##### Operaciones RDD
En PySpark RDD, puede realizar dos tipos de operaciones.

Transformaciones RDD:  las transformaciones son operaciones perezosas. Cuando ejecuta una transformación (por ejemplo, una actualización), en lugar de actualizar un RDD actual, estas operaciones devuelven otro RDD.

Acciones de RDD  : operaciones que activan el cálculo y devuelven valores de RDD al controlador.

##### Transformaciones RDD
Las transformaciones en Spark RDD  devuelven otro RDD y las transformaciones son perezosas, lo que significa que no se ejecutan hasta que llamas a una acción en RDD. Algunas transformaciones en los RDD son flatMap(), map(), reduceByKey(), filter(), sortByKey() y devuelven un nuevo RDD en lugar de actualizar el actual.

##### Acciones de RDD
La operación Acción de RDD devuelve los valores de un RDD a un nodo de controlador. En otras palabras, cualquier función RDD que no devuelva RDD[T] se considera una acción. 

Algunas acciones en RDD son count(), collect(), first(), max()y reduce() y más.

## Spark DataFrame

DataFrame es una colección distribuida de datos organizados en columnas con nombre. Es conceptualmente equivalente a una tabla en una base de datos relacional o un marco de datos en R/Python, pero con optimizaciones más ricas bajo el capó. Los marcos de datos se pueden construir a partir de una amplia gama de fuentes, como archivos de datos estructurados, tablas en Hive, bases de datos externas o RDD existentes.

In [3]:
# Creación de un dataframe utilizando createDataFrame
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]

df = spark.createDataFrame(data=data, schema = columns)

NameError: name 'spark' is not defined

In [None]:
#display(df) 
df.printSchema()

In [6]:
# Creación de un dataframe utilizando una fuente externa
df = spark.read.csv("/tmp/resources/student-mat.csv") # spark.read.json/ .parquet ( spark.read.format().load())
df.printSchema()

25/09/15 23:05:25 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: /tmp/resources/zipcodes.csv.
java.io.FileNotFoundException: File /tmp/resources/zipcodes.csv does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:917)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1238)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:907)
	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:462)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:56)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:381)
	at org.apache.spark.sql.catalyst.analysis.ResolveDataSource.org$apache$spark$sql$catalyst$analysis$ResolveDataSource$$loadV1BatchSource(ResolveDataSource.scala:143)
	at org.apache.spark.sql.catalyst.analys

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/tmp/resources/zipcodes.csv. SQLSTATE: 42K03

In [5]:
!pip install kagglehub

Collecting kagglehub
  Downloading kagglehub-0.3.13-py3-none-any.whl (68 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.3/68.3 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
Collecting tqdm
  Downloading tqdm-4.67.1-py3-none-any.whl (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.5/78.5 kB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tqdm, kagglehub
Successfully installed kagglehub-0.3.13 tqdm-4.67.1
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [6]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("janiobachmann/math-students")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Downloading from https://www.kaggle.com/api/v1/datasets/download/janiobachmann/math-students?dataset_version_number=1...


100%|██████████| 7.16k/7.16k [00:00<00:00, 7.81MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/janiobachmann/math-students/versions/1





## Spark SQL

Spark SQL en Apache Spark es uno de los módulos para el procesamiento de la información que ofrece Apache Spark y que trabaja con datos estructurados.
Una vez que haya creado un DataFrame, puede interactuar con los datos utilizando la sintaxis SQL.

In [None]:
df.createOrReplaceTempView("PERSON_DATA")
df.cache()
df.count()
df2 = spark.sql("SELECT * from PERSON_DATA")
df2.printSchema()
df2.show()

In [None]:
# Leer el CSV con encabezados
df_math = spark.read.csv("/ruta/math_student.csv", header=True, inferSchema=True)

# Mostrar los datos
df_math.show(5)

In [None]:
df_math.createOrReplaceTempView("math_student")


In [None]:
spark.sql("SELECT COUNT(*) AS total_estudiantes FROM math_student").show()


In [None]:
spark.sql("SELECT AVG(G3) AS promedio_final FROM math_student").show()


In [None]:
spark.sql("""
    SELECT sex, COUNT(*) AS cantidad
    FROM math_student
    GROUP BY sex
""").show()


In [None]:
spark.sql("""
    SELECT school, ROUND(AVG(G3),2) AS promedio
    FROM math_student
    GROUP BY school
    ORDER BY promedio DESC
""").show()


In [None]:
spark.sql("""
    SELECT absences, ROUND(AVG(G3),2) AS promedio_final
    FROM math_student
    GROUP BY absences
    ORDER BY absences
""").show()


In [None]:
df_math.groupBy("school").avg("G3").show()

## Spark Streaming
Spark Streaming es un sistema de procesamiento de transmisión escalable, de alto rendimiento y tolerante a fallas que admite cargas de trabajo tanto por lotes como de transmisión. Se utiliza para procesar datos en tiempo real de fuentes como la carpeta del sistema de archivos, el socket TCP, S3 , Kafka , Flume , Twitter y Amazon Kinesis , por nombrar algunos. Los datos procesados se pueden enviar a bases de datos, Kafka, dashboard, etc.

![](https://spark.apache.org/docs/latest/img/streaming-arch.png)

In [None]:
df = spark.readStream
      .format("socket")
      .option("host","localhost")
      .option("port","9090")
      .load()
        
query = df.writeStream
      .format("console")
      .outputMode("updated")
      .start()
      .awaitTermination()

In [None]:
df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "192.168.1.100:9092")
        .option("subscribe", "json_topic")
        .option("startingOffsets", "earliest") // From starting
        .load()
        
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   .writeStream
   .format("kafka")
   .outputMode("append")
   .option("kafka.bootstrap.servers", "192.168.1.100:9092")
   .option("topic", "josn_data_topic")
   .start()
   .awaitTermination()

## Spark GrpahFrame

GraphFrames es un paquete para Apache Spark que proporciona gráficos basados ​​en DataFrame. Proporciona API de alto nivel en Scala, Java y Python. Su objetivo es proporcionar tanto la funcionalidad de GraphX como la funcionalidad extendida aprovechando Spark DataFrames. Esta funcionalidad ampliada incluye búsqueda de motivos, serialización basada en DataFrame y consultas gráficas altamente expresivas.