# Algorithm with Spark SQL

Using the files pilotos.csv and vuelos.csv create datesets and retrieve data

In [None]:
#pip install unidecode

In [1]:
"""
Import Libraries
"""
import findspark
findspark.init()
from unidecode import unidecode
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession  # SparkSession representa la conexión para trabajar con SQL
from pyspark import SparkConf, SparkContext # SparkContext configuración de spark


"""
# start variables
"""
sparkConf = SparkConf().setAppName("My SparkQL Application")
sc = SparkContext(conf=sparkConf)
spark = SparkSession(sc)
spark.sparkContext.setLogLevel("ERROR")

In [18]:

"""----- Create dataframes----"""
vuelos_ = spark.read.load(
    "vuelos.csv", format="csv", sep=",", inferSchema="true", header="true",
)

pilotos = spark.read.load(
    "pilotos.csv", format="csv", sep=",", inferSchema="true", header="true",
)
pilotos.show(5)

vuelos=vuelos_.withColumnRenamed("Aerolínea","Aerolinea")  #Se modifica este campo para no tener problemas de caracteres extraños

vuelos.show(10)

+-------------+---------------+
|Codigo Piloto|         Piloto|
+-------------+---------------+
|        43556|       John Max|
|        43557|   Jilles Vlank|
|        43558|    Jorge Mej�a|
|        43559|David Colindres|
|        43560|Maximilian Call|
+-------------+---------------+
only showing top 5 rows

+-------------+---------+------+-------+------------------+------+
|Codigo Piloto|Aerolinea|Origen|Destino|Minutos de retraso|OnTime|
+-------------+---------+------+-------+------------------+------+
|        43556|       10|   SAP|    HAJ|                40|  null|
|        43557|        5|   MIA|    MIA|                30|  null|
|        43558|        3|   FLL|    FLL|                20|  null|
|        43559|        1|   TEG|    SAP|                 0|  null|
|        43560|        1|   HAJ|    SAP|                50|  null|
|        43561|        8|   HHN|    SAP|                20|  null|
|        43562|        2|   SAP|    HAJ|               -30|  null|
|        43563|  

In [5]:

"""------function to take away strange characters---"""
def correccion (x):
    return unidecode(x)
ascii_udf = udf(correccion)

In [11]:
"""-----#Create a new dataframe with the column pilot witout strange Characters---"""
df_pilotos = pilotos.withColumn("Piloto_", ascii_udf('Piloto'))
df_pilotos.show(10)


+-------------+---------------+---------------+
|Codigo Piloto|         Piloto|        Piloto_|
+-------------+---------------+---------------+
|        43556|       John Max|       John Max|
|        43557|   Jilles Vlank|   Jilles Vlank|
|        43558|    Jorge Mej�a|     Jorge Meja|
|        43559|David Colindres|David Colindres|
|        43560|Maximilian Call|Maximilian Call|
|        43561|   Muilin Mills|   Muilin Mills|
|        43562|    Gianni Falk|    Gianni Falk|
|        43563|       Hous Hih|       Hous Hih|
|        43564|       Cho Ming|       Cho Ming|
|        43565|        Chao Ma|        Chao Ma|
+-------------+---------------+---------------+
only showing top 10 rows



In [7]:
"""----- create Temp View to work with SQL -----"""
vuelos.createOrReplaceTempView("vw_vuelos")  # este es el nombre de la tabla
df_pilotos.createOrReplaceTempView("vw_pilotos")  # este es el nombre de la tabla

In [26]:
# Easiest solution
spark.sql("select  count(Aerolinea) cant_flights,aerolinea from vw_vuelos group by aerolinea ORDER BY 1 DESC LIMIT 1").show()

+------------+---------+
|cant_flights|aerolinea|
+------------+---------+
|         457|        7|
+------------+---------+



In [25]:
print("\n Which airline has more fligths?\n")

# Dos formas de dar solución a la pregunta:

# Con dense_rank
spark.sql("with n_vuelos(select count(Aerolinea) cant_origen,aerolinea from vw_vuelos group by aerolinea)\
            ,max_aer(SELECT DENSE_RANK() OVER (ORDER BY cant_origen DESC) rank\
                ,aerolinea,cant_origen as n_viajes from n_vuelos) \
                    select * from max_aer  WHERE RANK = 1").show()




 Which airline has more fligths?

+----+---------+--------+
|rank|aerolinea|n_viajes|
+----+---------+--------+
|   1|        7|     457|
+----+---------+--------+



In [13]:

# Con subconsultas
spark.sql(" WITH vw_max_vuelos as\
                (SELECT Aerolinea, COUNT(Aerolinea) AS cant_vuelos FROM vw_vuelos\
                 GROUP BY Aerolinea)\
                        SELECT Aerolinea, cant_vuelos FROM vw_max_vuelos\
                        WHERE cant_vuelos >= (SELECT MAX(cant_vuelos) FROM vw_max_vuelos )").show()

+---------+-----------+
|Aerolinea|cant_vuelos|
+---------+-----------+
|        7|        457|
+---------+-----------+



In [27]:
print("\nWhich origin has more frecuency?\n")
#dos formas de resolverlo
# Con dense_rank
spark.sql("WITH CANTIDAD AS (SELECT COUNT(1) CANTIDAD,ORIGEN FROM vw_vuelos group by origen)\
            , MAX_ORIG AS (SELECT DENSE_RANK() OVER (ORDER BY CANTIDAD DESC)RANK,ORIGEN,CANTIDAD FROM CANTIDAD)\
                 SELECT * FROM MAX_ORIG WHERE RANK = 1").show()


# Con subconsultas
spark.sql(" WITH vw_max_origen as\
                (SELECT Origen, COUNT(Origen) AS cant_origen FROM vw_vuelos\
                 GROUP BY Origen)\
                        SELECT Origen, cant_origen FROM vw_max_origen\
                        WHERE cant_origen>= (SELECT MAX(cant_origen) FROM vw_max_origen )")


Which origin has more frecuency?

+----+------+--------+
|RANK|ORIGEN|CANTIDAD|
+----+------+--------+
|   1|   SAP|    1404|
+----+------+--------+



DataFrame[Origen: string, cant_origen: bigint]

In [32]:
spark.sql("select  count(Origen) cant_origin ,Origen from vw_vuelos group by Origen ORDER BY 1 DESC LIMIT 1").show()

+-----------+------+
|cant_origin|Origen|
+-----------+------+
|       1404|   SAP|
+-----------+------+



In [33]:

print("\nFrom where the airline 8 flights with more frecuency\n")
#Dos formas de resolverlo
# Con dense_rank
spark.sql("with num_ori_aer as(select count(1) cant_origen,origen,aerolinea from vw_vuelos where aerolinea = 8 \
             group by origen,aerolinea)\
                , max_aer as (select dense_rank() over (order by cant_origen desc) rank, cant_origen,origen,aerolinea from num_ori_aer)\
                    select * from max_aer where rank =1").show()

# simple
spark.sql("select count(1) cant_origen,origen,aerolinea from vw_vuelos where aerolinea = 8 \
             group by origen,aerolinea order by 1 desc limit 1").show()


From where the airline 8 flights with more frecuency

+----+-----------+------+---------+
|rank|cant_origen|origen|aerolinea|
+----+-----------+------+---------+
|   1|        134|   SAP|        8|
+----+-----------+------+---------+

+-----------+------+---------+
|cant_origen|origen|aerolinea|
+-----------+------+---------+
|        134|   SAP|        8|
+-----------+------+---------+



In [35]:

print("\nWich pilot has more flights?\n")

spark.sql("SELECT count(Origen) cant_vuelos,b.Piloto FROM vw_vuelos a \
           inner join vw_pilotos b on a.`codigo piloto` = b.`codigo piloto` \
             group by a.`codigo piloto`,b.piloto order by 1 desc limit 1 ").show()


spark.sql("SELECT P.Piloto, COUNT(*) AS Vuelos FROM vw_vuelos V \
    INNER JOIN vw_pilotos P ON V.`Codigo Piloto` = P.`Codigo Piloto`\
        GROUP BY P.Piloto ORDER BY 2 DESC  LIMIT 1;").show()


Wich pilot has more flights?

+-----------+------------+
|cant_vuelos|      Piloto|
+-----------+------------+
|       1028|Jonh Pierson|
+-----------+------------+

+------------+------+
|      Piloto|Vuelos|
+------------+------+
|Jonh Pierson|  1028|
+------------+------+

