In [9]:
# Importando las librerias para el ejercicio.
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import *

In [10]:
# Creando el contexto que funciona como punto de conexion para todo spark.
spark = SparkContext(master="local", appName="DataFrames")
sqlContext = SQLContext(spark)

In [11]:
# Creando una variable que guarde la ruta de los archivos de datos.
#path = "/home/jovyan/work/files/"
path = "/Users/giova/OneDrive/Escritorio/Curses/platzi/fundamentos-spark-big-data/files/"

In [12]:
# Consultando la ruta para ubicar los archivos.
#!ls /home/jovyan/work/files

In [13]:
# Creando Schema para los campos del RDD.
Schema_juegosRDD = StructType([
    StructField("juego_id", IntegerType(),False),
    StructField("nombre_juego", StringType(),False),
    StructField("anio", IntegerType(),False),
    StructField("temporada", StringType(),False),
    StructField("ciudad", StringType(),False)
])


# Indicando lectura para cargar el archivo.
juegoDF = sqlContext.read.schema(Schema_juegosRDD).option("header","true").csv(path + "juegos.csv")

# Visualizando el dataframe con show().
juegoDF.show(4)

+--------+------------+----+---------+---------+
|juego_id|nombre_juego|anio|temporada|   ciudad|
+--------+------------+----+---------+---------+
|       1| 1896 Verano|1896|   Verano|   Athina|
|       2| 1900 Verano|1900|   Verano|    Paris|
|       3| 1904 Verano|1904|   Verano|St. Louis|
|       4| 1906 Verano|1906|   Verano|   Athina|
+--------+------------+----+---------+---------+
only showing top 4 rows



In [14]:
# Creando DataFrames de archivos .data
autoDataRDD = spark.textFile(path+"auto-mpg.data").map(lambda l : l.replace("\\s",""))
imports85RDD = spark.textFile(path+"imports-85.data").map(lambda l : l.replace("\\s",""))
synthetic_controlRDD = spark.textFile(path+"synthetic_control.data").map(lambda l : l.replace("\\s",""))

In [15]:
#Consultando el schema de un Dataframe.
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- nombre_juego: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- temporada: string (nullable = true)
 |-- ciudad: string (nullable = true)



In [16]:
# Creando funcion para eliminar encabezados
def eliminaEncabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [17]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo deportista.csv.
deportistaOlimpicoRDD = spark.textFile(path+"deportista.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado.
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

# Creando el Schema para los campos del RDD
schemadeportistaOlimpicoRDD = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

# Visualizando la transformación del Dataframe deportistaDF.
#deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schemadeportistaOlimpicoRDD)
deportistaOlimpicoDF = sqlContext.read.schema(schemadeportistaOlimpicoRDD).option("header","true").csv(path + "deportista.csv")

# Visualizando el dataframe con show().
deportistaOlimpicoDF.show(4)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
+-------------+--------------------+------+----+------+----+---------+
only showing top 4 rows



In [18]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo data.csv
dataRDD = spark.textFile(path+"data.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado.
dataRDD = dataRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
dataRDD = dataRDD.map(lambda l : (
l[0],
l[1],
int(l[2])
))

# Creando Schema para los campos del RDD.
schemadataRDD = StructType([
    StructField("Estado",StringType(),False),
    StructField("color",StringType(),False),
    StructField("contador",IntegerType(),False)
])

# Visualizando la transformación del Dataframe.
#dataDF = sqlContext.createDataFrame(dataRDD,schemadataRDD)
dataDF = sqlContext.read.schema(schemadataRDD).option("header","true").csv(path + "data.csv")

In [19]:
dataDF.show(4)

+------+-----+--------+
|Estado|color|contador|
+------+-----+--------+
|    TX|  Red|      20|
|    NV| Blue|      66|
|    CO| Blue|      79|
|    OR| Blue|      71|
+------+-----+--------+
only showing top 4 rows



In [20]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo deporte.csv
deporteRDD = spark.textFile(path+"deporte.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado.
deporteRDD = deporteRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
deporteRDD = deporteRDD.map(lambda l : (
int(l[0]),
l[1]
))

# Creando Schema para los campos del RDD.
schema_deporteRDD = StructType([
    StructField("deporte_Id",IntegerType(),False),
    StructField("deporte",StringType(),False)
])

# Visualizando la transformación del Dataframe.
#deporteDF = sqlContext.createDataFrame(deporteRDD,schema_deporteRDD)
deporteDF = sqlContext.read.schema(schema_deporteRDD).option("header","true").csv(path + "deporte.csv")

In [21]:
deporteDF.show(4)

+----------+----------+
|deporte_Id|   deporte|
+----------+----------+
|         1|Basketball|
|         2|      Judo|
|         3|  Football|
|         4|Tug-Of-War|
+----------+----------+
only showing top 4 rows



In [22]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo deportista2.csv
deportista2RDD = spark.textFile(path+"deportista2.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado.
deportista2RDD = deportista2RDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
deportista2RDD = deportista2RDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4]),
float(l[5]),
int(l[6])
))

# Creando Schema para los campos del RDD.
schema_deportista2RDD = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

# Visualizando la transformación del Dataframe.
#deportista2DF = sqlContext.createDataFrame(deportista2RDD,schema_deportista2RDD)
deportista2DF = sqlContext.read.schema(schema_deportista2RDD).option("header","true").csv(path + "deportista2.csv")

In [23]:
deportista2DF.show(4)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|        67788|            Lee BuTi|     1|  23|   164|54.0|      203|
|        67789|Anthony N. Buddy Lee|     1|  34|   172|62.0|     1096|
|        67790|Alfred A. Butch L...|     1|  19|   186|80.0|      825|
|        67791|        Lee ByeongGu|     1|  22|   175|68.0|      970|
+-------------+--------------------+------+----+------+----+---------+
only showing top 4 rows



In [24]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo evento.csv
eventoRDD = spark.textFile(path+"evento.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado.
eventoRDD = eventoRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
eventoRDD = eventoRDD.map(lambda l : (
int(l[0]),
l[1],
l[2]
))

# Creando Schema para los campos del RDD.
schema_eventoRDD = StructType([
    StructField("evento_id",IntegerType(),False),
    StructField("tipo_evento",StringType(),False),
    StructField("id_deporte",StringType(),False)
])
# Visualizando la transformación del Dataframe.
#eventoRDD = sqlContext.createDataFrame(eventoRDD,schema_eventoRDD)
deportesOlimpicosDF = sqlContext.read.schema(schema_eventoRDD).option("header","true").csv(path + "evento.csv")

In [25]:
deportesOlimpicosDF.show(4)

+---------+--------------------+----------+
|evento_id|         tipo_evento|id_deporte|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
+---------+--------------------+----------+
only showing top 4 rows



In [26]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo juegos.csv
juegosRDD = spark.textFile(path+"juegos.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado
juegosRDD = juegosRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
juegosRDD = juegosRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
l[3],
l[4]
))

# Creando Schema para los campos del RDD.
Schema_juegosRDD = StructType([
    StructField("juego_id", IntegerType(),False),
    StructField("nombre_juego", StringType(),False),
    StructField("anio", IntegerType(),False),
    StructField("temporada", StringType(),False),
    StructField("ciudad", StringType(),False)
])

# Visualizando la transformación del Dataframe.
#juegosRDD = sqlContext.createDataFrame(juegosRDD,Schema_juegosRDD)
juegosDF = sqlContext.read.schema(Schema_juegosRDD).option("header","true").csv(path + "juegos.csv")

In [27]:
juegosDF.show(4)

+--------+------------+----+---------+---------+
|juego_id|nombre_juego|anio|temporada|   ciudad|
+--------+------------+----+---------+---------+
|       1| 1896 Verano|1896|   Verano|   Athina|
|       2| 1900 Verano|1900|   Verano|    Paris|
|       3| 1904 Verano|1904|   Verano|St. Louis|
|       4| 1906 Verano|1906|   Verano|   Athina|
+--------+------------+----+---------+---------+
only showing top 4 rows



In [28]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo paises.csv
paisesRDD = spark.textFile(path+"paises.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado
paisesRDD = paisesRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
paisesRDD = paisesRDD.map(lambda l : (
int(l[0]),
l[1],
l[2]
))

# Creando Schema para los campos del RDD.
Schema_paisesRDD = StructType([
    StructField("pais_id", IntegerType(),False),
    StructField("nombre_equipo", StringType(),False),
    StructField("sigla_equipo", StringType(),False)
])

# Visualizando la transformación del Dataframe.
#paisesRDD = sqlContext.createDataFrame(paisesRDD,Schema_paisesRDD)
paisesDF = sqlContext.read.schema(Schema_paisesRDD).option("header","true").csv(path + "paises.csv")

In [29]:
paisesDF.show(4)

+-------+--------------------+------------+
|pais_id|       nombre_equipo|sigla_equipo|
+-------+--------------------+------------+
|      1|         30. Februar|         AUT|
|      2|A North American ...|         MEX|
|      3|           Acipactli|         MEX|
|      4|             Acturus|         ARG|
+-------+--------------------+------------+
only showing top 4 rows



In [30]:
# Creando el Dataframes a partir de los RDD correspondiente al archivo resultados.csv
resultadosRDD = spark.textFile(path+"resultados.csv").map(lambda l : l.split(","))

# Pasando El RDD a la funcion creada para eliminar el encabezado
resultadosRDD = resultadosRDD.mapPartitionsWithIndex(eliminaEncabezado)

# Transformando los valores del RDD.
resultadosRDD = resultadosRDD.map(lambda l : (
int(l[0]),
l[1],
int(l[2]),
int(l[3]),
int(l[4])
))

# Creando Schema para los campos del RDD.
schema_resultadosRDD = StructType([
    StructField("resultado_id",IntegerType(),False),
    StructField("medalla",StringType(),False),
    StructField("deportista_id",IntegerType(),False),
    StructField("juego_id",IntegerType(),False),
    StructField("evento_id",IntegerType(),False)
])

# Visualizando la transformación del Dataframe.
#resultadosRDD = sqlContext.createDataFrame(resultadosRDD,schema_resultadosRDD)
resultadosDF = sqlContext.read.schema(schema_resultadosRDD).option("header","true").csv(path + "resultados.csv")

In [31]:
resultadosDF.show(4)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
+------------+-------+-------------+--------+---------+
only showing top 4 rows



In [32]:
# La funcion printschema() permite visualizar el esquema que tiene el DF.
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [33]:
# La funcion withColumnRenamed("","").drop("") permite borrar o renombrar columnas.
#resultadosDF = resultadosDF.withColumnRenamed("medalla", "medallota")
#resultadosDF.printSchema()

In [34]:
# Generando consultas con funcion select
#deportistaOlimpicoDF = deportistaOlimpicoDF.select("deportista_id", "nombre", col("edad").alias("edadAlJugar"),"equipo_id")
#deportistaOlimpicoDF.show(5)

In [35]:
# Generando consultas con funcion filter
#deportistaOlimpicoDF = deportistaOlimpicoDF.filter(deportistaOlimpicoDF.edadAlJugar != 0)
#deportistaOlimpicoDF.show(5)

In [36]:
# Consultando el deportista mas joven de los juegos olimpicos
#deportistaOlimpicoDF.sort("edadAlJugar").show()

In [37]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [38]:
# Creando consultas usando Join para identificar los deportistas que participaron en las olimpiadas con sus disciplinas.
deportistaOlimpicoDF.join(resultadosDF, deportistaOlimpicoDF.deportista_id == resultadosDF.deportista_id, "left") \
    .join(juegosDF, juegosDF.juego_id == resultadosDF.juego_id, "left") \
    .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadosDF.evento_id, "left") \
    .select(deportistaOlimpicoDF.nombre, col("edad").alias("Edad al jugar"), 
            "medalla", col("anio").alias("Año de Juego"), 
            deportesOlimpicosDF.tipo_evento.alias("Nombre disciplina")).show()

+--------------------+-------------+-------+------------+--------------------+
|              nombre|Edad al jugar|medalla|Año de Juego|   Nombre disciplina|
+--------------------+-------------+-------+------------+--------------------+
|           A Dijiang|           24|     NA|        1992|Basketball Men's ...|
|            A Lamusi|           23|     NA|        2012|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|           24|     NA|        1920|Football Men's Fo...|
|Edgar Lindenau Aabye|           34|   Gold|        1900|Tug-Of-War Men's ...|
|Christine Jacoba ...|           21|     NA|        1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1994|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1992|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        1988|Speed Skating Wom...|
|Christine Jacoba ...|           21|     NA|        

In [39]:
# Reto de consulta para obtener como resultado los deportistas
resultadosDF.filter(resultadosDF.medalla != "NA") \
    .join(deportistaOlimpicoDF, deportistaOlimpicoDF.deportista_id == resultadosDF.deportista_id, "left") \
    .join(paisesDF, paisesDF.pais_id == deportistaOlimpicoDF.equipo_id, "left") \
    .select("medalla", "nombre_equipo", "sigla_equipo") \
    .sort(col("sigla_equipo").desc()).show()

+-------+-------------+------------+
|medalla|nombre_equipo|sigla_equipo|
+-------+-------------+------------+
| Silver|     Zimbabwe|         ZIM|
| Silver|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
| Silver|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
| Silver|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
|   Gold|     Zimbabwe|         ZIM|
| Bronze|     Zimbabwe|         ZIM|
| Bronze|   Yugoslavia|         YUG|
|   Gold|   Yugoslavia|         YUG|
| Silver|   Yugoslavia|         YUG|
| Silver|   Yugoslavia|         YUG|
| Silver|   Yugoslavia|         YUG|
+-------+-------------+------------+
only showing top 20 rows



In [40]:
# Funciones de Agregación. reto hacer los respectivos cruces para obtener todos los medallistas que han jugado en las olimpiadas, 
# mostrar equipo, medalla, el nombre del jugador y la sub disiplina de competencia.

medallistaXAnio = deportistaOlimpicoDF \
    .join(resultadosDF, deportistaOlimpicoDF.deportista_id == resultadosDF.deportista_id, "left") \
    .join(juegosDF, juegosDF.juego_id == resultadosDF.juego_id, "left") \
    .join(paisesDF, deportistaOlimpicoDF.equipo_id == paisesDF.pais_id, "left") \
    .join(deportesOlimpicosDF, deportesOlimpicosDF.evento_id == resultadosDF.evento_id, "left") \
    .join(deporteDF, deportistaOlimpicoDF.deportista_id == deporteDF.deporte_Id, "left") \
    .select("sigla_equipo", "anio", "medalla", deportesOlimpicosDF.tipo_evento.alias("Nombre subdisciplina"),
        deporteDF.deporte.alias("Nombre disciplina"), deportistaOlimpicoDF.nombre)

medallistaXAnio.show()

+------------+----+-------+--------------------+--------------------+--------------------+
|sigla_equipo|anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+------------+----+-------+--------------------+--------------------+--------------------+
|         CHN|1992|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|         CHN|2012|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|         DEN|1920|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|         SWE|1900|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|         NED|1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|         NED|1994|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|         NED|1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|         NED|1992|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|

In [41]:
# Realizando una agrupacion en el DF creado en el punto anterior y guardando resultado en medallistaXAnio2.
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != "NA") \
    .sort("anio") \
    .groupBy("sigla_equipo", "anio", "Nombre subdisciplina") \
    .count()

In [42]:
medallistaXAnio2.printSchema()

root
 |-- sigla_equipo: string (nullable = true)
 |-- anio: integer (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [43]:
# Realizando funciones de agrupaciones para mostrar el total de medallas y promedio.
medallistaXAnio2.groupBy("sigla_equipo", "anio") \
    .agg(sum("count").alias("Total medallas"), \
        avg("count").alias("Medallas promedio")).show()

+------------+----+--------------+------------------+
|sigla_equipo|anio|Total medallas| Medallas promedio|
+------------+----+--------------+------------------+
|         USA|2012|           121|1.9836065573770492|
|         FRA|2006|            12|1.3333333333333333|
|         BLR|2000|             9|               1.8|
|         FIN|1988|            10|               2.5|
|         KOR|2010|             3|               1.5|
|         FRA|1948|            52|              2.08|
|         GBR|2000|            30|1.5789473684210527|
|         QAT|2012|             2|               1.0|
|         JPN|1932|            11|               2.2|
|         FRG|1994|             2|               1.0|
|         NED|1972|             7|               1.4|
|         GER|1932|            35|1.8421052631578947|
|         NZL|1988|            11|             1.375|
|         AUS|1972|            13|1.1818181818181819|
|         BAH|2008|             2|               2.0|
|         SWE|1968|         

In [44]:
# Registrar Data Fraames como SQL para interactuar con SQL.
resultadosDF.registerTempTable("resultado")
deportistaOlimpicoDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")

In [45]:
# Consultando los Data Frame transformados en tablas con los nombres de (resultado, deportista, paises)
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [46]:
sqlContext.sql("SELECT * FROM resultado").show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [47]:
sqlContext.sql("SELECT * FROM paises").show(5)

+-------+--------------------+------------+
|pais_id|       nombre_equipo|sigla_equipo|
+-------+--------------------+------------+
|      1|         30. Februar|         AUT|
|      2|A North American ...|         MEX|
|      3|           Acipactli|         MEX|
|      4|             Acturus|         ARG|
|      5|         Afghanistan|         AFG|
+-------+--------------------+------------+
only showing top 5 rows



In [48]:
# Realizando consultas con SQL.
sqlContext.sql("""
                SELECT medalla,nombre_equipo,sigla_equipo FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.equipo_id
                JOIN paises p
                ON p.pais_id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla_equipo DESC
                """).show()

+-------+-------------+------------+
|medalla|nombre_equipo|sigla_equipo|
+-------+-------------+------------+
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
|   Gold|       Zambia|         ZAM|
+-------+-------------+------------+
only showing top 20 rows



In [65]:
# Generando RDD y con la funcion lambda separamos los valores por coma ","
deportistaError = spark.textFile(path+"deportistaError.csv") \
    .map( lambda l : l.split(","))

In [68]:
# Eliminamos los encabezados con la funcion eliminaEncabezado.
deportistaError  = deportistaError.mapPartitionsWithIndex(eliminaEncabezado)

# Mapeando los valores de deportistaError
deportistaError = deportistaError.map(lambda l : (
l[0],
l[1],
l[2],
l[3],
l[4],
l[5],
l[6]))

# Creando el Schema para los campos del RDD
schemadeportistaError = StructType([
    StructField("deportista_id",StringType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",StringType(),False),
    StructField("edad",StringType(),False),
    StructField("altura",StringType(),False),
    StructField("peso",StringType(),False),
    StructField("equipo_id",StringType(),False)
])

# Creando Dataframe con los parametros de mapeo y schema.
#deportistaErrorDF = sqlContext.createDataFrame(deportistaError, schemadeportistaError)
deportistaErrorDF = sqlContext.read.schema(schemadeportistaError).option("header","true").csv(path + "deportistaError.csv")

# Visualizando el dataframe con show().
deportistaErrorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|  null|null|      273|
|            4|Edgar Lindenau Aabye|     1|  34|  null|null|      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|null|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|  null|null|      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [72]:
# Generando funcion UDF y registrando la funcion.
from pyspark.sql.functions import udf

def conversionEnteros(valor):
    return int(valor) if len(valor) > 0 else None

conversionEnteros_udf = udf(lambda z: conversionEnteros(z),IntegerType())
sqlContext.udf.register("conversionEnteros_udf",conversionEnteros_udf)

<function __main__.<lambda>(z)>

In [74]:
deportistaErrorDF.select(conversionEnteros_udf("altura")).show()

Py4JJavaError: An error occurred while calling o428.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 250) (host.docker.internal executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 24 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.GeneratedMethodAccessor70.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 24 more
