In [47]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

In [35]:
spark = SparkContext(master='local', appName='DataFrames')
sql_context = SQLContext(spark)

In [7]:
!ls 'files'

evento.csv
juegos.csv
modelo_relacional.jpg
paises.csv
resultados.csv
deporte.csv
deportista.csv
deportista2.csv
deportistaError.csv
evento.csv
juegos.csv
modelo_relacional.jpg
paises.csv
resultados.csv


In [8]:
path = "files/juegos.csv"

In [36]:
juego_schema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("anio", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

juego_df = sql_context.read.schema(juego_schema).option("header", "true").csv(path)
juego_df.show(4)

In [13]:
juego_df.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [14]:
spark

In [71]:
deportista_olimpico_rdd = spark.textFile('files/deportista2.csv').map(lambda l: l.split(','))

In [38]:
deportista_olimpico_rdd.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [39]:
def elimina_encabezado(indice, iterador):
    return iter(list(iterador)[1:])

In [72]:
deportista_olimpico_rdd = deportista_olimpico_rdd.mapPartitionsWithIndex(elimina_encabezado)

In [41]:
deportista_olimpico_rdd.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [73]:
deportista_olimpico_rdd = deportista_olimpico_rdd.map(lambda l: (
    int(l[0]), l[1], int(l[2]), int(l[3]), int(l[4]), float(l[5]), int(l[6])
))
schema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False),
])
deportista_df = sql_context.createDataFrame(deportista_olimpico_rdd, schema)

In [74]:
deportista_df.show(4)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|        67788|            Lee BuTi|     1|  23|   164|54.0|      203|
|        67789|Anthony N. Buddy Lee|     1|  34|   172|62.0|     1096|
|        67790|Alfred A. Butch L...|     1|  19|   186|80.0|      825|
|        67791|        Lee ByeongGu|     1|  22|   175|68.0|      970|
+-------------+--------------------+------+----+------+----+---------+
only showing top 4 rows



In [49]:
paises_rdd = spark.textFile("files/paises.csv").map(lambda l: l.split(","))
paises_rdd = paises_rdd.mapPartitionsWithIndex(elimina_encabezado)
paises_rdd = paises_rdd.map(lambda l: (int(l[0]), l[1], l[2]))

schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("equipo", StringType(), False),
    StructField("sigle", StringType(), False),
])

paises_df = sql_context.createDataFrame(paises_rdd, schema)

In [50]:
paises_df.show(3)

+---+--------------------+-----+
| id|              equipo|sigle|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
+---+--------------------+-----+
only showing top 3 rows



In [60]:
resultados_schema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])

resultados_df = sql_context.read.format("csv").\
            option("header", True).\
            schema(resultados_schema).\
            load("files/resultados.csv")

resultados_df.show(4)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
+------------+-------+-------------+--------+---------+
only showing top 4 rows



In [55]:
deporte_schema = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("deporte", StringType(), False),
])
deporte_df = sql_context.read.format("csv").\
        option("header", True).\
        schema(deporte_schema).\
        load("files/deporte.csv")
deporte_df.show(4)

+----------+----------+
|deporte_id|   deporte|
+----------+----------+
|         1|Basketball|
|         2|      Judo|
|         3|  Football|
|         4|Tug-Of-War|
+----------+----------+
only showing top 4 rows



In [56]:
evento_schema = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("evento", StringType(), False), 
    StructField("deporte_id", IntegerType(), False)
])
evento_df = sql_context.read.format("csv").\
        option("header", True).\
        schema(evento_schema).\
        load("files/evento.csv")

evento_df.show(4)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
+---------+--------------------+----------+
only showing top 4 rows



In [61]:
deporte_df.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [62]:
deportista_df.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [75]:
deportista_df_2 = deportista_df.withColumnRenamed("genero", "sexo").drop("alture")

In [76]:
deportista_df_2.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [77]:
from pyspark.sql.functions import *
consulta_deportista = deportista_df_2.select("deportista_id", "nombre", col("edad").alias("edadAlJugar"), "equipo_id")

In [78]:
consulta_deportista.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        67788|            Lee BuTi|         23|      203|
|        67789|Anthony N. Buddy Lee|         34|     1096|
|        67790|Alfred A. Butch L...|         19|      825|
|        67791|        Lee ByeongGu|         22|      970|
|        67792|       Lee ByeongGyu|         21|      970|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [79]:
consulta_deportista = consulta_deportista.filter((consulta_deportista.edadAlJugar != 0))

In [80]:
consulta_deportista.sort("edadAlJugar").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|       118925|Megan Olwen Deven...|         11|      413|
|        70616|          Liu Luyang|         11|      199|
|       126307|        Liana Vicens|         11|      825|
|        76675|   Marcelle Matthews|         11|      967|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [33]:
spark.stop()