In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
from pyspark.sql.functions import col, date_format, date_add
from pyspark.sql import functions as F
import sys
import os

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.insert(0, parent_dir)

from apis.airportdb import AirportDB
from apis.weatherbit import WeatherBitDB

spark = SparkSession.builder.appName("EDA with PySpark").getOrCreate()

In [3]:
df = spark.read.csv("../data/airports-database.csv", header=True, inferSchema=True)

In [4]:
df.show(5)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

In [5]:
#get size of the data
print((df.count(), len(df.columns)))

(336776, 21)


In [6]:
#get flight unique values 
df.select("flight").distinct().count()

3844

In [7]:
df[df["flight"] == 1959].show(5)

+----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+----+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| 110|2013|    1|  1|   804.0|           810|     -6.0|  1103.0|          1116|    -13.0|     DL|  1959| N947DL|   JFK| MCO|   147.0|     944|   8|    10|2013-01-01 08:00:00|Delta Air Lines Inc.|
| 991|2013|    1|  2|   800.0|           810|    -10.0|  1102.0|          1116|    -14.0|     DL|  1959| N995DL|   JFK| MCO|   143.0|     944|   8|    10|2013-01-02 08:00:00|Delta Air Lines Inc.|
|1935|2013|    1|  3

In [8]:
df.groupBy("flight").count().show(5)

+------+-----+
|flight|count|
+------+-----+
|   496|   10|
|  1959|  330|
|  4935|   32|
|   471|  129|
|  1580|   92|
+------+-----+
only showing top 5 rows



Temos 336776 registros de voos. <br/>
Temos 3844 registros único de voos, ou seja, 3844 voos que foram feitos mais de uma vez e com diferentes aeronaves, atrasos, número da cauda, etc. <br/>

In [9]:
#filter df dep_time = null and arr_time = 
df.filter(df["dep_time"].isNull() & df["arr_time"].isNull()).count() 

8255

8255 registros de voos foram cancelados. <br/>

In [10]:
df.select("dep_delay").describe().show()

+-------+------------------+
|summary|         dep_delay|
+-------+------------------+
|  count|            328521|
|   mean|12.639070257304708|
| stddev| 40.21006089212993|
|    min|             -43.0|
|    max|            1301.0|
+-------+------------------+



In [11]:
not_cancelled_flights = df.filter(df["dep_time"].isNotNull() | df["arr_time"].isNotNull())

In [12]:
not_cancelled_flights.groupBy("dest").count().orderBy("count", ascending=False).show(5)

+----+-----+
|dest|count|
+----+-----+
| ATL|16898|
| ORD|16642|
| LAX|16076|
| BOS|15049|
| MCO|13982|
+----+-----+
only showing top 5 rows



In [13]:
df.groupBy(["origin", "dest"]).count().orderBy("count", ascending=False).show(5)

+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   JFK| LAX|11262|
|   LGA| ATL|10263|
|   LGA| ORD| 8857|
|   JFK| SFO| 8204|
|   LGA| CLT| 6168|
+------+----+-----+
only showing top 5 rows



Maior tempo médio de atraso

In [14]:
df.groupBy("dest") \
    .mean("dep_delay") \
    .withColumnRenamed("avg(dep_delay)", "avg_dep_delay") \
    .orderBy("avg_dep_delay", ascending=False) \
    .show(5)

+----+------------------+
|dest|     avg_dep_delay|
+----+------------------+
| CAE|35.570093457943926|
| TUL| 34.90635451505017|
| OKC|30.568807339449542|
| BHM| 29.69485294117647|
| TYS|28.493955094991364|
+----+------------------+
only showing top 5 rows



Dia da semana com maior número de voos

In [15]:
df.select(['time_hour']).show()
df_with_weekday = df.withColumn(
    "day_of_week",
    date_format(col("time_hour"), "EEEE")
)

+-------------------+
|          time_hour|
+-------------------+
|2013-01-01 05:00:00|
|2013-01-01 05:00:00|
|2013-01-01 05:00:00|
|2013-01-01 05:00:00|
|2013-01-01 06:00:00|
|2013-01-01 05:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 05:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
|2013-01-01 06:00:00|
+-------------------+
only showing top 20 rows



In [16]:
df_with_weekday.select(["time_hour", "day_of_week"]).show(5)

+-------------------+-----------+
|          time_hour|day_of_week|
+-------------------+-----------+
|2013-01-01 05:00:00|    Tuesday|
|2013-01-01 05:00:00|    Tuesday|
|2013-01-01 05:00:00|    Tuesday|
|2013-01-01 05:00:00|    Tuesday|
|2013-01-01 06:00:00|    Tuesday|
+-------------------+-----------+
only showing top 5 rows



In [17]:
df_with_weekday.groupBy("day_of_week").count().orderBy("count", ascending=False).show()

+-----------+-----+
|day_of_week|count|
+-----------+-----+
|     Monday|50690|
|    Tuesday|50422|
|     Friday|50308|
|   Thursday|50219|
|  Wednesday|50060|
|     Sunday|46357|
|   Saturday|38720|
+-----------+-----+



Data Split for Model Training

In [18]:
df_with_weekday.show(2)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-----------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|day_of_week|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-----------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|    Tuesday|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00

In [19]:
X_columns = ["distance", "dep_delay", "arr_delay"]
y_columns = "air_time"

In [20]:
X = df.select(X_columns)
y = df.select(y_columns)

In [21]:
y.show()

+--------+
|air_time|
+--------+
|   227.0|
|   227.0|
|   160.0|
|   183.0|
|   116.0|
|   150.0|
|   158.0|
|    53.0|
|   140.0|
|   138.0|
|   149.0|
|   158.0|
|   345.0|
|   361.0|
|   257.0|
|    44.0|
|   337.0|
|   152.0|
|   134.0|
|   147.0|
+--------+
only showing top 20 rows



### Dados lat long de aeroportos

In [24]:
origins = df.select('origin').distinct()
dests = df.select('dest').distinct()
dests = dests.withColumnRenamed('dest', 'origin')
unique_locations = origins.union(dests).distinct()
location_list = [row['origin'] for row in unique_locations.collect()]

#### Fazendo request para a airportdb API para obter os dados de latitude e longitude dos aeroportos

In [25]:
location_dict = AirportDB.get_airport_data(location_list)

In [68]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

def get_location(airport_code):
    return location_dict.get(airport_code, None)

get_location_udf = udf(get_location,  ArrayType(FloatType()))

df_with_coordinates = df.withColumn("origin_coordinates", get_location_udf(df["origin"]))
df_with_coordinates.show(truncate=False)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+------------------------+-------------------+
|id |year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|time_hour          |name                    |origin_coordinates |
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+------------------------+-------------------+
|0  |2013|1    |1  |517.0   |515           |2.0      |830.0   |819           |11.0     |UA     |1545  |N14228 |EWR   |IAH |227.0   |1400    |5   |15    |2013-01-01 05:00:00|United Air Lines Inc.   |[40.6925, -74.1687]|
|1  |2013|1    |1  |533.0   |529           |4.0      |850.0   |830           |20.0     |UA     |1714  |N24211 |LGA   |IAH |2

### Dados Wind Speed dos aeroportos

In [84]:
df_unique_weather_requests = df_with_coordinates.select([
    col("origin_coordinates"),
    date_format(col("time_hour"), "yyyy-MM-dd").alias("start_date"),
    date_format(date_add(col("time_hour"), 1), "yyyy-MM-dd").alias("end_date")
]).dropDuplicates()

In [85]:
df_unique_weather_requests.show(2)

+-------------------+----------+----------+
| origin_coordinates|start_date|  end_date|
+-------------------+----------+----------+
|[40.6925, -74.1687]|2013-01-13|2013-01-14|
|[40.6925, -74.1687]|2013-01-15|2013-01-16|
+-------------------+----------+----------+
only showing top 2 rows



In [86]:
lat_lon_dates = df_unique_weather_requests.rdd.map(lambda row: (
    row['origin_coordinates'][0],  
    row['origin_coordinates'][1],  
    row['start_date'],             
    row['end_date']                
)).collect()

#### Fazendo request de Wind Speed

In [89]:
weather_data = WeatherBitDB.get_weather_list(lat_lon_dates)

In [99]:
weather_list = [(lat, lon, date, float(wind_speed)) for (lat, lon, date, _), wind_speed in weather_data.items()]

[(40.692501068115234, -74.168701171875, '2013-01-13', 1.5),
 (40.692501068115234, -74.168701171875, '2013-01-15', 3.9),
 (40.777198791503906, -73.87259674072266, '2013-10-04', 2.7),
 (40.639801025390625, -73.77890014648438, '2013-10-01', 3.6),
 (40.777198791503906, -73.87259674072266, '2013-01-10', 5.3),
 (40.777198791503906, -73.87259674072266, '2013-01-25', 4.0),
 (40.777198791503906, -73.87259674072266, '2013-01-27', 5.0),
 (40.692501068115234, -74.168701171875, '2013-01-05', 4.3),
 (40.777198791503906, -73.87259674072266, '2013-01-08', 3.8),
 (40.639801025390625, -73.77890014648438, '2013-10-04', 2.8),
 (40.639801025390625, -73.77890014648438, '2013-01-15', 3.9),
 (40.777198791503906, -73.87259674072266, '2013-01-28', 3.4),
 (40.777198791503906, -73.87259674072266, '2013-10-06', 4.0),
 (40.639801025390625, -73.77890014648438, '2013-01-11', 3.0),
 (40.639801025390625, -73.77890014648438, '2013-01-12', 2.8),
 (40.692501068115234, -74.168701171875, '2013-01-17', 4.2),
 (40.77719879150

In [100]:
schema = StructType([
    StructField("lat", DoubleType(), False),
    StructField("lon", DoubleType(), False),
    StructField("date", StringType(), False),
    StructField("wind_speed", DoubleType(), False)
])

In [101]:
weather_df = spark.createDataFrame(weather_list, schema)

In [103]:
df_with_coordinates = df_with_coordinates.withColumn("date", F.date_format(F.col("time_hour"), "yyyy-MM-dd"))

In [106]:
df_with_coordinates.show(2)

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-------------------+----------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name| origin_coordinates|      date|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-------------------+----------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|[40.6925, -74.1687]|2013-01-01|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|

In [107]:
weather_df.show(2)

+------------------+----------------+----------+----------+
|               lat|             lon|      date|wind_speed|
+------------------+----------------+----------+----------+
|40.692501068115234|-74.168701171875|2013-01-13|       1.5|
|40.692501068115234|-74.168701171875|2013-01-15|       3.9|
+------------------+----------------+----------+----------+
only showing top 2 rows



In [108]:
df_with_lat_lon = df_with_coordinates.withColumn("lat", F.col("origin_coordinates")[0]) \
                                     .withColumn("lon", F.col("origin_coordinates")[1])

In [109]:
df_final = df_with_lat_lon.join(weather_df, on=["lat", "lon", "date"], how="left")

#### Dataframe compondo dados de vento!

In [112]:
df_final.show(5)

+-------+--------+----------+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-------------------+----------+
|    lat|     lon|      date|    id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name| origin_coordinates|wind_speed|
+-------+--------+----------+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+-------------------+----------+
|40.6398|-73.7789|2013-01-01|     2|2013|    1|  1|   542.0|           540|      2.0|   923.0|           850|     33.0|     AA|  1141| N619AA|   JFK| MIA|   160.0|    1089|   5|    40|2013-01-01 05:00:00|American Airlines...|[40.