In [1]:
import plotly.express as px
import plotly.graph_objs as go
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from plotly.subplots import make_subplots
import plotly.offline as py


In [2]:
spark = SparkSession.builder.appName("flights").getOrCreate()
import json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType
with open("../matrix/schema.json","r") as f:
    schema = StructType.fromJson(json.load(f))

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/26 18:16:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv("../../data.nosync/cleaned/cleaned_flights.csv",schema=schema, header=True)

In [4]:
# get the unique Origin and Dest airports
origin_airports = df.select(df.Origin).distinct()
dest_airports = df.select(df.Dest).distinct()

# merge the two dataframes
airports = origin_airports.union(dest_airports).distinct().toPandas()

                                                                                

In [6]:
# convert to list
airports = airports['Origin'].tolist()


In [8]:
# get the differente flights dates
dates = df.select(df.FlightDate).distinct().toPandas()



Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead



In [11]:
# sort the dates
dates = dates.sort_values(by=['FlightDate']).reset_index(drop=True)
dates


Unnamed: 0,FlightDate
0,2013-01-01
1,2013-01-02
2,2013-01-03
3,2013-01-04
4,2013-01-05
...,...
360,2013-12-27
361,2013-12-28
362,2013-12-29
363,2013-12-30


In [None]:
airports_sp = spark.read.csv("../../preprocessing/airports.csv", header=True,inferSchema=True)
airports = airports_sp.toPandas()

In [None]:
# get corrispondence between us states abbreviations and full names, from a dataset on internet, puerto rico included
states = pd.read_csv("../../states.csv",delimiter="\t",header=None)
states.columns = ["State","unk","Abbreviation"]
states.head()

In [None]:
def routes_queries(df,date_start,date_end,origin="BOS",query="NumFlights",scope="airports"):
    df_aggregated = df.filter((col("Origin") == origin))
    df_aggregated = df_aggregated.filter((col("FlightDate") >= date_start) & (col("FlightDate") <= date_end))
    # aggregate by flight date, day of weew, Origin and Dest, count the number of flights and average the arrival delay
    if scope == "airports":
        df_aggregated = df_aggregated.groupBy("Origin","Dest","ORIGIN_LATITUDE","ORIGIN_LONGITUDE","DEST_LATITUDE","DEST_LONGITUDE").agg({"ArrDelay": "avg","*":"count"}).withColumnRenamed("avg(ArrDelay)", "AverageArrivalDelay").withColumnRenamed("count(1)", "NumFlights")
    else:
        df_aggregated = df_aggregated.groupBy("ORIGIN_STATE","DEST_STATE").agg({"ArrDelay": "avg","*":"count","ORIGIN_LATITUDE":"avg","DEST_LATITUDE":"avg","ORIGIN_LONGITUDE":"avg","DEST_LONGITUDE":"avg"}).withColumnRenamed("avg(ArrDelay)", "AverageArrivalDelay").withColumnRenamed("count(1)", "NumFlights")
        # rename columns
        df_aggregated = df_aggregated.withColumnRenamed("avg(ORIGIN_LATITUDE)", "ORIGIN_LATITUDE").withColumnRenamed("avg(ORIGIN_LONGITUDE)", "ORIGIN_LONGITUDE").withColumnRenamed("avg(DEST_LATITUDE)", "DEST_LATITUDE").withColumnRenamed("avg(DEST_LONGITUDE)", "DEST_LONGITUDE")

    # sort by query and take the first 100 rows
    df_aggregated = df_aggregated.orderBy(df_aggregated[query].desc()).limit(100)
    return df_aggregated

In [None]:
# create a timestamp object from the string
date_start = "2013-02-01"
date_end = "2013-12-28"
# convert the string to a timestamp object
date_start = pd.Timestamp(date_start)
date_end = pd.Timestamp(date_end)


In [None]:
df_aggregatedp = routes_queries(df,date_start,date_end)
#show on map the routes


In [None]:
df_aggregatedp=df_aggregatedp.toPandas()

In [None]:
# join the airports dataframe with the aggregated dataframe compare with Origin and Dest
df_aggregatedp = df_aggregatedp.merge(airports, left_on="Origin", right_on="IATA")
df_aggregatedp = df_aggregatedp.merge(airports, left_on="Dest", right_on="IATA")


In [None]:
df_aggregated = df_aggregatedp.sort_values(by="AverageArrivalDelay", ascending=False).head(100)

In [None]:
df_aggregated["AverageArrivalDelay"] = df_aggregated["AverageArrivalDelay"]+ df_aggregated["AverageArrivalDelay"].min()*-1

In [None]:
df_aggregated["AverageArrivalDelay"].min()

In [None]:
states.head()

In [None]:
def plot_routes(df,date_start,date_to,origin="BOS",query="NumFlights",scope="airports"):
    df_aggregated=routes_queries(df,date_start,date_to,origin,query,scope).toPandas()
    print(len(df_aggregated))
    if scope == "airports":
        df_aggregated = df_aggregated.merge(airports, left_on="Origin", right_on="IATA")
        df_aggregated = df_aggregated.merge(airports, left_on="Dest", right_on="IATA")
    else:
        # join with states
        df_aggregated = df_aggregated.merge(states, left_on="ORIGIN_STATE", right_on="Abbreviation")
        df_aggregated = df_aggregated.merge(states, left_on="DEST_STATE", right_on="Abbreviation")
        
    
    fig = go.Figure()
    



    source_to_dest = zip(df_aggregated["ORIGIN_LATITUDE"], df_aggregated["DEST_LATITUDE"],
                         df_aggregated["ORIGIN_LONGITUDE"], df_aggregated["DEST_LONGITUDE"],
                         df_aggregated[query])

    ## Loop thorugh each flight entry to add line between source and destination
    for slat,dlat, slon, dlon, num_flights in source_to_dest:
        fig.add_trace(go.Scattergeo(
                            lat = [slat,dlat],
                            lon = [slon, dlon],
                            mode = 'lines',
                            line = dict(width = 1, color="red"),
                            # disable hover info
                            hoverinfo="skip",
                            textposition="top center"
                    ))

    ## Logic to create labels of source and destination cities of flights
    if scope=="airports":
        cities = df_aggregated["AIRPORT_x"].values.tolist()+df_aggregated["AIRPORT_y"].values.tolist()
    else:
        cities = df_aggregated["State_x"].values.tolist()+df_aggregated["State_y"].values.tolist()

    scatter_hover_data = [city for city in cities]

    if query == "AverageArrivalDelay":
        df_aggregated[query] = df_aggregated[query] + df_aggregated[query].min()*-1

    # create a column as concatenation of AIRPORT_x and query
    target_col = "AIRPORT_y" if scope=="airports" else "State_y"

    df_aggregated[target_col] = df_aggregated[target_col] + "<br>"+query+" : "+ df_aggregated[query].astype(str)
    text = df_aggregated[target_col].values.tolist()

    df_aggregated[query]=df_aggregated[query]/df_aggregated[query].max()
    ## Loop thorugh each flight entry to plot source and destination as points.
    fig.add_trace(
        go.Scattergeo(
                    lon = df_aggregated["DEST_LONGITUDE"].values.tolist(),
                    lat = df_aggregated["DEST_LATITUDE"].values.tolist(),
                    hoverinfo = 'text',
                    text = text,
                    mode = 'markers',
                    marker = dict(size = df_aggregated[query]*20+1, color = 'blue', opacity=0.9)),
                    # define the size of the marker based on the number of flights
                    #     
        )

    ## Update graph layout to improve graph styling.
    fig.update_layout(title_text="Connection Map Depicting Flights from Brazil to All Other Countries",
                      height=700, width=900,
                      margin={"t":0,"b":0,"l":0, "r":0, "pad":0},
                      showlegend=False,
                      geo= dict(showland = True, landcolor = 'white', countrycolor = 'grey', bgcolor="lightgrey",scope='north america'))

    return fig

In [None]:
plot_routes(df,date_start,date_end,"BOS","NumFlights","airports").show()

In [None]:
def states_map_query(df,group):
    df_aggregated = df.groupBy(group).agg({"ArrDelay": "avg", "*":"count"}).withColumnRenamed("avg(ArrDelay)", "ArrDelay")
    df_aggregated = df_aggregated.withColumnRenamed("count(1)","count")
    return df_aggregated

In [None]:
tmp = states_map_query(df,"DEST_STATE")

In [None]:
# get the row with the maximum average delay
tmp.sort("ArrDelay",ascending=False).show(5)


In [None]:
def plot_states_map(df,group,query):
    df_avg = states_map_query(df,group).toPandas()
    df_avg = df_avg.drop(df_avg[df_avg[group] == "AS"].index)
    df_avg = df_avg.drop(df_avg[df_avg[group] == "GU"].index)
    
    fig = px.choropleth(locations=df_avg[group], locationmode="USA-states", color=df_avg[query], scope="usa")

    # add title "Average delay by state origin"
    fig.update_layout(title_text=f"{group} : {query}")
    
    return fig



In [None]:
plot_states_map(df,"ORIGIN_STATE","ArrDelay")

In [None]:
# group by dest state and make the avg of the arrival delay
df_avg = states_map_query(df,"DEST_STATE").toPandas()


In [None]:
# order by the average delay
df_avg.sort_values("ArrDelay",ascending=False).head(5)

In [None]:
# group by dest state and origin state and make the avg of the arrival delay
df_avg = states_map_query(df,["DEST_STATE","ORIGIN_STATE"]).toPandas()


In [None]:
df_avg.sort_values("ArrDelay",ascending=False).head(5)

In [None]:
def origin_dest_query(df,query="ArrDelay"):
    # filter the dataframe using timestamp from_date and to_date    
    if query=="count":
        df = df.groupBy("ORIGIN_STATE","DEST_STATE").agg({"*": "count"}).withColumnRenamed("count(1)", "count")
    else:
        df = df.groupBy("ORIGIN_STATE","DEST_STATE").agg({"ArrDelay": "avg"}).withColumnRenamed("avg(ArrDelay)", "ArrDelay")

    # crate a new column with the origin and destination
    # order by query, descendant order
    df = df.orderBy(df[query].desc())

    return df

In [None]:
df_avg = origin_dest_query(df,"ArrDelay").toPandas()

In [None]:
df_avg.sort_values("ArrDelay",ascending=False).head(5)

In [None]:
def get_dates(df):
    dates = df.select("FlightDate").distinct().orderBy("FlightDate", ascending=True).toPandas()["FlightDate"]
    return dates

In [None]:
dates = get_dates(df)

In [None]:
def origin_dest_query(df,from_date,to_date,query="ArrDelay"):
    # filter the dataframe using timestamp from_date and to_date
    df = df.filter(df["FlightDate"].between(from_date,to_date))
    
    if query=="count":
        df = df.groupBy("ORIGIN_STATE","DEST_STATE").agg({"*": "count"}).withColumnRenamed("count(1)", "count")
    else:
        df = df.groupBy("ORIGIN_STATE","DEST_STATE").agg({"ArrDelay": "avg"}).withColumnRenamed("avg(ArrDelay)", "ArrDelay")

    # crate a new column with the origin and destination
    # order by query, descendant order
    df = df.orderBy(df[query].desc())

    return df

In [None]:
def origin_dest_plot(df,from_date,to_date,query="ArrDelay"):
    df_pd = origin_dest_query(df,from_date,to_date,query).toPandas()
    # make a join over STATE_ORIGIN and Abbreviation in states dataframe, rename the columns
    df_pd = df_pd.merge(states, left_on="ORIGIN_STATE", right_on="Abbreviation").\
        rename(columns={"State": "Origin"}).\
        merge(states, left_on="DEST_STATE", right_on="Abbreviation").\
        rename(columns={"State": "Dest"})
    # create a new column with the origin and destination
    df_pd["Origin-Dest"] = df_pd["Origin"] + " - " + df_pd["Dest"]
    fig = px.pie(df_pd.head(20), values=query, names='Origin-Dest', title=f'{query} by Origin-Dest')
    return fig

In [None]:
dates[len(dates)-1]

In [None]:
origin_dest_plot(df,dates[0],dates[len(dates)-1],"ArrDelay")

In [None]:
def taxi_time_query(df,orig_dest):
    # group by orig_dest
    df = df.groupBy(orig_dest,"FlightDate").agg({"TaxiIn": "avg", "TaxiOut": "avg","*":"count"}).\
        withColumnRenamed("avg(TaxiIn)", "TaxiIn").\
        withColumnRenamed("avg(TaxiOut)", "TaxiOut").\
        withColumnRenamed("count(1)", "count")
    # order by TaxiIn, descendant order
    df = df.orderBy(df["TaxiIn"].desc())
    return df


In [None]:
features = ['Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'Reporting_Airline',
 'Origin',
 'Dest',
 'CRSDepTime',
 'CRSArrTime',
 'ArrDel15',
 'DistanceGroup',
 'ORIGIN_STATE',
 'DEST_STATE',]


# mantain only the features in features list
df = df.select(features)

In [None]:
# get the column types in df_ml
df.dtypes

In [None]:
# convert arr del 15 to int
df= df.withColumn("ArrDel15", df["ArrDel15"].cast(IntegerType()))

In [None]:
# sample the 50% of the dataframe
df = df.sample(False,0.5,seed=42)

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.feature import OneHotEncoder

categorical= ['Reporting_Airline','Origin','Dest','ORIGIN_STATE','DEST_STATE']

stringindexer_stages = [StringIndexer(inputCol=c, outputCol='strindexed_' + c) for c in categorical]
stringindexer_stages += [StringIndexer(inputCol='ArrDel15', outputCol='label')]

onehotencoder_stages = [OneHotEncoder(inputCol='strindexed_' + c, outputCol='onehot_' + c) for c in categorical]

feature_columns = ['onehot_' + c for c in categorical]
vectorassembler_stage = VectorAssembler(inputCols=feature_columns, outputCol='features')

all_stages = stringindexer_stages + onehotencoder_stages + [vectorassembler_stage]
pipeline = Pipeline(stages=all_stages)



In [None]:
pipeline_model = pipeline.fit(df)

In [None]:
final_columns = feature_columns + ['features', 'label']
cuse_df = pipeline_model.transform(df).\
            select(final_columns)
            
cuse_df.show(5)

In [None]:
training, test = cuse_df.randomSplit([0.7, 0.3], seed=1234)

In [None]:
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')

In [None]:
# fit the model on training data
dt_model = dt.fit(training)


In [None]:
# make predictions on test data
predictions = dt_model.transform(test)

# evaluate the model using accuracy

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))




In [None]:
# save the model
dt_model.save("dt_model")


In [None]:
# import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassificationModel
# load the model
dt_model = DecisionTreeClassificationModel.load("dt_model")

In [None]:
# make predictions on test data
predictions = dt_model.transform(test)

# evaluate the model using accuracy

evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy))


In [None]:
# get the confusion matrix
predictions.groupBy("label","prediction").count().show()
