### <a name="index"></a> Index

### [Introducción a Apache Spark](#mark_01)

### [Introducción RDD y DataFrames.](#mark_02)

### [SparkContext - Spark Session](#mark_03)

### [Leer CSV a un DataFrame con SparkSession.](#mark_03.1)

### [RDD Transformaciones y Acciones](#mark_04)

### [RDD Acciones de modificación](#mark_05)

### [ Por Qué No Usar "collect()"](#mark_06)

### [Acciones de conteo sobre 2 o más RDDs](#mark_07)

### [Operaciones numéricas](#mark_08)

### [DataFrames](#mark_09)

### [Inferencia tipos de datos](#mark_10)

### [Operaciones sobre DF](#mark_11)

   - ### [printSchema()](#mark_12)

   - ### [withColumnRenamed("current_name", "new_name").drop("column_name")](#mark_13)

   - ### [select(), col("column_name").alias("new_name")](#mark_14)

   - ### [sort("column_name")](#mark_15)

   - ### [filter(df.column_name logic_condition)](#mark_16)

### [Agrupaciones y operaciones join, multiple joins, select() sobre DF](#mark_17)

### [Funciones de agrupación](#mark_18)

   - ### [groupBy()](#mark_19)

   - ### [ejemplo con withColumn("nombre_columna", "operacion_con_cast")](#mark_20)

   - ### [.agg() la forma correcta de hacer agregaciones](#mark_21)

### [SQL](#mark_22)

### [UDF User-defined function](#mark_23)

### [Comprendiendo la persistencia y particionado](#mark_24)

### [Preguntamos si un DF está en Cache](#mark_25)

### [Que tipo de persistencia tiene un DF? "getStorageLevel()"](#mark_26)

### [unpersist()](#mark_27)

### [Cambiando el tipo de "StorageLevel"](#mark_28)

### [Creando mi propio "StorageLevel", persistencia de datos](#mark_29)

### [Particionando datos, RDD o DF.](#mark_30)

### [Ver cantidad de particiones getNumPartitions()](#mark_31)

### [Guardando los rdd o df con saveAsTextFile()](#mark_32)

### [Reconstruyendo rdd desde las particiones wholeTextFiles()](#mark_33)

### [Reconstruyendo rdd desde las particiones textFile() en un solo paso](#mark_34)

### [Data Masking](#mark_35)

### [SHA-256 (Secure Hash Algorithm 256-bit)](#mark_36)

### [](#mark_3)

**-------------------------------------------------------------------------------------------------------------------------------------------------**

### <a name="mark_01"></a> Introducción a Apache Spark.

### [Index](#index)

![](img_01.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_02.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

Apache Spark es un framework de trabajo para el desarrollo de grandes datos o big data y se preocupa de la velocidad y continuidad del procesamiento de datos, en contraparte de Hadoop que se preocupa por un almacenamiento grande de datos.

Podemos utilizar multiples lenguajes

- Java
- Scala (Spark corre nativamente aquí)
- Python
- R
¿Que nos es Apache Spark? No es una base de datos

**OLAP (Online analytical processing)**: Es un sistema de recuperación de datos y análisis de datos en linea.

**OLTP (Online Transaction Processing)**: Es un sistema transaccional en línea y gestiona la modificación de la base de datos.

Spark debe estar conectado a un Data warehouse para poder aprovechar toda su funcionalidad.

![](img_03.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_04.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_05.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

### <a name="mark_02"></a> Introducción RDD y DataFrames.

### [Index](#index)

![](img_06.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_07.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

- Características de los RDD:

Principal abstracción de datos: Es la unidad básica, existen desde su inicio hasta su versión 3.0.

Distribución: Los RDD se dritribuyen y particionan a lo largo del clúster.

Creación simple: Al no poseer estructura formalmente, adoptan las más intuitiva.

Inmutabilidad: Posterior a su creación no se pueden modificar

Ejecución perezosa: A menos que se realice una acción, todo lo que se escribió de código, no corre.

Por ejemplo, si estoy cargando un archivo que no existe no me voy a dar cuenta, también puede pasar que si el archivo existe pero posee error tampoco me voy a dar cuenta hasta que todo el archivo sea cargado.

- Aquí nacen dos diferencias Transformaciones y Acciones:

![](img_09.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

- Todas las operaciones que sean "Transformaciones" las podrémos escribir sin problema.
- Una vez que realizo una "acción" estoy dandole vida a lo que hago, con lo cual hay que ser cuidadoso con las operaciones que realicemos.

- Para el siguiente ejemplo:
![](img_10.png)

1. Cargamos un texto.
2. Genero 2 nuevos RDD a partir del RDD existente, uno tendrá todas las palabras "Comala" y el otro las palabras "Páramo"
3. Luego realizamos una intersección de ambos.
- Hasta este momento el archivo que estoy cargando pdría no existir, incluso tener algunos errores en las filtros, "pero" hasta que no ejecute la "Acción COUNT" no nos daremos cuenta que tenemos un error de fondo, y debemos tener cuidado al momento de trabajar con los RDD.

**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_08.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

- Algunos problemas a lo largo de la vida de nuestro ETL:

Muchos RDD conviviendo

RDD que ya son basura y tenemos que hacer acciones para que Garbage Collector se ejecute.

- Características de los DataFrame:

Son una capa superior que existe sobre los RDD.

Poseen estructura (una columna).

![](img_11.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_12.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

Formato: A diferencia de un RDD poseen columnas, las cuales pueden ser string, double, int, float, date, etc.

Optimización: Poseen una mejor implementación, lo cual los hace preferibles.

Facilidad de creación: Se pueden crear desde una base de datos externa, archivo o RDD existente.

- ¿Cuándo usar un RDD?

![](img_13.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

Cuando te interesa controlar el flujo de Spark, para saltearte algunas cosas que sabes que Spark no está optimizando bien.

Si eres usuario Python, convertir a RDD un conjunto permite mejor control de datos.

Estás conectándote a versiones antiguas de spark.

- ¿Cuándo usar DataFrames?

![](img_14.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

Si poseemos semánticas de datos complicadas.

Vamos a realizar tareas de alto nivel como filtros, mapeos, agregaciones, promedios o sumas.

Si vamos a usar sentencias SQL.

### Todas las aplicaciones en Spark poseen un manejador central de programa (Driver) y varios ejecutores que se crean a lo largo del clúster, estas son las computadoras que realizarán las tareas en paralelo y finalmente devolverán los valores al driver, la aplicación central.

Para nuestros fines, debido a que se usa un modelo stand-alone, solo se contará con un driver y un ejecutor, ambos alojados en la misma computadora.

- RDD

Para poder realizar estas tareas, Spark posee desde su versión 1.0 los RDD (Resilient Distributed Dataset), los cuales son tolerantes a fallos y pueden ser distribuidos a lo largo de los nodos del clúster.

Los RDD pueden ser creados al cargar datos de manera distribuida, como es desde un HDFS, Cassanda, Hbase o cualquier sistema de datos soportado por Hadoop, pero también por colecciones de datos de Scala o Python, además de poder ser leídos desde archivos en el sistema local.

En visión general, un RDD puede ser visto como un set de datos los cuales soportan solo dos tipos de operaciones: **transformaciones y acciones**.

Las transformaciones permiten crear un nuevo RDD a partir de uno previamente existente, mientras que las acciones retornan un valor al driver de la aplicación. El núcleo de operación del paradigma de Spark es la ejecución perezosa (Lazy), es decir que las transformaciones solo serán calculadas posterior a una llamada de acción.

Además, los RDD poseen una familiaridad con el paradigma orientado a objetos, lo cual permite que podamos realizar operaciones de bajo nivel a modo. **Map, filter y reduce** son tres de las operaciones más comunes.

Una de las grandes ventajas que ofrecen los RDD es la compilación segura; por su particularidad de ejecución perezosa, se calcula si se generará un error o no antes de ejecutarse, lo cual permite identificar problemas antes de lanzar la aplicación. 

El peor que podemos encontrar con los RDD es que no son correctamente tratados por el Garbage collector y cuando las lógicas de operación se hacen complejas, su uso puede resultar poco práctico, aquí entran los DataFrames.

- DataFrames

Esos componentes fueron agregados en la versión 1.3 de Spark y pueden ser invocados con el contexto espacial de Spark SQL. Como indica su nombre, es un módulo especialmente desarrollado para ser ejecutado con instrucciones parecidas al SQL estándar.

De la misma forma, como los RDD, estos pueden ser creados a partir de archivos, tablas tipo Hive, bases de datos externas y RDD o DataFrames existentes.

El primer detalle que salta cuando creamos un DataFrame es que poseen columnas nombradas, lo que a nivel conceptual es como trabajar con un DataFrame de Pandas. Con la excepción que a nivel interno Spark trabaja con Scala, lo cual le asigna a cada columna el tipo de dato Row, un tipo especial de objeto sin tipo definido.

Pero no es todo, los DataFrames implementan un sistema llamado Catalyst, el cual es un motor de optimización de planes de ejecución, parecido al que usan las bases de datos, pero diseñado para la cantidad de datos propia de Spark, aunado a eso, se tiene implementado un optimizador de memoria y consumo de CPU llamado Tungsten, el cual determina cómo se convertirán los planes lógicos creados por Catalyst a un plan físico.

### <a name="mark_03"></a> SparkContext - SparkSession.

### [Index](#index)

In [1]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable # ver --> "solucion python worker versiones"
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable # ver --> "solucion python worker versiones"


### Ver --> [solucion python worker versiones](#solucion_python_worker_versiones)

In [2]:
from pyspark import SparkContext

In [3]:
from pyspark.sql import SparkSession

In [8]:
#Creando la primer sesión
'''spark = SparkSession.builder \
    .master("local")\ #Donde va estar viviendo mi sesión
    .appName("mi_primer_sesion")\ #Nombre de la sesión.
    .getOrCreate() #creación'''

spark = SparkSession.builder.master("local").appName("mi_primer_sesion").getOrCreate()

### Differencias entre "context" y "session".

basicamente es asi.
SparkContext = versiones 1 y 2 de Spark, funciones, metodos para interractuar con Spark

```py
sc = SparkContext(master="local", appName="mi_primer_contexto")
```

SparkSession = SparkContext + nuevas funciones, mejores formas de configuraciones que no están presentes en las versiones anteriores de "SparkContext"
- Desde SparkSession podemos invocar Contextos y convertirlos en Sesiones (esto es útil si estamos utilizando versiones antigüas, con lo cual no hay que reconstruir nada).

### Pasar de SparkContext --> SparkSession:
```py
sc_a_ss = SparkSession(sc)
```

In [9]:
spark #Spark cuenta con un UI, monitor gráfico "Spark UI"

### Spark UI.

- Jobs --> De tener DF o RDD ejecutados podemos ver los momentos de despliegue.
- Stages --> El paso a paso de los DF o RDD cargados.
- Environments --> checkeo del ambiente

In [10]:
# Al finalizar la sesión debe ser cerrada para que no siga consumiendo recursos.
spark.stop()

### <a name="mark_03.1"></a> Leer CSV a un DataFrame con SparkSession.

### [Index](#index)

In [None]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType

In [None]:
spark=SparkSession.builder\
    .master('local')\
    .appName('test_session_01')\
    .getOrCreate()

In [None]:
!pwd
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

In [None]:
general_path='./curso_spark_git_clone/curso-apache-spark-platzi/files/'

In [None]:
deporte_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path + 'deporte.csv')

deportista_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path+'deportista.csv')

deportista2_df=spark.read.options(inferSchema=True, header=False, delimiter=',')\
    .csv(general_path+'deportista2.csv')

evento_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path+'evento.csv')

juegos_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path+'juegos.csv')

paises_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path+'paises.csv')

resultados_df=spark.read.options(inferSchema=True, header=True, delimiter=',')\
    .csv(general_path+'resultados.csv')

print('deporte_df')
deporte_df.printSchema()
print('deportista_df')
deportista_df.printSchema()
print('deportista2_df')
deportista2_df.printSchema()
print('evento_df')
evento_df.printSchema()
print('juegos_df')
juegos_df.printSchema()
print('paises_df')
paises_df.printSchema()
print('resultados_df')
resultados_df.printSchema()

In [None]:
spark.stop()

### <a name="mark_04"></a> RDD Transformaciones y Acciones.

### [Index](#index)

In [11]:
from pyspark import SparkContext

In [12]:
sc_01 = SparkContext(master="local", appName="transformaciones_y_acciones")

In [13]:
#los rdd son distribuidos y paralelos, y podemos crearlos de la siguiente forma
rdd_01=sc_01.parallelize([1,2,3]) #datos distribuidos en el sistema

In [14]:
#visualización
rdd_01.collect()

[1, 2, 3]

In [15]:
sc_01

### Primer Job realizados

![](img_15.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

- Inspeccionando el trabajo.

![](img_16.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

In [16]:
pwd

'/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos'

In [21]:
#Visualizando los archivos clonados desde GitHub.
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files 

total 14520
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  9 09:38 .
drwxr-xr-x 5 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  7 13:11 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01     946 Oct  7 13:11 deporte.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2764536 Oct  7 13:11 deportista.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2807091 Oct  7 13:11 deportista2.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2776782 Oct  7 13:11 deportistaError.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   33262 Oct  7 13:11 evento.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    1978 Oct  7 13:11 juegos.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01  255853 Oct  7 13:11 modelo_relacional.jpg
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   23606 Oct  7 13:11 paises.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 6172796 Oct  7 13:11 resultados.csv


In [22]:
path="./curso_spark_git_clone/curso-apache-spark-platzi/files/"

#creand el rdd equipos_olimplicos_rdd
equipos_olimpicos_rdd=sc_01.textFile(path+"paises.csv")\
    .map(lambda line: line.split(","))

In [23]:
#Tomamos las primeras 5 líneas.
equipos_olimpicos_rdd.take(5)

                                                                                

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

In [24]:
sc_01.stop()

### <a name="mark_05"></a> RDD Acciones de modificación.

### [Index](#index)

In [25]:
from pyspark import SparkContext

In [26]:
sc_02=SparkContext(master="local", appName="acciones_de_modificacion")

In [27]:
path="./curso_spark_git_clone/curso-apache-spark-platzi/files/"

#creand el rdd equipos_olimplicos_rdd
equipos_olimpicos_rdd_02=sc_02.textFile(path+"paises.csv")\
    .map(lambda line: line.split(","))

In [28]:
equipos_olimpicos_rdd_02.take(15)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG'],
 ['6', 'Akatonbo', 'IRL'],
 ['7', 'Alain IV', 'SUI'],
 ['8', 'Albania', 'ALB'],
 ['9', 'Alcaid', 'POR'],
 ['10', 'Alcyon-6', 'FRA'],
 ['11', 'Alcyon-7', 'FRA'],
 ['12', 'Aldebaran', 'ITA'],
 ['13', 'Aldebaran II', 'ITA'],
 ['14', 'Aletta', 'IRL']]

In [29]:
#lista de los países, tomo el index 2
lista_paises=equipos_olimpicos_rdd_02.map(lambda x: x[2])


In [30]:
lista_paises.take(15)#como vemos acá hay duplicados.

['sigla',
 'AUT',
 'MEX',
 'MEX',
 'ARG',
 'AFG',
 'IRL',
 'SUI',
 'ALB',
 'POR',
 'FRA',
 'FRA',
 'ITA',
 'ITA',
 'IRL']

In [31]:
#quitando los duplicados
lista_paises_distinct=lista_paises.distinct()

In [32]:
lista_paises_distinct.take(15)

['sigla',
 'AUT',
 'MEX',
 'ARG',
 'AFG',
 'IRL',
 'SUI',
 'ALB',
 'POR',
 'FRA',
 'ITA',
 'ALG',
 'SWE',
 'URS',
 'AUS']

In [33]:
#contando la cantidad de países
lista_paises_count=lista_paises_distinct.count()

In [34]:
print(f"cantidad total de paises: {lista_paises_count}")

cantidad total de paises: 231


In [35]:
equipos_por_pais=equipos_olimpicos_rdd_02.map(lambda x: (x[2],x[1]))

In [36]:
equipos_por_pais.take(15)

[('sigla', 'equipo'),
 ('AUT', '30. Februar'),
 ('MEX', 'A North American Team'),
 ('MEX', 'Acipactli'),
 ('ARG', 'Acturus'),
 ('AFG', 'Afghanistan'),
 ('IRL', 'Akatonbo'),
 ('SUI', 'Alain IV'),
 ('ALB', 'Albania'),
 ('POR', 'Alcaid'),
 ('FRA', 'Alcyon-6'),
 ('FRA', 'Alcyon-7'),
 ('ITA', 'Aldebaran'),
 ('ITA', 'Aldebaran II'),
 ('IRL', 'Aletta')]

In [45]:
equipos_por_pais=equipos_olimpicos_rdd_02.map(lambda x: (x[2],x[1]))
equipos_por_pais.groupByKey()

#observar que se agruparon por "sigla"

PythonRDD[38] at RDD at PythonRDD.scala:53

In [42]:
equipos_por_pais.take(15)

[('sigla', 'equipo'),
 ('AUT', '30. Februar'),
 ('MEX', 'A North American Team'),
 ('MEX', 'Acipactli'),
 ('ARG', 'Acturus'),
 ('AFG', 'Afghanistan'),
 ('IRL', 'Akatonbo'),
 ('SUI', 'Alain IV'),
 ('ALB', 'Albania'),
 ('POR', 'Alcaid'),
 ('FRA', 'Alcyon-6'),
 ('FRA', 'Alcyon-7'),
 ('ITA', 'Aldebaran'),
 ('ITA', 'Aldebaran II'),
 ('IRL', 'Aletta')]

In [46]:
equipos_por_pais=equipos_olimpicos_rdd_02\
    .map(lambda x: (x[2],x[1]))\
    .groupByKey()\
    .mapValues(len)
'''en este caso "mapValues()" genera una lista con un país y su lista de equipos, luego
"len" nos dá la longitud de esa lista, o sea la cantidad de equipos por país'''

'en este caso "mapValues()" genera una lista con un país y su lista de equipos, luego\n"len" nos dá la longitud de esa lista, o sea la cantidad de equipos por país'

In [47]:
equipos_por_pais.take(15)

[('sigla', 1),
 ('AUT', 11),
 ('MEX', 9),
 ('ARG', 18),
 ('AFG', 1),
 ('IRL', 7),
 ('SUI', 17),
 ('ALB', 1),
 ('POR', 21),
 ('FRA', 155),
 ('ITA', 36),
 ('ALG', 1),
 ('SWE', 52),
 ('URS', 16),
 ('AUS', 23)]

In [48]:
equipos_por_pais=equipos_olimpicos_rdd_02\
    .map(lambda x: (x[2],x[1]))\
    .groupByKey()\
    .mapValues(list)

In [49]:
equipos_por_pais.take(15)# acá se ve la lista de equipos por paises.

[('sigla', ['equipo']),
 ('AUT',
  ['30. Februar',
   'Austria',
   'Austria-1',
   'Austria-2',
   'Breslau',
   'Brigantia',
   'Donar III',
   'Evita VI',
   'May-Be 1960',
   '"R.-V. Germania; Leitmeritz"',
   'Surprise']),
 ('MEX',
  ['A North American Team',
   'Acipactli',
   'Chamukina',
   'Mexico',
   'Mexico-1',
   'Mexico-2',
   'Nausikaa 4',
   'Tlaloc',
   'Xolotl']),
 ('ARG',
  ['Acturus',
   'Antares',
   'Arcturus',
   'Ardilla',
   'Argentina',
   'Argentina-1',
   'Argentina-2',
   'Blue Red',
   'Covunco III',
   'Cupidon III',
   'Djinn',
   'Gullvinge',
   'Matrero II',
   'Mizar',
   'Pampero',
   'Rampage',
   'Tango',
   'Wiking']),
 ('AFG', ['Afghanistan']),
 ('IRL',
  ['Akatonbo',
   'Aletta',
   'Ireland',
   'Ireland-1',
   'Ireland-2',
   'The Cloud',
   'Three Leaves']),
 ('SUI',
  ['Alain IV',
   'Ali-Baba IV',
   'Ali-Baba IX',
   'Ali-Baba VI',
   'Baccara',
   'Ballerina IV',
   'Fantasio III',
   'Kln',
   'Lerina',
   'Pousse-Moi Pas VII',
   'Switz

In [50]:
equipos_arg=equipos_olimpicos_rdd_02.filter(lambda line: "ARG" in line)

In [51]:
equipos_arg.collect()

[['4', 'Acturus', 'ARG'],
 ['37', 'Antares', 'ARG'],
 ['42', 'Arcturus', 'ARG'],
 ['43', 'Ardilla', 'ARG'],
 ['45', 'Argentina', 'ARG'],
 ['46', 'Argentina-1', 'ARG'],
 ['47', 'Argentina-2', 'ARG'],
 ['119', 'Blue Red', 'ARG'],
 ['238', 'Covunco III', 'ARG'],
 ['252', 'Cupidon III', 'ARG'],
 ['288', 'Djinn', 'ARG'],
 ['436', 'Gullvinge', 'ARG'],
 ['644', 'Matrero II', 'ARG'],
 ['672', 'Mizar', 'ARG'],
 ['774', 'Pampero', 'ARG'],
 ['843', 'Rampage', 'ARG'],
 ['1031', 'Tango', 'ARG'],
 ['1162', 'Wiking', 'ARG']]

In [52]:
sc_02.stop()

### <a name="mark_06"></a> Por Qué No Usar "collect()".

### [Index](#index)

- "collect()" toma todos los RDD distribuidos, que contengan los datos que se buscan, son enviados a la computadora que ejecutó el "collect()", esto en Big Data va a generar problemas al momento de intentar procesar grandes cantidades de datos.

- Una buena alternativa para poder contar la cantidad de datos que tenemos es usar un countApprox(milisegundos), esta instrucción solo cuenta hasta 20 miliseg de proceso y arroja el valor.
```python
cant_aprox_datos = equipos_olimpicos_rdd_02.countApprox(20)
```

### <a name="mark_07"></a> Acciones de conteo sobre 2 o más RDDs.

### [Index](#index)

In [53]:
#visualizando los RDDs que tenemos
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

total 14520
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  9 09:38 .
drwxr-xr-x 5 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  7 13:11 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01     946 Oct  7 13:11 deporte.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2764536 Oct  7 13:11 deportista.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2807091 Oct  7 13:11 deportista2.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2776782 Oct  7 13:11 deportistaError.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   33262 Oct  7 13:11 evento.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    1978 Oct  7 13:11 juegos.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01  255853 Oct  7 13:11 modelo_relacional.jpg
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   23606 Oct  7 13:11 paises.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 6172796 Oct  7 13:11 resultados.csv


In [None]:
#"deportista.csv" contiene el encabezado, y "deportista2.csv" sin encabezado

In [69]:
from pyspark import SparkContext

In [70]:
sc_03=SparkContext(master="local", appName="acciones_en_2_rdd")

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=new_rdd, master=local) created by __init__ at /tmp/ipykernel_489/6268609.py:1 

In [68]:
path="./curso_spark_git_clone/curso-apache-spark-platzi/files/"
deportista_encabezado=sc_03.textFile(path+"deportista.csv")\
    .map(lambda line: line.split(","))

deportista2_datos=sc_03.textFile(path+"deportista2.csv")\
    .map(lambda line: line.split(","))

paises=sc_03.textFile(path+"paises.csv")\
    .map(lambda line: line.split(","))

resultados=sc_03.textFile(path+"resultados.csv")\
    .map(lambda line: line.split(","))

NameError: name 'sc_03' is not defined

In [57]:
deportis_union=deportista_encabezado.union(deportista2_datos)

In [58]:
#validar por cantidades

print(f"cantidad en deportista.csv: {deportista_encabezado.count()}")
print(f"cantidad en deportista2.csv: {deportista2_datos.count()}")
suma=deportista_encabezado.count() + deportista2_datos.count()
print(f"suma de deportista.csv + deportista2.csv: {suma}")
print(f"total en deportis_union: {deportis_union.count()}")

cantidad en deportista.csv: 67787
cantidad en deportista2.csv: 67785
suma de deportista.csv + deportista2.csv: 135572
total en deportis_union: 135572


In [59]:
deportista_encabezado.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [60]:
deportista2_datos.take(5)

[['67787', 'Lee BongJu', '1', '27', '167', '56', '970'],
 ['67788', 'Lee BuTi', '1', '23', '164', '54', '203'],
 ['67789', 'Anthony N. Buddy Lee', '1', '34', '172', '62', '1096'],
 ['67790', 'Alfred A. Butch Lee Porter', '1', '19', '186', '80', '825'],
 ['67791', 'Lee ByeongGu', '1', '22', '175', '68', '970']]

In [61]:
deportis_union.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [62]:
paises.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

### Realizamos un join entre "paises.csv" con su "id" y "deportis_union" con su "equipo_id"

In [69]:
paises_join_deportis=deportis_union\
    .map(lambda line:[line[-1],line[:-1]])

In [70]:
'''muestra "equipo_id" line[-1], el resto line[:-1]'''
paises_join_deportis.take(15)

[['equipo_id',
  ['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso']],
 ['199', ['1', 'A Dijiang', '1', '24', '180', '80']],
 ['199', ['2', 'A Lamusi', '1', '23', '170', '60']],
 ['273', ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0']],
 ['278', ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0']],
 ['705', ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82']],
 ['1096', ['6', 'Per Knut Aaland', '1', '31', '188', '75']],
 ['1096', ['7', 'John Aalberg', '1', '31', '183', '72']],
 ['705', ['8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0']],
 ['350', ['9', 'Antti Sami Aalto', '1', '26', '186', '96']],
 ['350', ['10', 'Einar Ferdinand Einari Aalto', '1', '26', '0', '0']],
 ['350', ['11', 'Jorma Ilmari Aalto', '1', '22', '182', '76.5']],
 ['350', ['12', 'Jyri Tapani Aalto', '1', '31', '172', '70']],
 ['350', ['13', 'Minna Maarit Aalto', '2', '30', '159', '55.5']],
 ['350', ['14', 'Pirjo Hannele Aalto Mattila ', '2', '32', '171', '65']]]

In [71]:
#Continuamos con el join
paises_join_deportis=deportis_union\
    .map(lambda line:[line[-1],line[:-1]])\
    .join(paises.map(lambda line:[line[0],line[2]]))#quiero el "id" y "sigla"

#Notar que se pierde el encabezado.

In [77]:
paises_join_deportis.take(5)

[('199', (['1', 'A Dijiang', '1', '24', '180', '80'], 'CHN')),
 ('199', (['2', 'A Lamusi', '1', '23', '170', '60'], 'CHN')),
 ('199', (['602', 'Abudoureheman', '1', '22', '182', '75'], 'CHN')),
 ('199', (['1463', 'Ai Linuer', '1', '25', '160', '62'], 'CHN')),
 ('199', (['1464', 'Ai Yanhan', '2', '14', '168', '54'], 'CHN'))]

## Intento de re-organizar "paises_join_deportis" en una lista, sin listas adentro.

In [91]:
deportis_ordenados=deportis_union\
    .map(lambda line: (line[-1],*line[:-1]))#logré colocar todo en una tupla

paises_ordenados=paises.map(lambda line: (line[0], line[2]))

join_final=paises_ordenados.join(deportis_ordenados)#no puedo unirlos

In [79]:
deportis_ordenados.take(15)

[('equipo_id', 'deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso'),
 ('199', '1', 'A Dijiang', '1', '24', '180', '80'),
 ('199', '2', 'A Lamusi', '1', '23', '170', '60'),
 ('273', '3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0'),
 ('278', '4', 'Edgar Lindenau Aabye', '1', '34', '0', '0'),
 ('705', '5', 'Christine Jacoba Aaftink', '2', '21', '185', '82'),
 ('1096', '6', 'Per Knut Aaland', '1', '31', '188', '75'),
 ('1096', '7', 'John Aalberg', '1', '31', '183', '72'),
 ('705', '8', 'Cornelia Cor Aalten Strannood ', '2', '18', '168', '0'),
 ('350', '9', 'Antti Sami Aalto', '1', '26', '186', '96'),
 ('350', '10', 'Einar Ferdinand Einari Aalto', '1', '26', '0', '0'),
 ('350', '11', 'Jorma Ilmari Aalto', '1', '22', '182', '76.5'),
 ('350', '12', 'Jyri Tapani Aalto', '1', '31', '172', '70'),
 ('350', '13', 'Minna Maarit Aalto', '2', '30', '159', '55.5'),
 ('350', '14', 'Pirjo Hannele Aalto Mattila ', '2', '32', '171', '65')]

In [80]:
paises_ordenados.take(15)

[('id', 'sigla'),
 ('1', 'AUT'),
 ('2', 'MEX'),
 ('3', 'MEX'),
 ('4', 'ARG'),
 ('5', 'AFG'),
 ('6', 'IRL'),
 ('7', 'SUI'),
 ('8', 'ALB'),
 ('9', 'POR'),
 ('10', 'FRA'),
 ('11', 'FRA'),
 ('12', 'ITA'),
 ('13', 'ITA'),
 ('14', 'IRL')]

In [92]:
join_final.take(15)



[('5', ('AFG', '502')),
 ('5', ('AFG', '1076')),
 ('5', ('AFG', '1101')),
 ('5', ('AFG', '1745')),
 ('5', ('AFG', '4628')),
 ('5', ('AFG', '5285')),
 ('5', ('AFG', '5582')),
 ('5', ('AFG', '5678')),
 ('5', ('AFG', '5679')),
 ('5', ('AFG', '5841')),
 ('5', ('AFG', '5844')),
 ('5', ('AFG', '6261')),
 ('5', ('AFG', '6280')),
 ('5', ('AFG', '6282')),
 ('5', ('AFG', '6323'))]

## Fin intento

### takeSample()

In [93]:
#takeSample(repetidos,cantidad_datos,semilla_aleatorea), la semilla_aleatorea es la muestra a tomar
paises_join_deportis.takeSample(False,15,25)

[('45', (['8843', 'Walter Antonio Bauza', '1', '44', '174', '85'], 'ARG')),
 ('80', (['21890', 'Eldece ClarkeLewis', '2', '19', '165', '58'], 'BAH')),
 ('66', (['12968', 'Thomas Patrick Tom Bolger', '1', '23', '0', '0'], 'AUS')),
 ('970', (['67911', 'Lee HakJa', '2', '23', '165', '58'], 'KOR')),
 ('982',
  (['98732', 'Enrique Quique Ramos Gonzlez', '1', '24', '0', '0'], 'ESP')),
 ('413',
  (['5551', 'Henry Sherard Osborn Ashington', '1', '20', '0', '0'], 'GBR')),
 ('705',
  (['44456', 'Paul Vincent Nicholas Haarhuis', '1', '26', '188', '80'],
   'NED')),
 ('1096',
  (['78838', 'Eugene Leroy Roy Mercer', '1', '23', '180', '80'], 'USA')),
 ('742', (['14449', 'Knut Tore Br', '1', '23', '187', '68'], 'NOR')),
 ('66', (['8326', 'Clive Barton', '1', '28', '189', '85'], 'AUS')),
 ('199', (['110045', 'Shu Siyao', '2', '23', '167', '52'], 'CHN')),
 ('413', (['15576', 'Robert Harry Brown', '1', '34', '0', '0'], 'GBR')),
 ('656', (['37457', 'Rosa Fuentes', '2', '18', '165', '60'], 'MEX')),
 ('810

In [94]:
paises_join_deportis.top(2)#Ojo se perdió el encabezado.

[('999', (['92679', 'Trygve Bjarne Pedersen', '1', '35', '0', '0'], 'NOR')),
 ('999', (['1144', 'Henrik Agersborg', '1', '47', '0', '0'], 'NOR'))]

In [95]:
resultados.take(15)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4'],
 ['5', 'NA', '5', '36', '5'],
 ['6', 'NA', '5', '36', '6'],
 ['7', 'NA', '5', '38', '5'],
 ['8', 'NA', '5', '38', '6'],
 ['9', 'NA', '5', '40', '5'],
 ['10', 'NA', '5', '40', '6'],
 ['11', 'NA', '6', '38', '7'],
 ['12', 'NA', '6', '38', '8'],
 ['13', 'NA', '6', '38', '9'],
 ['14', 'NA', '6', '38', '10']]

In [96]:
resultados=resultados.filter(lambda line: "NA" not in line[1])

In [97]:
resultados.take(15)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['4', 'Gold', '4', '2', '4'],
 ['38', 'Bronze', '15', '7', '19'],
 ['39', 'Bronze', '15', '7', '20'],
 ['41', 'Bronze', '16', '50', '14'],
 ['42', 'Bronze', '17', '17', '21'],
 ['43', 'Gold', '17', '17', '22'],
 ['45', 'Gold', '17', '17', '24'],
 ['49', 'Gold', '17', '17', '28'],
 ['51', 'Bronze', '17', '19', '22'],
 ['61', 'Gold', '20', '38', '32'],
 ['62', 'Bronze', '20', '38', '33'],
 ['64', 'Silver', '20', '40', '31'],
 ['65', 'Bronze', '20', '40', '32'],
 ['68', 'Silver', '20', '40', '35']]

In [98]:
resultados.map(lambda line: [line[2]])\
    .take(15)

[['deportista_id'],
 ['4'],
 ['15'],
 ['15'],
 ['16'],
 ['17'],
 ['17'],
 ['17'],
 ['17'],
 ['17'],
 ['20'],
 ['20'],
 ['20'],
 ['20'],
 ['20']]

In [99]:
'''deportista_medallas=paises_join_deportis\
    .map(lambda line: [line[1][0][0]])\
    .join(resultados.map(lambda line: [line[2]]))'''

deportista_medallas=paises_join_deportis\
    .join(resultados)

In [100]:
deportista_medallas.take(15)

                                                                                

[('1090',
  ((['9150', 'Yelena Yuryevna Bechke Petrova Ellis ', '2', '26', '158', '48'],
    'EUN'),
   'Bronze')),
 ('1090',
  ((['9819', 'Gennady Vladimirovich Belyakov', '1', '23', '171', '84'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['41896', 'Andrey Vladimirovich Gorokhov', '1', '23', '185', '92'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['64345', 'Sergey Valeryevich Kruglov', '1', '31', '0', '0'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['70906', 'Igor Vladimirovich Lobanov', '1', '22', '181', '78'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['90775', 'Irina Vladimirovna Palina', '2', '22', '162', '60'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['91898', 'Aleksandr Pashkov', '1', '38', '190', '90'], 'EUN'), 'Bronze')),
 ('1090',
  ((['94109', 'Denis Alekseyevich Petrov', '1', '23', '182', '77'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['94120', 'Oleg Yuryevich Petrov', '1', '24', '175', '95'], 'EUN'),
   'Bronze')),
 ('1090',
  ((['116299', 'Oleg Borisovich Sukhoruchenko', '1', '26', '180', '83'],
 

### <a name="mark_08"></a> Operaciones numéricas.

### [Index](#index)

In [101]:
valores_por_medallas={
    'Gold': 7,
    'Silver': 5,
    'Bronze':4
}

In [109]:
#realizando extracción:
paises_medallas=deportista_medallas.map(lambda line: (line[1][0][1], valores_por_medallas[line[1][1]]))

In [110]:
paises_medallas.take(15)

[('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('EUN', 4),
 ('BDI', 4),
 ('BDI', 4),
 ('BDI', 4)]

In [111]:
from operator import add #nos permite sumar

In [112]:
conclusion=paises_medallas.reduceByKey(add)\
    .sortBy(lambda line: line[1], ascending=False)

'''reduceByKey(add): Aplicar reduceByKey con add significa que deseas agregar los valores (números de medallas) que tienen 
la misma clave (país). En este caso, add es una función que simplemente suma dos números.'''

                                                                                

'reduceByKey(add): Aplicar reduceByKey con add significa que deseas agregar los valores (números de medallas) que tienen \nla misma clave (país). En este caso, add es una función que simplemente suma dos números.'

In [113]:
conclusion.take(10)

[('CAN', 32538),
 ('ARG', 12520),
 ('HUN', 10860),
 ('MEX', 6124),
 ('RSA', 3788),
 ('BLR', 3580),
 ('LTU', 1535),
 ('MGL', 1460),
 ('USA', 1342),
 ('AZE', 1218)]

In [114]:
sc_03.stop()

### <a name="mark_09"></a> DataFrames.

### [Index](#index)

![](img_19.png)
**-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------**

In [115]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType,FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [116]:
spark=SparkContext(master="local", appName="DataFrames")
sqlContext=SQLContext(spark)



In [117]:
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

total 14520
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  9 09:38 .
drwxr-xr-x 5 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  7 13:11 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01     946 Oct  7 13:11 deporte.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2764536 Oct  7 13:11 deportista.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2807091 Oct  7 13:11 deportista2.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2776782 Oct  7 13:11 deportistaError.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   33262 Oct  7 13:11 evento.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    1978 Oct  7 13:11 juegos.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01  255853 Oct  7 13:11 modelo_relacional.jpg
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   23606 Oct  7 13:11 paises.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 6172796 Oct  7 13:11 resultados.csv


In [118]:
#Analizando archivo desde Linux
!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


### Antes de cargar el archivo primero creamos el Schema, utilizamos StructFile() para definir el nombre y tipo de columna dentro de StructType().
- StructFile("nombre_columna", tipo_dato, ¿es_obligatorio?) 

In [119]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("nombre_juego", StringType(), False),
    StructField("anio", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

#Cargando el archivo
path="./curso_spark_git_clone/curso-apache-spark-platzi/files/"

juegoDF=sqlContext.read.schema(juegoSchema)\
    .option("header", "true").csv(path+"juegos.csv")

In [120]:
juegoDF.take(5)#La vista no es muy agradable, por eso usamos show()

23/10/23 14:17:36 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego, annio, temporada, ciudad
 Schema: juego_id, nombre_juego, anio, temporada, ciudad
Expected: juego_id but found: 
CSV file: file:///home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/juegos.csv


[Row(juego_id=1, nombre_juego='1896 Verano', anio='1896', temporada='Verano', ciudad='Athina'),
 Row(juego_id=2, nombre_juego='1900 Verano', anio='1900', temporada='Verano', ciudad='Paris'),
 Row(juego_id=3, nombre_juego='1904 Verano', anio='1904', temporada='Verano', ciudad='St. Louis'),
 Row(juego_id=4, nombre_juego='1906 Verano', anio='1906', temporada='Verano', ciudad='Athina'),
 Row(juego_id=5, nombre_juego='1908 Verano', anio='1908', temporada='Verano', ciudad='London')]

In [121]:
juegoDF.show(15)

+--------+-------------+----+---------+--------------------+
|juego_id| nombre_juego|anio|temporada|              ciudad|
+--------+-------------+----+---------+--------------------+
|       1|  1896 Verano|1896|   Verano|              Athina|
|       2|  1900 Verano|1900|   Verano|               Paris|
|       3|  1904 Verano|1904|   Verano|           St. Louis|
|       4|  1906 Verano|1906|   Verano|              Athina|
|       5|  1908 Verano|1908|   Verano|              London|
|       6|  1912 Verano|1912|   Verano|           Stockholm|
|       7|  1920 Verano|1920|   Verano|           Antwerpen|
|       8|1924 Invierno|1924| Invierno|            Chamonix|
|       9|  1924 Verano|1924|   Verano|               Paris|
|      10|1928 Invierno|1928| Invierno|        Sankt Moritz|
|      11|  1928 Verano|1928|   Verano|           Amsterdam|
|      12|1932 Invierno|1932| Invierno|         Lake Placid|
|      13|  1932 Verano|1932|   Verano|         Los Angeles|
|      14|1936 Invierno|

23/10/23 14:17:44 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego, annio, temporada, ciudad
 Schema: juego_id, nombre_juego, anio, temporada, ciudad
Expected: juego_id but found: 
CSV file: file:///home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/juegos.csv


In [71]:
spark

NameError: name 'spark' is not defined

### Si revisamos los "jobs --> DAGs" podemos ver como el "Agente" de Spark (optimizador) realizó distintas acciones.

![](img_20.png)

- Scan csv --> Escanea el archivo
- WholeStageCodegen --> Generación de código interna (chequeo de código)
- mapPartitionsInternal --> Escaneo de las particiones

**-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------**

In [123]:
spark.stop()

### <a name="mark_10"></a> Inferencia tipos de datos.

### [Index](#index)

- Vamos a trabajar a partir de un RDD pasandolo a un DF.

In [1]:
from pyspark import SparkContext

In [2]:
sc_04=SparkContext(master="local", appName="new_rdd")

23/10/26 09:53:08 WARN Utils: Your hostname, CompuDell01 resolves to a loopback address: 127.0.1.1; using 192.168.29.145 instead (on interface eth0)
23/10/26 09:53:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 09:53:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
path="./curso_spark_git_clone/curso-apache-spark-platzi/files/"

deportis_encabezado=sc_04.textFile(path+"deportista.csv")\
    .map(lambda line: line.split(","))

deportis_datos=sc_04.textFile(path+"deportista2.csv")\
    .map(lambda line: line.split(","))

datos_deportis=deportis_encabezado.union(deportis_datos)

In [5]:
datos_deportis.take(5)

                                                                                

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [4]:
#Creamos una función para eliminar el encabezado.
def elimna_encabezado(indice, iterador):
    return iter(list(iterador)[1:])
'''
iter() devuelve valor a valor lo que procesamos
list(iterador)[:1] tomará todo el rdd menos la primer lista
'''

'\niter() devuelve valor a valor lo que procesamos\nlist(iterador)[:1] tomará todo el rdd menos la primer lista\n'

In [7]:
datos_deportis=datos_deportis.mapPartitionsWithIndex(elimna_encabezado)
'''
mapPartitionsWithIndex() a la función aplicada (elimna_encabezado), se le va a pasar 2 parámetros,
primero, toda la columna y segundo, un valor por índice
'''

'\nmapPartitionsWithIndex() a la función aplicada (elimna_encabezado), se le va a pasar 2 parámetros,\nprimero, toda la columna y segundo, un valor por índice\n'

In [8]:
#rdd sin encabezado
datos_deportis.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [9]:
#Antes de transformar el RDD hay que transformar los datos del RDD.
datos_deportis=datos_deportis.map(lambda line: (
    int(line[0]),
    line[1], #queda como str
    int(line[2]),
    int(line[3]),
    float(line[4]),#es la columna de altura
    float(line[5]),#es la columna de peso
    int(line[6])
))

In [8]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType,FloatType
from pyspark.sql.types import Row

from pyspark.sql import SQLContext

In [11]:
#creando el schema 
schema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(),False),
    StructField("genero", IntegerType(),False),
    StructField("edad", IntegerType(),False),
    StructField("altura",FloatType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
    
])

In [12]:
#aplicando el schema al rdd, SQLContext() nos permite crear un DF a partir de nuestro RDD y schema
sqlContext=SQLContext(sc_04)



In [13]:
deportis_df=sqlContext.createDataFrame(datos_deportis, schema)

In [14]:
deportis_df.show(15)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24| 180.0|80.0|      199|
|            2|            A Lamusi|     1|  23| 170.0|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|   0.0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|   0.0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|
|            6|     Per Knut Aaland|     1|  31| 188.0|75.0|     1096|
|            7|        John Aalberg|     1|  31| 183.0|72.0|     1096|
|            8|Cornelia Cor Aalt...|     2|  18| 168.0| 0.0|      705|
|            9|    Antti Sami Aalto|     1|  26| 186.0|96.0|      350|
|           10|Einar Ferdinand E...|     1|  26|   0.0| 0.0|      350|
|           11|  Jorma Ilmari Aalto|     1|  22| 182.0|76.5|      350|
|     

### Otra forma muy sencilla de inferir un schema en una línea que da un resultado muy parecido.

In [15]:
deportis_ts = sqlContext.read.csv(path+"deportista.csv", inferSchema=True, header=True)

In [16]:
deportis_ts.show(15)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
|            6|     Per Knut Aaland|     1|  31|   188|75.0|     1096|
|            7|        John Aalberg|     1|  31|   183|72.0|     1096|
|            8|Cornelia Cor Aalt...|     2|  18|   168| 0.0|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|96.0|      350|
|           10|Einar Ferdinand E...|     1|  26|     0| 0.0|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

### Comparando los schemas podemos observar esas diferencias.

In [16]:
deportis_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: float (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [17]:
deportis_ts.printSchema()

root
 |-- deportista_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- edad: integer (nullable = true)
 |-- altura: integer (nullable = true)
 |-- peso: double (nullable = true)
 |-- equipo_id: integer (nullable = true)



### Pasando de RDD a DF los demas archivos.
- deporte.csv
- deportistaError.csv
- evento.csv
- juegos.csv
- paises.csv
- resultados.csv

In [18]:
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

total 14520
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  9 09:38 .
drwxr-xr-x 5 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  7 13:11 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01     946 Oct  7 13:11 deporte.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2764536 Oct  7 13:11 deportista.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2807091 Oct  7 13:11 deportista2.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2776782 Oct  7 13:11 deportistaError.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   33262 Oct  7 13:11 evento.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    1978 Oct  7 13:11 juegos.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01  255853 Oct  7 13:11 modelo_relacional.jpg
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   23606 Oct  7 13:11 paises.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 6172795 Oct 24 09:55 resultados.csv


### deportes.csv desde RDD a DF

In [19]:
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/deporte.csv
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/deportistaError.csv
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/evento.csv
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/juegos.csv
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/paises.csv
!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/resultados.csv

resultado_id,medalla,deportista_id,juego_id,evento_id
1,NA,1,39,1
2,NA,2,49,2
3,NA,3,7,3
4,Gold,4,2,4


In [17]:
#creando los rdd
deporte_rdd=sc_04.textFile(path+"deporte.csv")\
    .map(lambda line: line.split(","))

deportistaError_rdd=sc_04.textFile(path+"deportistaError.csv")\
    .map(lambda line: line.split(","))

evento_rdd=sc_04.textFile(path+"evento.csv")\
    .map(lambda line: line.split(","))

juegos_rdd=sc_04.textFile(path+"juegos.csv")\
    .map(lambda line: line.split(","))

paises_rdd=sc_04.textFile(path+"paises.csv")\
    .map(lambda line: line.split(","))

resultados_rdd=sc_04.textFile(path+"resultados.csv")\
    .map(lambda line: line.split(","))

In [18]:
resultados_rdd.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4']]

In [19]:
#utilizando la función eliminar_encabezado

deporte_rdd_sin_encabezado=deporte_rdd.mapPartitionsWithIndex(elimna_encabezado)

deportistaError_rdd_sin_encabezado=deportistaError_rdd.mapPartitionsWithIndex(elimna_encabezado)

evento_rdd_sin_encabezado=evento_rdd.mapPartitionsWithIndex(elimna_encabezado)

juegos_rdd_sin_encabezado=juegos_rdd.mapPartitionsWithIndex(elimna_encabezado)

paises_rdd_sin_encabezado=paises_rdd.mapPartitionsWithIndex(elimna_encabezado)

resultados_rdd_sin_encabezado=resultados_rdd.mapPartitionsWithIndex(elimna_encabezado)

'''
mapPartitionsWithIndex() a la función aplicada se le paso 2 parámetros, toda la columna y un valor
por índice
'''

'\nmapPartitionsWithIndex() a la función aplicada se le paso 2 parámetros, toda la columna y un valor\npor índice\n'

In [20]:
#Antes de transformar el RDD hay que transformar los datos del RDD.

deporte_rdd_sin_encabezado=deporte_rdd_sin_encabezado.map(lambda line: (
    int(line[0]),
    line[1] #queda como str
))

#En line[4] y line[5] se transforman los valores vacios '' en 0
deportistaError_rdd_sin_encabezado=deportistaError_rdd_sin_encabezado.map(lambda line:( 
    int(line[0]),
    line[1],
    int(line[2]),
    int(line[3]),
    float(line[4]) if line[4]!='' else 0.0,
    float(line[5]) if line[5]!='' else 0.0,
    int(line[6]) 
))
 #line[1]+line[2] if len(line) == 4 else (line[1]+line[2]+line[3] if len(line) == 5 else line[1]),
evento_rdd_sin_encabezado=evento_rdd_sin_encabezado.map(lambda line:(
    int(line[0]),
    line[1]+line[2] if len(line) > 3 else line[1],
    int(line[3]) if len(line) > 3 else int(line[2])
    
))
'''
Hay un error en el archivo original "evento_rdd"
[['evento_id', 'evento', 'deporte_id'],
 ['1', "Basketball Men's Basketball", '1'],
 ['2', "Judo Men's Extra-Lightweight", '2'],
 ['3', "Football Men's Football", '3'],
 ['4', "Tug-Of-War Men's Tug-Of-War", '4'],
 ['5', "Speed Skating Women's 500 metres", '5'],
 ['6', '"Speed Skating Women\'s 1', '000 metres"', '5'], esta última lista contiene una "," de más
 lo que no es correcto, entonces el código identifica si la lista tiene una logitud mayor a 3 
 , si es así une line[1] con line[2], y luego para la tercer columna utiliza un indice line[3] 
 para completar correctamente el RDD.
'''
juegos_rdd_sin_encabezado=juegos_rdd_sin_encabezado.map(lambda line:(
    int(line[0]),
    line[1],
    int(line[2]),
    line[3],
    line[4]
))

paises_rdd_sin_encabezado=paises_rdd_sin_encabezado.map(lambda line:(
    int(line[0]),
    line[1],
    line[2]    
))

resultados_rdd_sin_encabezado=resultados_rdd_sin_encabezado.map(lambda line:(
    int(line[0]),
    line[1],
    int(line[2]),
    int(line[3]),
    int(line[4])    
))

In [21]:
resultados_rdd_sin_encabezado.take(5)

[(1, 'NA', 1, 39, 1),
 (2, 'NA', 2, 49, 2),
 (3, 'NA', 3, 7, 3),
 (4, 'Gold', 4, 2, 4),
 (5, 'NA', 5, 36, 5)]

In [22]:
#creando el schema 
schema_deporte_rdd = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("deporte", StringType(),False)    
])

schema_deportistaError_rdd=StructType([
    StructField("deportista_id", IntegerType(),False),
    StructField("nombre", StringType(),False),
    StructField("genero", IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",FloatType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

schema_evento_rdd=StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("evento",StringType(),False),
    StructField("deporte_id",IntegerType(),False)
])

schema_juegos_rdd=StructType([
    StructField("juego_id",IntegerType(),False),
    StructField("nombre_juego",StringType(),False),
    StructField("anio",IntegerType(),False),
    StructField("temporada",StringType(),False),
    StructField("ciudad",StringType(),False)
])

schema_paises_rdd=StructType([
    StructField("pais_id",IntegerType(),False),
    StructField("equipo",StringType(),False),
    StructField("sigla",StringType(),False)
])

schema_resultados_rdd=StructType([
    StructField("resultado_id", IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])

In [23]:
deporte_df=sqlContext.createDataFrame(deporte_rdd_sin_encabezado, schema_deporte_rdd)

deportistaError_df=sqlContext.createDataFrame(deportistaError_rdd_sin_encabezado, schema_deportistaError_rdd)

evento_df=sqlContext.createDataFrame(evento_rdd_sin_encabezado, schema_evento_rdd)

juegos_df=sqlContext.createDataFrame(juegos_rdd_sin_encabezado, schema_juegos_rdd)

paises_df=sqlContext.createDataFrame(paises_rdd_sin_encabezado, schema_paises_rdd)

resultados_df=sqlContext.createDataFrame(resultados_rdd_sin_encabezado, schema_resultados_rdd)

In [24]:
#deportis_df.show(5)
#deporte_df.show(5)
#deportistaError_df.show(5)
#evento_df.show(5)
#juegos_df.show(5)
#paises_df.show(5)
resultados_df.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



### <a name="mark_11"></a> Operaciones sobre DF.

### <a name="mark_12"></a> printSchema().

### [Index](#index)


In [28]:
deportis_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: float (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



### <a name="mark_13"></a> withColumnRenamed("current_name", "new_name").drop("column_name")

### [Index](#index)

- Nota_01: Se combinan "withColumnRenamed()" y "drop()"
- Nota_02: Esto genera un nuevo df, no modifica el original.

In [29]:
#quiero reemplazar los nombre de ciertas columnas.

deportis_df_01=deportis_df.withColumnRenamed("genero", "sexo").drop("altura")

In [30]:
deportis_df_01.printSchema() 

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



### <a name="mark_14"></a> select(), col("column_name").alias("new_name")

### [Index](#index)

- Para poder utilizar col() necesitamos importar desde sql.functions

In [25]:
from pyspark.sql import functions as f

In [32]:
deportis_df_02=deportis_df_01.select(
    "deportista_id", 
    "nombre",
    f.col("edad").alias("edad_jugador"),
    "equipo_id"
)

In [33]:
deportis_df_02.show(15)

+-------------+--------------------+------------+---------+
|deportista_id|              nombre|edad_jugador|equipo_id|
+-------------+--------------------+------------+---------+
|            1|           A Dijiang|          24|      199|
|            2|            A Lamusi|          23|      199|
|            3| Gunnar Nielsen Aaby|          24|      273|
|            4|Edgar Lindenau Aabye|          34|      278|
|            5|Christine Jacoba ...|          21|      705|
|            6|     Per Knut Aaland|          31|     1096|
|            7|        John Aalberg|          31|     1096|
|            8|Cornelia Cor Aalt...|          18|      705|
|            9|    Antti Sami Aalto|          26|      350|
|           10|Einar Ferdinand E...|          26|      350|
|           11|  Jorma Ilmari Aalto|          22|      350|
|           12|   Jyri Tapani Aalto|          31|      350|
|           13|  Minna Maarit Aalto|          30|      350|
|           14|Pirjo Hannele Aal...|    

### <a name="mark_15"></a> sort("column_name")

### [Index](#index)

In [34]:
deportis_df_02.sort("edad_jugador").show(5)



+-------------+--------------------+------------+---------+
|deportista_id|              nombre|edad_jugador|equipo_id|
+-------------+--------------------+------------+---------+
|          133|           Franz Abb|           0|      399|
|        67900|      Lee GyeongSeop|           0|      970|
|          167|Ould Lamine Abdallah|           0|      362|
|        67991|        Lee JeongGyu|           0|      970|
|           66|     Mohamed Abakkar|           0|     1003|
+-------------+--------------------+------------+---------+
only showing top 5 rows



                                                                                

### <a name="mark_16"></a> filter(df.column_name logic_condition)

### [Index](#index)

In [35]:
deportis_df_02.filter(deportis_df_02.edad_jugador != 0).show(5)

+-------------+--------------------+------------+---------+
|deportista_id|              nombre|edad_jugador|equipo_id|
+-------------+--------------------+------------+---------+
|            1|           A Dijiang|          24|      199|
|            2|            A Lamusi|          23|      199|
|            3| Gunnar Nielsen Aaby|          24|      273|
|            4|Edgar Lindenau Aabye|          34|      278|
|            5|Christine Jacoba ...|          21|      705|
+-------------+--------------------+------------+---------+
only showing top 5 rows



In [36]:
deportis_df_02.filter(
    (deportis_df_02.edad_jugador != 0) & (deportis_df_02.edad_jugador == 24)
).show(5)

+-------------+-------------------+------------+---------+
|deportista_id|             nombre|edad_jugador|equipo_id|
+-------------+-------------------+------------+---------+
|            1|          A Dijiang|          24|      199|
|            3|Gunnar Nielsen Aaby|          24|      273|
|           24|   Nils Egil Aaness|          24|      742|
|           25|   Alf Lied Aanning|          24|      742|
|           31|    Evald rma rman |          24|      331|
+-------------+-------------------+------------+---------+
only showing top 5 rows



### <a name="mark_17"></a> Agrupaciones y operaciones join, multiple joins, select() sobre DF

### [Index](#index)

- Analizamos los schemas para realizar los joins.


In [37]:
deportis_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: float (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [38]:
resultados_df.printSchema()

root
 |-- resultado_id: integer (nullable = false)
 |-- medalla: string (nullable = false)
 |-- deportista_id: integer (nullable = false)
 |-- juego_id: integer (nullable = false)
 |-- evento_id: integer (nullable = false)



In [79]:
resultados_df_mod=resultados_df.withColumn("medalla", f.when(resultados_df["medalla"] == 'NA', 'sin medalla').otherwise(resultados_df["medalla"]))


In [80]:
resultados_df_mod.show(5)

+------------+-----------+-------------+--------+---------+
|resultado_id|    medalla|deportista_id|juego_id|evento_id|
+------------+-----------+-------------+--------+---------+
|           1|sin medalla|            1|      39|        1|
|           2|sin medalla|            2|      49|        2|
|           3|sin medalla|            3|       7|        3|
|           4|       Gold|            4|       2|        4|
|           5|sin medalla|            5|      36|        5|
+------------+-----------+-------------+--------+---------+
only showing top 5 rows



In [81]:
print(f'cantidad "deportis_df:" {deportis_df.count()}')
print(f'cantidad "resultados_df:" {resultados_df_mod.count()}') 

                                                                                

cantidad "deportis_df:" 135570


[Stage 66:>                                                         (0 + 1) / 1]

cantidad "resultados_df:" 271116


                                                                                

'\nAl momento de asignar los tipos de datos a rdd, "resultados_rdd" se rompió para algunas funciones de agregación,\ncomo count()\n'

In [84]:
'''
Una prueba realizando un join desde "resultados_df_mod" hacia "deportis_df" muestra que fuciona correctamente 
con un "left" pero no con un "right", se revisaron los csv encontrando valores "#N/A" en varias posiciones, reemplazando
esto valores, se solucionó el inconveniente.
'''

resultados_df_mod.join(deportis_df, resultados_df_mod.deportista_id==deportis_df.deportista_id, "right").show(15)

[Stage 80:>                                                         (0 + 1) / 2]

+------------+-----------+-------------+--------+---------+-------------+--------------------+------+----+------+----+---------+
|resultado_id|    medalla|deportista_id|juego_id|evento_id|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+------------+-----------+-------------+--------+---------+-------------+--------------------+------+----+------+----+---------+
|           1|sin medalla|            1|      39|        1|            1|           A Dijiang|     1|  24| 180.0|80.0|      199|
|           2|sin medalla|            2|      49|        2|            2|            A Lamusi|     1|  23| 170.0|60.0|      199|
|           3|sin medalla|            3|       7|        3|            3| Gunnar Nielsen Aaby|     1|  24|   0.0| 0.0|      273|
|           4|       Gold|            4|       2|        4|            4|Edgar Lindenau Aabye|     1|  34|   0.0| 0.0|      278|
|           5|sin medalla|            5|      36|        5|            5|Christine Jacoba ...|   



In [46]:
juegos_df.printSchema()

root
 |-- juego_id: integer (nullable = false)
 |-- nombre_juego: string (nullable = false)
 |-- anio: integer (nullable = false)
 |-- temporada: string (nullable = false)
 |-- ciudad: string (nullable = false)



In [47]:
evento_df.printSchema()

root
 |-- evento_id: integer (nullable = false)
 |-- evento: string (nullable = false)
 |-- deporte_id: integer (nullable = false)



In [26]:
#Primer join
deportis_df.join(resultados_df, deportis_df.deportista_id == resultados_df.deportista_id, "left")\
.show(15)

[Stage 10:>                                                         (0 + 1) / 1]

+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|resultado_id|medalla|deportista_id|juego_id|evento_id|
+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+
|            1|           A Dijiang|     1|  24| 180.0|80.0|      199|           1|     NA|            1|      39|        1|
|            2|            A Lamusi|     1|  23| 170.0|60.0|      199|           2|     NA|            2|      49|        2|
|            3| Gunnar Nielsen Aaby|     1|  24|   0.0| 0.0|      273|           3|     NA|            3|       7|        3|
|            4|Edgar Lindenau Aabye|     1|  34|   0.0| 0.0|      278|           4|   Gold|            4|       2|        4|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|           5|     NA|            5|      36|        5|


                                                                                

In [27]:
#Segundo join
deportis_df.join(resultados_df, deportis_df.deportista_id == resultados_df.deportista_id, "left")\
.join(juegos_df, resultados_df.juego_id == juegos_df.juego_id, "left")\
.show(15)



[Stage 16:>                                                         (0 + 1) / 1]

+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+--------+-------------+----+---------+-----------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|resultado_id|medalla|deportista_id|juego_id|evento_id|juego_id| nombre_juego|anio|temporada|     ciudad|
+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+--------+-------------+----+---------+-----------+
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|           9|     NA|            5|      40|        5|      40|1994 Invierno|1994| Invierno|Lillehammer|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|          10|     NA|            5|      40|        6|      40|1994 Invierno|1994| Invierno|Lillehammer|
|            6|     Per Knut Aaland|     1|  31| 188.0|75.0|     1096|          15|     NA|            6|      40|        7| 

                                                                                

In [28]:
#Tercer join
deportis_df.join(resultados_df, deportis_df.deportista_id == resultados_df.deportista_id, "left")\
.join(juegos_df, resultados_df.juego_id == juegos_df.juego_id, "left")\
.join(evento_df, resultados_df.evento_id == evento_df.evento_id, "left")\
.show(15)

[Stage 27:====(1 + 0) / 1][Stage 28:>   (0 + 1) / 1][Stage 29:>   (0 + 0) / 1]

+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+--------+-------------+----+---------+-----------+---------+--------------------+----------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|resultado_id|medalla|deportista_id|juego_id|evento_id|juego_id| nombre_juego|anio|temporada|     ciudad|evento_id|              evento|deporte_id|
+-------------+--------------------+------+----+------+----+---------+------------+-------+-------------+--------+---------+--------+-------------+----+---------+-----------+---------+--------------------+----------+
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|           9|     NA|            5|      40|        5|      40|1994 Invierno|1994| Invierno|Lillehammer|        5|Speed Skating Wom...|         5|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|      705|          10|     NA|            5|      40|        6|      40

                                                                                

In [29]:
#Realizando el select
deportis_df.join(resultados_df, deportis_df.deportista_id == resultados_df.deportista_id, "left")\
.join(juegos_df, resultados_df.juego_id == juegos_df.juego_id, "left")\
.join(evento_df, resultados_df.evento_id == evento_df.evento_id, "left")\
.select(deportis_df.nombre, f.col("edad").alias("edad_jugador"), "medalla", 
        f.col("anio").alias("año"), evento_df.evento.alias("nombre_disciplina"))\
.show(15)

                                                                                

+--------------------+------------+-------+----+--------------------+
|              nombre|edad_jugador|medalla| año|   nombre_disciplina|
+--------------------+------------+-------+----+--------------------+
|Christine Jacoba ...|          21|     NA|1994|Speed Skating Wom...|
|Christine Jacoba ...|          21|     NA|1994|"Speed Skating Wo...|
|     Per Knut Aaland|          31|     NA|1994|Cross Country Ski...|
|     Per Knut Aaland|          31|     NA|1994|Cross Country Ski...|
|           A Dijiang|          24|     NA|1992|Basketball Men's ...|
|            A Lamusi|          23|     NA|2012|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|          24|     NA|1920|Football Men's Fo...|
|Christine Jacoba ...|          21|     NA|1992|Speed Skating Wom...|
|Christine Jacoba ...|          21|     NA|1992|"Speed Skating Wo...|
|     Per Knut Aaland|          31|     NA|1992|Cross Country Ski...|
|     Per Knut Aaland|          31|     NA|1992|Cross Country Ski...|
|     Per Knut Aalan

### join para armar df con "medallas ganadoras" + "paises" + "equipos"

In [91]:
#deportis_df.show(5)
#deporte_df.show(5)
#deportistaError_df.show(5)
#evento_df.show(5)
#juegos_df.show(5)
paises_df.show(5)
#esultados_df.show(5)

+-------+--------------------+-----+
|pais_id|              equipo|sigla|
+-------+--------------------+-----+
|      1|         30. Februar|  AUT|
|      2|A North American ...|  MEX|
|      3|           Acipactli|  MEX|
|      4|             Acturus|  ARG|
|      5|         Afghanistan|  AFG|
+-------+--------------------+-----+
only showing top 5 rows



In [36]:
resultados_df.printSchema()

root
 |-- resultado_id: integer (nullable = false)
 |-- medalla: string (nullable = false)
 |-- deportista_id: integer (nullable = false)
 |-- juego_id: integer (nullable = false)
 |-- evento_id: integer (nullable = false)



In [37]:
paises_df.printSchema()

root
 |-- pais_id: integer (nullable = false)
 |-- equipo: string (nullable = false)
 |-- sigla: string (nullable = false)



In [60]:
deportis_df.printSchema()
#Para que las relaciones sean más claras, vamos a renombrar "equipo_id" por "pais_id"

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: float (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [64]:
#!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

In [65]:
#!head -n 5 ./curso_spark_git_clone/curso-apache-spark-platzi/files/deportista.csv

In [82]:
#Para deportis_df renombramos su columna "equipo_id" por "pais_id"
deportis_df_renamed = deportis_df.withColumnRenamed("equipo_id", "pais_id")
deportis_df_renamed.show(5)

#Pra "resultados_df" quitamos los resultados sin medallas "NA"
resultados_sin_NA_df = resultados_df.filter(f.col("medalla") != 'NA')
resultados_sin_NA_df.show(15)

+-------------+--------------------+------+----+------+----+-------+
|deportista_id|              nombre|genero|edad|altura|peso|pais_id|
+-------------+--------------------+------+----+------+----+-------+
|            1|           A Dijiang|     1|  24| 180.0|80.0|    199|
|            2|            A Lamusi|     1|  23| 170.0|60.0|    199|
|            3| Gunnar Nielsen Aaby|     1|  24|   0.0| 0.0|    273|
|            4|Edgar Lindenau Aabye|     1|  34|   0.0| 0.0|    278|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|    705|
+-------------+--------------------+------+----+------+----+-------+
only showing top 5 rows

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           4|   Gold|            4|       2|        4|
|          38| Bronze|           15|       7|       19|
|          39| Bronze|           15|       7|       20|
|          41| Bro

In [94]:
resultados_sin_NA_df.join(deportis_df_renamed, resultados_sin_NA_df.deportista_id==deportis_df_renamed.deportista_id, "left")\
.join(paises_df, deportis_df_renamed.pais_id ==  paises_df.pais_id, "left")\
.select("medalla", "sigla", "equipo")\
.sort(f.col("sigla").desc())\
.show(15)



+-------+-----+--------+
|medalla|sigla|  equipo|
+-------+-----+--------+
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
| Silver|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
| Silver|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
+-------+-----+--------+
only showing top 15 rows



                                                                                

### <a name="mark_18"></a> Funciones de agrupación

### [Index](#index)


In [99]:
deportis_df_renamed.show(5)
#deporte_df.show(5)
#deportistaError_df.show(5)
#evento_df.show(5)
#juegos_df.show(5)
#paises_df.show(5)
resultados_df.show(5)

+-------------+--------------------+------+----+------+----+-------+
|deportista_id|              nombre|genero|edad|altura|peso|pais_id|
+-------------+--------------------+------+----+------+----+-------+
|            1|           A Dijiang|     1|  24| 180.0|80.0|    199|
|            2|            A Lamusi|     1|  23| 170.0|60.0|    199|
|            3| Gunnar Nielsen Aaby|     1|  24|   0.0| 0.0|    273|
|            4|Edgar Lindenau Aabye|     1|  34|   0.0| 0.0|    278|
|            5|Christine Jacoba ...|     2|  21| 185.0|82.0|    705|
+-------------+--------------------+------+----+------+----+-------+
only showing top 5 rows

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   G

[Stage 169:>                                                        (0 + 1) / 1]                                                                                

In [109]:
medallistas_df = deportis_df_renamed\
.join(resultados_df, deportis_df_renamed.deportista_id == resultados_df.deportista_id, "left")\
.join(juegos_df, resultados_df.juego_id == juegos_df.juego_id, "left")\
.join(paises_df, deportis_df_renamed.pais_id == paises_df.pais_id, "left")\
.join(evento_df, resultados_df.evento_id == evento_df.evento_id, "left")\
.join(deporte_df, evento_df.deporte_id ==  deporte_df.deporte_id, "left")\
.select("sigla", "anio", "medalla", 
        evento_df.evento.alias("nombre_subdiciplina"),
        deporte_df.deporte.alias("nombre_disciplina"),
        deportis_df.nombre
       )

medallistas_df.show(5)

[Stage 266:>                (0 + 1) / 1][Stage 267:>                (0 + 0) / 1]

+-----+----+-------+--------------------+-----------------+--------------------+
|sigla|anio|medalla| nombre_subdiciplina|nombre_disciplina|              nombre|
+-----+----+-------+--------------------+-----------------+--------------------+
|  CHN|1992|     NA|Basketball Men's ...|       Basketball|           A Dijiang|
|  CHN|2012|     NA|Judo Men's Extra-...|             Judo|            A Lamusi|
|  DEN|1920|     NA|Football Men's Fo...|         Football| Gunnar Nielsen Aaby|
|  SWE|1900|   Gold|Tug-Of-War Men's ...|       Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1988|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
+-----+----+-------+--------------------+-----------------+--------------------+
only showing top 5 rows



                                                                                

### <a name="mark_19"></a> groupBy()

### [Index](#index)

In [130]:
#descartamos los que no tienen medallas y agrupamos
medallistas_df_01=medallistas_df\
.filter(medallistas_df.medalla != 'NA')\
.groupBy("sigla", "anio", "nombre_subdiciplina")\
.count()\
.sort("sigla", "anio", "nombre_subdiciplina", "count")

medallistas_df_01.show(25)

                                                                                

+-----+----+--------------------+-----+
|sigla|anio| nombre_subdiciplina|count|
+-----+----+--------------------+-----+
|  AFG|2008|Taekwondo Men's F...|    1|
|  AFG|2012|Taekwondo Men's F...|    1|
|  AHO|1988|Sailing Mixed Win...|    1|
|  ALB|2000|Athletics Women's...|    1|
|  ALB|2004|Athletics Women's...|    1|
|  ALB|2016|Judo Women's Half...|    1|
|  ALG|1984|Boxing Men's Ligh...|    1|
|  ALG|1984|Boxing Men's Midd...|    1|
|  ALG|1992|"Athletics Women'...|    1|
|  ALG|1992|Boxing Men's Feat...|    1|
|  ALG|1996|"Athletics Men's ...|    1|
|  ALG|1996|Boxing Men's Ligh...|    1|
|  ALG|1996|Boxing Men's Midd...|    1|
|  ALG|2000|"Athletics Men's ...|    1|
|  ALG|2000|"Athletics Women'...|    1|
|  ALG|2000|Athletics Men's 8...|    1|
|  ALG|2000|Athletics Men's H...|    1|
|  ALG|2000|Boxing Men's Ligh...|    1|
|  ALG|2008|Judo Men's Middle...|    1|
|  ALG|2008|Judo Women's Half...|    1|
|  ALG|2012|"Athletics Men's ...|    1|
|  ALG|2016|"Athletics Men's ...|    1|


### <a name="mark_20"></a> ejemplo con withColumn("nombre_columna", "operacion_con_cast")

### [Index](#index)

In [144]:
medallistas_df_01.printSchema()

#Casteando columna "count" a "int"
medallistas_df_02=medallistas_df_01.withColumn("count", medallistas_df_01["count"].cast(IntegerType()))

medallistas_df_02.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- nombre_subdiciplina: string (nullable = true)
 |-- count: long (nullable = false)

root
 |-- sigla: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- nombre_subdiciplina: string (nullable = true)
 |-- count: integer (nullable = false)



### <a name="mark_21"></a> .agg() la forma correcta de hacer agregaciones

### [Index](#index)

- Ahora vamos a realizar una agrupación por "sigla" y por "anio" de medallista_df_01

- Documentacion: https://sparkbyexamples.com/pyspark/pyspark-groupby-agg-aggregate-explained/

In [165]:
#Recordar que "sum", "avg", "count", "max", etc provenientes de "function" fueron importadas "as f"
medallistas_df_03=medallistas_df_01.groupBy('sigla', 'anio')\
    .agg(f.sum('count').alias('medallas_totales'),\
         f.avg('count').alias('promedio_medallas'))
medallistas_df_03.orderBy(f.desc("medallas_totales")).show(15)

                                                                                

+-----+----+----------------+------------------+
|sigla|anio|medallas_totales| promedio_medallas|
+-----+----+----------------+------------------+
|  URS|1980|             496| 2.883720930232558|
|  USA|1904|             398|4.4222222222222225|
|  URS|1988|             366| 2.772727272727273|
|  USA|1984|             361|2.5069444444444446|
|  GBR|1908|             354| 4.597402597402597|
|  URS|1976|             342|          2.671875|
|  USA|2008|             318|3.4565217391304346|
|  GDR|1980|             303| 2.443548387096774|
|  USA|2016|             263| 2.481132075471698|
|  USA|2004|             262| 2.977272727272727|
|  USA|1996|             259|2.9101123595505616|
|  URS|1972|             259|2.5145631067961167|
|  FRA|1900|             250| 4.310344827586207|
|  USA|2012|             248|2.7252747252747254|
|  USA|2000|             242|             3.025|
+-----+----+----------------+------------------+
only showing top 15 rows



### <a name="mark_22"></a> SQL

### [Index](#index)

In [166]:
#deportis_df_renamed.show(5)
#deporte_df.show(5)
#deportistaError_df.show(5)
#evento_df.show(5)
#juegos_df.show(5)
#paises_df.show(5)
resultados_df.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



[Stage 823:>                                                        (0 + 1) / 1]                                                                                

In [173]:

resultados_df.createOrReplaceTempView("resultados_table")

deportis_df_renamed.createOrReplaceTempView("deportis_table")

paises_df.createOrReplaceTempView("paises_table")

In [167]:
from pyspark.sql import SQLContext

In [174]:
#Hay que instanciar SQLContext con el contexto que ya estamos usando "sc_04"
sqlContext = SQLContext(sc_04)

In [176]:
sqlContext.sql("""
SELECT * FROM resultados_table
""").show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [None]:
#repetimos el mismo join pero en SQL
"""
resultados_sin_NA_df.join(deportis_df_renamed, resultados_sin_NA_df.deportista_id==deportis_df_renamed.deportista_id, "left")\
.join(paises_df, deportis_df_renamed.pais_id ==  paises_df.pais_id, "left")\
.select("medalla", "sigla", "equipo")\
.sort(f.col("sigla").desc())\
.show(15)
"""

In [189]:
sqlContext.sql("""
SELECT medalla, sigla, equipo FROM resultados_table rt
LEFT JOIN deportis_table dt
ON rt.deportista_id = dt.deportista_id
LEFT JOIN paises_table pt
ON dt.pais_id = pt.pais_id
WHERE medalla != "NA"
ORDER BY sigla DESC
""").show(5)



+-------+-----+--------+
|medalla|sigla|  equipo|
+-------+-----+--------+
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
|   Gold|  ZIM|Zimbabwe|
+-------+-----+--------+
only showing top 5 rows



                                                                                

In [188]:
sc_04

### <a name="mark_23"></a> UDF "User-defined function"

### [Index](#index)

- Las funciones definidas por el usuario o UDF, por sus siglas en inglés, son una funcionalidad agregada en Spark para definir funciones basadas en columnas las cuales permiten extender las capacidades de Spark al momento de transformar el set de datos.

- Este tipo de implementaciones son convenientes cuando tenemos un desarrollo extenso donde hemos identificado la periodicidad de tareas repetitivas como suele ser en pasos de limpieza de datos, transformación o renombrado dinámico de columnas.

- Por lo anterior es común encontrar en un proyecto de Spark una librería independiente donde existen todas estas funciones agregadas para que los desarrolladores involucrados en el proyecto puedan usarlas a conveniencia.

- El uso de UDF no implica que las funciones que podemos crear nativamente con Python, Scala, R o Java no sean útiles. Una UDF tiene el objetivo de ofrecer un estándar interno en el proyecto que nos encontremos realizando. Además, en caso de ser necesario, una UDF puede ser modificada con ayuda de decoradores para que sea más extensible en diversos escenarios a los cuales nos podemos enfrentar.

- Otro motivo para usar UDF es que en el módulo de Spark MLlib, la librería nativa de Spark para operaciones de Machine Learning, las UDF juegan un papel vital al momento de hacer transformaciones. Por lo cual tener un uso familiar de estas ampliará considerablemente la curva de aprendizaje de Spark MLlib.

In [5]:
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files

total 14520
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  9 09:38 .
drwxr-xr-x 5 compu_dell_ubuntu_01 compu_dell_ubuntu_01    4096 Oct  7 13:11 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01     946 Oct  7 13:11 deporte.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2764536 Oct  7 13:11 deportista.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2807091 Oct  7 13:11 deportista2.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 2776782 Oct  7 13:11 deportistaError.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   32942 Oct 24 13:40 evento.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    1978 Oct  7 13:11 juegos.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01  255853 Oct  7 13:11 modelo_relacional.jpg
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   23606 Oct  7 13:11 paises.csv
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01 6172768 Oct 24 14:13 resultados.csv


In [10]:
sqlContext = SQLContext(sc_04)
deportistaError_df = sqlContext.read.csv(path+"deportistaError.csv", inferSchema=True, header=True)

In [11]:
deportistaError_df.show(9)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|  null|null|      273|
|            4|Edgar Lindenau Aabye|     1|  34|  null|null|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
|            6|     Per Knut Aaland|     1|  31|   188|75.0|     1096|
|            7|        John Aalberg|     1|  31|   183|72.0|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|null|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|96.0|      350|
+-------------+--------------------+------+----+------+----+---------+
only showing top 9 rows



In [12]:
from pyspark.sql.functions import udf

### Creando la Función que utilizaremos como UDF

In [27]:
#Transformamos valores "null" a "0"
def conversion_nulls(valor):
    return 0 if valor == None else valor

### Creamos la función que va ser tratada como UDF

In [28]:
conversion_a_cero_udf = udf(lambda z: conversion_nulls(z), IntegerType())

### Damos de alta "conversion_a_cero_udf"

In [29]:
#El primer parámetro es el nombre que Spark le va a dar.
#El segundo parámetro es la función que va registrar.
sqlContext.udf.register("conversion_a_cero_udf", conversion_a_cero_udf)

23/10/26 10:11:20 WARN SimpleFunctionRegistry: The function conversion_a_cero_udf replaced a previously registered function.


<function __main__.<lambda>(z)>

### Aplicando la UDF

- Cual sería el benficio de estas UDF.
    - Trabajar nativamente.
    - Invocarlas y utilizarlas con sql sin problemas.

In [33]:
deportistaError_df.select(conversion_a_cero_udf("altura").alias("altura_udf")).show()

+----------+
|altura_udf|
+----------+
|       180|
|       170|
|         0|
|         0|
|       185|
|       188|
|       183|
|       168|
|       186|
|         0|
|       182|
|       172|
|       159|
|       171|
|         0|
|       184|
|       175|
|       189|
|         0|
|       176|
+----------+
only showing top 20 rows



### <a name="mark_24"></a> Comprendiendo la persistencia y particionado

### [Index](#index)

- Como se ha descrito en clases pasadas, los RDD son la capa de abstracción primaria para poder interactuar con los datos que viven en nuestro ambiente de Spark. Aunque estos puedan ser enmascarados con un esquema dotándolos de las facultades propias de los DataFrames, la información de fondo sigue operando como RDD.

- Por lo tanto, la información, como indica el nombre de los RDD, se maneja de forma distribuida a lo largo del clúster, facilitando las operaciones que se van a ejecutar, ya que segmentos de información pueden encontrarse en diferentes ejecutores reduciendo el tiempo necesario para acceder a la información y poder así realizar los cálculos necesarios.

- Cuando un RDD o Dataframe es creado, según las especificaciones que se indiquen a la aplicación de Spark, creará un esquema de particionado básico, el cual distribuirá los datos a lo largo del clúster. Siendo así que al momento de ejecutar una acción, esta se ejecutará entre los diversos fragmentos de información que existan para poder así realizar de la forma más rápida las operaciones. Es por eso que un correcto esquema de particionado es clave para poder tener aplicaciones rápidas y precisas que además consuman pocos recursos de red.

- Otra de las tareas fundamentales es la replicación de componentes y sus fragmentos, ya que al aumentar la disponibilidad de estos podremos asegurar una tolerancia a fallos, mientras más se replique un valor es más probable que no se pierda si existe un fallo de red o energía, además de permitir una disponibilidad casi inmediata del archivo buscado.

- La partición y replicación son elementos que deben ser analizados según el tipo de negocio o requerimientos que se tengan en el desarrollo que se encuentre en progreso, por lo cual la cantidad de datos replicados o granularidad de datos existentes en los fragmentos dependerá en función de las reglas de negocio.

![](img_21.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_22.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_23.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_24.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

![](img_25.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

In [34]:
from pyspark.storagelevel import StorageLevel

### <a name="mark_25"></a> Preguntamos si "deportistaError_df" está en Cache

### [Index](#index)

In [35]:
deportistaError_df.is_cached

"""
Como se puede ver la respuesta es "false", con lo cual cada vez que lo llame Spark lo tiene que
reconstruir.
"""

False

In [36]:
deportistaError_df.rdd.cache()

"""
Aquí utilizamos las funciones primitivas "rdd" aplicadas al df y su guardado en cache, y para
este ejemplo obtenemos la siguiente salida.

"MapPartitionsRDD[91] at javaToPython at NativeMethodAccessorImpl.java:0"
"""

MapPartitionsRDD[91] at javaToPython at NativeMethodAccessorImpl.java:0

### <a name="mark_26"></a> Que tipo de persistencia tiene un DF? "getStorageLevel()"
### [Index](#index) 

- Documentación https://spark.apache.org/docs/2.4.6/api/python/pyspark.html#pyspark.StorageLevel


![](img_26.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

In [37]:
deportistaError_df.rdd.getStorageLevel()

"""
Para este ejemplo vemos la siguiente salida.

StorageLevel(False, True, False, False, 1)

lo que nos indica un "MEMORY_ONLY"
"""

StorageLevel(False, True, False, False, 1)

### <a name="mark_27"></a> unpersist()
### [Index](#index) 

- Si deseo cambiar el tipo de "StorageLevel" primero tengo que dejar de persistir los datos que estan en cache. 

In [38]:
deportistaError_df.rdd.unpersist()

MapPartitionsRDD[91] at javaToPython at NativeMethodAccessorImpl.java:0

### <a name="mark_28"></a> Cambiando el tipo de "StorageLevel"
### [Index](#index) 

In [None]:
deportistaError_df.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

"""
DF persistido para que pueda utilizar desde Memoria o Disk, según sea la necesidad de Spark,
y replicado 2 veces.

Nota: Si hubieramos creado un particionada, previo, este particionado sería el que va a ser 
replicado 2 veces.
"""

### <a name="mark_29"></a> Creando mi propio "StorageLevel", persistencia de datos.
### [Index](#index) 

In [39]:
#Creación
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True,True,False,False,3)

"""
Se usa el nombre "MEMORY_AND_DISK_3" solo como una buena práctica, pero podría ser cualquier
otro, también elegimos 3 replicaciones que persisten los datos en 3 "lugares" diferentes para
no perderlos en ocación de algún fallo.
"""

'\nSe usa el nombre "MEMORY_AND_DISK_3" solo como una buena práctica, pero podría ser cualquier\notro, también elegimos 3 replicaciones que persisten los datos en 3 "lugares" diferentes para\nno perderlos en ocación de algún fallo.\n'

In [42]:
#Aplicación del nuevo StorageLevel.

deportistaError_df.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[91] at javaToPython at NativeMethodAccessorImpl.java:0

In [45]:
deportistaError_df.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 3)

In [46]:
deportistaError_df.rdd.unpersist()

MapPartitionsRDD[91] at javaToPython at NativeMethodAccessorImpl.java:0

In [47]:
deportistaError_df.rdd.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [48]:
sc_04.stop()

### <a name="mark_30"></a> Particionando datos, RDD o DF.
### [Index](#index) 


In [49]:
from pyspark.sql import SparkSession

In [52]:
spark = SparkSession.builder.appName("particionado").master("local[5]").getOrCreate()

# "local[5]" indica por defecto hacer un particionado

In [53]:
# creamos un DF
df = spark.range(0,20)
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+



### <a name="mark_31"></a> Ver cantidad de particiones getNumPartitions()
### [Index](#index) 

In [55]:
df.rdd.getNumPartitions()
#Por defecto construyó 5 particiones

5

### Un ejemplo similar al anterior pero utilizando parámetros manuales

In [60]:
rdd_01 = spark.sparkContext.parallelize((0,20),12)

"""
Como SparkSession contiene también los sparkContext, puedo crear ese rdd, con 12 particiones.
"""

In [62]:
rdd_01.take(22)

[0, 20]

In [64]:
rdd_01.getNumPartitions()

12

### Segundo ejemplo

In [76]:
#creamos el rdd, con 12 particiones

deporte_rdd = spark\
    .sparkContext\
    .textFile(path+"deporte.csv", 13)

In [68]:
deporte_rdd.getNumPartitions() #el valor es una partición más de la seteada.

14

### <a name="mark_32"></a> Guardando los rdd o df con saveAsTextFile()
### [Index](#index) 

In [69]:
path='./curso_spark_git_clone/curso-apache-spark-platzi/files/'

'./curso_spark_git_clone/curso-apache-spark-platzi/files/'

In [71]:
deporte_rdd.saveAsTextFile(path+'deporte_partitions')
"""
Guardamos "deporte_rdd" dentro de una nueva carpeta "deporte_partitions", lo cual como vemos al
inspeccionar la carpeta todas las particiones generadas y archivo _SUCCESS, este archivo siempre
debe venir vacios, si no es así tal vez halla habido errores.
"""



In [75]:
!ls -lac ./curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions

total 120
drwxr-xr-x 2 compu_dell_ubuntu_01 compu_dell_ubuntu_01 4096 Oct 26 12:13 .
drwxr-xr-x 3 compu_dell_ubuntu_01 compu_dell_ubuntu_01 4096 Oct 26 12:13 ..
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01    8 Oct 26 12:13 ._SUCCESS.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00000.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00001.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00002.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00003.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00004.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00005.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00006.crc
-rw-r--r-- 1 compu_dell_ubuntu_01 compu_dell_ubuntu_01   12 Oct 26 12:13 .part-00007.crc
-rw-r--r-- 1 compu_dell_ubun

### Viendo las particiones.

- Acá se puede ver como fue particionado el rdd.

In [80]:
!head -n 7 ./curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00000

deporte_id,deporte
1,Basketball
2,Judo
3,Football
4,Tug-Of-War
5,Speed Skating


In [81]:
!head -n 7 ./curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00001

6,Cross Country Skiing
7,Athletics
8,Ice Hockey
9,Swimming
10,Badminton


### <a name="mark_33"></a> Reconstruyendo rdd desde las particiones wholeTextFiles() Muchos pasos inecesarios.
### [Index](#index) 

In [84]:
#Creamos el rdd cargando todos las particiones.

deporte_partitions_path = './curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/'

all_partitions_rdd = spark.sparkContext.wholeTextFiles(deporte_partitions_path+'*')

'\nComo vemos la salida no es muy funcional, por lo cual de\n'

In [85]:
all_partitions_rdd.take(5)

"""
Como vemos la salida no es muy funcional, por lo cual de hay que hacer varios pasos extra.
"""

[('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00000',
  'deporte_id,deporte\n1,Basketball\n2,Judo\n3,Football\n4,Tug-Of-War\n5,Speed Skating\n'),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00001',
  '6,Cross Country Skiing\n7,Athletics\n8,Ice Hockey\n9,Swimming\n10,Badminton\n'),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00002',
  '11,Sailing\n12,Biathlon\n13,Gymnastics\n14,Art Competitions\n15,Alpine Skiing\n'),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark

In [86]:
#Primero creamos una lista con los componentes de rdd.
lista = all_partitions_rdd.mapValues(lambda x: x.split()).collect()

In [88]:
lista

[('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00000',
  ['deporte_id,deporte',
   '1,Basketball',
   '2,Judo',
   '3,Football',
   '4,Tug-Of-War',
   '5,Speed',
   'Skating']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00001',
  ['6,Cross',
   'Country',
   'Skiing',
   '7,Athletics',
   '8,Ice',
   'Hockey',
   '9,Swimming',
   '10,Badminton']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00002',
  ['11,Sailing',
   '12,Biathlon',
   '13,Gymnastics',
   '14,Art',
   'Competitions',
   '15,Alpine',
   'Skiing']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_e

In [89]:
lista_01 = all_partitions_rdd.mapValues(lambda x: x.split(',')).collect()
"""
Una prueba usando split(',') muestra que no es lo más conveniente, aparece el salto de carro '\n'
"""

In [91]:
lista_01

[('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00000',
  ['deporte_id',
   'deporte\n1',
   'Basketball\n2',
   'Judo\n3',
   'Football\n4',
   'Tug-Of-War\n5',
   'Speed Skating\n']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00001',
  ['6',
   'Cross Country Skiing\n7',
   'Athletics\n8',
   'Ice Hockey\n9',
   'Swimming\n10',
   'Badminton\n']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00002',
  ['11',
   'Sailing\n12',
   'Biathlon\n13',
   'Gymnastics\n14',
   'Art Competitions\n15',
   'Alpine Skiing\n']),
 ('file:/home/compu_dell_ubuntu_01/platzi/pip_y_

In [94]:
#recuperamos los todos los paths
lista_path = [ l[0] for l in lista]
lista_path.sort()
lista_path

['file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00000',
 'file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00001',
 'file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00002',
 'file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00003',
 'file:/home/compu_dell_ubuntu_01/platzi/pip_y_entornos_virtuales/py-project/39_Spark_y_Big_Data_Fundamentos/curso_spark_git_clone/curso-apache-spark-platzi/files/deporte_partitions/part-00004',
 'file:/home/compu_dell_u

In [100]:
#Creamos un nuevo rdd utilizando todos los paths recuperados

rdd_recuperado = spark\
    .sparkContext\
    .textFile(','.join(lista_path),9).map(lambda line: line.split(','))
"""
Esta parte "textFile(','.join(lista_path),9)" toma todos los elementos de "lista_path" y los une
con una coma, luego genera 9 particiones nuevas.
"""

'\nEsta parte "textFile(\',\'.join(lista_path),9)" toma todos los elementos de "lista_path" y los une\ncon una coma, luego genera 9 particiones nuevas.\n'

In [101]:
rdd_recuperado.take(9) #Acá tenemos el rdd reconstruido a partir de las particiones.

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football'],
 ['4', 'Tug-Of-War'],
 ['5', 'Speed Skating'],
 ['6', 'Cross Country Skiing'],
 ['7', 'Athletics'],
 ['8', 'Ice Hockey']]

### <a name="mark_34"></a> Reconstruyendo rdd desde las particiones textFile() en un solo paso.
### [Index](#index) 

In [102]:
all_partitions_rdd_01 = spark.sparkContext.textFile(deporte_partitions_path+'*')\
    .map(lambda line: line.split(','))

In [103]:
all_partitions_rdd_01.take(9)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football'],
 ['4', 'Tug-Of-War'],
 ['5', 'Speed Skating'],
 ['6', 'Cross Country Skiing'],
 ['7', 'Athletics'],
 ['8', 'Ice Hockey']]

In [None]:
spark.stop()

In [36]:
conda list 


# packages in environment at /home/compu_dell_ubuntu_01/anaconda3/envs/data_trans_env:
#
# Name                    Version                   Build  Channel
brotlipy                  0.7.0           py311h5eee18b_1002  
contourpy                 1.0.5           py311hdb19cb5_0  
debugpy                   1.5.1           py311h6a678d5_0  
ipykernel                 6.19.2          py311h6410fe4_0  
ipython                   8.12.0          py311h06a4308_0  
jupyter_client            8.1.0           py311h06a4308_0  
jupyter_core              5.3.0           py311h06a4308_0  
numpy                     1.24.3          py311h08b1b3b_1  
numpy-base                1.24.3          py311hf175353_1  
py4j                      0.10.9.7        py311h06a4308_0  
pyarrow                   11.0.0          py311hd8e8d9b_1  
pycosat                   0.6.4           py311h5eee18b_0  
pycparser                 2.21               pyhd3eb1b0_0  
pygments                  2.15.1          py311h06a4308_1  
p

In [4]:
connexion_string = 'mongodb+srv://mendezleonardom:Mongodb_3967@awscluster.bupqt8b.mongodb.net/'

In [28]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb+srv://mendezleonardom:Mongodb_3967@awscluster.bupqt8b.mongodb.net/sample_guides.planets") \
    .config("spark.mongodb.output.uri", "mongodb+srv://mendezleonardom:Mongodb_3967@awscluster.bupqt8b.mongodb.net/sample_guides.planets") \
    .getOrCreate()

In [29]:
df = spark.read.format("mongodb").load()

Py4JJavaError: An error occurred while calling o240.load.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: mongodb. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:738)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:208)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.ClassNotFoundException: mongodb.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


In [27]:
spark.stop()

### <a name="mark_35"></a> Data Masking.
### [Index](#index) 


1. Enmascaramiento con Funciones de Spark SQL:

- En este ejemplo, la columna sensible se enmascara utilizando la función sha2, que aplica la función de hash SHA-256 a los valores de la columna. El resultado es una versión enmascarada de los datos originales.

- En este caso, si has aplicado una función hash como sha2 de Spark SQL para enmascarar los datos, **la recuperación de los datos originales no es directa**. Las funciones hash son unidireccionales, lo que significa que no se pueden revertir. Para recuperar los datos originales, generalmente necesitas comparar el hash de un valor proporcionado con el hash almacenado. Si hay una coincidencia, has encontrado el valor original. Sin embargo, esto no es práctico para datos sensibles.

2. Enmascaramiento con Funciones Personalizadas:

- En este caso, se utiliza una función personalizada (custom_masking_function) para enmascarar los valores de la columna sensible. Aunque el ejemplo usa una función simple de sustitución de caracteres, puedes personalizar la lógica de enmascaramiento según tus necesidades.

- **La recuperación** en este caso depende de la lógica específica de enmascaramiento que hayas implementado. Si la función de enmascaramiento es reversible, podrías aplicar la función inversa para recuperar los datos originales. Sin embargo, muchas técnicas de enmascaramiento buscan ser irreversibles para mejorar la seguridad.

3. Enmascaramiento con Anonimización de Datos:

- Aquí, la columna sensible se anonimiza al reemplazar los valores con identificadores aleatorios generados mediante la función rand(). La anonimización implica la sustitución de datos originales con datos no identificables o pseudónimos.

- La anonimización implica reemplazar datos con identificadores aleatorios o pseudónimos. **La recuperación** implica mantener una tabla de mapeo que asocia los identificadores aleatorios con los datos originales. La recuperación se realiza mediante la consulta de esta tabla de mapeo.

### Enmascaramiento de Columnas con Funciones de Spark SQL

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2

# Inicializa una sesión de Spark
spark = SparkSession.builder.appName("DataMaskingExample").getOrCreate()

# Carga los datos desde una fuente (por ejemplo, CSV)
df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)

# Enmascara una columna usando SHA-256
df_masked = df.withColumn("sensitive_column_masked", sha2("sensitive_column", 256))

# Muestra el DataFrame resultante
df_masked.show()


### Enmascaramiento con Funciones Personalizadas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Función de enmascaramiento personalizada (por ejemplo, sustitución de caracteres)
def custom_masking_function(value):
    # Implementa tu lógica de enmascaramiento aquí
    return "*****"

# Registra la función como una UDF (User Defined Function)
masking_udf = udf(custom_masking_function, StringType())

# Inicializa una sesión de Spark
spark = SparkSession.builder.appName("CustomMaskingExample").getOrCreate()

# Carga los datos desde una fuente
df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)

# Enmascara una columna usando la función personalizada
df_masked = df.withColumn("sensitive_column_masked", masking_udf("sensitive_column"))

# Muestra el DataFrame resultante
df_masked.show()


### Enmascaramiento con Anonimización de Datos

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand

# Inicializa una sesión de Spark
spark = SparkSession.builder.appName("DataAnonymizationExample").getOrCreate()

# Carga los datos desde una fuente
df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)

# Anonimiza una columna (por ejemplo, reemplaza valores con identificadores aleatorios)
df_anonymized = df.withColumn("sensitive_column_anonymized", (rand() * 1000).cast("int"))

# Muestra el DataFrame resultante
df_anonymized.show()

### Otro ejemplo con withColumn()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Crear una sesión de Spark
spark = SparkSession.builder.appName("ejemplo").getOrCreate()

# Crear un DataFrame de ejemplo
data = [("Juan", "juan@example.com"),
        ("María", "maria@example.com"),
        ("Pedro", "pedro@example.com")]

columns = ["Nombre", "Correo"]
df = spark.createDataFrame(data, columns)

# Enmascarar el nombre de la columna de correo utilizando alias
df_enmascarado = df.withColumn("Correo_enmascarado", col("Correo").alias("Correo"))

# Seleccionar solo las columnas deseadas
df_enmascarado = df_enmascarado.select("Nombre", "Correo_enmascarado")

# Mostrar el DataFrame resultante
df_enmascarado.show()


23/11/18 10:44:41 WARN Utils: Your hostname, CompuDell01 resolves to a loopback address: 127.0.1.1; using 192.168.29.145 instead (on interface eth0)
23/11/18 10:44:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/18 10:44:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+------+------------------+
|Nombre|Correo_enmascarado|
+------+------------------+
|  Juan|  juan@example.com|
| María| maria@example.com|
| Pedro| pedro@example.com|
+------+------------------+



In [3]:
spark.stop()

### <a name="mark_36"></a> SHA-256 (Secure Hash Algorithm 256-bit).
### [Index](#index) 

El algoritmo de hash SHA-256 (Secure Hash Algorithm 256-bit) es una función criptográfica de hash que produce un valor hash de 256 bits, generalmente expresado como una cadena hexadecimal de 64 caracteres. Este algoritmo pertenece a la familia de funciones hash SHA-2 y es ampliamente utilizado en aplicaciones de seguridad y criptografía.

En el contexto de Spark o cualquier otro entorno de procesamiento de datos, SHA-256 a menudo se utiliza para la generación de resúmenes hash de datos sensibles o identificadores. Algunos casos de uso típicos incluyen:

1. **Enmascaramiento de Datos Sensibles:**
   - Se puede aplicar SHA-256 para generar hashes de valores sensibles como direcciones de correo electrónico, números de tarjetas de crédito, etc.
   - Los hashes resultantes se almacenan en lugar de los datos originales, lo que proporciona una capa adicional de seguridad.

2. **Detección de Cambios en Datos:**
   - SHA-256 se utiliza para generar huellas dactilares (hashes) de conjuntos de datos.
   - Si los datos cambian, los nuevos hashes serán diferentes, lo que permite detectar cambios de manera eficiente.

3. **Integridad de Datos:**
   - Para verificar la integridad de los datos almacenados, especialmente en sistemas distribuidos como Spark, donde los datos pueden estar distribuidos en clústeres.

In [5]:
'''Aquí hay un ejemplo simple en PySpark que utiliza SHA-256 para generar un hash de una 
columna en un DataFrame'''

from pyspark.sql import SparkSession
from pyspark.sql.functions import sha2
from pyspark.sql.types import StringType

# Crear una sesión de Spark
spark = SparkSession.builder.appName("SHA256Example").getOrCreate()

# Crear un DataFrame de ejemplo
data = [("John",), ("Doe",), ("Alice",)]
columns = ["Name"]
schema = [StringType()]

df = spark.createDataFrame(data, schema=columns)

# Aplicar SHA-256 a la columna 'Name'
df_hashed = df.withColumn("Name_Hash", sha2(df["Name"], 256))

# Mostrar el DataFrame resultante
df_hashed.show(truncate=False)


'''En este ejemplo, se crea un nuevo DataFrame (`df_hashed`) que contiene una columna adicional 
("Name_Hash") que almacena los valores hash SHA-256 de la columna original "Name". 
Este proceso ayuda a preservar la privacidad y seguridad de los datos originales.'''

+-----+----------------------------------------------------------------+
|Name |Name_Hash                                                       |
+-----+----------------------------------------------------------------+
|John |a8cfcd74832004951b4408cdb0a5dbcd8c7e52d43f7fe244bf720582e05241da|
|Doe  |fd53ef835b15485572a6e82cf470dcb41fd218ae5751ab7531c956a2a6bcd3c7|
|Alice|3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043|
+-----+----------------------------------------------------------------+



'En este ejemplo, se crea un nuevo DataFrame (`df_hashed`) que contiene una columna adicional \n("Name_Hash") que almacena los valores hash SHA-256 de la columna original "Name". \nEste proceso ayuda a preservar la privacidad y seguridad de los datos originales.'

In [6]:
spark.stop()

### <a name="solucion_python_worker_versiones"></a>solucion python worker dif. versiones
### [Index](#index)


### Inconveniente_01 --> RuntimeError: Python in worker has different version 3.10 than that in driver 3.11, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

Solución:


Para solucionar el siguiente error:

RuntimeError: Python in worker has different version 3.10 than that in driver 3.11, PySpark cannot run with different minor versions. Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

Debes asegurarte de que la versión de Python que está utilizando el controlador de PySpark sea la misma que la versión de Python que están utilizando los trabajadores de PySpark. Para hacer esto, puedes configurar las variables de entorno PYSPARK_PYTHON y PYSPARK_DRIVER_PYTHON.

- Puedes configurar estas variables de entorno en el archivo ~/.bashrc o en el archivo spark-env.sh.

- Para configurar las variables de entorno en el archivo ~/.bashrc, agrega las siguientes líneas al final del archivo:

- export PYSPARK_PYTHON=/ruta/a/python3.11

- export PYSPARK_DRIVER_PYTHON=/ruta/a/python3.11

- Para configurar las variables de entorno en el archivo spark-env.sh, agrega las siguientes líneas al final del archivo:

- PYSPARK_PYTHON=/ruta/a/python3.11

- PYSPARK_DRIVER_PYTHON=/ruta/a/python3.11

Una vez que hayas configurado las variables de entorno, guarda los cambios y reinicia tu sesión de terminal.

- Mi elección fue ir por ".bashrc"

![](img_17.png)
**-------------------------------------------------------------------------------------------------------------------------------------------------------**

Con el comando "nano .bashrc"

![](img_18.png)

ctr+x para salir y "yes" para salir y guardar los cambios.
**-------------------------------------------------------------------------------------------------------------------------------------------------------**