#Consultas en PySpark con Joins

#*En este proyecto buscaremos analizar tres datasets relacionales aplicaremos filtrados especificos para analizar y poder responder las preguntas clave en nuestro proyecto, para así denotar los valores de las distintas variables que lo conforman.*

*En este proyecto trabajamos con tres tablas en PySpark:
• flights_spark: contiene información de vuelos (origen, destino, fechas,
retrasos, avión utilizado, etc.).
• airports_spark: contiene información de aeropuertos (código FAA, nombre,
ciudad, estado, latitud, longitud).
• planes_spark: contiene información de aviones (número de cola tailnum,
fabricante, modelo, año de fabricación, etc.).*


1. Aeropuertos destino más frecuentes
2. Mostrar vuelos junto con fabricante del avión
3. Vuelos agrupados por fabricante del avión
4. Contar vuelos por aeropuerto (origen y destino juntos)
5. Promedio de distancia por aeropuerto de origen
6. Vuelos de más de 2000 km con modelo del avión
7. Número de vuelos por año de fabricación del avión
8. Vuelos con origen y destino mostrando ciudad de ambos aeropuertos
9. Promedio de retraso de salida por aeropuerto
10. Promedio de retraso de llegada por modelo de avión
11. Contar vuelos por ciudad origen y destino
12. Vuelos con aviones Boeing
13. Contar vuelos por fabricante
14. Vuelos con más de 2 horas de retraso en salida y llegada
15. Promedio de distancia recorrida por fabricante de avión

In [None]:
pip install findspark

In [None]:
!pip install pyspark

In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession


In [None]:
#Create Spark Session for builder
from pyspark.sql import SparkSession
spark=SparkSession.builder.master("local[1]")\
.appName("SparkByExamples.com")\
.getOrCreate()
print(spark.sparkContext)
print("Spark App Name :"+spark.sparkContext.appName)

In [None]:
from google.colab import drive
# Montar Google Drive
drive.mount('/content/drive', force_remount=True)

In [None]:
import pandas as pd
import numpy as np
flightsdf = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/flights_small.csv')
airportsdf = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/airports.csv')
planesdf = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/planes.csv')


In [None]:
flights_spark = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/flights_small.csv', header= True, inferSchema = True)
airports_spark = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/airports.csv', header= True, inferSchema = True)
planes_spark = spark.read.csv('/content/drive/MyDrive/Colab Notebooks/planes.csv', header= True, inferSchema = True)

In [None]:
flights_spark.show()

In [None]:
airports_spark.show()

In [None]:
planes_spark.show()

In [None]:
flights_spark.createOrReplaceTempView("flights")
airports_spark.createOrReplaceTempView("airports")
planes_spark.createOrReplaceTempView("planes")

In [None]:
spark.catalog.listTables()

In [None]:
sqlDF = spark.sql("SELECT * FROM flights")
sqlDF.show()

In [None]:
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
# Run the query
flights_counts = spark.sql(query)
# Convert the results to a pandas DataFrame
pd_counts = flights_counts.toPandas()
# Print the head of pd_counts
print(pd_counts.head())

In [None]:
flights_spark.join(airports_spark, flights_spark.origin == airports_spark.faa)\
.select("year", "month", "day", "origin", "dest", "name")\
.show(10)

In [None]:
from pyspark.sql import functions as F
#Agrupar por Origen y destino , y contar
flights_counts = flights_spark.groupBy("origin", "dest").agg(F.count("*").alias("N"))
#Convertir a pandas
pd_counts = flights_counts.toPandas()
#Imprimir las 10 primeras filas
print(pd_counts.head())

#Aeropuertos destino más frecuentes

In [None]:


flights_with_dest_airport_names = flights_spark.join(airports_spark, flights_spark.dest == airports_spark.faa)
most_frequent_dest_airports = flights_with_dest_airport_names.groupBy(airports_spark.name).agg(F.count("*").alias("count"))
most_frequent_dest_airports\
.orderBy("count", ascending=False)\
.show(10)

#Mostrar vuelos junto con fabricante del avión

In [None]:
flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)\
.select("flight","origin", "dest", "manufacturer")\
.show(10)

#Vuelos agrupados por fabricante del avión

In [None]:
from pyspark.sql import functions as F


flights_with_manufacturer = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)


flights_by_manufacturer = flights_with_manufacturer.groupBy("manufacturer").agg(F.count("*").alias("count"))

flights_by_manufacturer.orderBy("count", ascending=False).show()

# Conteo de vuelos por aeropuerto (origen y destino)

In [None]:
all_airports_df = flights_spark.selectExpr("origin as airport").unionAll(flights_spark.selectExpr("dest as airport"))
airport_counts_df_api = all_airports_df.groupBy("airport").agg(F.count("*").alias("flight_count"))
airport_counts_df_api.orderBy("flight_count", ascending=False).show()

#Promedio de distancia por aeropuerto de origen

In [None]:

avg_distance_by_origin = flights_spark.groupBy("origin").agg(F.avg("distance").alias("average_distance"))
avg_distance_by_origin.orderBy("average_distance", ascending=False).show(10)

#Vuelos de más de 2000 km con modelo del avión

In [None]:

long_distance_flights = flights_spark.filter(flights_spark.distance > 2000)
flights_with_model = long_distance_flights.join(planes_spark, long_distance_flights.tailnum == planes_spark.tailnum)
flights_with_model.select("flight","distance", "model").show()

#Número de vuelos por año de fabricación del avión

In [None]:

flights_with_plane_year = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)
flights_by_plane_year = flights_with_plane_year.groupBy(planes_spark.year).agg(F.count("*").alias("flight_count"))
flights_by_plane_year.orderBy(planes_spark.year).show()

# Vuelos con origen y destino mostrando ciudad de ambos aeropuertos

In [None]:

sql_query = """
SELECT
  f.flight,
  f.origin,
  origin_airport.name AS origin_airport_name,
  f.dest,
  dest_airport.name AS dest_airport_name
FROM flights f
JOIN airports origin_airport ON f.origin = origin_airport.faa
JOIN airports dest_airport ON f.dest = dest_airport.faa
"""


spark.sql(sql_query).show()

#Promedio de retraso de salida por aeropuerto.

In [None]:
from pyspark.sql import functions as F
avg_dep_delay_by_origin = flights_spark.groupBy("origin").agg(F.avg("dep_delay").alias("average_departure_delay"))
avg_dep_delay_by_origin.orderBy("average_departure_delay", ascending=False).show()

#Promedio de retraso de llegada por modelo de avión

In [None]:
flights_with_model = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)
avg_arr_delay_by_model = flights_with_model.groupBy(planes_spark.model).agg(F.avg("arr_delay").alias("average_arrival_delay"))
avg_arr_delay_by_model.orderBy("average_arrival_delay", ascending=False).show()

#Contar vuelos por ciudad origen y destino

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col # Import col here


flight_counts_by_cities = flights_spark.alias("f").join(
    airports_spark.alias("origin_airport"),
    col("f.origin") == col("origin_airport.faa")
).join(
    airports_spark.alias("dest_airport"),
    col("f.dest") == col("dest_airport.faa")
).groupBy(
    col("origin_airport.name").alias("origin_city"),
    col("dest_airport.name").alias("dest_city")
).agg(F.count("*").alias("flight_count"))


flight_counts_by_cities.orderBy("flight_count", ascending=False).show()

#Vuelos con aviones Boeing

In [None]:

boeing_flights = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)\
.filter(planes_spark.manufacturer == "BOEING")\
.select(flights_spark.tailnum, planes_spark.manufacturer, flights_spark.flight)


boeing_flights.show()

#Contar vuelos por fabricante

In [None]:
flights_with_manufacturer = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)
flights_by_manufacturer = flights_with_manufacturer.groupBy("manufacturer").agg(F.count("*").alias("count"))
flights_by_manufacturer.orderBy("count", ascending=False).show()

#Vuelos con más de 2 horas de retraso en salida y llegada

In [None]:
delayed_flights = flights_spark.filter((flights_spark.dep_delay > 120) & (flights_spark.arr_delay > 120))\
.select(flights_spark.dep_delay, flights_spark.arr_delay, flights_spark.flight)

delayed_flights.show()

#Promedio de distancia recorrida por fabricante de avión

In [None]:

flights_with_manufacturer = flights_spark.join(planes_spark, flights_spark.tailnum == planes_spark.tailnum)
avg_distance_by_manufacturer = flights_with_manufacturer.groupBy(planes_spark.manufacturer).agg(F.avg("distance").alias("average_distance"))
avg_distance_by_manufacturer.orderBy("average_distance", ascending=False).show()