# Import Data

In [1]:
import os
import databricks.koalas as ks
import math
from haversine import haversine



In [2]:
fileList = os.listdir("./mini_open_sky")
csvYearly = [i for i in fileList if i.startswith("mini_flightlist_2019")]
csvYearly.sort()

In [3]:
# dfList = [ks.read_csv("/user/hadoop/mini_open_sky/" + i) for i in csvYearly]
# dfList = [ks.read_csv("file:///home/hadoop/mini_open_sky/" + i) for i in csvYearly]
dfList = [ks.read_csv("./mini_open_sky/" + i) for i in csvYearly]
openSkyDf = ks.concat(dfList)

In [4]:
# icaoDf = ks.read_csv("/user/hadoop/mini_icao/mini_ICAO.csv")
# icaoDf = ks.read_csv("file:///home/hadoop/mini_icao/mini_ICAO.csv")
icaoDf = ks.read_csv("./mini_icao/mini_ICAO.csv")

In [5]:
outputDf = ks.DataFrame(icaoDf["icao_code"])

# Domestic and International

In [6]:
# dropna
df2 = openSkyDf.dropna(subset = ["origin", "destination"], how = "any")

# select column from icao data
df3_1 = icaoDf[["ident", "iso_country"]]

# use 'origin' field to map the combined opensky dataset and icao table to get airport country
df3_2 = ks.merge(df2, df3_1, left_on = "origin", right_on = "ident", how = "left") \
          .drop(["ident"], axis = 1) \
          .rename(columns={"iso_country": "origin_iso_country"})

# use 'destination' field to map the combined opensky dataset and icao table to get airport country
df3_2 = ks.merge(df3_2, df3_1, left_on = "destination", right_on = "ident", how = "left") \
          .drop(["ident"], axis = 1) \
          .rename(columns={"iso_country": "destination_iso_country"})

# create a field to determine whether it is a domestic line
df3_2["domestic"] = df3_2["origin_iso_country"] == df3_2["destination_iso_country"]

# count domestic_origin
df3_3 = df3_2[df3_2["domestic"] == True].groupby("origin").count()
se3_3 = df3_3.iloc[:, -1]
se3_3.name = "domestic_origin"

# count domestic_destination
df3_4 = df3_2[df3_2["domestic"] == True].groupby("destination").count()
se3_4 = df3_4.iloc[:, -1]
se3_4.name = "domestic_destination"

# count international_origin
df3_5 = df3_2[df3_2["domestic"] == False].groupby("origin").count()
se3_5 = df3_5.iloc[:, -1]
se3_5.name = "international_origin"

# count international_destination
df3_6 = df3_2[df3_2["domestic"] == False].groupby("destination").count()
se3_6 = df3_6.iloc[:, -1]
se3_6.name = "international_destination"

# icao merge domestic_origin
df4 = ks.merge(df3_1, se3_3, left_on = "ident", right_index = True, how = "outer")
df4["domestic_origin"] = df4["domestic_origin"].fillna(0)

# icao merge domestic_destination
df4 = ks.merge(df4, se3_4, left_on = "ident", right_index = True, how = "outer")
df4["domestic_destination"] = df4["domestic_destination"].fillna(0)

# icao merge international_origin
df4 = ks.merge(df4, se3_5, left_on = "ident", right_index = True, how = "outer")
df4["international_origin"] = df4["international_origin"].fillna(0)

# icao merge international_destination
df4 = ks.merge(df4, se3_6, left_on = "ident", right_index = True, how = "outer")
df4["international_destination"] = df4["international_destination"].fillna(0)

# prepare to output
df5 = df4.rename(columns = {"ident": "icao_code"}).drop("iso_country", axis = 1)

# integrate to output dataframe
outputDf = ks.merge(outputDf, df5, left_on = "icao_code", right_on = "icao_code", how = "left")

# delete temporary objects
del df2, df3_1, df3_2, df3_3, se3_3, df3_4, se3_4, df3_5, se3_5, df3_6, se3_6, df4, df5

# Day and Night

In [7]:
"""
-180   <= zone 13 (return -12) < -172.5 | 18 -  5 day |  6 - 17 night
-172.5 <= zone 24 (return -11) < -157.5 | 17 -  4 day |  5 - 16 night
-157.5 <= zone 23 (return -10) < -142.5 | 16 -  3 day |  4 - 15 night
-142.5 <= zone 22 (return  -9) < -127.5 | 15 -  2 day |  3 - 14 night
-127.5 <= zone 21 (return  -8) < -112.5 | 14 -  1 day |  2 - 13 night
-112.5 <= zone 20 (return  -7) <  -97.5 | 13 -  0 day |  1 - 12 night
 -97.5 <= zone 19 (return  -6) <  -82.5 | 12 - 23 day |  0 - 11 night
 -82.5 <= zone 18 (return  -5) <  -67.5 | 11 - 22 day | 23 - 10 night
 -67.5 <= zone 17 (return  -4) <  -52.5 | 10 - 21 day | 22 -  9 night
 -52.5 <= zone 16 (return  -3) <  -37.5 |  9 - 20 day | 21 -  8 night
 -37.5 <= zone 15 (return  -2) <  -22.5 |  8 - 19 day | 20 -  7 night
 -22.5 <= zone 14 (return  -1) <   -7.5 |  7 - 18 day | 19 -  6 night
  -7.5 <= zone  1 (return   0) <    7.5 |  6 - 17 day | 18 -  5 night
   7.5 <= zone  2 (return   1) <   22.5 |  5 - 16 day | 17 -  4 night
  22.5 <= zone  3 (return   2) <   37.5 |  4 - 15 day | 16 -  3 night
  37.5 <= zone  4 (return   3) <   52.5 |  3 - 14 day | 15 -  2 night
  52.5 <= zone  5 (return   4) <   67.5 |  2 - 13 day | 14 -  1 night
  67.5 <= zone  6 (return   5) <   82.5 |  1 - 12 day | 13 -  0 night
  82.5 <= zone  7 (return   6) <   97.5 |  0 - 11 day | 12 - 23 night
  97.5 <= zone  8 (return   7) <  112.5 | 23 - 10 day | 11 - 22 night
 112.5 <= zone  9 (return   8) <  127.5 | 22 -  9 day | 10 - 21 night
 127.5 <= zone 10 (return   9) <  142.5 | 21 -  8 day |  9 - 20 night
 142.5 <= zone 11 (return  10) <  157.5 | 20 -  7 day |  8 - 19 night
 157.5 <= zone 12 (return  11) <  172.5 | 19 -  6 day |  7 - 18 night
 172.5 <= zone 13 (return  12) <  180   | 18 -  5 day |  6 - 17 night
"""

# prepare dataframe, dropna, rename, and convert to datetime
ori_first = openSkyDf[["origin", "firstseen"]].dropna(subset = ["origin", "firstseen"], how = "any")
ori_first = ori_first.rename({"origin": "ident"}, axis = 1)
ori_first["firstseen"] = ks.to_datetime(ori_first["firstseen"])

des_last = openSkyDf[["destination", "lastseen"]].dropna(subset = ["destination", "lastseen"], how = "any")
des_last = des_last.rename({"destination": "ident"}, axis = 1)
des_last["lastseen"] = ks.to_datetime(des_last["lastseen"])

air = icaoDf[["ident", "longitude_deg"]]

# label different time zones
ks.set_option("compute.ops_on_diff_frames", True)

def zone_classifier(row):
    return math.floor((7.5 + row["longitude_deg"]) / 15)

air["zone_tag"] = air.apply(lambda x: zone_classifier(x), axis = 1)

# ready to count origin day and night
ori_df = ks.merge(ori_first, air, on = "ident" ,how = "left")

# label day, night of different time zones
def ori_day_night_classifier(row):
    if (-12 <= row["zone_tag"]) and (row["zone_tag"] <= -7):
        if ((-6 - row["zone_tag"]) <= row["firstseen"].hour) and (row["firstseen"].hour <= (5 - row["zone_tag"])):
            return "night_origin"
        else:
            return "day_origin"
    elif (-6 <= row["zone_tag"]) and (row["zone_tag"] <= 6):
        if ((6 - row["zone_tag"]) <= row["firstseen"].hour) and (row["firstseen"].hour <= (17 - row["zone_tag"])):
            return "day_origin"
        else:
            return "night_origin"
    elif (7 <= row["zone_tag"]) and (row["zone_tag"] <= 12):
        if ((18 - row["zone_tag"]) <= row["firstseen"].hour) and (row["firstseen"].hour <= (29 - row["zone_tag"])):
            return "night_origin"
        else:
            return "day_origin"

ori_df["day_night_tag"] = ori_df.apply(lambda x: ori_day_night_classifier(x), axis = 1)

# count origin day and night
firstseen_count = ori_df.groupby(["ident", "day_night_tag"])["ident"].count().unstack()
firstseen_count = firstseen_count.reset_index().rename(columns={"ident": "icao_code"})

# ready to count destination day and night
des_df = ks.merge(des_last, air, on = "ident" ,how = "left")

# label day, night of different time zones
def des_day_night_classifier(row):
    if (-12 <= row["zone_tag"]) and (row["zone_tag"] <= -7):
        if ((-6 - row["zone_tag"]) <= row["lastseen"].hour) and (row["lastseen"].hour <= (5 - row["zone_tag"])):
            return "night_destination"
        else:
            return "day_destination"
    elif (-6 <= row["zone_tag"]) and (row["zone_tag"] <= 6):
        if ((6 - row["zone_tag"]) <= row["lastseen"].hour) and (row["lastseen"].hour <= (17 - row["zone_tag"])):
            return "day_destination"
        else:
            return "night_destination"
    elif (7 <= row["zone_tag"]) and (row["zone_tag"] <= 12):
        if ((18 - row["zone_tag"]) <= row["lastseen"].hour) and (row["lastseen"].hour <= (29 - row["zone_tag"])):
            return "night_destination"
        else:
            return "day_destination"

des_df["day_night_tag"] = des_df.apply(lambda x: des_day_night_classifier(x), axis = 1)

# count destination day and night
lastseen_count = des_df.groupby(["ident", "day_night_tag"])["ident"].count().unstack()
lastseen_count = lastseen_count.reset_index().rename(columns={"ident": "icao_code"})

# prepare to output
together = ks.DataFrame(icaoDf["icao_code"])
together = ks.merge(together, firstseen_count, on = "icao_code", how = "left")
together = ks.merge(together, lastseen_count, on = "icao_code", how = "left").fillna(0)

# integrate to output dataframe
outputDf = ks.merge(outputDf, together, left_on = "icao_code", right_on = "icao_code", how = "left")

# delete temporary objects
del ori_first, des_last, air, ori_df, firstseen_count, des_df, lastseen_count, together



# Month

In [8]:
# creat year and month column
df_combine = openSkyDf[["origin", "destination", "day"]]
df_combine["time"] = ks.to_datetime(df_combine["day"], format = "%Y-%m-%d %H:%M:%S%z")
df_combine["year"] = df_combine["time"].dt.year
df_combine["month"] = df_combine["time"].dt.month

# prepare jan to dec column names
mon_syntax = ["jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec"]
mon_ind_ori = [i + "_origin" for i in mon_syntax]
mon_ind_dest = [i + "_destination" for i in mon_syntax]

# by month flights - origin
ori_g = df_combine[["origin", "month"]].dropna().groupby("origin")["month"].value_counts().unstack().fillna(0)
# rename column name
ori_g.columns = mon_ind_ori

# by month flights - destination
dest_g = df_combine[["destination", "month"]].dropna().groupby("destination")["month"].value_counts().unstack().fillna(0)
# rename column name
dest_g.columns = mon_ind_dest

# prepare to output
ori_g_temp = ks.DataFrame(icaoDf["icao_code"])
ori_g_temp = ks.merge(ori_g_temp, ori_g, left_on = "icao_code", right_index = True, how = "left").fillna(0)
dest_g_temp = ks.DataFrame(icaoDf["icao_code"])
dest_g_temp = ks.merge(dest_g_temp, dest_g, left_on = "icao_code", right_index = True, how = "left").fillna(0)

# calculate min/max
ks.set_option("compute.ops_on_diff_frames", True)
# origin
ks_ori = ks.DataFrame(ori_g.apply(lambda x: min(x), axis = 1))
ks_ori["max_month_flights_origin"] = ori_g.apply(lambda x: max(x), axis = 1)
ks_ori = ks_ori.reset_index().rename(columns = {0: "min_month_flights_origin"})
# destination
ks_dest = ks.DataFrame(dest_g.apply(lambda x: min(x), axis = 1))
ks_dest["max_month_flights_destination"] = dest_g.apply(lambda x: max(x), axis = 1)
ks_dest = ks_dest.reset_index().rename(columns = {0: "min_month_flights_destination"})

# integrate to output dataframe
outputDf = ks.merge(outputDf, ori_g_temp, left_on = "icao_code", right_on = "icao_code", how = "left")
outputDf = ks.merge(outputDf, dest_g_temp, left_on = "icao_code", right_on = "icao_code", how = "left")
outputDf = ks.merge(outputDf, ks_ori, left_on = "icao_code", right_on = "origin", how = "left").drop(["origin"], axis = 1)
outputDf = ks.merge(outputDf, ks_dest, left_on = "icao_code", right_on = "destination", how = "left").drop(["destination"], axis = 1)

# delete temporary objects
del df_combine, mon_syntax, mon_ind_ori, mon_ind_dest, ori_g, dest_g, ori_g_temp, dest_g_temp, ks_ori, ks_dest

# Long, Medium and Short

In [9]:
# dropna
df1 = openSkyDf.dropna(subset = ["origin", "destination"], how = "any")

# merge latitude and longitude points table about origin and destination
df2 = icaoDf[["ident","latitude_deg","longitude_deg"]]

df3 = ks.merge(df2, df1, left_on = "ident", right_on = "origin", how = "left") \
        .rename(columns = {"latitude_deg": "airport_o_latitude", "longitude_deg": "airport_o_longitude"})

df4 = ks.merge(df2, df3, left_on = "ident", right_on = "destination", how = "left") \
        .rename(columns = {"latitude_deg": "airport_d_latitude", "longitude_deg": "airport_d_longitude"})

# calculate the distance between two latitude and longitude points
ks.set_option("compute.ops_on_diff_frames", True)

def cal_distance(row):
    lat1 = row["airport_o_latitude"]
    long1 = row["airport_o_longitude"]
    lat2 = row["airport_d_latitude"]
    long2 = row["airport_d_longitude"]
    g1 = (lat1, long1)
    g2 = (lat2, long2)
    ret = haversine(g1, g2)
    result = "%.7f" % ret
    return result

df4["distance"] = df4.apply(lambda x: cal_distance(x), axis = 1)
df4["distance"] = df4["distance"].astype(float)

# df4["distance_group"] = pd.cut(df2["distance"], bins = [0, 1300, 4450, 1.979048e+07], labels = ["short", "medium", "long"])
def categorizer(row):
    if row["distance"] < 1300:
        return "short"
    elif row["distance"] < 4450:
        return "medium"
    elif row["distance"] >= 4450:
        return "long"

df4["distance_group"] = df4.apply(categorizer, axis = 1)

# group origin with categories
df5 = df4.groupby(["origin", "distance_group"])["origin"].count().unstack()
df5 = df5.rename(columns = {"short": "short_origin", "medium": "medium_origin", "long": "long_origin"})

# group destination with categories
df6 = df4.groupby(["destination", "distance_group"])["destination"].count().unstack()
df6 = df6.rename(columns = {"short": "short_destination", "medium": "medium_destination", "long": "long_destination"})

# prepare to output
df7 = ks.DataFrame(icaoDf["icao_code"])
df7 = ks.merge(df7, df5, left_on = "icao_code", right_index = True, how = "left")
df7 = ks.merge(df7, df6, left_on = "icao_code", right_index = True, how = "left").fillna(0)

# integrate to output dataframe
outputDf = ks.merge(outputDf, df7, left_on = "icao_code", right_on = "icao_code", how = "left")

# delete temporary objects
del df1, df2, df3, df4, df5, df6, df7

# Export Data

In [10]:
# outputDf.to_csv("/user/hadoop/mini_2019_airport_table", index = False)
# outputDf.to_csv("file:///home/hadoop/mini_2019_airport_table", index = False)
outputDf.to_csv("./mini_2019_airport_table", index = False)