In [1]:
import os
from pyspark import SparkContext, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    sum as _sum,
    avg as _avg,
    udf
)
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    FloatType,
    Row
)
from schemas import (
    base,
    data_dir,
    get_schemas,
    load_dataframes_from_schemas,
)

# Spark Context

In [2]:
spark = SparkContext(master="local", appName="Replicacion")
sql_context = SQLContext(spark)

21/08/21 19:47:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Load Dataframes

In [3]:
dataframes = load_dataframes_from_schemas(sql_context)

In [4]:
dataframes.keys()

dict_keys(['deporte', 'deportista', 'evento', 'juegos', 'paises', 'resultados', 'deportista_error'])

# Class - DF and replication

In [5]:
medallas_x_anio = dataframes["deportista"].join(
    dataframes["resultados"],
    dataframes["deportista"].deportista_id == dataframes["resultados"].deportista_id,
    "left"
).join(
    dataframes["juegos"],
    dataframes["juegos"].juego_id == dataframes["resultados"].juego_id,
    "left"
).join(
    dataframes["paises"],
    dataframes["paises"].pais_id == dataframes["deportista"].equipo_id,
    "left"
).join(
    dataframes["evento"],
    dataframes["evento"].evento_id == dataframes["resultados"].evento_id,
    "left"
).join(
    dataframes["deporte"],
    dataframes["evento"].deporte_id == dataframes["deporte"].deporte_id,
    "left"
).select(
    "sigla", "anio", "medalla",
    dataframes["evento"].evento.alias("Nombre subdisciplina"),
    dataframes["deporte"].deporte.alias("Nombre disciplina"),
    dataframes["deportista"].nombre
)


In [6]:
medallas_x_anio.show()

21/08/21 19:47:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, sigla
 Schema: pais_id, sigla
Expected: pais_id but found: id
CSV file: file:///home/jovyan/work/files/paises.csv
21/08/21 19:47:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/files/juegos.csv


+-----+----+-------+--------------------+--------------------+--------------------+
|sigla|anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+----+-------+--------------------+--------------------+--------------------+
|  CHN|1992|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|2012|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|1920|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|1900|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1988|     NA|Speed Skating Wom...|       Speed Skating|Christine Jaco

In [7]:
medallas_x_anio_2 = (medallas_x_anio
     .filter(medallas_x_anio.medalla != "NA")
     .sort("anio")
     .groupBy("sigla", "anio", "Nombre subdisciplina")
     .count()
)

In [8]:
medallas_x_anio_2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [9]:
medallas_x_anio_2.show(5)

21/08/21 19:47:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/files/juegos.csv
21/08/21 19:47:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, sigla
 Schema: pais_id, sigla
Expected: pais_id but found: id
CSV file: file:///home/jovyan/work/files/paises.csv
[Stage 11:>                                                         (0 + 1) / 1]

+-----+----+--------------------+-----+
|sigla|anio|Nombre subdisciplina|count|
+-----+----+--------------------+-----+
|  MEX|1984|Wrestling Men's F...|    1|
|  FIN|1960|Cross Country Ski...|    4|
|  CAN|2010|Snowboarding Men'...|    1|
|  YUG|1984|Water Polo Men's ...|   13|
|  RUS|2012|Athletics Women's...|    6|
+-----+----+--------------------+-----+
only showing top 5 rows



                                                                                

In [10]:
(medallas_x_anio_2
    .groupBy("sigla", "anio")
    .agg(
        _sum("count").alias("Total de medallas"),
        _avg("count").alias("Promedio medallas")
    )
    .show()
)

21/08/21 19:47:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, sigla
 Schema: pais_id, sigla
Expected: pais_id but found: id
CSV file: file:///home/jovyan/work/files/paises.csv
21/08/21 19:47:12 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , annio
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/jovyan/work/files/juegos.csv
                                                                                

+-----+----+-----------------+------------------+
|sigla|anio|Total de medallas| Promedio medallas|
+-----+----+-----------------+------------------+
|  USA|2012|              248|2.7252747252747254|
|  FRA|2006|               15|1.6666666666666667|
|  KOR|2010|               18|               1.5|
|  FIN|1988|               38|               3.8|
|  BLR|2000|               15|               1.5|
|  VEN|2012|                1|               1.0|
|  FRA|1948|               77|2.3333333333333335|
|  GBR|2000|               55|1.9642857142857142|
|  FRG|1994|                6|               1.0|
|  JPN|1932|               31|2.5833333333333335|
|  QAT|2012|                2|               1.0|
|  KOR|1972|                1|               1.0|
|  NED|1972|               15|1.3636363636363635|
|  GER|1932|               57|              2.28|
|  NZL|1988|               24|1.8461538461538463|
|  AUS|1972|               20|1.1764705882352942|
|  THA|1988|                1|               1.0|


# SQL

In [11]:
dataframes["resultados"].registerTempTable("resultados")
dataframes["deportista"].registerTempTable("deportista")
dataframes["paises"].registerTempTable("paises")

In [12]:
sql_context.sql("""SELECT * FROM deportista""").show()

+-------------+--------------------+------+----+------+-----+---------+
|deportista_id|              nombre|genero|edad|altura| peso|equipo_id|
+-------------+--------------------+------+----+------+-----+---------+
|            1|           A Dijiang|     1|  24|   180| 80.0|      199|
|            2|            A Lamusi|     1|  23|   170| 60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0|  0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0|  0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185| 82.0|      705|
|            6|     Per Knut Aaland|     1|  31|   188| 75.0|     1096|
|            7|        John Aalberg|     1|  31|   183| 72.0|     1096|
|            8|Cornelia Cor Aalt...|     2|  18|   168|  0.0|      705|
|            9|    Antti Sami Aalto|     1|  26|   186| 96.0|      350|
|           10|Einar Ferdinand E...|     1|  26|     0|  0.0|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182| 76.5|   

In [13]:
sql_context.sql("""
    SELECT medalla, equipo, sigla
    FROM resultados r
        INNER JOIN deportista d
        ON r.deportista_id = d.deportista_id
        INNER JOIN paises p
        ON p.pais_id = d.equipo_id
    WHERE medalla <> 'NA'
    ORDER BY sigla DESC
""").show()

21/08/21 19:47:19 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: id, equipo, sigla
 Schema: pais_id, equipo, sigla
Expected: pais_id but found: id
CSV file: file:///home/jovyan/work/files/paises.csv


+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



In [14]:
spark

- SQL puede llegar a ser mas rapido, pero requiere de un mayor poder de procesamiento.
- Para sacar el mayor provecho de Spark, se recomienda usar las funciones nativas.

# UDF

In [15]:
dataframes["deportista_error"].show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|  null|null|      273|
|            4|Edgar Lindenau Aabye|     1|  34|  null|null|      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|null|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|  null|null|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [26]:
convert_integer = lambda value: int(value) if value and value.isdigit() else 0
convert_float = lambda value: float(value) if value else 0

In [27]:
convert_integer_udf = udf(lambda z: convert_integer(z), IntegerType())
convert_float_udf = udf(lambda z: convert_float(z), FloatType())
sql_context.udf.register("convert_integer_udf", convert_integer_udf)
sql_context.udf.register("convert_float_udf", convert_float_udf)

21/08/21 20:59:36 WARN SimpleFunctionRegistry: The function convert_integer_udf replaced a previously registered function.
21/08/21 20:59:36 WARN SimpleFunctionRegistry: The function convert_float_udf replaced a previously registered function.


<function __main__.<lambda>(z)>

In [29]:
dataframes["deportista_error"].select(
    convert_integer_udf("altura").alias("altura")
).show()

+------+
|altura|
+------+
|   180|
|   170|
|     0|
|     0|
|   185|
|   188|
|   183|
|   168|
|   186|
|     0|
|   182|
|   172|
|   159|
|   171|
|     0|
|   184|
|   175|
|   189|
|     0|
|   176|
+------+
only showing top 20 rows



Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError


In [None]:
spark.stop()