# Datasets Spark

Esos componentes fueron agregados en la versión 1.3 de Spark y pueden ser invocados con el contexto espacial de Spark SQL. Como indica su nombre, es un módulo especialmente desarrollado para ser ejecutado con instrucciones parecidas al SQL estándar.

De la misma forma, como los RDD, estos pueden ser creados a partir de archivos, tablas tipo Hive, bases de datos externas y RDD o DataFrames existentes.

El primer detalle que salta cuando creamos un DataFrame es que poseen columnas nombradas, lo que a nivel conceptual es como trabajar con un DataFrame de Pandas. Con la excepción que a nivel interno Spark trabaja con Scala, lo cual le asigna a cada columna el tipo de dato Row, un tipo especial de objeto sin tipo definido.

Pero no es todo, los DataFrames implementan un sistema llamado Catalyst, el cual es un motor de optimización de planes de ejecución, parecido al que usan las bases de datos, pero diseñado para la cantidad de datos propia de Spark, aunado a eso, se tiene implementado un optimizador de memoria y consumo de CPU llamado Tungsten, el cual determina cómo se convertirán los planes lógicos creados por Catalyst a un plan físico.

###### Mini resumen de características:

<ol>
    <li><p>Permite procesar como un tabla de database</p></li>
    <li><p>Poseen una estructura y pueden ser creados como Dataframes</p></li>
    <li><p>Mejor optimización (Sistema de consulta Catalyst y motor de ejecución Tungsten)</p></li>
</ol>

In [7]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
from pyspark.sql import SQLContext

In [8]:
spark = SparkContext(master="local", appName="Dataframes")
sqlContext = SQLContext(spark)

### Creando DF desde cero.

In [9]:
!ls ./data/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [10]:
!head -n 5 ./data/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [11]:
path = "./data/"

game_schema = StructType([
    StructField("game_id", IntegerType(), False),
    StructField("year", StringType(), False),
    StructField("season", StringType(), False),
    StructField("city", StringType(), False)
])

game_df = sqlContext.read.schema(game_schema) \
    .option("header", "true").csv(path+"juegos.csv")

In [12]:
game_df.show(4) #Como tenemos un formato creado, podemos usar Show()

+-------+-----------+------+------+
|game_id|       year|season|  city|
+-------+-----------+------+------+
|      1|1896 Verano|  1896|Verano|
|      2|1900 Verano|  1900|Verano|
|      3|1904 Verano|  1904|Verano|
|      4|1906 Verano|  1906|Verano|
+-------+-----------+------+------+
only showing top 4 rows



In [13]:
spark

### Creando dataframes a partir de un RDD

In [14]:
deportistaOlimpicoRDD = spark.textFile(path+"deportista.csv")\
    .map(lambda l : l.split(","))

In [15]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [16]:
# Retiramos encabezado
def removeHeader(index, iterator):
    return iter(list(iterator)[1:])

In [17]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(removeHeader)

In [18]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [19]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [20]:
schema = StructType([
    StructField("sport_player", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("gender", IntegerType(), False),
    StructField("age", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("weight", FloatType(), False),
    StructField("team_id", IntegerType(), False)
])

In [21]:
sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)

+------------+--------------------+------+---+------+------+-------+
|sport_player|                name|gender|age|height|weight|team_id|
+------------+--------------------+------+---+------+------+-------+
|           1|           A Dijiang|     1| 24|   180|  80.0|    199|
|           2|            A Lamusi|     1| 23|   170|  60.0|    199|
|           3| Gunnar Nielsen Aaby|     1| 24|     0|   0.0|    273|
|           4|Edgar Lindenau Aabye|     1| 34|     0|   0.0|    278|
|           5|Christine Jacoba ...|     2| 21|   185|  82.0|    705|
+------------+--------------------+------+---+------+------+-------+
only showing top 5 rows



In [22]:
sports_players_df = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

In [23]:
!ls ./curso-apache-spark-platzi/files/

ls: no se puede acceder a './curso-apache-spark-platzi/files/': No existe el archivo o el directorio


###### Hasta ahora tenemos los df:
<ul>
    <li><p>Juegos</p></li>
    <li><p>Deportistas</p></li>
    <h5>Pendientes:</h5>
    <br></br>
    <li><p>Paises</p></li>
    <li><p>Resultados</p></li>
    <li><p>Deportistas pero haciendo el join primero</p></li>
</ul>

## Dataframe deportistas unidos

In [24]:
deportistaOlimpicoRDD = spark.textFile(path+"deportista.csv") \
    .map(lambda l: l.split(","))
deportistaOlimpicoRDD2 = spark.textFile(path+"deportista2.csv") \
    .map(lambda l: l.split(","))

In [25]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [26]:
deportistaOlimpicoRDD.count()

67787

In [27]:
deportistaOlimpicoRDD2.take(5)

[['67787', 'Lee BongJu', '1', '27', '167', '56', '970'],
 ['67788', 'Lee BuTi', '1', '23', '164', '54', '203'],
 ['67789', 'Anthony N. Buddy Lee', '1', '34', '172', '62', '1096'],
 ['67790', 'Alfred A. Butch Lee Porter', '1', '19', '186', '80', '825'],
 ['67791', 'Lee ByeongGu', '1', '22', '175', '68', '970']]

In [28]:
deportistaOlimpicoRDD2.count()

67785

In [29]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD \
    .union(deportistaOlimpicoRDD2)

In [30]:
#Removemos header primero y luego si procesamos
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(removeHeader)

In [31]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [32]:
schema = StructType([
    StructField("sport_player", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("gender", IntegerType(), False),
    StructField("age", IntegerType(), False),
    StructField("height", IntegerType(), False),
    StructField("weight", FloatType(), False),
    StructField("team_id", IntegerType(), False)
])

In [33]:
sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)

+------------+--------------------+------+---+------+------+-------+
|sport_player|                name|gender|age|height|weight|team_id|
+------------+--------------------+------+---+------+------+-------+
|           1|           A Dijiang|     1| 24|   180|  80.0|    199|
|           2|            A Lamusi|     1| 23|   170|  60.0|    199|
|           3| Gunnar Nielsen Aaby|     1| 24|     0|   0.0|    273|
|           4|Edgar Lindenau Aabye|     1| 34|     0|   0.0|    278|
|           5|Christine Jacoba ...|     2| 21|   185|  82.0|    705|
+------------+--------------------+------+---+------+------+-------+
only showing top 5 rows



In [34]:
sports_players_df = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

## Dataframe países

In [35]:
paises_RDD = spark.textFile(path + "paises.csv") \
    .map(lambda l: l.split(","))

In [36]:
paises_RDD.take(5)

[['id', 'equipo', 'sigla'],
 ['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG']]

Recordando:

| id (int) | team (string) | initials (string) |

In [37]:
#Removemos header
paises_RDD = paises_RDD.mapPartitionsWithIndex(removeHeader)

In [38]:
paises_RDD.take(5)

[['1', '30. Februar', 'AUT'],
 ['2', 'A North American Team', 'MEX'],
 ['3', 'Acipactli', 'MEX'],
 ['4', 'Acturus', 'ARG'],
 ['5', 'Afghanistan', 'AFG']]

In [39]:
#Mapeo:
paises_RDD = paises_RDD.map(lambda l : (
    int(l[0]),
    l[1],
    l[2]
))

In [40]:
schema = StructType([
    StructField("team_id",IntegerType(),False),
    StructField("team_name",StringType(),False),
    StructField("initials",StringType(),False)
])

In [41]:
sqlContext.createDataFrame(paises_RDD,schema).show(5)

+-------+--------------------+--------+
|team_id|           team_name|initials|
+-------+--------------------+--------+
|      1|         30. Februar|     AUT|
|      2|A North American ...|     MEX|
|      3|           Acipactli|     MEX|
|      4|             Acturus|     ARG|
|      5|         Afghanistan|     AFG|
+-------+--------------------+--------+
only showing top 5 rows



In [42]:
teams_df = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema)

## Dataframe Resultados

In [45]:
!ls ./data/

deporte.csv	 deportistaError.csv  modelo_relacional.jpg
deportista2.csv  evento.csv	      paises.csv
deportista.csv	 juegos.csv	      resultados.csv


In [46]:
results_RDD = spark.textFile(path + "resultados.csv") \
    .map(lambda l: l.split(","))

In [47]:
results_RDD.take(5)

[['resultado_id', 'medalla', 'deportista_id', 'juego_id', 'evento_id'],
 ['1', 'NA', '1', '39', '1'],
 ['2', 'NA', '2', '49', '2'],
 ['3', 'NA', '3', '7', '3'],
 ['4', 'Gold', '4', '2', '4']]

In [48]:
#Removemos header
results_RDD = results_RDD.mapPartitionsWithIndex(removeHeader)

In [49]:
results_RDD = results_RDD.filter(lambda l : 'NA' not in l[1]) #Quitamos los que no ganaron

In [50]:
results_RDD.take(2)

[['4', 'Gold', '4', '2', '4'], ['38', 'Bronze', '15', '7', '19']]

###### Mapeo:
|result_id (int) | medal (str) | sport_player_id (int) | game_id(int) | event_id (int)|

In [51]:
results_RDD = results_RDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4])
))

In [52]:
results_RDD.take(3)

[(4, 'Gold', 4, 2, 4), (38, 'Bronze', 15, 7, 19), (39, 'Bronze', 15, 7, 20)]

In [55]:
schema = StructType([
    StructField("result_id ",IntegerType(),False),
    StructField("medal",StringType(),False),
    StructField("sport_player_id",IntegerType(),False),
    StructField("game_id",IntegerType(),False),
    StructField("event_id", IntegerType(), False)
])

In [56]:
sqlContext.createDataFrame(results_RDD,schema).show(5)

+----------+------+---------------+-------+--------+
|result_id | medal|sport_player_id|game_id|event_id|
+----------+------+---------------+-------+--------+
|         4|  Gold|              4|      2|       4|
|        38|Bronze|             15|      7|      19|
|        39|Bronze|             15|      7|      20|
|        41|Bronze|             16|     50|      14|
|        42|Bronze|             17|     17|      21|
+----------+------+---------------+-------+--------+
only showing top 5 rows



In [57]:
results_df = sqlContext.createDataFrame(results_RDD,schema)