In [1]:
# Install findpark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [143]:
!pip install openpyxl

Collecting openpyxl
  Downloading openpyxl-3.1.1-py2.py3-none-any.whl (249 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m249.8/249.8 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting et-xmlfile
  Downloading et_xmlfile-1.1.0-py3-none-any.whl (4.7 kB)
Installing collected packages: et-xmlfile, openpyxl
Successfully installed et-xmlfile-1.1.0 openpyxl-3.1.1


In [2]:
# Import libs
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

In [4]:
# Create SparkSession
sparkConf = SparkConf().setAppName("My Spark Application for IBM test")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)

23/02/17 12:20:27 WARN Utils: Your hostname, jolver-ardila resolves to a loopback address: 127.0.1.1; using 192.168.0.10 instead (on interface wlp0s20f3)
23/02/17 12:20:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/17 12:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [58]:
# Create dataframe from vuelos
vuelos_ = spark.read.load("./sources/vuelos.csv", format="csv", sep=",", inferSchema="true", header="true")
vuelos_.show()

+-------------+---------+------+-------+------------------+------+
|Codigo Piloto|Aerolínea|Origen|Destino|Minutos de retraso|OnTime|
+-------------+---------+------+-------+------------------+------+
|        43556|       10|   SAP|    HAJ|                40|  null|
|        43557|        5|   MIA|    MIA|                30|  null|
|        43558|        3|   FLL|    FLL|                20|  null|
|        43559|        1|   TEG|    SAP|                 0|  null|
|        43560|        1|   HAJ|    SAP|                50|  null|
|        43561|        8|   HHN|    SAP|                20|  null|
|        43562|        2|   SAP|    HAJ|               -30|  null|
|        43563|        8|   MIA|    HHN|               -30|  null|
|        43564|       10|   FLL|    SAP|               -30|  null|
|        43565|        7|   TEG|    TEG|                 0|  null|
|        43566|        4|   HAJ|    HAJ|               -70|  null|
|        43567|        6|   HHN|    HHN|               -40|  n

In [59]:
# Create dataframe from pilots
pilots = spark.read.load("./sources/pilotos.csv", format="csv", sep=",", inferSchema="true", header="true")
pilots.show()

+-------------+---------------+
|Codigo Piloto|         Piloto|
+-------------+---------------+
|        43556|       John Max|
|        43557|   Jilles Vlank|
|        43558|    Jorge Mej�a|
|        43559|David Colindres|
|        43560|Maximilian Call|
|        43561|   Muilin Mills|
|        43562|    Gianni Falk|
|        43563|       Hous Hih|
|        43564|       Cho Ming|
|        43565|        Chao Ma|
|        43566|     Jos� Perez|
|        43567|    Davie Mixal|
|        43568|     Filg Gills|
|        43569|       Hung Cho|
|        43570|     Filen Filg|
|        43571| Arthur Maxwell|
|        43572| Carlos Miranda|
|        43573|  Denis Tercero|
|        43574|   David German|
|        43575|   Charles Moll|
+-------------+---------------+
only showing top 20 rows



In [60]:
# Rename column
# Se renombran para un uso mas comodo en las operaciones de consulta (se eliminan los espacios)
vuelos = vuelos_.withColumnRenamed("Aerolínea", "Aerolinea")
vuelos = vuelos.withColumnRenamed("Codigo Piloto", "Codigo_Piloto")
vuelos = vuelos.withColumnRenamed("Minutos de retraso", "Minutos_de_retraso")
pilots = pilots.withColumnRenamed("Codigo Piloto", "Cod_Piloto")

In [63]:
# Transform columns types for vuelos
# Se cambian el tipo a integer de algunas columnas para facilitar operaciones de comparacion
vuelos = vuelos.withColumn("Codigo_Piloto", vuelos.Codigo_Piloto.cast("integer"))
vuelos = vuelos.withColumn("Aerolinea", vuelos.Aerolinea.cast("integer"))
vuelos = vuelos.withColumn("Minutos_de_retraso", vuelos.Minutos_de_retraso.cast("integer"))

In [64]:
vuelos.printSchema()

root
 |-- Codigo_Piloto: integer (nullable = true)
 |-- Aerolinea: integer (nullable = true)
 |-- Origen: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- Minutos_de_retraso: integer (nullable = true)
 |-- OnTime: string (nullable = true)



In [67]:
# ETL steps
# Step 1 -> Add pilot name to flights df
# 
# Se ejecuta un join entre los 2 dtaframes para linkear los nombres de los pilotos
# Luego se elimina la columna duplicada que viene del join
fligths_ = vuelos.join(pilots, vuelos.Codigo_Piloto == pilots.Cod_Piloto, "left")
fligths_ = fligths_.drop("Cod_Piloto")
fligths_.show()

+-------------+---------+------+-------+------------------+------+---------------+
|Codigo_Piloto|Aerolinea|Origen|Destino|Minutos_de_retraso|OnTime|         Piloto|
+-------------+---------+------+-------+------------------+------+---------------+
|        43556|       10|   SAP|    HAJ|                40|  null|       John Max|
|        43557|        5|   MIA|    MIA|                30|  null|   Jilles Vlank|
|        43558|        3|   FLL|    FLL|                20|  null|    Jorge Mej�a|
|        43559|        1|   TEG|    SAP|                 0|  null|David Colindres|
|        43560|        1|   HAJ|    SAP|                50|  null|Maximilian Call|
|        43561|        8|   HHN|    SAP|                20|  null|   Muilin Mills|
|        43562|        2|   SAP|    HAJ|               -30|  null|    Gianni Falk|
|        43563|        8|   MIA|    HHN|               -30|  null|       Hous Hih|
|        43564|       10|   FLL|    SAP|               -30|  null|       Cho Ming|
|   

In [69]:
fligths_.count()

4302

In [75]:
# Step 2 -> Remove same origin and destination
fligths_ = fligths_[fligths_["Origen"] != fligths_["Destino"]]
fligths_.show()

+-------------+---------+------+-------+------------------+------+---------------+
|Codigo_Piloto|Aerolinea|Origen|Destino|Minutos_de_retraso|OnTime|         Piloto|
+-------------+---------+------+-------+------------------+------+---------------+
|        43556|       10|   SAP|    HAJ|                40|  null|       John Max|
|        43559|        1|   TEG|    SAP|                 0|  null|David Colindres|
|        43560|        1|   HAJ|    SAP|                50|  null|Maximilian Call|
|        43561|        8|   HHN|    SAP|                20|  null|   Muilin Mills|
|        43562|        2|   SAP|    HAJ|               -30|  null|    Gianni Falk|
|        43563|        8|   MIA|    HHN|               -30|  null|       Hous Hih|
|        43564|       10|   FLL|    SAP|               -30|  null|       Cho Ming|
|        43568|        4|   FLL|    SAP|                 0|  null|     Filg Gills|
|        43569|        9|   TEG|    MIA|                 0|  null|       Hung Cho|
|   

In [118]:
from pyspark.sql.functions import col,lit,when, count, desc

In [99]:
# Step 3 -> fill OnTime column
# Se genera un nuevo df con las columnas necesarias y modificando el valor del campo "OnTime"
# comparando el campo Minutos_de_retraso usando la funcion when de spark
# Finalmente nombramos la columna en el nuevo df como OnTime
fligths_ = fligths_.select("Codigo_Piloto", "Piloto", "Aerolinea", "Origen", "Destino", "Minutos_de_retraso", \
                    when(col("Minutos_de_retraso") <= 30, lit("A")) \
                    .when((col("Minutos_de_retraso") > 30) & (col("Minutos_de_retraso") <= 50), lit("B")) \
                    .otherwise(lit("C")) \
                    .alias("OnTime"))
fligths_.show()

+-------------+---------------+---------+------+-------+------------------+------+
|Codigo_Piloto|         Piloto|Aerolinea|Origen|Destino|Minutos_de_retraso|OnTime|
+-------------+---------------+---------+------+-------+------------------+------+
|        43556|       John Max|       10|   SAP|    HAJ|                40|     B|
|        43559|David Colindres|        1|   TEG|    SAP|                 0|     A|
|        43560|Maximilian Call|        1|   HAJ|    SAP|                50|     B|
|        43561|   Muilin Mills|        8|   HHN|    SAP|                20|     A|
|        43562|    Gianni Falk|        2|   SAP|    HAJ|               -30|     A|
|        43563|       Hous Hih|        8|   MIA|    HHN|               -30|     A|
|        43564|       Cho Ming|       10|   FLL|    SAP|               -30|     A|
|        43568|     Filg Gills|        4|   FLL|    SAP|                 0|     A|
|        43569|       Hung Cho|        9|   TEG|    MIA|                 0|     A|
|   

In [100]:
# Se crean las vistas temporales para las soluciones sql
fligths_.createOrReplaceTempView("vv_vuelos")
pilots.createOrReplaceTempView("vv_pilotos")

In [124]:
spark.sql("select * from vv_vuelos").show()

+-------------+---------------+---------+------+-------+------------------+------+
|Codigo_Piloto|         Piloto|Aerolinea|Origen|Destino|Minutos_de_retraso|OnTime|
+-------------+---------------+---------+------+-------+------------------+------+
|        43556|       John Max|       10|   SAP|    HAJ|                40|     B|
|        43559|David Colindres|        1|   TEG|    SAP|                 0|     A|
|        43560|Maximilian Call|        1|   HAJ|    SAP|                50|     B|
|        43561|   Muilin Mills|        8|   HHN|    SAP|                20|     A|
|        43562|    Gianni Falk|        2|   SAP|    HAJ|               -30|     A|
|        43563|       Hous Hih|        8|   MIA|    HHN|               -30|     A|
|        43564|       Cho Ming|       10|   FLL|    SAP|               -30|     A|
|        43568|     Filg Gills|        4|   FLL|    SAP|                 0|     A|
|        43569|       Hung Cho|        9|   TEG|    MIA|                 0|     A|
|   

In [145]:
# Export result to csv
df_pd = fligths_.toPandas()
df_pd.to_excel("results.xlsx", sheet_name='sheet_1')

In [128]:
# 1) Quien es el piloto con mas vuelos A
f_sql = spark.sql("SELECT Piloto, count(*) from vv_vuelos where OnTime = 'A' group by Piloto order by 2 desc limit 1")
f_sql.show()

+------------+--------+
|      Piloto|count(1)|
+------------+--------+
|Jonh Pierson|     668|
+------------+--------+



In [129]:
# 2) Cual es la aerolinea con mas vuelos C
f2_sql = spark.sql("SELECT Aerolinea, count(*) from vv_vuelos where OnTime = 'C' group by Aerolinea order by 2 desc limit 1")
f2_sql.show()

+---------+--------+
|Aerolinea|count(1)|
+---------+--------+
|        4|      32|
+---------+--------+



In [152]:
# 3) Hung Cho vuela para las aerolineas
f3_sql = spark.sql("SELECT distinct(Piloto), Aerolinea from vv_vuelos where Piloto = 'Hung Cho'")
f3_sql.show()

+--------+---------+
|  Piloto|Aerolinea|
+--------+---------+
|Hung Cho|        9|
|Hung Cho|        3|
|Hung Cho|        8|
|Hung Cho|        4|
+--------+---------+



In [153]:
# 4) Cuántos vuelos A, B, C tiene Chao Ma
f3_sql = spark.sql("SELECT Piloto, OnTime, count(*) from vv_vuelos where Piloto = 'Chao Ma' and OnTime in ('A', 'B', 'C') group by 1,2")
f3_sql.show()

+-------+------+--------+
| Piloto|OnTime|count(1)|
+-------+------+--------+
|Chao Ma|     B|       1|
|Chao Ma|     C|       2|
|Chao Ma|     A|       7|
+-------+------+--------+

