# Bayesian Spatio-Temporal Graph Transformer Network (B-STAR) for Multi-Aircraft Trajectory Prediction
Author: Yutian Pang, Arizona State University


Email: yutian.pang@asu.edu

# Part 1: IFF ASDE-X Flight Track Data Processing with PySpark and Hadoop
This is a demonstration of using PySpark and Hadoop for large-scale processing of IFF ASDE-X data. In practice, this data processing would be performed on a server or high-performance cluster via ssh.

## Module Requirements

This Jupyter notebook has been tested with:
- Ubuntu 20.04 LTS (and 18.04 LTS)
- Python 3.8.5 (and 3.8.10)
- Spark 3.1.1 with Hadoop3.2 (and Spark 3.2.1 with Hadoop3.2)

The software in parenthesis were tested together. Other combinations of Ubuntu, Python, and Spark should be verified for compatibility. See [this](https://stackoverflow.com/questions/58700384/how-to-fix-typeerror-an-integer-is-required-got-type-bytes-error-when-tryin) article for further guidance.

### Instructions for Windows 10 Users

1. **Install Ubuntu on Windows 10 with Windows Subsystem for LInux (WSL)**
    - Windows 10 users with admin privileges can enable Windows Subsystem for Linux (WSL) following [these](https://docs.microsoft.com/en-us/windows/wsl/install-win10) directions.
        
        
2. **Install Anaconda in the Ubuntu terminal**
    - A user can then install Anaconda on their WSL Ubuntu distribution following [these](https://gist.github.com/kauffmanes/5e74916617f9993bc3479f401dfec7da) instructions. 
        
        
3. **Download and unzip Spark on WSL**
    - Identify the distribution of Spark and Hadoop you require [here](https://spark.apache.org/downloads.html). 
    - In your Ubuntu terminal window execute the ```wget``` command followed by the download link in your chosen download directory (likely the ```HOME``` directory). 
    - Then, unzip the downloaded .tgz file with ```tar -xvzf [fname]```.



## Installing the required Python packages
The required Python packages for this module are:
- **[```pyspark```]**(http://spark.apache.org/docs/latest/api/python/getting_started/index.html)
    - This is the Python API for Apache Spark. We will be using the distributed processing features and backend SQL queries for structured data.
- **[```apache-sedona```]**(https://sedona.apache.org/)
    - Formerly Geospark, Apache Sedona extends the Resilient Distributed Dataset (RDD), the core data structure in Apache Spark, to accommodate big geospatial data in a cluster environment.
    
In the Ubuntu or Anaconda terminal, execute ```pip install pyspark apache-sedona```. This will install both the ```pyspark``` and ```apache-sedona``` packages. 

## Setting Environment Variables
The Spark codes (note: Improve this description) retrieve the ```SPARK_HOME```, ```PYTHONPATH```, ```PYSPARK_PYTHON```, and ```PYSPARK_DRIVER_PYTHON``` system variables. Either (Option 1) these are set in the shell environment in the ```.bash_profile``` script or (Option 2) in the Python script prior to calling the ```pyspark``` module.
- ### Option 1: Add environment variables to the ```.bash_profile``` script

    Open the ```.bash_profile``` script in your text editor. On Ubuntu systems, this script is usually found in your ```HOME``` directory ```~/```. If this file does not yet exist (or is empty) you can create one. Then add the following ```export``` statements for each variable you want to add and add them to the path. For example:

    ```export SPARK_HOME="$HOME/spark-X.X.X-bin-hadoopX.X"```

    ```export PYTHONPATH="$HOME/anacond3/bin/python3.8"```

    ```export PYSPARK_PYTHON="$HOME/anacond3/bin/python3.8"```

    ```export PYSPARK_DRIVER_PYTHON="$HOME/anacond3/bin/python3.8"```

    ```export PATH="$SPARK_HOME/bin:$PATH"```

- ### Option 2: Add the environment variables in the Python script using the ```os``` package

    ```import os```
       
    ```os.environ["SPARK_HOME"] = '~/spark-3.1.1-bin-hadoop3.2'```

    ```os.environ["PYTHONPATH"] = '~/anaconda3/bin/python3.8'```

    ```os.environ['PYSPARK_PYTHON'] = '~/anaconda3/bin/python3.8'```

    ```os.environ['PYSPARK_DRIVER_PYTHON'] = '~/anaconda3/bin/python3.8'```


## Procedure 1: Loading IFF ASDE-X Data into the Python Environment
### Step 1a: Use ```sedona``` to register ```SparkSession``` with geospatial packages

In [1]:
from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer

spark = SparkSession.\
        builder.\
        master("local[*]").\
        appName("Sector_IFF_Parser").\
        config("spark.serializer", KryoSerializer.getName).\
        config("spark.kryo.registrator", SedonaKryoRegistrator.getName) .\
        config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.1.1-incubating,'
           'org.datasyslab:geotools-wrapper:1.1.0-25.2'). \
        getOrCreate()

SedonaRegistrator.registerAll(spark)
sc = spark.sparkContext

22/02/02 11:22:29 WARN Utils: Your hostname, EDECARLO-LT-1 resolves to a loopback address: 127.0.1.1; using 172.30.15.228 instead (on interface eth0)
22/02/02 11:22:29 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/edecarlo/spark-3.2.0-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/edecarlo/.ivy2/cache
The jars for the packages stored in: /home/edecarlo/.ivy2/jars
org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency
org.datasyslab#geotools-wrapper added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0050eea1-420d-4628-952a-b187f1c4e12f;1.0
	confs: [default]
	found org.apache.sedona#sedona-python-adapter-3.0_2.12;1.1.1-incubating in central
	found org.locationtech.jts#jts-core;1.18.0 in central
	found org.wololo#jts2geojson;0.16.1 in central
	found org.apache.sedona#sedona-core-3.0_2.12;1.1.1-incubating in central
	found org.scala-lang.modules#scala-collection-compat_2.12;2.5.0 in central
	found org.apache.sedona#sedona-sql-3.0_2.12;1.1.1-incubating in central
	found org.datasyslab#geotools-wrapper;1.1.0-25.2 in central
:: resolution report :: resolve 677ms :: artifacts dl 56ms
	:: modules in use:
	org.apache.sedona#sedona-core-3.0_2.12;1.1.1-incubating from central in [default]
	

### Step 1b: Define SQL Schema for IFF ASDE-X Data with ```pyspark``` Structures
- Retrieve requried SQL types from ```pyspark```.
- Create ```load_schema``` function which returns variable ```myschema``` specifically for IFF recType=3. 

In [2]:
# Import required SQL types from pyspark
from pyspark.sql.types import (ShortType, StringType, StructType,StructField,LongType, IntegerType, DoubleType)

#Create load_schema function that returns variable 'myschema' specifically for IFF recType=3.
def load_schema():
    myschema = StructType([
        StructField("recType", ShortType(), True),  # 1  //track point record type number
        StructField("recTime", StringType(), True),  # 2  //seconds since midnigght 1/1/70 UTC
        StructField("fltKey", LongType(), True),  # 3  //flight key
        StructField("bcnCode", IntegerType(), True),  # 4  //digit range from 0 to 7
        StructField("cid", IntegerType(), True),  # 5  //computer flight id
        StructField("Source", StringType(), True),  # 6  //source of the record
        StructField("msgType", StringType(), True),  # 7
        StructField("acId", StringType(), True),  # 8  //call sign
        StructField("recTypeCat", IntegerType(), True),  # 9
        StructField("lat", DoubleType(), True),  # 10
        StructField("lon", DoubleType(), True),  # 11
        StructField("alt", DoubleType(), True),  # 12  //in 100s of feet
        StructField("significance", ShortType(), True),  # 13 //digit range from 1 to 10
        StructField("latAcc", DoubleType(), True),  # 14
        StructField("lonAcc", DoubleType(), True),  # 15
        StructField("altAcc", DoubleType(), True),  # 16
        StructField("groundSpeed", IntegerType(), True),  # 17 //in knots
        StructField("course", DoubleType(), True),  # 18  //in degrees from true north
        StructField("rateOfClimb", DoubleType(), True),  # 19  //in feet per minute
        StructField("altQualifier", StringType(), True),  # 20  //Altitude qualifier (the “B4 character”)
        StructField("altIndicator", StringType(), True),  # 21  //Altitude indicator (the “C4 character”)
        StructField("trackPtStatus", StringType(), True),  # 22  //Track point status (e.g., ‘C’ for coast)
        StructField("leaderDir", IntegerType(), True),  # 23  //int 0-8 representing the direction of the leader line
        StructField("scratchPad", StringType(), True),  # 24
        StructField("msawInhibitInd", ShortType(), True),  # 25 // MSAW Inhibit Indicator (0=not inhibited, 1=inhibited)
        StructField("assignedAltString", StringType(), True),  # 26
        StructField("controllingFac", StringType(), True),  # 27
        StructField("controllingSec", StringType(), True),  # 28
        StructField("receivingFac", StringType(), True),  # 29
        StructField("receivingSec", StringType(), True),  # 30
        StructField("activeContr", IntegerType(), True),  # 31  // the active control number
        StructField("primaryContr", IntegerType(), True),
        # 32  //The primary(previous, controlling, or possible next)controller number
        StructField("kybrdSubset", StringType(), True),  # 33  //identifies a subset of controller keyboards
        StructField("kybrdSymbol", StringType(), True),  # 34  //identifies a keyboard within the keyboard subsets
        StructField("adsCode", IntegerType(), True),  # 35  //arrival departure status code
        StructField("opsType", StringType(), True),  # 36  //Operations type (O/E/A/D/I/U)from ARTS and ARTS 3A data
        StructField("airportCode", StringType(), True),  # 37
        StructField("trackNumber", IntegerType(), True),  # 38
        StructField("tptReturnType", StringType(), True),  # 39
        StructField("modeSCode", StringType(), True)  # 40
    ])
    return myschema

### Step 1c: Load dataframe structured by ```iff_schema``` with ```spark```

In [3]:
fname = "../../miscellaneous/gnats-fpgen/IFF_SFO_ASDEX_ABC456.csv"
iff_schema = load_schema()
df = spark.read.csv(fname, header=False, sep=",", schema=iff_schema)

## Procedure 2: Build geospatial dataframe with ```sedona``` 

### Step 2a: Downselect features of ```recType```, ```recTime```, ```acId```, ```lat```,```lon```,```alt```
In this work, we are simply interested in flight ID, timestamps, and coordinates (latitude, longitude, and altitude).

In [4]:
cols = ['recType', 'recTime', 'acId', 'lat', 'lon', 'alt']
df = df.select(*cols).filter(df['recType']==3).withColumn("recTime", df['recTime'].cast(IntegerType()))
df.show(5)

+-------+----------+------+--------+----------+----+
|recType|   recTime|  acId|     lat|       lon| alt|
+-------+----------+------+--------+----------+----+
|      3|1546302315|ABC123|37.61867|-122.38173|0.06|
|      3|1546302316|ABC123| 37.6187|-122.38171|0.06|
|      3|1546302318|ABC123|37.61874|-122.38169|0.06|
|      3|1546302319|ABC123|37.61876|-122.38172|0.06|
|      3|1546302320|ABC123|37.61878|-122.38173|0.06|
+-------+----------+------+--------+----------+----+
only showing top 5 rows



### Step 2b: Register geospatial dataframe in SQL
We need to register an SQL table that runs in the backend of the system for fast querying. 

In [5]:
df.registerTempTable("pointtable")
df.show(5, truncate=False)

22/02/02 11:22:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+----------+------+--------+----------+----+
|recType|recTime   |acId  |lat     |lon       |alt |
+-------+----------+------+--------+----------+----+
|3      |1546302315|ABC123|37.61867|-122.38173|0.06|
|3      |1546302316|ABC123|37.6187 |-122.38171|0.06|
|3      |1546302318|ABC123|37.61874|-122.38169|0.06|
|3      |1546302319|ABC123|37.61876|-122.38172|0.06|
|3      |1546302320|ABC123|37.61878|-122.38173|0.06|
+-------+----------+------+--------+----------+----+
only showing top 5 rows



## Procedure 3: Perform fast SQL queries to retrieve data subsets
Now can use ```spark.sql``` commands to query from new registered table ```pointtable``` and create new data frames and register them as SQL tables. 
### Step 3a: Temporal queries of IFF ASDE-X data
- Define desired time window from a starting timestamp (e.g. Monday, December 31, 2018 at 4:25pm PST)
- Query returning all flight records within **1 hour** time window
- Register query as SQL table

In [6]:
## Define desired time window
duration = 1 #hour
t_start = 1546302340 #Monday, December 31, 2018 at 4:25pm in PST
t_end = t_start + 3600*duration

In [7]:
## Query returning all flight records within 1 hour time window
temporal_df = spark.sql(
  """
  SELECT ST_Point(CAST(lat AS Decimal(24, 20)), CAST(lon AS Decimal(24, 20))) AS geom, recTime, acId, alt
  FROM pointtable
  WHERE recTime>={} AND recTime<={}
  """.format(t_start, t_end))

In [8]:
## Register query as SQL table
temporal_df.createOrReplaceTempView("temporaldf")

### Step 3b: Spatial queries of IFF ASDE-X data
- Define desired spatial rectangle around a central point (e.g. KSFO airport) in degrees and vertical feet
- Query returning all **flight IDs** within registered ```temporaldf``` SQL table around KSFO airport using ```ST_PolygonFromEnvelope``` function from ```sedona```
- Query returning all **flight records** again using ```ST_PolygonFromEnvelope``` function from ```sedona```

In [9]:
## Define desired spatial rectangle around a central point (e.g. KSFO airport) 
apt_coords = [37.6188056,-122.3754167, 0]  # from https://www.airnav.com/airport/ksfo
r = 0.2 # rectangular query range unit: degrees
vs = 0.3 # vertical threshold unit: x100 feet

In [10]:
## Query returning all flight IDs within temporal_df around KSFO
range_query_result = spark.sql(
  """
    SELECT DISTINCT acId
    FROM temporaldf
    WHERE ST_Contains(ST_PolygonFromEnvelope({}, {}, {}, {}), geom) AND alt>{}
  """.format(apt_coords[0]-r, apt_coords[1]-r, apt_coords[0]+r, apt_coords[1]+r, apt_coords[2]+vs))

In [11]:
range_query_result.show()

[Stage 5:>                                                          (0 + 1) / 1]

+------+
|  acId|
+------+
|ABC123|
+------+



                                                                                

In [15]:
## Query returning all flight records within temporal_df around KSFO
df_result = spark.sql(
  """
    SELECT *
    FROM temporaldf
    WHERE ST_Contains(ST_PolygonFromEnvelope({}, {}, {}, {}), geom) AND alt>{}
  """.format(apt_coords[0]-r, apt_coords[1]-r, apt_coords[0]+r, apt_coords[1]+r, apt_coords[2]+vs))

# Count the number of points after the second query
df_result.show(5)

+--------------------+----------+------+----+
|                geom|   recTime|  acId| alt|
+--------------------+----------+------+----+
|POINT (37.62887 -...|1546303252|ABC123|1.13|
|POINT (37.62963 -...|1546303253|ABC123|1.44|
|POINT (37.63041 -...|1546303254|ABC123|1.63|
|POINT (37.63116 -...|1546303255|ABC123|2.19|
|POINT (37.63192 -...|1546303256|ABC123|2.69|
+--------------------+----------+------+----+
only showing top 5 rows



## Procedure 4: Data Anonymization and Sanitation

Anonymization is a type of data sanitization technique to remove identifiable information from sensitive data. Here, we perform two operations to anonymize the data while retaining useful geo-spatial features.

- Normalize the timestamp by the earliest time in the current dataframe.
- Mask the real flight IDs into integers

In [None]:
## Normalize the timestamp by the earlierst time in the current dataframe
df = spark.sql(
"""
    SELECT acId, recTime-{} AS t, geom, alt
    FROM spatialdf
""".format(t_start)
)
df.show(5, False)

In [13]:
##Mask the real flight IDs into integers
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="acId", outputCol="FacId")
df_new = indexer.fit(df).transform(df).drop('acId')
df_new.show(5)

+-------+----------+--------+----------+----+-----+
|recType|   recTime|     lat|       lon| alt|FacId|
+-------+----------+--------+----------+----+-----+
|      3|1546302315|37.61867|-122.38173|0.06|  1.0|
|      3|1546302316| 37.6187|-122.38171|0.06|  1.0|
|      3|1546302318|37.61874|-122.38169|0.06|  1.0|
|      3|1546302319|37.61876|-122.38172|0.06|  1.0|
|      3|1546302320|37.61878|-122.38173|0.06|  1.0|
+-------+----------+--------+----------+----+-----+
only showing top 5 rows



# Further Manipulations of the Data for BSTAR Modeling

## Manipulation 1. Resample the time series with an interval ```dt```

In [14]:
dt = 10
t_start = 0
t_end = 3600 * duration

t_interval = list(range(t_start, t_end, dt))
df = df[df.t.isin(t_interval)]

AttributeError: 'DataFrame' object has no attribute 't'

In [15]:
df.show(5)

+-------+----------+------+--------+----------+----+
|recType|   recTime|  acId|     lat|       lon| alt|
+-------+----------+------+--------+----------+----+
|      3|1546302315|ABC123|37.61867|-122.38173|0.06|
|      3|1546302316|ABC123| 37.6187|-122.38171|0.06|
|      3|1546302318|ABC123|37.61874|-122.38169|0.06|
|      3|1546302319|ABC123|37.61876|-122.38172|0.06|
|      3|1546302320|ABC123|37.61878|-122.38173|0.06|
+-------+----------+------+--------+----------+----+
only showing top 5 rows



## Manipulation 2. Change the origin of the coordinate system to the airport center

In [None]:
df2 = spark.sql(
"""
    SELECT t, FacId, lat-{} AS Lat, lon-{} AS Lon, alt
    FROM spatialdf
""".format(apt_coords[0], apt_coords[1])
)

## Manipulation 3: Save dataframe into csv
- Convert dataframe to ```pandas```
- Save data using the ```to_csv``` function in ```pandas```

In [None]:
# Make sure the data types are correct
df.toPandas()
df.dtypes #Make sure the data types are correct

In [None]:
# Save data with altitude
csv_name = 'KATL_r_{}_date_{}_range_{}_wAltitude.csv'.format(r, t_start, duration)
df.toPandas().T.to_csv(csv_name, sep=',', index=False, header=False)

# Save data without altitude
csv_name = 'KATL_r_{}_date_{}_range_{}.csv'.format(r, t_start, duration)
df.drop('alt').toPandas().T.to_csv(csv_name, sep=',', index=False, header=False)