In [1]:
# Como usar instrucciones del lenguaje SQL con SparkSession

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [6]:
# es una transformacion -> map
datosEstudiantesDF = spark.read.csv('file:///c:/archivos/datos_estudiantes.csv',header=True)

In [7]:
# es una accion -> reduce
# datosEstudiantesDF.show()

+------------+---------+------+------------+
|idEstudiante|   Nombre|Genero|FechaIngreso|
+------------+---------+------+------------+
|          s1|  Soledad|     F|  01-03-2019|
|          s2|   Sandra|     F|  01-07-2019|
|          s3| Benjamin|     M|  01-03-2017|
|          s4|     Juan|     M|  01-07-2018|
|          s5|Sebastian|     M|  01-03-2018|
|          s6|    Julia|     F|  01-03-2020|
+------------+---------+------+------------+



In [8]:
lenguajesEstudianteDF = spark.read.csv('file:///c:/archivos/lenguajes_estudiantes.csv',header=True)

In [9]:
# lenguajesEstudianteDF.show()

+------------+--------+----+
|idEstudiante|Lenguaje|Nota|
+------------+--------+----+
|          s1|  Python|  90|
|          s3|    Java|  62|
|          s1|    Java|  78|
|          s2|  Python|  69|
|          s3|       R|  80|
|          s4|     C++|  78|
|          s5|   Scala|  68|
|          s4|  Python|  74|
|          s2|    Java|  89|
|          s6|    Java|  85|
+------------+--------+----+



In [13]:
# crear una vista asociada al DF datosEstudiantesDF
datosEstudiantesDF.createOrReplaceTempView('vDatosEstudiantesDF')
lenguajesEstudianteDF.createOrReplaceTempView('vLenguajesEstudianteDF')

In [15]:
spark.sql("select * from vDatosEstudiantesDF").show()

+------------+---------+------+------------+
|idEstudiante|   Nombre|Genero|FechaIngreso|
+------------+---------+------+------------+
|          s1|  Soledad|     F|  01-03-2019|
|          s2|   Sandra|     F|  01-07-2019|
|          s3| Benjamin|     M|  01-03-2017|
|          s4|     Juan|     M|  01-07-2018|
|          s5|Sebastian|     M|  01-03-2018|
|          s6|    Julia|     F|  01-03-2020|
+------------+---------+------+------------+



In [14]:
spark.sql("select * from vLenguajesEstudianteDF").show()

+------------+--------+----+
|idEstudiante|Lenguaje|Nota|
+------------+--------+----+
|          s1|  Python|  90|
|          s3|    Java|  62|
|          s1|    Java|  78|
|          s2|  Python|  69|
|          s3|       R|  80|
|          s4|     C++|  78|
|          s5|   Scala|  68|
|          s4|  Python|  74|
|          s2|    Java|  89|
|          s6|    Java|  85|
+------------+--------+----+



In [17]:
sqlJoinDF = spark.sql("select * from vDatosEstudiantesDF de join vLenguajesEstudianteDF le \
on (de.idEstudiante = le.idEstudiante)")

In [18]:
# sqlJoinDF.show()

+------------+---------+------+------------+------------+--------+----+
|idEstudiante|   Nombre|Genero|FechaIngreso|idEstudiante|Lenguaje|Nota|
+------------+---------+------+------------+------------+--------+----+
|          s1|  Soledad|     F|  01-03-2019|          s1|    Java|  78|
|          s1|  Soledad|     F|  01-03-2019|          s1|  Python|  90|
|          s2|   Sandra|     F|  01-07-2019|          s2|    Java|  89|
|          s2|   Sandra|     F|  01-07-2019|          s2|  Python|  69|
|          s3| Benjamin|     M|  01-03-2017|          s3|       R|  80|
|          s3| Benjamin|     M|  01-03-2017|          s3|    Java|  62|
|          s4|     Juan|     M|  01-07-2018|          s4|  Python|  74|
|          s4|     Juan|     M|  01-07-2018|          s4|     C++|  78|
|          s5|Sebastian|     M|  01-03-2018|          s5|   Scala|  68|
|          s6|    Julia|     F|  01-03-2020|          s6|    Java|  85|
+------------+---------+------+------------+------------+-------

In [19]:
sqlGroupByDF = spark.sql("select nombre, avg(nota) promedio from vDatosEstudiantesDF de join vLenguajesEstudianteDF \
le on (de.idEstudiante = le.idEstudiante) group by nombre")

In [20]:
# sqlGroupByDF.show()

+---------+--------+
|   nombre|promedio|
+---------+--------+
| Benjamin|    71.0|
|    Julia|    85.0|
|   Sandra|    79.0|
|Sebastian|    68.0|
|  Soledad|    84.0|
|     Juan|    76.0|
+---------+--------+



In [21]:
sqlCountDF = spark.sql("select lenguaje, count(nombre) numeroAlumnos from vDatosEstudiantesDF de join vLenguajesEstudianteDF \
le on (de.idEstudiante = le.idEstudiante) group by lenguaje")

In [22]:
# sqlCountDF.show()

+--------+-------------+
|lenguaje|numeroAlumnos|
+--------+-------------+
|     C++|            1|
|       R|            1|
|   Scala|            1|
|  Python|            3|
|    Java|            4|
+--------+-------------+



In [23]:
# funciones windows ... funcion Rank

In [24]:
spark.sql('select nombre, lenguaje, nota, rank() over (order by nombre) \
as ranking from vDatosEstudiantesDF de join vLenguajesEstudianteDF le on (de.idEstudiante = le.idEstudiante)').show()

+---------+--------+----+-------+
|   nombre|lenguaje|nota|ranking|
+---------+--------+----+-------+
| Benjamin|    Java|  62|      1|
| Benjamin|       R|  80|      1|
|     Juan|     C++|  78|      3|
|     Juan|  Python|  74|      3|
|    Julia|    Java|  85|      5|
|   Sandra|  Python|  69|      6|
|   Sandra|    Java|  89|      6|
|Sebastian|   Scala|  68|      8|
|  Soledad|  Python|  90|      9|
|  Soledad|    Java|  78|      9|
+---------+--------+----+-------+



In [26]:
# partition by
spark.sql('select nombre, lenguaje, nota, rank() over (PARTITION BY  lenguaje \
order by nota desc) as ranking from vDatosEstudiantesDF de join vLenguajesEstudianteDF le \
on (de.idestudiante = le.idestudiante)').show()

+---------+--------+----+-------+
|   nombre|lenguaje|nota|ranking|
+---------+--------+----+-------+
|     Juan|     C++|  78|      1|
| Benjamin|       R|  80|      1|
|Sebastian|   Scala|  68|      1|
|  Soledad|  Python|  90|      1|
|     Juan|  Python|  74|      2|
|   Sandra|  Python|  69|      3|
|   Sandra|    Java|  89|      1|
|    Julia|    Java|  85|      2|
|  Soledad|    Java|  78|      3|
| Benjamin|    Java|  62|      4|
+---------+--------+----+-------+



In [30]:
mejorRankingDF = spark.sql('select nombre, lenguaje, nota, ranking from (select nombre,lenguaje,nota, rank() \
over (PARTITION BY lenguaje order by nota desc) as ranking from vDatosEstudiantesDF de join vLenguajesEstudianteDF le \
on (de.idestudiante = le.idestudiante)) where ranking <=2')

In [31]:
mejorRankingDF.show()

+---------+--------+----+-------+
|   nombre|lenguaje|nota|ranking|
+---------+--------+----+-------+
|     Juan|     C++|  78|      1|
| Benjamin|       R|  80|      1|
|Sebastian|   Scala|  68|      1|
|  Soledad|  Python|  90|      1|
|     Juan|  Python|  74|      2|
|   Sandra|    Java|  89|      1|
|    Julia|    Java|  85|      2|
+---------+--------+----+-------+



In [32]:
mejorRankingDF.show()

+---------+--------+----+-------+
|   nombre|lenguaje|nota|ranking|
+---------+--------+----+-------+
|     Juan|     C++|  78|      1|
| Benjamin|       R|  80|      1|
|Sebastian|   Scala|  68|      1|
|  Soledad|  Python|  90|      1|
|     Juan|  Python|  74|      2|
|   Sandra|    Java|  89|      1|
|    Julia|    Java|  85|      2|
+---------+--------+----+-------+



In [34]:
spark.sql("cache table estudiantesCache AS select * from vDatosEstudiantesDF").show()

++
||
++
++



In [35]:
spark.sql("select idestudiante,nombre from estudiantesCache").show()

+------------+---------+
|idestudiante|   nombre|
+------------+---------+
|          s1|  Soledad|
|          s2|   Sandra|
|          s3| Benjamin|
|          s4|     Juan|
|          s5|Sebastian|
|          s6|    Julia|
+------------+---------+



In [36]:
spark.sql("select idestudiante,nombre from estudiantesCache").show()

+------------+---------+
|idestudiante|   nombre|
+------------+---------+
|          s1|  Soledad|
|          s2|   Sandra|
|          s3| Benjamin|
|          s4|     Juan|
|          s5|Sebastian|
|          s6|    Julia|
+------------+---------+

