### 1. Recolha e Preparação de dados

In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import functions as F

client = MongoClient('mongodb://mongodb:27017/')
db = client.projeto
c_atrasos = db.atrasos
c_voos = db.voos

print(c_atrasos.find_one())
print(c_voos.find_one())

client.close()

{'_id': ObjectId('676ed62b177c65a442d95605'), 'year': 2023, 'month': 8, 'carrier': '9E', 'carrier_name': 'Endeavor Air Inc.', 'airport': 'ABE', 'airport_name': 'Allentown/Bethlehem/Easton, PA: Lehigh Valley International', 'arr_flights': 89.0, 'arr_del15': 13.0, 'carrier_ct': 2.25, 'weather_ct': 1.6, 'nas_ct': 3.16, 'security_ct': 0.0, 'late_aircraft_ct': 5.99, 'arr_cancelled': 2.0, 'arr_diverted': 1.0, 'arr_delay': 1375.0, 'carrier_delay': 71.0, 'weather_delay': 761.0, 'nas_delay': 118.0, 'security_delay': 0.0, 'late_aircraft_delay': 425.0}
{'_id': ObjectId('676ef5e70a6bf8a76f22671a'), 'Unnamed: 0': 6256560, 'ItinID': 201835057271, 'MktID': 20183505727101, 'MktCoupons': 1, 'Quarter': 3, 'Origin': 'MCO', 'OriginWac': 33, 'Dest': 'BWI', 'DestWac': 35, 'Miles': 787.0, 'ContiguousUSA': 2, 'NumTicketsOrdered': 2.0, 'AirlineCompany': 'WN', 'PricePerTicket': 96.0}


In [2]:
spark = SparkSession.builder \
    .appName("Voos e atrasos em 2018") \
    .config("spark.mongodb.input.uri", "mongodb://mongodb:27017/projeto") \
    .config("spark.mongodb.output.uri", "mongodb://mongodb:27017/projeto") \
    .getOrCreate()

df_atrasos = spark.read.format("mongo").option("collection", "atrasos").load()
df_voos = spark.read.format("mongo").option("collection", "voos").load()

In [3]:
print(df_atrasos.printSchema())
print(df_voos.printSchema())

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- airport_name: string (nullable = true)
 |-- arr_cancelled: double (nullable = true)
 |-- arr_del15: double (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- arr_diverted: double (nullable = true)
 |-- arr_flights: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- carrier_ct: double (nullable = true)
 |-- carrier_delay: double (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- late_aircraft_ct: double (nullable = true)
 |-- late_aircraft_delay: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- nas_ct: double (nullable = true)
 |-- nas_delay: double (nullable = true)
 |-- security_ct: double (nullable = true)
 |-- security_delay: double (nullable = true)
 |-- weather_ct: double (nullable = true)
 |-- weather_delay: double (nullable = true)
 |-- year: integer (nullable = true)

None
root
 |-- AirlineC

In [4]:
df_atrasos = df_atrasos.drop("carrier","airport_name","FLIGHT_NUM","ORIGIN_SEQ_ID","DEST_SEQ_ID","DEP_TIME","ARR_TIME","WEATHER_DELAY", "_id")
df_atrasos = df_atrasos.filter(df_atrasos["year"] == 2018)

df_atrasos = df_atrasos.dropna()

df_atrasos = df_atrasos.dropDuplicates()

null_atrasos = df_atrasos.select([count(when(col(c).isNull(), c)).alias(c) for c in df_atrasos.columns])

null_atrasos.show()

+-------+-------------+---------+---------+------------+-----------+----------+-------------+------------+----------------+-------------------+-----+------+---------+-----------+--------------+----------+----+
|airport|arr_cancelled|arr_del15|arr_delay|arr_diverted|arr_flights|carrier_ct|carrier_delay|carrier_name|late_aircraft_ct|late_aircraft_delay|month|nas_ct|nas_delay|security_ct|security_delay|weather_ct|year|
+-------+-------------+---------+---------+------------+-----------+----------+-------------+------------+----------------+-------------------+-----+------+---------+-----------+--------------+----------+----+
|      0|            0|        0|        0|           0|          0|         0|            0|           0|               0|                  0|    0|     0|        0|          0|             0|         0|   0|
+-------+-------------+---------+---------+------------+-----------+----------+-------------+------------+----------------+-------------------+-----+------+----

In [5]:
df_voos = df_voos.drop("ItinID","MktID","MktCoupons","ContiguousUSA","NumTicketsOrdered","AirlineCompany", "_id", "Unnamed: 0")

df_voos = df_voos.dropna()

df_voos = df_voos.dropDuplicates()

null_voos = df_voos.select([count(when(col(c).isNull(), c)).alias(c) for c in df_voos.columns])

null_voos.show()

+----+-------+-----+------+---------+--------------+-------+
|Dest|DestWac|Miles|Origin|OriginWac|PricePerTicket|Quarter|
+----+-------+-----+------+---------+--------------+-------+
|   0|      0|    0|     0|        0|             0|      0|
+----+-------+-----+------+---------+--------------+-------+

