In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

In [2]:
spark = SparkContext(master="local", appName="DataFrames")
sqlContext = SQLContext(spark)

In [3]:
!ls ./data

data.csv	 deportista.csv       juegos.csv	     resultados.csv
deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv


In [4]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("anio", StringType(), False),
    StructField("temporada",StringType(),False),
    StructField("ciudad", StringType(), False)
])

juegoDF = sqlContext.read.schema(juegoSchema)\
            .option("header", "true").csv("./data/juegos.csv")

In [5]:
juegoDF.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [6]:
spark


In [24]:
deportistaOlimpicoRDD = spark.textFile("./data/deportista.csv") \
    .map(lambda l: l.split(","))
deportistaolimpicoRDD2 = spark.textFile("./data/deportista2.csv") \
    .map(lambda l: l.split(","))
# Concatenacion
deportistaOlimpicoRDD = deportistaOlimpicoRDD.union(deportistaolimpicoRDD2)
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [25]:
def delete_header(index, iterator):
    return iter(list(iterator)[1:])

In [26]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD \
                        .mapPartitionsWithIndex(delete_header)

In [27]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [32]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
int(l[0]),l[1],int(l[2]),int(l[3]),int(l[4]),float(l[5]),int(l[6])))

In [33]:
schema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])

In [66]:
deportistaOlimpicoDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [67]:
deportistaOlimpicoDF.show(10)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
|            6|     Per Knut Aaland|     1|  31|   188|75.0|     1096|
|            7|        John Aalberg|     1|  31|   183|72.0|     1096|
|            8|Cornelia Cor Aalt...|     2|  18|   168| 0.0|      705|
|            9|    Antti Sami Aalto|     1|  26|   186|96.0|      350|
|           10|Einar Ferdinand E...|     1|  26|     0| 0.0|      350|
+-------------+--------------------+------+----+------+----+---------+
only s

In [38]:
!ls ./data

data.csv	 deportista.csv       juegos.csv	     resultados.csv
deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv


In [53]:
#! cat ./data/evento.csv

In [45]:
schema_evento = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("evento", StringType(), False),
    StructField("deporte_id", IntegerType(), False),
])

eventoDF = sqlContext.read.format("csv")\
                .option("header", True)\
                .schema(schema_evento)\
                .load("./data/evento.csv")
    
eventoDF.show(5)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
+---------+--------------------+----------+
only showing top 5 rows



In [52]:
#! cat ./data/paises.csv

In [49]:
schema_paises = StructType([
    StructField("id", IntegerType(), False),
    StructField("equipo", StringType(), False),
    StructField("sigla", StringType(), False),
])

paisesDF = sqlContext.read.format("csv")\
                .option("header", True)\
                .schema(schema_paises)\
                .load("./data/paises.csv")
    
paisesDF.show(5)

+---+--------------------+-----+
| id|              equipo|sigla|
+---+--------------------+-----+
|  1|         30. Februar|  AUT|
|  2|A North American ...|  MEX|
|  3|           Acipactli|  MEX|
|  4|             Acturus|  ARG|
|  5|         Afghanistan|  AFG|
+---+--------------------+-----+
only showing top 5 rows



In [51]:
# ! cat ./data/resultados.csv

In [50]:
schema_resultados = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False),
])

resultadosDF = sqlContext.read.format("csv")\
                .option("header", True)\
                .schema(schema_resultados)\
                .load("./data/resultados.csv")
    
resultadosDF.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [61]:
schema_deporte = StructType([
    StructField("id", IntegerType(), False),
    StructField("deporte", StringType(), False),
])

deportesDF = sqlContext.read.format("csv")\
                .option("header", True)\
                .schema(schema_deporte)\
                .load("./data/deporte.csv")
    
deportesDF.show(5)

+---+-------------+
| id|      deporte|
+---+-------------+
|  1|   Basketball|
|  2|         Judo|
|  3|     Football|
|  4|   Tug-Of-War|
|  5|Speed Skating|
+---+-------------+
only showing top 5 rows



In [68]:
deporteDF.printSchema()

root
 |-- id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [69]:
deportistaOlimpicoDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- genero: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- altura: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [70]:
deportistaOlimpicoDF = deportistaOlimpicoDF.withColumnRenamed("genero","sexo").drop("altura")
deportistaOlimpicoDF.printSchema()


root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [72]:
from pyspark.sql.functions import *

deportistaOlimpicoDF = deportistaOlimpicoDF.select("deportista_id", 
                            "nombre", 
                            col("edad").alias("edadAlJugar"),"equipo_id")

In [75]:
deportistaOlimpicoDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [80]:
deportistaOlimpicoDF.filter(
    deportistaOlimpicoDF.edadAlJugar != 0).sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        70616|          Liu Luyang|         11|      199|
|       118925|Megan Olwen Deven...|         11|      413|
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        47618|Sonja Henie Toppi...|         11|      742|
|        76675|   Marcelle Matthews|         11|      967|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|       126307|        Liana Vicens|         11|      825|
|        48939|             Ho Gang|         12|      738|
|        49142|        Jan Hoffmann|         12|      302|
|        42835|   Werner Grieshofer|         12|       7