# FDM Departure and Arrival
Finding departure and arrival coordinate for each FDM files. Then, create `fdm_location` dataset using the information. Using PySpark

## Import

In [1]:
import pandas as pd
import seaborn as sns
import numpy as np
import gc
import os

#### Initialize Spark

In [2]:
import findspark
findspark.init()

import pyspark
import random

#Settings for PySpark to work
driver_memory = '4g'
num_executors = 2
executor_memory = '1g'
pyspark_submit_args = ' --driver-memory ' + driver_memory + ' pyspark-shell'

#Setting the required parameters to start up PySpark
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

#Import Modules Needed for PySpark
from pyspark.sql import SparkSession

In [3]:
import pyspark.sql.functions as F 
from pyspark.sql.types import *

In [63]:
spark = SparkSession.builder.appName("Data Exploration").getOrCreate()

# Extract FDM Data

#### 1. Read FDM Management Data

In [5]:
fdm_database_path = '../dataset/database/fdm_files.csv'
fdm_files = pd.read_csv(fdm_database_path)
display(fdm_files.head())
print(fdm_files.shape)

Unnamed: 0,fname,tail_id,fsize_mb,fullpath
0,686200104111724.csv,tail_686_1,50.99,/Volumes/My Passport/desy/tail_686_1/686200104...
1,686200104120602.csv,tail_686_1,24.43,/Volumes/My Passport/desy/tail_686_1/686200104...
2,686200104120802.csv,tail_686_1,40.53,/Volumes/My Passport/desy/tail_686_1/686200104...
3,686200104121013.csv,tail_686_1,43.39,/Volumes/My Passport/desy/tail_686_1/686200104...
4,686200104121245.csv,tail_686_1,61.52,/Volumes/My Passport/desy/tail_686_1/686200104...


(9316, 4)


#### 2. Create FDM Spark DF 

In [64]:
# fdm_folder_path = '../dataset/fdm'
fdm_folder_path = '/Volumes/My Passport/desy'

fdm_data = spark.read \
                .option("header", "true") \
                .csv(f'{fdm_folder_path}/*/*.csv') \
                .withColumn("fname", F.input_file_name())

In [49]:
fdm_data.limit(5).toPandas()

Unnamed: 0,airbrk_pos_rad,ail_l_rad,ail_r_rad,hbaro_m,hdot_1_mps,aoa_1_rad,aoa_2_rad,aoac_rad,aoai_rad,auto_thr_status,...,temp_total_degC,psi_rad,psi_mag_selected,chi_rad,chi_mag_rad,az_mps2,wdir_rad,wow,ws_mps,fname
0,2.09410904426956,1.82175343497467,1.79283149314117,337.7184,-0.08128,0.151096090453625,0.322900797260857,0.0,0.236998452179623,0.0,...,32.0,0.814423972265244,1.0,-0.523647275167336,-0.386839432914986,9.7546836659193,0.0,0.0,0.0,file:/Volumes/My%20Passport/desy/tail_686_3/68...
1,,,,,,,,,,,...,,,,,,,,,,file:/Volumes/My%20Passport/desy/tail_686_3/68...
2,,,,,,,,,,,...,,,,,,9.7546836659193,,,,file:/Volumes/My%20Passport/desy/tail_686_3/68...
3,,,,,,,,,,,...,,,,,,,,,,file:/Volumes/My%20Passport/desy/tail_686_3/68...
4,,,,337.7184,-0.16256,0.150329116375732,0.322900797260857,0.0,0.236998452179623,,...,,0.814423972265244,,-0.499296052092614,-0.362488209840264,9.77712930648327,0.0,,0.0,file:/Volumes/My%20Passport/desy/tail_686_3/68...


# Transform FDM Data

### Data Cleaning

####  1. Change orig_fname column to filename only

In [65]:
extract_fname_udf = F.udf(os.path.basename, StringType())
fdm_fname = fdm_data.withColumn("fname", extract_fname_udf("fname"))

In [51]:
fdm_fname.select('fname').limit(5).toPandas()

Unnamed: 0,fname
0,686200203171255.csv
1,686200203171255.csv
2,686200203171255.csv
3,686200203171255.csv
4,686200203171255.csv


#### 2. Remove Error in Location Column
Error if Latitude or Longitude = 0

In [66]:
# Cast lat_rad and lon_rad to float
fdm_float = fdm_fname.withColumn("lat_rad", F.col("lat_rad").cast(FloatType()))\
                     .withColumn("lon_rad", F.col("lon_rad").cast(FloatType()))

In [67]:
fdm_cleaned_loc = fdm_float.filter((fdm_fname['lat_rad'] != 0) & (fdm_fname['lon_rad'] != 0))\
                           .filter(~F.isnan("lat_rad") & ~F.isnan("lon_rad"))

#### Finalize Transformation

In [68]:
fdm_transformed = fdm_cleaned_loc

## Find Departure Coordinate

#### 1. Create Helper Function

In [69]:
def get_first_loc(fdm_df):
    # Given FDM Dataframe, get location (lat, lon) from first row
    if fdm_df.empty:
        return (np.nan, np.nan)
    else:
        return tuple(fdm_df[['lat_rad', 'lon_rad']].iloc[0])

In [70]:
def get_last_loc(fdm_df):
    # Given FDM Dataframe, get location (lat, lon) from last row
    if fdm_df.empty:
        return (np.nan, np.nan)
    else:
        return tuple(fdm_df[['lat_rad', 'lon_rad']].iloc[-1])

In [71]:
def get_departure_loc(fdm):
    # Given fdm dataframe, return the departure long lat and arrival long lat
    plane_in_ground   = fdm[fdm['wow'] == 0]
    plane_off_ground  = fdm[fdm['wow'] == 1]
        
    if plane_in_ground.empty:
        return (np.nan, np.nan)

    if plane_off_ground.empty:
        est_departure_loc = tuple(plane_in_ground[['lat_rad', 'lon_rad']].iloc[0])
        return est_departure_loc
    else:
        first_takeoff_idx = plane_off_ground.index[0]
        departure_df = plane_in_ground.loc[:first_takeoff_idx]
        est_departure_loc = get_first_loc(departure_df)
        return est_departure_loc

In [72]:
def get_arrival_loc(fdm):
    # Given fdm dataframe, return the departure long lat and arrival long lat
    plane_in_ground   = fdm[fdm['wow'] == 0]
    plane_off_ground  = fdm[fdm['wow'] == 1]
        
    if plane_in_ground.empty:
        return (np.nan, np.nan)

    if plane_off_ground.empty:
        return (np.nan, np.nan)
    else:
        first_takeoff_idx = plane_off_ground.index[0]
        arrival_df = plane_in_ground.loc[first_takeoff_idx:]
        est_arrival_loc = get_last_loc(arrival_df)
        return est_arrival_loc

In [73]:
def get_loc_from_fdm(fdm):
    # Given filepath, return the departure long lat and arrival long lat
    dep_lat, dep_lon, arr_lat, arr_lon = get_departure_loc(fdm) + get_arrival_loc(fdm)
    return (dep_lat, dep_lon, arr_lat, arr_lon)

#### Test Helper Function is Working to DataFrame

In [None]:
fdm_transformed.limit(5).toPandas()

In [None]:
fname_1 = '686200104130652.csv'
df_1 = fdm_transformed.filter(fdm_transformed['fname'] == fname_1).toPandas()
get_loc_from_fdm(df_1)

#### 2. Generate Location for All FDM using Spark

In [60]:
fdm_transformed = fdm_transformed.repartition("fname")

In [None]:
fdm_transformed.groupBy('fname').agg(F.count('*').alias('count')).toPandas()

In [75]:
fdm_location_schema = StructType([
    StructField("fname", StringType()),
    StructField("dep_lat", DoubleType()),
    StructField("dep_lon", DoubleType()),
    StructField("arr_lat", DoubleType()),
    StructField("arr_lon", DoubleType()),
])

In [None]:
# result = df_1.groupby(df_1.fname).apply(get_loc_from_fdm)
# loc_columns = ['dep_lat', 'dep_lon', 'arr_lat', 'arr_lon']
# result_df = pd.DataFrame(result.index)
# result_df[loc_columns] = pd.DataFrame(result.values.tolist(), index=result_df.index)
# result_df.reset_index(inplace=True, drop=True)

In [80]:
@F.pandas_udf(fdm_location_schema, functionType=F.PandasUDFType.GROUPED_MAP)
def find_fdm_location(df):
    result = df.groupby(df.fname).apply(get_loc_from_fdm)
    loc_columns = ['dep_lat', 'dep_lon', 'arr_lat', 'arr_lon']
    result_df = pd.DataFrame(result.index)
    result_df[loc_columns] = pd.DataFrame(result.values.tolist(), index=result_df.index)
    result_df.reset_index(inplace=True, drop=True)
    return result_df

In [81]:
fdm_locations = fdm_transformed.groupby("fname").apply(find_fdm_location)

# fdm_locations.limit(10).toPandas()



In [82]:
fdm_locations_df = fdm_locations.toPandas()
fdm_locations_df.to_csv('../dataset/database/fdm_locations.csv', index=False)