# Set Up JAVA y Spark:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3-scala2.13.tgz
!tar -xvf /content/spark-3.4.0-bin-hadoop3-scala2.13.tgz
!pip install -q findspark
!pip install pyspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3-scala2.13"

# Clase 13 / Mod 4

In [5]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField

from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row

# Para ver que operaciones son propias de SQLcontext, cuando tengamos varias versiones de SPARK ejecutandoce.
from pyspark.sql import SQLContext

In [6]:
spark = SparkContext(master='local', appName="DataFrames")
sqlContext = SQLContext(spark)



In [7]:
# Podemos crear DF mediantes dos vías:
# 1. Desde RDD desde CERO.
# 2. Archivo.
path = '/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/'

In [8]:
!head -n 5 /content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [9]:
# Paso_1: Creación del Schema (Esquema de la tabla a crear)

juegoSchema = StructType([
    StructField('juego_id', IntegerType(), False),
    StructField('anio', StringType(), False),
    StructField('temporada', StringType(), False),
    StructField('ciudad', StringType(), False)
])

# Paso_2: Crea DataFrame con la lectura del Schema
juegoDF = sqlContext.read.schema(juegoSchema) \
            .option('header', 'true').csv(path+'juegos.csv')

In [10]:
juegoDF.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [11]:
spark


# Clase 14 / Mod 4

In [12]:
# Creación de un DF a partir de un RDD:
deportistaOlimpicoRDD = spark.textFile(path+'deportista.csv') \
    .map(lambda line: line.split(","))

deportistaOlimpicoRDD2 = spark.textFile(path+'deportista2.csv') \
    .map(lambda line: line.split(","))

deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [13]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [14]:
# Retirar encabezado:
def eliminarEncabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [15]:
# Otra forma de eliminar el encabezado:
def without_header(rdd):
    header = rdd.first()
    rdd = rdd.filter( lambda l: l != header)
    return rdd

In [16]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.\
        mapPartitionsWithIndex(eliminarEncabezado)

In [17]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [18]:
# Asignar data type a cada columna del RDD:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
    int(l[0]),
    l[1], # No se transforma (Sigue siendo de tipo String)
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [19]:
schema = StructType([
    StructField('deportista_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('genero', IntegerType(), False),
    StructField('edad', IntegerType(), False),
    StructField('altura', IntegerType(), False),
    StructField('peso', FloatType(), False),
    StructField('equipo_id', IntegerType(), False)
])

In [20]:
# SQL context, nos permite crear un DF desde un RDD:
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schema)

In [21]:
deportistaDF.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



# Clase 15 / Mod 3

## Reto crear los dataframes restantes a partir de los RDD creados en anteriores clases:

In [22]:
# Paises/Equipos
equiposOlimpicosRDD = spark.textFile(path+'paises.csv') \
    .map(lambda line: line.split(",")) \
    .mapPartitionsWithIndex(eliminarEncabezado) \
    .map(lambda l: (
    int(l[0]),
    l[1],
    l[2]
    ))

equiposSchema = StructType([
    StructField('id', IntegerType(), False),
    StructField('equipo', StringType(), False),
    StructField('sigla', StringType(), False),
])

paisesDF = sqlContext.createDataFrame(equiposOlimpicosRDD, equiposSchema)

paisesDF.show(5)

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
|  4|             Acturus|  ARG|
|  5|         Afghanistan|  AFG|
+---+--------------------+-----+
only showing top 5 rows



In [23]:
# Deportes
deporteSchema = StructType([
    StructField('deporte_id', IntegerType(), False),
    StructField('deporte', StringType(), False)
])

# Paso 2 Crea DataFrame con la lectura del Schema
deportesDF = sqlContext.read.schema(deporteSchema) \
            .option('header', 'true').csv(path+'deporte.csv')

deportesDF.show(5)

+----------+-------------+
|deporte_id|      deporte|
+----------+-------------+
|         1|   Basketball|
|         2|         Judo|
|         3|     Football|
|         4|   Tug-Of-War|
|         5|Speed Skating|
+----------+-------------+
only showing top 5 rows



In [24]:
# Evento
eventoSchema = StructType([
    StructField('evento_id', IntegerType(), False),
    StructField('nombre', StringType(), False),
    StructField('deporte_id', IntegerType(), False),

])

# Paso 2 Crea DataFrame con la lectura del Schema
deportesOlimpicosDF = sqlContext.read.schema(eventoSchema) \
            .option('header', 'true').csv(path+'evento.csv')

deportesOlimpicosDF.show(5)

+---------+--------------------+----------+
|evento_id|              nombre|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
+---------+--------------------+----------+
only showing top 5 rows



In [25]:
# Resultados
resultadosSchema = StructType([
    StructField('resultado_id', IntegerType(), False),
    StructField('medalla', StringType(), False),
    StructField('deportista_id', IntegerType(), False),
    StructField('juego_id', IntegerType(), False),
    StructField('evento_id', IntegerType(), False),

])

# Paso 2 Crea DataFrame con la lectura del Schema
resultadoDF = sqlContext.read.schema(resultadosSchema) \
            .option('header', 'true').csv(path+'resultados.csv')

resultadoDF.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [26]:
# juegos
juegoSchema = StructType([
    StructField('juego_id', IntegerType(), False),
    StructField('anio', StringType(), False),
    StructField('temporada', StringType(), False),
    StructField('ciudad', StringType(), False)
])

# Paso 2 Crea DataFrame con la lectura del Schema
juegoDF = sqlContext.read.schema(juegoSchema) \
            .option('header', 'true').csv(path+'juegos.csv')

juegoDF.show(5)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
|       5|1908 Verano|     1908|Verano|
+--------+-----------+---------+------+
only showing top 5 rows



## Operaciones claves sobre DF:

In [27]:
# Visualizar el schema de X DF:
deportesDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [28]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [29]:
# Renombrado de columnas:
# Los elementos de spark son inmutables, por lo tanto los debemos SOBRE ESCRIBIR.
deportistaOlimpicoDF = deportistaDF \
    .withColumnRenamed("genero", 'sexo') \
    .drop('altura')

In [30]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [31]:
# Usando consultas SQL (SELECT)
from pyspark.sql.functions import *

deportistaOlimpicoDF = deportistaOlimpicoDF.select("deportista_id", "nombre",
                           col("edad").alias("edadAlJugar"), # Función col genera una lista en vivo que tiene todos los valores de la columna y hacer operaciones sobre la X columna.
                           "equipo_id"
                          )

In [32]:
deportistaOlimpicoDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [33]:
# Usando filer:
deportistaOlimpicoDF = deportistaOlimpicoDF.filter(
                        (deportistaOlimpicoDF.edadAlJugar != 0) # Se hace funciones anidadas (condicionales con & y //)
                    )

In [34]:
deportistaOlimpicoDF.sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        22411|Magdalena Cecilia...|         11|      413|
|        70616|          Liu Luyang|         11|      199|
|        37333|Carlos Bienvenido...|         11|      982|
|        76675|   Marcelle Matthews|         11|      967|
|        40129|    Luigina Giavotti|         11|      507|
|       118925|Megan Olwen Deven...|         11|      413|
|        47618|Sonja Henie Toppi...|         11|      742|
|       126307|        Liana Vicens|         11|      825|
|        51268|      Beatrice Hutiu|         11|      861|
|        52070|        Etsuko Inada|         11|      514|
|        72854|      Licia Macchini|         12|      507|
|         5291|Marcia Arriaga La...|         12|      656|
|        74712|     Carla Marangoni|         12|      50

# Clase 16 / Mod 4

## Agrupaciones y operaciones join sobre DF:

In [35]:
# Visualizamos los Schemas de los DataFrames
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [36]:
resultadoDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [37]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- anio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [38]:
deportesOlimpicosDF.printSchema()

root
 |-- evento_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- deporte_id: integer (nullable = true)



In [39]:
paisesDF.printSchema()

root
 |-- id: integer (nullable = false)
 |-- equipo: string (nullable = false)
 |-- sigla: string (nullable = false)



In [40]:
# JOINS
deportistaOlimpicoDF \
    .join(
    resultadoDF,
    deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,
    "left"
    ) \
    .join(
    juegoDF,
    juegoDF.juego_id == resultadoDF.juego_id,
    "left"
    ) \
    .join(
    deportesOlimpicosDF,
    deportesOlimpicosDF.evento_id == resultadoDF.evento_id,
    "left"
    ) \
    .select(deportistaOlimpicoDF.nombre,
            "edadAlJugar",
            "medalla",
            col("anio").alias("Anio de Juego"),
            deportesOlimpicosDF.nombre.alias("Nombre de disciplina")
           ).show()

+--------------------+-----------+-------+-------------+--------------------+
|              nombre|edadAlJugar|medalla|Anio de Juego|Nombre de disciplina|
+--------------------+-----------+-------+-------------+--------------------+
|           A Dijiang|         24|     NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|         23|     NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|         24|     NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|         34|   Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|         21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|         21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|         21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|         21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|         21|     NA|1988 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|         21|     NA|1988 Invierno|Speed Sk

# Clase 17 / Mod 4

## RETO:
Hacer un join con todas las medallas ganadoras unidas con el pais y el equipo al que pertenecen estas medallas.

In [41]:
resultadoDF \
    .join(
    deportistaOlimpicoDF,
    resultadoDF.deportista_id == deportistaOlimpicoDF.deportista_id,
    "left"
    ) \
    .join(
    paisesDF,
    deportistaOlimpicoDF.equipo_id == paisesDF.id,
    "left"
    ) \
    .select("medalla",
            paisesDF.equipo,
            paisesDF.sigla
    ) \
    .where(resultadoDF.medalla != "NA") \
    .sort(resultadoDF.medalla.desc()).show(20)

+-------+-------------+-----+
|medalla|       equipo|sigla|
+-------+-------------+-----+
| Silver| Soviet Union|  URS|
| Silver|United States|  USA|
| Silver|United States|  USA|
| Silver|       Norway|  NOR|
| Silver|       France|  FRA|
| Silver|       Brazil|  BRA|
| Silver|      Senegal|  SEN|
| Silver|   Yugoslavia|  YUG|
| Silver|      Germany|  GER|
| Silver|      Germany|  GER|
| Silver|       France|  FRA|
| Silver|         null| null|
| Silver|       France|  FRA|
| Silver|      Austria|  AUT|
| Silver|       France|  FRA|
| Silver|       Brazil|  BRA|
| Silver|       France|  FRA|
| Silver|      Nigeria|  NGR|
| Silver|      Austria|  AUT|
| Silver|       Sweden|  SWE|
+-------+-------------+-----+
only showing top 20 rows



# Clase 18 / Mod 4

## Funciones de agrupación:

In [42]:
medallistaXAnio = deportistaOlimpicoDF \
    .join(
        resultadoDF,
        deportistaOlimpicoDF.deportista_id == resultadoDF.deportista_id,
        "left"
    ) \
    .join(
        juegoDF,
        juegoDF.juego_id == resultadoDF.juego_id,
        "left"
    ) \
    .join(
        paisesDF,
        deportistaOlimpicoDF.equipo_id == paisesDF.id,
        "left"
    ) \
    .join(
        deportesOlimpicosDF,
        deportesOlimpicosDF.evento_id == resultadoDF.evento_id,
        "left"
    ) \
    .join(
        deportesDF,
        deportesOlimpicosDF.deporte_id == deportesDF.deporte_id,
        "left"
    ) \
    .select(
        "sigla",
        "anio",
        "medalla",
        deportesOlimpicosDF.nombre.alias("Nombre subdisciplina"),
        deportesDF.deporte.alias("Nombre disciplina"),
        deportistaOlimpicoDF.nombre
    )

In [43]:
medallistaXAnio.show(10)

+-----+-------------+-------+--------------------+-----------------+--------------------+
|sigla|         anio|medalla|Nombre subdisciplina|Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+-----------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|       Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|             Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|         Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|       Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|    Speed Skating|Christine Jacoba ...|
|  NED|198

In [44]:
# descartamos deportistas sin medalla
medallistaXAnio2 = medallistaXAnio.filter(resultadoDF.medalla != "NA") \
                    .sort("anio") \
                    .groupBy("sigla", "anio", "Nombre subdisciplina") \
                    .count()

medallistaXAnio2.show()

+-----+-------------+--------------------+-----+
|sigla|         anio|Nombre subdisciplina|count|
+-----+-------------+--------------------+-----+
|  KOR|2006 Invierno|Short Track Speed...|    2|
|  RUS|  2004 Verano|Wrestling Men's S...|    1|
|  USA|  2004 Verano|Synchronized Swim...|    8|
|  RUS|  2016 Verano|Shooting Women's ...|    1|
|  RUS|  2008 Verano|Wrestling Men's L...|    1|
|  SWE|  1936 Verano|Art Competitions ...|    1|
|  EUN|2002 Invierno|Cross Country Ski...|    1|
|  GBR|  1956 Verano|Sailing Mixed 5.5...|    4|
|  USA|  1988 Verano|Volleyball Men's ...|   12|
|  FRA|  1932 Verano|Cycling Men's Tea...|    4|
|  USA|  1996 Verano|Equestrianism Mix...|    4|
|  USA|  1996 Verano|Athletics Men's H...|    1|
|  GBR|  2008 Verano|Boxing Men's Midd...|    1|
|  FRA|  1984 Verano|Fencing Men's Sab...|    5|
|  USA|  2012 Verano|Gymnastics Women'...|    1|
|  FRA|  2004 Verano|Rowing Men's Ligh...|    2|
|  NOR|1932 Invierno|Speed Skating Men...|    1|
|  FRG|  1992 Verano

In [45]:
medallistaXAnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [46]:
medallistaXAnio2.groupBy("Sigla", "anio") \
    .agg(sum("count").alias("Total de medallas"), \
        avg("count").alias("Medallas promedio")).show()

# .agg() o agregate es la forma recomendada por la documentacion para realizar las AGRUPACIONES.

+-----+-------------+-----------------+------------------+
|Sigla|         anio|Total de medallas| Medallas promedio|
+-----+-------------+-----------------+------------------+
|  SWE|  1976 Verano|               10|               2.0|
|  MGL|  2008 Verano|                5|              1.25|
|  SUI|2014 Invierno|               29|3.2222222222222223|
|  ETH|  2004 Verano|                7|              1.75|
|  BEL|  2000 Verano|                7|               1.4|
|  AUT|  1928 Verano|                5|              1.25|
|  SYR|  1984 Verano|                1|               1.0|
|  NED|1992 Invierno|                4|1.3333333333333333|
|  MAS|  2012 Verano|                2|               1.0|
|  ITA|  1996 Verano|               69| 2.225806451612903|
|  THA|  2008 Verano|                4|               1.0|
|  URS|1984 Invierno|               56|               2.8|
|  DEN|  1896 Verano|                6|               1.0|
|  GRN|  2016 Verano|                1|               1.

# Clase 19 / Mod 4

In [47]:
## Forma de trabajar en Spark de forma nativa SQL:
# registro de DataFrames como tablas SQL:
resultadoDF.registerTempTable("resultado")
deportistaOlimpicoDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

# En terminos de procesamiento, trabajar las tablas en forma NATIVA SQL, representa mayor recurso de computación gastado para llevar a cabo
# las operaciones.
# El verdadero poder de Spark se nota con las sentencias nativas de Spark.



In [48]:
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [49]:
sqlContext.sql("""
                SELECT medalla, equipo, sigla FROM resultado as r
                JOIN deportista as d
                ON r.deportista_id = d.deportista_id
                JOIN paises as p
                ON p.id = d.equipo_id
                WHERE r.medalla <> "NA"
                ORDER BY p.sigla DESC
                """).show(15)

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 15 rows



# Clase 20 / Mod 4

In [50]:
!ls /content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [51]:
!head -n 5 /content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/deportistaError.csv

deportista_id,nombre,genero,edad,altura,peso,equipo_id
1,A Dijiang,1,24,180,80,199
2,A Lamusi,1,23,170,60,199
3,Gunnar Nielsen Aaby,1,24,,,273
4,Edgar Lindenau Aabye,1,34,,,278


In [52]:
from pyspark import SparkContext

deportistaError = spark.textFile(path+"deportistaError.csv") \
  .map( lambda l: l.split(","))

In [53]:
deportistaError = deportistaError.mapPartitionsWithIndex(eliminarEncabezado)

In [54]:
deportistaError.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '', '', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [55]:
deportistaError = deportistaError.map(lambda l : (
    l[0],
    l[1],
    l[2],
    l[3],
    l[4],
    l[5],
    l[6]
))

In [56]:
deportistaError.take(5)

[('1', 'A Dijiang', '1', '24', '180', '80', '199'),
 ('2', 'A Lamusi', '1', '23', '170', '60', '199'),
 ('3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273'),
 ('4', 'Edgar Lindenau Aabye', '1', '34', '', '', '278'),
 ('5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705')]

In [57]:
DeportistaError_schema = StructType([
    StructField("deportista_id", StringType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", StringType(), False),
    StructField("edad", StringType(), False),
    StructField("altura", StringType(), False),
    StructField("peso", StringType(), False),
    StructField("equipo_id", StringType(), False)
])

In [58]:
deportistaErrorDF = sqlContext.createDataFrame(deportistaError, schema=DeportistaError_schema)

In [59]:
deportistaErrorDF.show()
# Si analizamos la columna altura nosotros queremos que sean enteros y en ausencia del valor deberia aparecer
# un null, aqui entran en el juego las UDF, las cuales trabajaran de forma nativa

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [60]:
from pyspark.sql.functions import udf

# Crear función para la UDF:
# Si NO esta vacio poner Num.Entero y si lo esta NONE.
def conversionEnteros(valor):
    try:
        return int(valor) if len(valor) > 0 else 0
    except TypeError as e:
        return 0

# Crear la función como UDF:
# La función conversionEnteros pasara el valor a un IntegerType.
conversionEnteros_udf = udf(lambda z: conversionEnteros(z), IntegerType())

# Darla formalmente de ALTA:
# Nombre con la cual Spark va a procesar la función
sqlContext.udf.register("conversionEnteros_udf", conversionEnteros_udf)

deportistaErrorDF.select(conversionEnteros_udf("altura") \
                         .alias("alturaUDF")).show()

+---------+
|alturaUDF|
+---------+
|      180|
|      170|
|        0|
|        0|
|      185|
|      188|
|      183|
|      168|
|      186|
|        0|
|      182|
|      172|
|      159|
|      171|
|        0|
|      184|
|      175|
|      189|
|        0|
|      176|
+---------+
only showing top 20 rows



# Clase 23 - 24 / Mod 5

In [61]:
from pyspark.storagelevel import StorageLevel

In [62]:
medallistaXAnio.is_cached

False

### Cada vez que necesitamos el recurso, Spark necesita "incovarlo" de nuevo, pero podemos indicarle a spark que lo guarde en cache.

In [63]:
# Creamos la persisttencia
medallistaXAnio.rdd.cache()

MapPartitionsRDD[204] at javaToPython at NativeMethodAccessorImpl.java:0

In [64]:
# Saber el tipo de persistencia y nivel de replicacion
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [65]:
# eliminando la persistencia
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[204] at javaToPython at NativeMethodAccessorImpl.java:0

In [66]:
# Usando persistencia MEMORY AND DISK con replicacion 2
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[204] at javaToPython at NativeMethodAccessorImpl.java:0

In [71]:
# Saber el tipo de persistencia y nivel de replicacion
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 3)

In [68]:
# eliminando la persistencia
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[204] at javaToPython at NativeMethodAccessorImpl.java:0

In [69]:
# Definiendo un StorageLevel custom
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [70]:
# Persistimos el DataFrame
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[204] at javaToPython at NativeMethodAccessorImpl.java:0

In [72]:
# Saber el tipo de persistencia y nivel de replicacion
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(True, True, False, False, 3)

### Particionado de datos:

In [74]:
from pyspark.sql import SparkSession
# Creación de la sesión y cambiando el puerto de conexión:
spark = SparkSession.builder.appName("Particionado") \
    .master("local[5]").getOrCreate()

In [76]:
df = spark.range(0,20)
df.rdd.getNumPartitions()

1

In [77]:
# El primer parametro es el set de datos que generamos (rango entre 0 y 20) y la replicación/partición:
rdd1 = spark.sparkContext.parallelize((0,20), 10)
rdd1.getNumPartitions()

10

In [78]:
rddDesdeArchivo = spark \
    .sparkContext \
    .textFile(path+"deporte.csv", 10)

In [79]:
rddDesdeArchivo.getNumPartitions()

10

In [None]:
rddDesdeArchivo.saveAsTextFile("content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark")
# SUCCESS es un archivo tipo log que si no hubo error siempre debe venir vacio

In [88]:
#  Cargando los archivos particionados en un RDD
# * para cargar todos los archivos que estan en una carpeta
rdd = spark.sparkContext.wholeTextFiles('/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/*')

In [107]:
rdd.take(4)

[('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00000',
  'deporte_id,deporte\n1,Basketball\n2,Judo\n3,Football\n4,Tug-Of-War\n5,Speed Skating\n6,Cross Country Skiing\n'),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00001',
  '7,Athletics\n8,Ice Hockey\n9,Swimming\n10,Badminton\n11,Sailing\n12,Biathlon\n13,Gymnastics\n14,Art Competitions\n'),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00002',
  '15,Alpine Skiing\n16,Handball\n17,Weightlifting\n18,Wrestling\n19,Luge\n20,Water Polo\n'),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00003',
  '21,Hockey\n22,Rowing\n23,Bobsleigh\n24,Fencing\n25,Equestrianism\n26,Shooting\n27,Boxing\n28,Taekwondo\n')]

In [108]:
# forma sencilla cargar RDD:
lista = rdd.mapValues(lambda x: x.split()).collect() # Es necesario hacer un collet.

In [109]:
lista

[('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00000',
  ['deporte_id,deporte',
   '1,Basketball',
   '2,Judo',
   '3,Football',
   '4,Tug-Of-War',
   '5,Speed',
   'Skating',
   '6,Cross',
   'Country',
   'Skiing']),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00001',
  ['7,Athletics',
   '8,Ice',
   'Hockey',
   '9,Swimming',
   '10,Badminton',
   '11,Sailing',
   '12,Biathlon',
   '13,Gymnastics',
   '14,Art',
   'Competitions']),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00002',
  ['15,Alpine',
   'Skiing',
   '16,Handball',
   '17,Weightlifting',
   '18,Wrestling',
   '19,Luge',
   '20,Water',
   'Polo']),
 ('file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00003',
  ['21,Hockey',
   '22,Rowing',
   '23,Bobsleigh',
   '24,Fencing',
 

In [110]:
# Hacemos el sort para que el registro numero 0000 quede de primero, ya que es el que contiene
# Encabezado de la tabla:
lista = [l[0] for l in lista]
lista.sort()

In [111]:
lista

['file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00000',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00001',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00002',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00003',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00004',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00005',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00006',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark/Datos/files/Exportación Spark/part-00007',
 'file:/content/content/drive/MyDrive/Personal/Educación/Platzi/PySpark

In [112]:
# Particionado de 10 = len(lista)
rdd2 = spark \
    .sparkContext \
    .textFile(','.join(lista), len(lista) ).map(lambda l: l.split(","))

In [113]:
rdd2.take(4)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football']]