In [1]:
# Proyecto: Análisis de Datos de Vuelos con PySpark

# En este ejercicio, usarás el conjunto de datos de vuelos 
# de la Administración Federal de Aviación de los Estados 
# Unidos (FAA) para realizar un análisis básico de los vuelos.


In [2]:
# Obtén los Datos

# Descarga un conjunto de datos de vuelos de la FAA. 
# Aquí está un enlace a los datos: 
# http://stat-computing.org/dataexpo/2009/the-data.html

# Para este ejercicio, puedes descargar el conjunto de datos de 2008.


In [3]:
import numpy as np
import matplotlib.pyplot as plt

from keras.models import Sequential
from keras.layers import Input, Dense
from keras.utils import to_categorical
from keras.models import Model

import warnings
warnings.filterwarnings('ignore')


In [4]:
import pyspark 
from pyspark import SparkContext
sc =SparkContext()

from pyspark import SQLContext
sqlContext = SQLContext(sc)

In [5]:
#
# MUY IMPORTANTE:
#
# Para que funcione el ejercicio, debes descargar el archivo
# 2008.rar y guardar y descomprimir el archivo 2008.csv en la
# misma carpeta donde se encuentra este archivo .ipynb


# Lee los datos con PySpark. 
df = sqlContext.read.csv("2008.csv", sep = ",", inferSchema = True, header = True)

In [6]:
# Explorar los Datos
# Explora los datos para familiarizarte con ellos. Aquí hay algunas cosas que podrías hacer:

# Mostrar las primeras filas del objeto con los datos.
df.show(5)


+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|TailNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|CancellationCode|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+----------------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   1343|      1325|   1451|      1435|           WN|      588

In [7]:
# Ver la cantidad de filas y columnas del objeto con los datos.

# get rows
num_filas = df.count()
print("----------------------------")
print("Número de filas:", num_filas)


# get columns
num_columnas = len(df.columns)
print("Número de columnas:", num_columnas)
print("----------------------------")


----------------------------
Número de filas: 2389217
Número de columnas: 29
----------------------------


In [8]:
# Mostrar el esquema del objeto con los datos. Qué tipo de objeto es?

df.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [9]:
# Limpieza de Datos

# Asegúrate de que los datos estén limpios. Puedes necesitar hacer cosas como:

# Eliminar filas con datos faltantes.
from pyspark.sql.functions import col

columnas = df.columns

# Revisar las cantidad de filas con NA para cada columna
#============================================================================
col_drop = []
for columna in columnas:
    df_na = df.filter(col(columna).isNull()).count()
    if df_na > 0:
        col_drop.append(columna) # agregar a la lista de columnas a eliminar
        print(f"Filas con valores nulos en la columna '{columna}': {df_na}")


# drop rows with NA
print ("----------------------------")
print ("Eliminando filas con valores nulos...", col_drop)

df_clean = df = df.drop(*col_drop)
print(df_clean.columns)

Filas con valores nulos en la columna 'TailNum': 42452
Filas con valores nulos en la columna 'CancellationCode': 2324775
----------------------------
Eliminando filas con valores nulos... ['TailNum', 'CancellationCode']
['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Diverted', 'CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']


In [10]:

columnas = df_clean.columns
# check rows with NA as string
for columna in columnas:
    df_na = df_clean.filter(col(columna).like("%NA%")).count()
    if df_na > 0:
        print(f"Filas con valores nulos como texto en la columna '{columna}': {df_na}")



Filas con valores nulos como texto en la columna 'DepTime': 64442
Filas con valores nulos como texto en la columna 'ArrTime': 70096
Filas con valores nulos como texto en la columna 'ActualElapsedTime': 70096
Filas con valores nulos como texto en la columna 'CRSElapsedTime': 407
Filas con valores nulos como texto en la columna 'AirTime': 70096
Filas con valores nulos como texto en la columna 'ArrDelay': 70096
Filas con valores nulos como texto en la columna 'DepDelay': 64442
Filas con valores nulos como texto en la columna 'Origin': 40481
Filas con valores nulos como texto en la columna 'Dest': 40497
Filas con valores nulos como texto en la columna 'TaxiIn': 70096
Filas con valores nulos como texto en la columna 'TaxiOut': 64442
Filas con valores nulos como texto en la columna 'CarrierDelay': 1804634
Filas con valores nulos como texto en la columna 'WeatherDelay': 1804634
Filas con valores nulos como texto en la columna 'NASDelay': 1804634
Filas con valores nulos como texto en la column

In [11]:
columnas = ['ActualElapsedTime', 'Dest']

# check rows with NA as string
for columna in columnas:
    if df_clean.schema[columna].dataType == 'string':
        df_clean = df_clean.filter(~col(columna).like("%NA%"))

df_clean.show(5)

+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|Year|Month|DayofMonth|DayOfWeek|DepTime|CRSDepTime|ArrTime|CRSArrTime|UniqueCarrier|FlightNum|ActualElapsedTime|CRSElapsedTime|AirTime|ArrDelay|DepDelay|Origin|Dest|Distance|TaxiIn|TaxiOut|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|
+----+-----+----------+---------+-------+----------+-------+----------+-------------+---------+-----------------+--------------+-------+--------+--------+------+----+--------+------+-------+---------+--------+------------+------------+--------+-------------+-----------------+
|2008|    1|         3|        4|   1343|      1325|   1451|      1435|           WN|      588|               68|            70|     55|      16|      18|   HOU| LIT|   

In [12]:
# Convertir columnas a los tipos de datos correctos.

from pyspark.sql import types

df_clean = df_clean.withColumn('ActualElapsedTime', df_clean['ActualElapsedTime'].cast(types.IntegerType()))

df_clean.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: string (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- N

In [13]:
# Análisis de Datos

#¿Cuál es el vuelo más largo? Que unidad de tiempo es más apropiada? De qué depende? Seleccione una.
from pyspark.sql import functions as F

max_vuelo = df_clean.select(F.max("ActualElapsedTime")).first()[0]
print(f"El tiempo máximo de vuelo es: {max_vuelo} minutos")

df_with_hours = df_clean.withColumn("ActualElapsedTime_hours", F.col("ActualElapsedTime") / 60)

max_vuelo_hrs = df_with_hours.select(F.max("ActualElapsedTime_hours")).first()[0]
print(f"El tiempo máximo de vuelo es: {max_vuelo_hrs:.2f} horas **")

print("Es mas apropiado en horas, ya que es mas facil de interpretar...")


El tiempo máximo de vuelo es: 905 minutos
El tiempo máximo de vuelo es: 15.08 horas **
Es mas apropiado en horas, ya que es mas facil de interpretar...


In [14]:
#¿Cuál es la aerolínea con más vuelos?

# get flights by airline
df_vuelos_aerolinas = df_with_hours.groupBy("UniqueCarrier").count()

# get max flights
max_aerolinea = df_vuelos_aerolinas.select(F.max("count")).first()[0]

#get airline with max flights
df_max_aerolinea = df_vuelos_aerolinas.filter(F.col("count") == max_aerolinea)
aerolinea = df_max_aerolinea.collect()[0][0]
vuelos = df_max_aerolinea.collect()[0][1]

print(f"La aerolínea con más vuelos es: {aerolinea}, con {vuelos} viajes")

La aerolínea con más vuelos es: WN, con 398966 viajes


In [15]:
#¿Cuál es el destino más popular?

# get flights by destination
df_vuelos_destino = df_with_hours.groupBy("Dest").count()

# get max flights
max_destino = df_vuelos_destino.select(F.max("count")).first()[0]

# get destination with max flights
df_max_destino = df_vuelos_destino.filter(F.col("count") == max_destino)

destino = df_max_destino.collect()[0][0]
vuelos = df_max_destino.collect()[0][1]

print(f"El destino con más vuelos es: {destino}, con {vuelos} viajes")

El destino con más vuelos es: ATL, con 136950 viajes


In [16]:
#¿Qué día de la semana tiene más vuelos?

# get flights by day of week
df_vuelos_dia = df_with_hours.groupBy("DayOfWeek").count()

# get max flights
max_dia = df_vuelos_dia.select(F.max("count")).first()[0]

# get day with max flights
df_max_dia = df_vuelos_dia.filter(F.col("count") == max_dia)

# change day number to day name
df_max_dia = df_max_dia.withColumn("DayOfWeek", F.when(F.col("DayOfWeek") == 1, "Lunes")\
    .when(F.col("DayOfWeek") == 2, "Martes")\
    .when(F.col("DayOfWeek") == 3, "Miércoles")\
    .when(F.col("DayOfWeek") == 4, "Jueves")\
    .when(F.col("DayOfWeek") == 5, "Viernes")\
    .when(F.col("DayOfWeek") == 6, "Sábado")\
    .when(F.col("DayOfWeek") == 7, "Domingo"))




# get day with max flights
dia = df_max_dia.collect()[0][0]
vuelos = df_max_dia.collect()[0][1]

print(f"El día con más vuelos es: {dia}, con {vuelos} viajes")


El día con más vuelos es: Miércoles, con 365560 viajes


In [17]:
# close spark session
sc.stop()