# Environment

In [2]:
# !pip install pyspark==3.3.1
!pwd

/content


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [4]:
!wget -q https://dlcdn.apache.org/spark/spark-3.4.3/spark-3.4.3-bin-hadoop3.tgz
!ls

sample_data  spark-3.4.3-bin-hadoop3.tgz


In [5]:
! file spark-3.4.3-bin-hadoop3.tgz
!tar xf spark-3.4.3-bin-hadoop3.tgz


spark-3.4.3-bin-hadoop3.tgz: gzip compressed data, from Unix, original size modulo 2^32 431595520


In [6]:
!pip install -q findspark

In [7]:
# add airports-database.zip to instanc's storage"
!echo "y" | unzip "airports-database.zip"

unzip:  cannot find or open airports-database.zip, airports-database.zip.zip or airports-database.zip.ZIP.


In [8]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

import findspark
findspark.init()

# Q & A

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

csv_file_path = "airports-database.csv"

df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df.show()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [12]:
# 1. Qual é o número total de voos no conjunto de dados?
print(f"Q1 - Numero total de voos: {df.count()}")

Q1 - Numero total de voos: 336776


In [13]:
# 2. Quantos voos foram cancelados? (Considerando que voos cancelados têm dep_time e arr_time nulos)?
ans = df.filter(df.dep_time.isNull() & df.arr_time.isNull()).count()
print(f"Q2 - Numero de voos cancelados: {ans}")

Q2 - Numero de voos cancelados: 8255


In [14]:
# 3. Qual é o atraso médio na partida dos voos (dep_delay)?
from pyspark.sql import functions as F

ans = df.agg(F.mean("dep_delay")).collect()[0][0]
print(f"Q3 - Atraso medio: {ans} minutos")

Q3 - Atraso medio: 12.639070257304708 minutos


In [15]:
# 4. Quais são os 5 aeroportos com maior número de pousos?
dest_counts = df.groupBy("dest").count()
ans = dest_counts.orderBy(F.desc("count")).limit(5)

print(f"Q4 - Top 5 aeroportos por pouso:")
ans.show()

Q4 - Top 5 aeroportos por pouso:
+----+-----+
|dest|count|
+----+-----+
| ORD|17283|
| ATL|17215|
| LAX|16174|
| BOS|15508|
| MCO|14082|
+----+-----+



In [16]:
# 5. Qual é a rota mais frequente (par origin-dest)?
origin_dest_counts = df.groupBy("origin", "dest").count()
ans = origin_dest_counts.orderBy(F.desc("count")).limit(1)

print(f"Q5 - Rota mais frequente:")
ans.show()

Q5 - Rota mais frequente:
+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   JFK| LAX|11262|
+------+----+-----+



In [17]:
# 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?
carrier_delay_mean = df.groupBy("name").agg(F.mean("arr_delay").alias("mean_arr_delay"))
ans = carrier_delay_mean.orderBy(F.desc("mean_arr_delay")).limit(5)

print(f"Q6 - Top 5 companhias com maior atraso medio na chegada:")
ans.show()

Q6 - Top 5 companhias com maior atraso medio na chegada:
+--------------------+------------------+
|                name|    mean_arr_delay|
+--------------------+------------------+
|Frontier Airlines...|21.920704845814978|
|AirTran Airways C...|20.115905511811025|
|ExpressJet Airlin...| 15.79643108710965|
|  Mesa Airlines Inc.|15.556985294117647|
|SkyWest Airlines ...|11.931034482758621|
+--------------------+------------------+



In [18]:
# 7. Qual é o dia da semana com maior número de voos?
df_with_week_day = df.withColumn("week_day", F.dayofweek("time_hour"))

week_day_counts = df_with_week_day.groupBy("week_day").count()
ans = week_day_counts.orderBy(F.desc("count")).limit(1)

print(f"Q7 - Dia da semana com mais voos:")
ans.show()

Q7 - Dia da semana com mais voos:
+--------+-----+
|week_day|count|
+--------+-----+
|       2|50690|
+--------+-----+



In [19]:
# 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?
df_with_month = df.withColumn("month", F.month("time_hour"))

df_delayed = df_with_month.filter(df_with_month.dep_delay > 30)
delayed_count_per_month = df_delayed.groupBy("month").count().alias("delayed_count")
total_count_per_month = df_with_month.groupBy("month").count().alias("total_count")

ans = delayed_count_per_month.join(
    total_count_per_month, "month"
).withColumn(
    "percent_delayed (%)",
    (F.col("delayed_count.count") / F.col("totaL_count.count")) * 100
).select("month", "percent_delayed (%)")

print(f"Q8 - Percentual mensal dos voos que tiveram atraso na partida superior a 30 minutos:")
ans.orderBy("month").show()

Q8 - Percentual mensal dos voos que tiveram atraso na partida superior a 30 minutos:
+-----+-------------------+
|month|percent_delayed (%)|
+-----+-------------------+
|    1| 12.405569545252556|
|    2| 12.752995871908942|
|    3| 14.944163140736629|
|    4| 15.993646311330744|
|    5| 15.335463258785943|
|    6|  20.24218390397621|
|    7|  20.97875955819881|
|    8| 14.450847342039758|
|    9|   8.77275694494814|
|   10|  9.335733324102598|
|   11|  8.757517969781428|
|   12| 17.312955393637818|
+-----+-------------------+



In [20]:
# 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?
flights_to_sea = df.filter(df.dest == "SEA")
origin_counts = flights_to_sea.groupBy("origin").count()

ans = origin_counts.orderBy(F.desc("count"))

print(f"Q8 - Origem mais comum para voos que pousaram em Seattle (SEA):")
ans.show(1)

Q8 - Origem mais comum para voos que pousaram em Seattle (SEA):
+------+-----+
|origin|count|
+------+-----+
|   JFK| 2092|
+------+-----+
only showing top 1 row



In [21]:
# 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?
df_with_week_day = df.withColumn("weekday", F.dayofweek("time_hour"))

mean_dep_delay_per_weekday = df_with_week_day.groupBy("weekday").agg(
    F.mean("dep_delay").alias("mean_dep_delay")
)
ans = mean_dep_delay_per_weekday.orderBy("weekday")

print(f"Q10 - Média de atraso na partida dos voos por dia da semana:")
ans.show()

Q10 - Média de atraso na partida dos voos por dia da semana:
+-------+------------------+
|weekday|    mean_dep_delay|
+-------+------------------+
|      1|11.589531801152422|
|      2|14.778936729330908|
|      3|10.631682565455652|
|      4|11.803512219083876|
|      5|16.148919990957108|
|      6| 14.69605749486653|
|      7| 7.650502333676133|
+-------+------------------+



In [22]:
# 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?

average_air_time_per_route = df.groupBy("origin", "dest").agg(
    F.mean("air_time").alias("average_air_time")
)
ans = average_air_time_per_route.orderBy(F.desc("average_air_time"))

print(f"Q11 - Rota com maior air_time medio:")
ans.show(1)

Q11 - Rota com maior air_time medio:
+------+----+-----------------+
|origin|dest| average_air_time|
+------+----+-----------------+
|   JFK| HNL|623.0877192982456|
+------+----+-----------------+
only showing top 1 row



In [23]:
# 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?
from pyspark.sql.window import Window

origin_dest_counts = df.groupBy("origin", "dest").count()
window_spec = Window.partitionBy("origin").orderBy(F.desc("count"))

ans = origin_dest_counts.withColumn(
    "rank", F.row_number().over(window_spec)
).filter(F.col("rank") == 1).drop("rank")

print(f"Q12 - Destino mais comum por origem:")
ans.show()

Q12 - Destino mais comum por origem:
+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   EWR| ORD| 6100|
|   JFK| LAX|11262|
|   LGA| ATL|10263|
+------+----+-----+



In [24]:
# 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time) ?
route_stats = df.groupBy("origin", "dest").agg(
    F.mean("air_time").alias("average_air_time")
)
route_variation = route_stats.groupBy("origin").agg(
    F.max("average_air_time").alias("max_air_time"),
    F.min("average_air_time").alias("min_air_time")
).withColumn(
    "variation",
    F.col("max_air_time") - F.col("min_air_time")
)

ans = route_variation.orderBy(F.desc("variation"))

print(f"Q13 - 3 rotas que tiveram a maior variação no tempo de voo:")
ans.show(3)

Q13 - 3 rotas que tiveram a maior variação no tempo de voo:
+------+------------------+------------------+------------------+
|origin|      max_air_time|      min_air_time|         variation|
+------+------------------+------------------+------------------+
|   JFK| 623.0877192982456| 30.83687150837989| 592.2508477898657|
|   EWR|  612.075208913649|25.466019417475728| 586.6091894961733|
|   LGA|227.51599671862184| 35.43518518518518|192.08081153343664|
+------+------------------+------------------+------------------+



In [25]:
# 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?
flights_with_high_dep_delay = df.filter(df.dep_delay > 60)  # 60 minutos = 1 hora
ans = flights_with_high_dep_delay.agg(
    F.mean("arr_delay").alias("mean_arr_delay")
).collect()[0][0]

print(f"Q14 - Media de atraso de chegada em voos com atraso de partida acima de 1 hora: {ans}")

Q14 - Media de atraso de chegada em voos com atraso de partida acima de 1 hora: 119.04880549963919


In [26]:
# 15. Qual é a média de voos diários para cada mês do ano?
df_with_month_and_day = df.withColumn("month", F.month("time_hour")) \
                          .withColumn("day_of_year", F.dayofyear("time_hour"))

daily_flight_counts = df_with_month_and_day.groupBy("month", "day_of_year").count()

ans = daily_flight_counts.groupBy("month").agg(
    F.mean("count").alias("average_daily_flights")
)

print(f"Q15 - Media de voos diarios por mes:")
ans.show()

Q15 - Media de voos diarios por mes:
+-----+---------------------+
|month|average_daily_flights|
+-----+---------------------+
|   12|    907.5806451612904|
|    1|    871.0967741935484|
|    6|    941.4333333333333|
|    3|    930.1290322580645|
|    5|    928.9032258064516|
|    9|    919.1333333333333|
|    4|    944.3333333333334|
|    8|    946.0322580645161|
|    7|    949.1935483870968|
|   10|    931.9032258064516|
|   11|    908.9333333333333|
|    2|    891.1071428571429|
+-----+---------------------+



In [27]:
# 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?
delayed_arrivals = df.filter(df.arr_delay > 30)
route_counts = delayed_arrivals.groupBy("origin", "dest").count()

ans = route_counts.orderBy(F.desc("count"))

print(f"Q16 - 3 rotas mais comuns com atraso de chegada > 30 minutos:")
ans.show(3)

Q16 - 3 rotas mais comuns com atraso de chegada > 30 minutos:
+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   LGA| ATL| 1563|
|   JFK| LAX| 1286|
|   LGA| ORD| 1188|
+------+----+-----+
only showing top 3 rows



In [28]:
# 17. Para cada origem, qual o principal destino?
route_counts = df.groupBy("origin", "dest").count()

window_spec = Window.partitionBy("origin").orderBy(F.desc("count"))

ranked_routes = route_counts.withColumn("rank", F.row_number().over(window_spec))

main_destinations = ranked_routes.filter(F.col("rank") == 1).drop("rank")

print(f"Q17 - Para cada origem, qual o principal destino?:")
main_destinations.show()

Q17 - Para cada origem, qual o principal destino?:
+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   EWR| ORD| 6100|
|   JFK| LAX|11262|
|   LGA| ATL|10263|
+------+----+-----+



# Enrich

In [121]:
import requests
import datetime

airportdb_key = "API_KEY_HERE" # "a5fcdbee03ccf965a0ea9047924c7e81dd1f266899309b561f16a63b22b2fc018a0bad0b5bf36ef638ea54afeb1c18ce"
weatherbit = "API_KEY_HERE"  # "08bf41a2898c4fd9a17ff1785f119774"

In [104]:
top_delayed_flights = df.orderBy(F.desc("arr_delay")).limit(5)

all_origins = list(top_delayed_flights.select('origin').distinct().toPandas()['origin'])
all_destinations = list(top_delayed_flights.select('dest').distinct().toPandas()['dest'])
all_origins, all_destinations

In [138]:
fields = ["origin", "dest", "time_hour"]
flight_data = top_delayed_flights.select(*fields).collect()

for origin, dest, time_hour in flight_data:
    url = f"https://airportdb.io/api/v1/airport/K{origin}?apiToken={airportdb_key}"
    response = requests.get(url)
    data = response.json()

    url = f"https://api.weatherbit.io/v2.0/history/daily"
    lat, lon = data["latitude_deg"], data["longitude_deg"]
    start_date, end_date = time_hour.date(), time_hour.date() + datetime.timedelta(days=1)

    params = {"lat": lat, "lon": lon, "start_date": start_date, "end_date": end_date, "key": weatherbit}
    headers = {"Accept": "application/json"}
    response = requests.get(url, params=params, headers=headers)
    data = response.json()
    wind_spd = data["data"][0]["wind_spd"]

    print(
        {"origin" : origin,
        "dest" : dest,
        "lat" : lat,
        "lon": lon,
        "start_date": str(start_date),
        "end_date": str(end_date),
        "wind_spd": wind_spd}
    )


{'origin': 'JFK', 'dest': 'HNL', 'lat': 40.639801, 'lon': -73.7789, 'start_date': '2013-01-09', 'end_date': '2013-01-10', 'wind_spd': 3.5}
{'origin': 'JFK', 'dest': 'CMH', 'lat': 40.639801, 'lon': -73.7789, 'start_date': '2013-06-15', 'end_date': '2013-06-16', 'wind_spd': 4.1}
{'origin': 'EWR', 'dest': 'ORD', 'lat': 40.692501, 'lon': -74.168701, 'start_date': '2013-01-10', 'end_date': '2013-01-11', 'wind_spd': 4.1}
{'origin': 'JFK', 'dest': 'SFO', 'lat': 40.639801, 'lon': -73.7789, 'start_date': '2013-09-20', 'end_date': '2013-09-21', 'wind_spd': 3.7}
{'origin': 'JFK', 'dest': 'CVG', 'lat': 40.639801, 'lon': -73.7789, 'start_date': '2013-07-22', 'end_date': '2013-07-23', 'wind_spd': 3.2}


# Model Manipulation

In [11]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- name: string (nullable = true)



In [12]:
df = df.drop('time_hour')

pdf = df.toPandas()
pdf = pdf.sample(frac=0.1)

In [13]:
# Tratar valores nulos (exemplo: preencher com a média para colunas numéricas e com a moda para colunas categóricas)
pdf.fillna({
    'dep_time': 0,
    'sched_dep_time': pdf['sched_dep_time'].mode()[0],
    'dep_delay': pdf['dep_delay'].mean(),
    'arr_time': pdf['arr_time'].mean(),
    'sched_arr_time': pdf['sched_arr_time'].mode()[0],
    'arr_delay': pdf['arr_delay'].mean(),
    'carrier': pdf['carrier'].mode()[0],
    'flight': pdf['flight'].mode()[0],
    'tailnum': pdf['tailnum'].mode()[0],
    'origin': pdf['origin'].mode()[0],
    'dest': pdf['dest'].mode()[0],
    'air_time': pdf['air_time'].mean(),
    'distance': pdf['distance'].mean(),
    'hour': pdf['hour'].mode()[0],
    'minute': pdf['minute'].mode()[0],
    'name': pdf['name'].mode()[0]
}, inplace=True)

In [14]:
from sklearn.model_selection import train_test_split
import pandas as pd

# Definir features (X) e target (Y)
X = pdf[['year', 'month', 'day', 'dep_time', 'sched_dep_time', 'dep_delay', 'arr_time', 'sched_arr_time', 'carrier', 'flight', 'tailnum', 'origin', 'dest', 'air_time', 'distance', 'hour', 'minute']]
Y = pdf['arr_delay']

# Converter variáveis categóricas para numéricas usando one-hot encoding
X = pd.get_dummies(X, columns=['carrier', 'origin', 'dest', 'tailnum'])

# Dividir os dados em conjuntos de treinamento e teste
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)


In [16]:
from sklearn.ensemble import RandomForestRegressor

# Inicializar e treinar o modelo
model = RandomForestRegressor(n_estimators=10, random_state=42)
model.fit(X_train, Y_train)

In [17]:
from sklearn.metrics import mean_squared_error, r2_score

# Fazer previsões
Y_pred = model.predict(X_test)

# Calcular métricas
mse = mean_squared_error(Y_test, Y_pred)
r2 = r2_score(Y_test, Y_pred)

print(f"Mean Squared Error: {mse}")
print(f"R2 Score: {r2}")

Mean Squared Error: 251.9870202637964
R2 Score: 0.8737426533168348


In [18]:
import joblib

# Salvar o modelo
joblib.dump(model, 'random_forest_model.pkl')

['random_forest_model.pkl']

In [27]:
loaded_model = joblib.load('random_forest_model.pkl')

prediction = loaded_model.predict(X_test)
print(f"Atraso de chegada previsto: {prediction[0]} minutos")

Atraso de chegada previsto: 28.1 minutos


In [1]:
X_test

NameError: name 'X_test' is not defined