## Creacion de DF

Importamos todas las librerias que vamos a utilizar

In [33]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import StructField, StructType, StringType, IntegerType, StringType, FloatType, Row
from pyspark.sql.functions import sum, avg

El SQLContext nos permite acceder a las diferentes funciones del sql context y poder diferenciarlo en caso de usar diferentes versiones de Spark.

In [3]:
spark = SparkContext(master="local", appName="DataFrames")
sql_context = SQLContext(spark)

21/08/09 14:56:50 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 192.168.100.109 instead (on interface enp0s31f6)
21/08/09 14:56:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/09 14:56:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Mostramos la cabecera del archivo juegos.csv

In [13]:
!head -n 4 files/juegos.csv

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis


## Usando un Schema

Creamos el esquema para nuestro DF.

In [4]:
juego_schema = StructType([
  StructField('juego_id', IntegerType(), False),
  StructField('anio', StringType(), False),
  StructField('temporada', StringType(), False),
  StructField('ciudad', StringType(), False),
])

juego_df = sql_context.read.schema(juego_schema) \
    .option("header", True) \
    .csv("files/juegos.csv")

El DF no se crea hasta que ejecutemos una operacion.

In [17]:
juego_df.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|ciudad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



21/08/09 11:19:36 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 5, schema size: 4
CSV file: file:///home/will/Projects/spark/files/juegos.csv


In [18]:
spark

## Usando un RDD
Hay que tomar en cuenta:
- Eliminar el encabezado
- Castear valores a sus respectivos tipos de datos
- Crear el esquema a usar

### Funcion para eliminar el header

In [5]:
def dropFirstRow(index,iterator):
     return iter(list(iterator)[1:])

Primero cargamos los rdds de deportista

In [6]:
deportista = spark.textFile("files/deportista.csv").map(lambda l: l.split(','))
deportista2 = spark.textFile("files/deportista2.csv").map(lambda l: l.split(','))
deportista = deportista.union(deportista2)
deportista.take(4)



[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273']]

Despues procedemos a eliminar el encabezado

In [7]:
deportista = deportista.mapPartitionsWithIndex( dropFirstRow ) # mapPartitionsWithIndex permite particionar un rdd
deportista.take(4)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

Ahora tranformamos cada valor a su respectivo tipo de dato.

In [8]:
deportista = deportista.map(lambda l: (
   int(l[0]), l[1], int(l[2]), int(l[3]), int(l[4]), float(l[5]), int(l[6])
))
deportista.take(4)

[(1, 'A Dijiang', 1, 24, 180, 80.0, 199),
 (2, 'A Lamusi', 1, 23, 170, 60.0, 199),
 (3, 'Gunnar Nielsen Aaby', 1, 24, 0, 0.0, 273),
 (4, 'Edgar Lindenau Aabye', 1, 34, 0, 0.0, 278)]

Creamos el Schema respectivo.

In [9]:
deportista_schema = StructType([
  StructField('deportista_id', IntegerType(), False),
  StructField('nombre', StringType(), False),
  StructField('genero', IntegerType(), False),
  StructField('edad', IntegerType(), True),
  StructField('altura', IntegerType(), True),
  StructField('peso', FloatType(), True),
  StructField('equipo_id', IntegerType(), True)
])

deportista = sql_context.createDataFrame(deportista, deportista_schema)
deportista.show(4)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
+-------------+--------------------+------+----+------+----+---------+
only showing top 4 rows



## Inferir esquema

In [13]:
resultados = sql_context.read.csv("files/resultados.csv", header=True)
paises = sql_context.read.csv("files/paises.csv", header=True)
eventos = sql_context.read.csv("files/evento.csv", header=True)
deporte = sql_context.read.csv("files/deporte.csv", header=True)


## Funciones de Agrupacion

In [21]:
medallistaXAnio = deportista \
    .join(
        resultados, 
        deportista.deportista_id == resultados.deportista_id, 
        "left"
    ) \
    .join(
        juego_df, 
        juego_df.juego_id == resultados.juego_id, 
        "left"
    ) \
    .join(
        paises, 
        deportista.equipo_id == paises.id, 
        "left"
    ) \
    .join(
        eventos, 
        eventos.evento_id == resultados.evento_id, 
        "left"
    ) \
    .join(
        deporte, 
        eventos.deporte_id == deporte.deporte_id, 
        "left"
    ) \
    .select(
        "sigla",
        "anio",
        "medalla",
        eventos.evento.alias("Nombre subdisciplina"),
        deporte.deporte.alias("Nombre disciplina"),
        deportista.nombre
    )

medallistaXAnio.show() 

21/08/09 15:11:03 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/will/Projects/spark/files/juegos.csv


+-----+-------------+-------+--------------------+--------------------+--------------------+
|sigla|         anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+--------------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating

De esta manera podemos realizar operaciones de agrupación pero no es la mas recomendable.

In [30]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla != 'NA') \
    .sort('anio') \
    .groupBy("sigla", "anio", "Nombre subdisciplina") \
    .count()

In [None]:
medallistaXAnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



Con la funcion agg podemos realizar multiples funciones de agregacion.

In [46]:
medallistaXAnio2.groupBy("sigla", "anio") \
  .agg(sum("count").alias("total de medallas"), avg("count").alias("Medallas promedio")) \
  .show()

21/08/09 15:35:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , nombre_juego
 Schema: juego_id, anio
Expected: juego_id but found: 
CSV file: file:///home/will/Projects/spark/files/juegos.csv


+-----+-------------+-----------------+------------------+
|sigla|         anio|total de medallas| Medallas promedio|
+-----+-------------+-----------------+------------------+
|  NED|1992 Invierno|                4|1.3333333333333333|
|  BEL|  2000 Verano|                7|               1.4|
|  MAS|  2012 Verano|                2|               1.0|
|  MGL|  2008 Verano|                5|              1.25|
|  SWE|  1976 Verano|               10|               2.0|
|  SUI|2014 Invierno|               29|3.2222222222222223|
|  ETH|  2004 Verano|                7|              1.75|
|  AUT|  1928 Verano|                5|              1.25|
|  SYR|  1984 Verano|                1|               1.0|
|  ITA|  1996 Verano|               69| 2.225806451612903|
|  THA|  2008 Verano|                4|               1.0|
|  URS|1984 Invierno|               56|               2.8|
|  DEN|  1896 Verano|                6|               1.0|
|  GRN|  2016 Verano|                1|               1.



## Uso de SQL

Podemos usar comandos de sql registrando una DF como una tabla temporal de la siguiente manera

In [42]:
resultados.registerTempTable("resultados")
deportista.registerTempTable("deportistas")
paises.registerTempTable("paises")

Ahora podemos usar el SQLContext para ejecutar comandos de SQL

In [37]:
sql_context.sql("SELECT * FROM resultados").show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



Podemos replicar el ejercicio anterior pero ahora con SQL.

In [47]:
sql_context.sql("""
  SELECT medalla, equipo, sigla FROM resultados r
  JOIN deportistas d ON d.deportista_id = r.deportista_id
  JOIN paises p ON p.id = d.equipo_id
  WHERE medalla <> "NA"
  ORDER BY sigla DESC
              """).show()



+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows





In [44]:
spark

In [10]:
# spark.stop()
