<a href="https://colab.research.google.com/github/jeraldflowers/Spark-OlimpicData/blob/main/DataFrames_Management.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! sudo apt-get update
! sudo mkdir -p /usr/share/man/man1
! sudo apt-get install -y openjdk-11-jdk
! pip install pyspark

# DataFrames Management

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.sql.types import StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row, StructType
from pyspark.sql import SQLContext

In [3]:
spark = SparkContext(master="local", appName="DataFrames")
sqlContext = SQLContext(spark)



In [4]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [5]:
%cd '/gdrive/My Drive'

/gdrive/My Drive


In [6]:
!ls

'200.60.47.73 ReportePagos ReportePagos Inicio.pdf'
 Classroom
'Colab Notebooks'
'CURRICULUM_VITAE_JERALD[1].docx'
 curso-apache-spark-platzi
 dl-pytorch
'Documento sin título (1).gdoc'
'Documento sin título.gdoc'
'fuente-de-datos-platzi_6a20a26e-6a1d-4b4d-8fe9-d61f9689cc6e (1).xlsx'
 fuente-de-datos-platzi.gsheet
 INGRESOS_PASIVOS_EL_SECRETO_DE_LOS_MILONARIOS_.pdf
'Mas de 1000 libros de Educacion financiera - Google Drive'
'Mas de 1000 libros de Educacion financiera - Google Drive.gdoc'
 PyTorch-CycleGAN
'TEORIA ACTUALIZ. PROC.I-100 (2).docx'


In [7]:
%cd '/gdrive/My Drive/curso-apache-spark-platzi/files'

/gdrive/My Drive/curso-apache-spark-platzi/files


In [8]:
!head -n 5 '/gdrive/My Drive/curso-apache-spark-platzi/files/juegos.csv'

,nombre_juego,annio,temporada,ciudad
1,1896 Verano,1896,Verano,Athina
2,1900 Verano,1900,Verano,Paris
3,1904 Verano,1904,Verano,St. Louis
4,1906 Verano,1906,Verano,Athina


In [9]:
path = '/gdrive/My Drive/curso-apache-spark-platzi/files/'

In [10]:
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("anio", StringType(), False),
    StructField("temporada", StringType(), False),
    StructField("cuidad", StringType(), False)
])

juegoDF = sqlContext.read.schema(juegoSchema).option("header", "true").csv(path + "juegos.csv" )

In [11]:
juegoDF.show(4)

+--------+-----------+---------+------+
|juego_id|       anio|temporada|cuidad|
+--------+-----------+---------+------+
|       1|1896 Verano|     1896|Verano|
|       2|1900 Verano|     1900|Verano|
|       3|1904 Verano|     1904|Verano|
|       4|1906 Verano|     1906|Verano|
+--------+-----------+---------+------+
only showing top 4 rows



In [12]:
deporteRDD = spark.textFile(path + "deporte.csv").map(lambda l: l.split(","))
deporteRDD.take(5)

[['deporte_id', 'deporte'],
 ['1', 'Basketball'],
 ['2', 'Judo'],
 ['3', 'Football'],
 ['4', 'Tug-Of-War']]

In [13]:
deporte_schema = StructType([
    StructField("deporte_id", IntegerType(), False),
    StructField("deporte", StringType(), False)
])

In [14]:
deporteDF = sqlContext.read.schema(deporte_schema).option("header", "true").csv(path + "deporte.csv" )
deporteDF.show(5)

+----------+-------------+
|deporte_id|      deporte|
+----------+-------------+
|         1|   Basketball|
|         2|         Judo|
|         3|     Football|
|         4|   Tug-Of-War|
|         5|Speed Skating|
+----------+-------------+
only showing top 5 rows



In [15]:
deportistaOlimpicoRDD = spark.textFile(path + "deportista.csv").map(lambda l: l.split(","))
deportistaOlimpicoRDD2 = spark.textFile(path + "deportista2.csv").map(lambda l: l.split(","))

deportistaOlimpicoRDD =  deportistaOlimpicoRDD.union(deportistaOlimpicoRDD2)

In [16]:
deportistaOlimpicoRDD.take(5)

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]

In [17]:
def removeHeader(index, iterator):
  return iter(list(iterator)[1:])

In [18]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(removeHeader)

In [19]:
deportistaOlimpicoRDD.take(5)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

In [20]:
deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l: (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

In [21]:
schema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])

In [22]:
deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schema)
deportistaDF.show(5)

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|80.0|      199|
|            2|            A Lamusi|     1|  23|   170|60.0|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|     0| 0.0|      273|
|            4|Edgar Lindenau Aabye|     1|  34|     0| 0.0|      278|
|            5|Christine Jacoba ...|     2|  21|   185|82.0|      705|
+-------------+--------------------+------+----+------+----+---------+
only showing top 5 rows



In [23]:
evento_schema = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("evento", StringType(), False), 
    StructField("deporte_id", IntegerType(), False)
])

In [24]:
eventoDF = sqlContext.read.schema(evento_schema).option("header", "true").csv(path + "evento.csv" )

In [25]:
eventoDF.show(5)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
|        5|Speed Skating Wom...|         5|
+---------+--------------------+----------+
only showing top 5 rows



In [26]:
paises_schema = StructType([
    StructField("paises_id", IntegerType(), False),
    StructField("equipo", StringType(), False), 
    StructField("sigla", StringType(), False)
])

In [27]:
paisesDF = sqlContext.read.schema(paises_schema).option("header", "true").csv(path + "paises.csv" )
paisesDF.show(5)

+---------+--------------------+-----+
|paises_id|              equipo|sigla|
+---------+--------------------+-----+
|        1|         30. Februar|  AUT|
|        2|A North American ...|  MEX|
|        3|           Acipactli|  MEX|
|        4|             Acturus|  ARG|
|        5|         Afghanistan|  AFG|
+---------+--------------------+-----+
only showing top 5 rows



In [28]:
resultados_schema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])

In [29]:
resultadosDF = sqlContext.read.schema(resultados_schema).option("header", "true").csv(path + "resultados.csv" )
resultadosDF.show(5)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
|           5|     NA|            5|      36|        5|
+------------+-------+-------------+--------+---------+
only showing top 5 rows



In [30]:
deportistaDF = deportistaDF.withColumnRenamed("genero", "sexo").drop("altura")
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- sexo: integer (nullable = false)
 |-- edad: integer (nullable = false)
 |-- peso: float (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [31]:
from pyspark.sql.functions import *

deportistaDF = deportistaDF.select("deportista_id", "nombre",
                    col("edad").alias("edadAlJugar"), "equipo_id")

deportistaDF.show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [32]:
deportistaDF = deportistaDF.filter((deportistaDF.edadAlJugar != 0))

In [33]:
deportistaDF.sort("edadAlJugar").show()

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|        71691|  Dimitrios Loundras|         10|      333|
|        70616|          Liu Luyang|         11|      199|
|       118925|Megan Olwen Deven...|         11|      413|
|        52070|        Etsuko Inada|         11|      514|
|        22411|Magdalena Cecilia...|         11|      413|
|        40129|    Luigina Giavotti|         11|      507|
|        47618|Sonja Henie Toppi...|         11|      742|
|        76675|   Marcelle Matthews|         11|      967|
|        37333|Carlos Bienvenido...|         11|      982|
|        51268|      Beatrice Hutiu|         11|      861|
|       126307|        Liana Vicens|         11|      825|
|        48939|             Ho Gang|         12|      738|
|        49142|        Jan Hoffmann|         12|      302|
|        42835|   Werner Grieshofer|         12|       7

In [34]:
deportistaDF.printSchema()

root
 |-- deportista_id: integer (nullable = false)
 |-- nombre: string (nullable = false)
 |-- edadAlJugar: integer (nullable = false)
 |-- equipo_id: integer (nullable = false)



In [35]:
resultadosDF.printSchema()

root
 |-- resultado_id: integer (nullable = true)
 |-- medalla: string (nullable = true)
 |-- deportista_id: integer (nullable = true)
 |-- juego_id: integer (nullable = true)
 |-- evento_id: integer (nullable = true)



In [36]:
juegoDF.printSchema()

root
 |-- juego_id: integer (nullable = true)
 |-- anio: string (nullable = true)
 |-- temporada: string (nullable = true)
 |-- cuidad: string (nullable = true)



In [37]:
deporte_olimpicoDF = eventoDF.withColumnRenamed("evento", "nombre")

In [38]:
deporte_olimpicoDF.printSchema()

root
 |-- evento_id: integer (nullable = true)
 |-- nombre: string (nullable = true)
 |-- deporte_id: integer (nullable = true)



In [39]:
deportistaDF.join(resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left")\
            .join(juegoDF, juegoDF.juego_id == resultadosDF.juego_id, "left")\
            .join(deporte_olimpicoDF, deporte_olimpicoDF.evento_id == resultadosDF.evento_id, "left")\
            .select(deportistaDF.nombre, col("edadAlJugar").alias("Edad del jugador"),
                    "medalla", col("anio").alias("Año de juego"),
                    deporte_olimpicoDF.nombre.alias("Nombre de disciplina")).show()

+--------------------+----------------+-------+-------------+--------------------+
|              nombre|Edad del jugador|medalla| Año de juego|Nombre de disciplina|
+--------------------+----------------+-------+-------------+--------------------+
|           A Dijiang|              24|     NA|  1992 Verano|Basketball Men's ...|
|            A Lamusi|              23|     NA|  2012 Verano|Judo Men's Extra-...|
| Gunnar Nielsen Aaby|              24|     NA|  1920 Verano|Football Men's Fo...|
|Edgar Lindenau Aabye|              34|   Gold|  1900 Verano|Tug-Of-War Men's ...|
|Christine Jacoba ...|              21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|              21|     NA|1994 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|              21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|              21|     NA|1992 Invierno|Speed Skating Wom...|
|Christine Jacoba ...|              21|     NA|1988 Invierno|Speed Skating Wom...|
|Chr

In [40]:
paisesDF.printSchema()

root
 |-- paises_id: integer (nullable = true)
 |-- equipo: string (nullable = true)
 |-- sigla: string (nullable = true)



In [41]:
resultadosDF.filter(resultadosDF.medalla != "NA").join(deportistaDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left")\
                                                       .join(paisesDF, paisesDF.paises_id == deportistaDF.equipo_id, "left")\
                                                       .select("medalla", "equipo", "sigla").sort(col("sigla").desc()).show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



In [42]:
deporteDF.printSchema()

root
 |-- deporte_id: integer (nullable = true)
 |-- deporte: string (nullable = true)



In [43]:
medallistaXAnio = deportistaDF.join(resultadosDF, deportistaDF.deportista_id == resultadosDF.deportista_id, "left") \
    .join(juegoDF, juegoDF.juego_id == resultadosDF.juego_id, "left") \
    .join(paisesDF, deportistaDF.equipo_id == paisesDF.paises_id, "left") \
    .join(deporte_olimpicoDF, deporte_olimpicoDF.evento_id == resultadosDF.evento_id, "left") \
    .join(deporteDF, deporte_olimpicoDF.deporte_id == deporteDF.deporte_id, "left") \
    .select(
        "sigla",
        "anio",
        "medalla",
        deporte_olimpicoDF.nombre.alias("Nombre subdisciplina"),
        deporteDF.deporte.alias("Nombre disciplina"),
        deportistaDF.nombre    
    )

In [44]:
medallistaXAnio.show()

+-----+-------------+-------+--------------------+--------------------+--------------------+
|sigla|         anio|medalla|Nombre subdisciplina|   Nombre disciplina|              nombre|
+-----+-------------+-------+--------------------+--------------------+--------------------+
|  CHN|  1992 Verano|     NA|Basketball Men's ...|          Basketball|           A Dijiang|
|  CHN|  2012 Verano|     NA|Judo Men's Extra-...|                Judo|            A Lamusi|
|  DEN|  1920 Verano|     NA|Football Men's Fo...|            Football| Gunnar Nielsen Aaby|
|  SWE|  1900 Verano|   Gold|Tug-Of-War Men's ...|          Tug-Of-War|Edgar Lindenau Aabye|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1994 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating|Christine Jacoba ...|
|  NED|1992 Invierno|     NA|Speed Skating Wom...|       Speed Skating

In [45]:
medallistaXAnio2 = medallistaXAnio.filter(medallistaXAnio.medalla !="NA").sort("anio")\
               .groupBy("sigla", "anio", "Nombre subdisciplina").count()

In [46]:
medallistaXAnio2.show()

+-----+-------------+--------------------+-----+
|sigla|         anio|Nombre subdisciplina|count|
+-----+-------------+--------------------+-----+
|  KOR|2006 Invierno|Short Track Speed...|    2|
|  RUS|  2004 Verano|Wrestling Men's S...|    1|
|  USA|  2004 Verano|Synchronized Swim...|    8|
|  RUS|  2016 Verano|Shooting Women's ...|    1|
|  RUS|  2008 Verano|Wrestling Men's L...|    1|
|  SWE|  1936 Verano|Art Competitions ...|    1|
|  EUN|2002 Invierno|Cross Country Ski...|    1|
|  GBR|  1956 Verano|Sailing Mixed 5.5...|    4|
|  USA|  1988 Verano|Volleyball Men's ...|   12|
|  FRA|  1932 Verano|Cycling Men's Tea...|    4|
|  USA|  1996 Verano|Equestrianism Mix...|    4|
|  USA|  1996 Verano|Athletics Men's H...|    1|
|  GBR|  2008 Verano|Boxing Men's Midd...|    1|
|  FRA|  1984 Verano|Fencing Men's Sab...|    5|
|  USA|  2012 Verano|Gymnastics Women'...|    1|
|  FRA|  2004 Verano|Rowing Men's Ligh...|    2|
|  NOR|1932 Invierno|Speed Skating Men...|    1|
|  FRG|  1992 Verano

In [47]:
medallistaXAnio2.printSchema()

root
 |-- sigla: string (nullable = true)
 |-- anio: string (nullable = true)
 |-- Nombre subdisciplina: string (nullable = true)
 |-- count: long (nullable = false)



In [48]:
medallistaXAnio2.groupBy("sigla", "anio").agg(sum("count").alias("Total de medallas"),\
                                              avg("count").alias("Medallas promedio")).show()

+-----+-------------+-----------------+------------------+
|sigla|         anio|Total de medallas| Medallas promedio|
+-----+-------------+-----------------+------------------+
|  SWE|  1976 Verano|               10|               2.0|
|  MGL|  2008 Verano|                5|              1.25|
|  SUI|2014 Invierno|               29|3.2222222222222223|
|  ETH|  2004 Verano|                7|              1.75|
|  BEL|  2000 Verano|                7|               1.4|
|  AUT|  1928 Verano|                5|              1.25|
|  SYR|  1984 Verano|                1|               1.0|
|  NED|1992 Invierno|                4|1.3333333333333333|
|  MAS|  2012 Verano|                2|               1.0|
|  ITA|  1996 Verano|               69| 2.225806451612903|
|  THA|  2008 Verano|                4|               1.0|
|  URS|1984 Invierno|               56|               2.8|
|  DEN|  1896 Verano|                6|               1.0|
|  GRN|  2016 Verano|                1|               1.

In [49]:
resultadosDF.registerTempTable("resultado")
deportistaDF.registerTempTable("deportista")
paisesDF.registerTempTable("paises")



In [50]:
sqlContext.sql("SELECT * FROM deportista").show(5)

+-------------+--------------------+-----------+---------+
|deportista_id|              nombre|edadAlJugar|equipo_id|
+-------------+--------------------+-----------+---------+
|            1|           A Dijiang|         24|      199|
|            2|            A Lamusi|         23|      199|
|            3| Gunnar Nielsen Aaby|         24|      273|
|            4|Edgar Lindenau Aabye|         34|      278|
|            5|Christine Jacoba ...|         21|      705|
+-------------+--------------------+-----------+---------+
only showing top 5 rows



In [51]:
sqlContext.sql(""" 
                SELECT medalla, equipo, sigla FROM resultado r
                JOIN deportista d
                ON r.deportista_id = d.deportista_id
                JOIN paises p
                ON p.paises_id = d.equipo_id
                WHERE medalla <> "NA"
                ORDER BY sigla DESC
                """).show()

+-------+--------+-----+
|medalla|  equipo|sigla|
+-------+--------+-----+
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Bronze|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
| Silver|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
|   Gold|Zimbabwe|  ZIM|
+-------+--------+-----+
only showing top 20 rows



#UDF

In [52]:
deportista_error = spark.textFile(path + "deportistaError.csv").map(lambda l: l.split(","))

In [53]:
deportista_error = deportista_error.mapPartitionsWithIndex(removeHeader)

In [54]:
deportista_error.take(3)

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '', '', '273']]

In [55]:
deportista_error = deportista_error.map(lambda l:(
                     l[0],
                     l[1],
                     l[2],
                     l[3],
                     l[4],
                     l[5],
                     l[6]))

In [56]:
schema = StructType([
    StructField("deportista_id", StringType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", StringType(), False),
    StructField("edad", StringType(), False),
    StructField("altura", StringType(), False),
    StructField("peso", StringType(), False),
    StructField("equipo_id", StringType(), False)
])

deportistaErrorDF = sqlContext.createDataFrame(deportista_error, schema)

In [57]:
deportistaErrorDF.show()

+-------------+--------------------+------+----+------+----+---------+
|deportista_id|              nombre|genero|edad|altura|peso|equipo_id|
+-------------+--------------------+------+----+------+----+---------+
|            1|           A Dijiang|     1|  24|   180|  80|      199|
|            2|            A Lamusi|     1|  23|   170|  60|      199|
|            3| Gunnar Nielsen Aaby|     1|  24|      |    |      273|
|            4|Edgar Lindenau Aabye|     1|  34|      |    |      278|
|            5|Christine Jacoba ...|     2|  21|   185|  82|      705|
|            6|     Per Knut Aaland|     1|  31|   188|  75|     1096|
|            7|        John Aalberg|     1|  31|   183|  72|     1096|
|            8|"Cornelia ""Cor""...|     2|  18|   168|    |      705|
|            9|    Antti Sami Aalto|     1|  26|   186|  96|      350|
|           10|"Einar Ferdinand ...|     1|  26|      |    |      350|
|           11|  Jorma Ilmari Aalto|     1|  22|   182|76.5|      350|
|     

In [58]:
from pyspark.sql.functions import udf

def conversionEnteros(valor):
  return int(valor) if len(valor) > 0 else None

conversion_enteros_udf = udf(lambda z: conversionEnteros(z), IntegerType())
sqlContext.udf.register("conversion_enteros_udf", conversion_enteros_udf)

<function __main__.<lambda>(z)>

In [59]:
deportistaErrorDF.select(conversion_enteros_udf("altura").alias("alturaDF")).show()

+--------+
|alturaDF|
+--------+
|     180|
|     170|
|    null|
|    null|
|     185|
|     188|
|     183|
|     168|
|     186|
|    null|
|     182|
|     172|
|     159|
|     171|
|    null|
|     184|
|     175|
|     189|
|    null|
|     176|
+--------+
only showing top 20 rows



# Persistence and Partitioning

In [60]:
from pyspark.storagelevel import StorageLevel

In [62]:
medallistaXAnio.is_cached

False

In [64]:
medallistaXAnio.rdd.cache()

MapPartitionsRDD[198] at javaToPython at NativeMethodAccessorImpl.java:0

In [65]:
medallistaXAnio.rdd.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [66]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[198] at javaToPython at NativeMethodAccessorImpl.java:0

In [67]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_2)

MapPartitionsRDD[198] at javaToPython at NativeMethodAccessorImpl.java:0

In [68]:
StorageLevel.MEMORY_AND_DISK_3 = StorageLevel(True, True, False, False, 3)

In [69]:
medallistaXAnio.rdd.unpersist()

MapPartitionsRDD[198] at javaToPython at NativeMethodAccessorImpl.java:0

In [70]:
medallistaXAnio.rdd.persist(StorageLevel.MEMORY_AND_DISK_3)

MapPartitionsRDD[198] at javaToPython at NativeMethodAccessorImpl.java:0