## Prepare evaluation and test datasets

In the previous notebook, we have prepared the training dataset. Now we will conduct the same preprocessing steps to make the evaluation and test datasets ready. 

The source of the data for evaluation and test datasets is the competition dataset that can be found here:
<p>round2_competition_data/<br>
&ensp;&ensp;&ensp;&ensp; ├── round2_competition.csv<br>
&ensp;&ensp;&ensp;&ensp; ├── round2_sensors.csv <br>

The evaluation and test datasets are going to be saved in a single file called <i>eval_test.csv</i>. Before evaluating the machine learning model, we will specify the proportion of the dataset to include either in the evaluation and test splits. 

We will begin with importing all libraries that will be used in this notebook.

In [1]:
import os
import operator
import pandas as pd
import numpy as np
import json
import urllib
import time
import pickle
import matplotlib.pyplot as plt
import findspark
findspark.init()
import pyspark
import pyspark.sql.functions as F
from pyspark.sql import types
from pyspark.sql import SparkSession
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

Now we can instantiate the <i>SparkSession</i>. For testing, the application will be running locally with 2 cores, and 4 GB of memory for the driver process. 

In [2]:
spark = SparkSession.builder \
    .master("local[2]") \
    .appName("ads-b data processing") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [3]:
# Set number of output partitions
spark.conf.set("spark.sql.shuffle.partitions", 100)

# Set log level
spark.sparkContext.setLogLevel("ERROR")

Below we will load the CSV file and print its schema.

In [4]:
df = spark.read.format("csv") \
    .options(header='True', inferSchema='True') \
    .load("round2_competition_data/round2_competition.csv") 

In [5]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- timeAtServer: double (nullable = true)
 |-- aircraft: integer (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- baroAltitude: double (nullable = true)
 |-- geoAltitude: double (nullable = true)
 |-- numMeasurements: integer (nullable = true)
 |-- measurements: string (nullable = true)



### Explode the measurements data

In this section we are going to do the following:
- explode the measurements JSON array, sort it according to sensor serial number and limit the number of measurements
- extract the sensor, timestamp and RSSI information from array of measurements

In [6]:
df = df.limit(df.count())

In [7]:
max_measurements = 6
json_schema = types.ArrayType(types.ArrayType(types.DoubleType()))

In [8]:
df = df.withColumn('meas', F.from_json('measurements', schema=json_schema))

In [9]:
# Sort arrays of measurements according to sensor's serial number
df = df.rdd.map(lambda x: [x[0], x[1], x[2], x[3], x[4], x[5], x[6], x[7], x[8], \
                           sorted(x[9], key=operator.itemgetter(0), reverse=False)]) \
                                .toDF(df.schema.names)


In [10]:
df.select('id', 'measurements', 'meas').show(10)

+---+--------------------+--------------------+
| id|        measurements|                meas|
+---+--------------------+--------------------+
|  1|[[208,962354640,9...|[[197.0, 5.770127...|
|  2|[[150,968341093,8...|[[150.0, 9.683410...|
|  3|[[470,982753933,3...|[[470.0, 9.827539...|
|  4|[[203,79215063833...|[[23.0, 6.1095513...|
|  5|[[203,79216918750...|[[203.0, 7.921691...|
|  6|[[150,971816968,8...|[[150.0, 9.718169...|
|  7|[[166,33743812166...|[[26.0, -5.1503E8...|
|  8|[[625,-1965785000...|[[215.0, 4.459894...|
|  9|[[607,26027857583...|[[461.0, 2.740584...|
| 10|[[166,33767674666...|[[166.0, 3.376767...|
+---+--------------------+--------------------+
only showing top 10 rows



In [11]:
col_names = ['sensor', 'tmp', 'RSSI']
col_types = ['int', 'Decimal(16,0)', 'int']

for i in range(max_measurements):
    for j, col_name in enumerate(col_names): 
        df = df.withColumn('{}_{}'.format(col_name, i), F.col('meas')[i][j].cast(col_types[j]))

df = df.drop('meas', 'measurements') 

In [12]:
df.select('id', 'sensor_0', 'tmp_0', 'RSSI_0', 'sensor_1', 'tmp_1', 'RSSI_1').show(10, False)

+---+--------+-----------+------+--------+-----------+------+
|id |sensor_0|tmp_0      |RSSI_0|sensor_1|tmp_1      |RSSI_1|
+---+--------+-----------+------+--------+-----------+------+
|1  |197     |5770127167 |109   |208     |962354640  |98    |
|2  |150     |968341093  |82    |434     |3229609750 |63    |
|3  |470     |982753933  |38    |499     |6026974083 |30    |
|4  |23      |61095513667|190   |134     |940042921  |62    |
|5  |203     |79216918750|8     |315     |31656195917|154   |
|6  |150     |971816968  |80    |434     |3233068917 |177   |
|7  |26      |-515030000 |219   |166     |33743812167|78    |
|8  |215     |44598943167|94    |352     |22189829750|65    |
|9  |461     |2740584833 |20    |607     |26027857583|62    |
|10 |166     |33767674667|32    |346     |65133682333|29    |
+---+--------+-----------+------+--------+-----------+------+
only showing top 10 rows



In [13]:
df.write.save('round2_competition_data/eval_test', format='csv', header=True, mode='overwrite')

In [14]:
file_name = [name for name in os.listdir('round2_competition_data/eval_test') if name.startswith('part')]

os.rename(os.path.join('round2_competition_data/eval_test', file_name[0]),
          'round2_competition_data/eval_test/eval_test.csv')

### Data casting and filtering

Next, we will cast the data to proper data types and filter-out data with incorrect timestamps or without coordinates.

In [2]:
flights = pd.read_csv('round2_competition_data/eval_test/eval_test.csv')

max_measurements = 6

In [3]:
types = {'id': 'int32', 'timeAtServer': 'float32', 'aircraft': 'int16', 'latitude': 'float32', 'longitude': 'float32',
        'baroAltitude': 'float32', 'geoAltitude': 'float32', 'numMeasurements': 'int16'}

for i in range(max_measurements):
    types['tmp_{}'.format(i)] = 'float64'
    types['RSSI_{}'.format(i)] = 'int16'
    
flights.fillna(value={i:0 for i in list(types.keys()) if i.startswith('RSSI')}, inplace=True)

flights = flights.astype(types, errors='ignore')

In [4]:
flights['tmp_0'] = flights['tmp_0'].abs()

In [5]:
idx = []

for i in range(max_measurements):
    idx.extend(flights.loc[flights['tmp_{}'.format(i)] == 0].index)
        
flights.drop(idx, inplace=True)

There are some data points that don't contain aircraft coordinates. Those localizations should be predicted using our model, thus we can remove them from our evaluation and test datasets.

In [6]:
print('Number of rows without aircraft localization: ', len(flights.loc[flights.latitude.isna(), 'latitude']))

Number of rows without aircraft localization:  632932


In [7]:
flights.dropna(subset=['latitude'], inplace=True)

In [8]:
print('Number of rows in dataset: ', len(flights))

Number of rows in dataset:  5824608


In [9]:
flights.to_csv('round2_competition_data/eval_test/eval_test.csv', index=False)

### Timestamp synchronization

To perform timestamp synchronization in evaluation and test datasets, we have to load the coefficient dictionary of linear regression models created in the previous notebook. The correction coefficients enable us to estimate the value of timestamp correction that needs to be applied at a given time and for a specific pair of sensors.  

In [2]:
max_measurements = 6

types = {'id': 'int32', 'timeAtServer': 'float32', 'aircraft': 'int16', 'latitude': 'float32', 'longitude': 'float32',
        'baroAltitude': 'float32', 'geoAltitude': 'float32', 'numMeasurements': 'int16'}

for i in range(max_measurements):
    types['tmp_{}'.format(i)] = 'float64'
    types['RSSI_{}'.format(i)] = 'int16'

In [3]:
flights = pd.read_csv('round2_competition_data/eval_test/eval_test.csv', dtype=types)

In [4]:
sensors = pd.read_csv('round2_training/round2/round2_sensors.csv')

In [5]:
with open(r"coeff_dict.pickle", "rb") as output_file:
    coeff_dict = pickle.load(output_file)

Below we will join the flights' dataset with the sensors' dataframe in order to extract receivers coordinates.

In [6]:
sensors.set_index('serial', inplace=True)

In [7]:
for i in range(max_measurements):
    flights = flights.join(sensors.loc[:, ['latitude', 'longitude', 'height']], on='sensor_{}'.format(i),
                           rsuffix='_{}'.format(i))
    
flights.rename({'height': 'height_0'}, axis=1, inplace=True)

In [8]:
types_coords = {}

for i in range(max_measurements):
    types_coords['latitude_{}'.format(i)] = 'float32'
    types_coords['longitude_{}'.format(i)] = 'float32'
    types_coords['height_{}'.format(i)] = 'float32'
    
flights = flights.astype(types_coords, errors='ignore')

In [9]:
flights.loc[:10, ['sensor_0', 'sensor_1', 'tmp_0', 'tmp_1', 'tmp_2', 'tmp_3']]

Unnamed: 0,sensor_0,sensor_1,tmp_0,tmp_1,tmp_2,tmp_3
0,197,208,5770127000.0,962354600.0,37145250000.0,17125900000.0
1,150,434,968341100.0,3229610000.0,29137900000.0,26839110000.0
2,470,499,982753900.0,6026974000.0,,
3,23,134,61095510000.0,940042900.0,49067580000.0,-144603300.0
4,203,315,79216920000.0,31656200000.0,67289770000.0,
5,150,434,971817000.0,3233069000.0,29141420000.0,26842950000.0
6,26,166,515030000.0,33743810000.0,38922700000.0,
7,215,352,44598940000.0,22189830000.0,-1965785000.0,59469080000.0
8,461,607,2740585000.0,26027860000.0,24526930000.0,45961970000.0
9,166,346,33767670000.0,65133680000.0,,


Now we will begin the timestamp synchronization.

In [10]:
corrected_tmp = {}

for row in tqdm(flights.itertuples(), total=len(flights)):
    
    s1 = row.sensor_0
    timeAtServer = row.timeAtServer
    
    for i in range(1, max_measurements):
        
        corrected_tmp.setdefault(i, [])
        
        s2 = getattr(row, 'sensor_{}'.format(i))
        tmp_2 = getattr(row, 'tmp_{}'.format(i))
        
        if np.isnan(tmp_2):
            corrected_tmp[i].append(np.NaN)
            continue
        
        # Return the correction coefficients (default=(0,0))
        m, b = coeff_dict.get('{}_{}'.format(int(s1), int(s2)), (0, 0))
        
        corr = m * timeAtServer + b
        
        corrected_tmp[i].append(tmp_2 - corr)      
         

100%|█████████████████████████████████████████████████████████████████████| 5824608/5824608 [02:27<00:00, 39621.83it/s]


In [13]:
for i in range(1, max_measurements):
    flights['tmp_{}'.format(i)] = pd.Series(corrected_tmp[i])

In [14]:
flights.loc[:10, ['sensor_0', 'sensor_1', 'tmp_0', 'tmp_1', 'tmp_2', 'tmp_3']]

Unnamed: 0,sensor_0,sensor_1,tmp_0,tmp_1,tmp_2,tmp_3
0,197,208,5770127000.0,26627320000.0,5770716000.0,5771402000.0
1,150,434,968341100.0,-19889120000.0,-19890660000.0,-19888080000.0
2,470,499,982753900.0,-19880080000.0,,
3,23,134,61095510000.0,82250270000.0,61406790000.0,61398810000.0
4,203,315,79216920000.0,-10458930000.0,79089620000.0,
5,150,434,971817000.0,-19885660000.0,-19887140000.0,-19884240000.0
6,26,166,515030000.0,-523003000.0,-524690500.0,
7,215,352,44598940000.0,44587250000.0,44596690000.0,44595350000.0
8,461,607,2740585000.0,3700265000.0,3508737000.0,3509040000.0
9,166,346,33767670000.0,130348100000.0,,


Synchronized data can be saved into a CSV file.

In [16]:
flights.to_csv('round2_competition_data/eval_test/eval_test.csv', index=False)

### Feature extraction and filtering

In this section we are going to focus on extracting the following additional features:
- timestamp differences
- the mean latitude and longitude location of the receivers
- the weighted mean of sensors coordinates

At first, we will instantiate the <i>SparkSession</i> and set the number of output partitions and the log level.

In [2]:
spark = SparkSession.builder \
    .master("local[2]") \
    .appName("ads-b data processing") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [3]:
# Set number of output partitions
spark.conf.set("spark.sql.shuffle.partitions", 100)

# Set log level
spark.sparkContext.setLogLevel("ERROR")

In [4]:
df = spark.read.format("csv") \
    .options(header='True', inferSchema='True') \
    .load('round2_competition_data/eval_test/eval_test.csv') 

Next, we will remove the columns that are no longer useful.

In [5]:
max_measurements = 6

# Select columns to drop. We will keep timeAtServer and aircraft fields for visualization purposes
cols = ['id', 'numMeasurements'] + ['sensor_{}'.format(i) for i in range(max_measurements)] 

In [6]:
df = df.drop(*cols)

Below we will calculate the timestamp differences that are going to be used as additional features.

In [7]:
# Specify the number of measurements to be taken into account while calculating differences
n_meas_diff = 4

for col_1 in range(1, n_meas_diff):
    for col_2 in range(col_1):
        df = df.withColumn('diff_{}_{}'.format(col_1, col_2), F.col('tmp_{}'.format(col_1)) - \
                           F.col('tmp_{}'.format(col_2)))

In [8]:
df.select('tmp_0', 'tmp_1', 'tmp_2', 'diff_1_0', 'diff_2_0').show(5)

+---------------+--------------------+--------------------+--------------------+--------------------+
|          tmp_0|               tmp_1|               tmp_2|            diff_1_0|            diff_2_0|
+---------------+--------------------+--------------------+--------------------+--------------------+
|  5.770127167E9|2.662732089412679...| 5.770716371589668E9|2.085719372712679...|   589204.5896682739|
|   9.68341093E8|-1.98891237046369...|-1.98906594110398...|-2.08574647976369...|-2.08590005040398...|
|   9.82753933E8|-1.98800833742654...|                null|-2.08628373072654...|                null|
|6.1095513667E10|8.225027432537793E10|6.140679142674698E10|2.115476065837793E10|3.1127775974697876E8|
| 7.921691875E10|-1.04589263600445...| 7.90896239496978E10|-8.96758451100445...|-1.27294800302200...|
+---------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



The next feature to extract is the mean latitude and longitude location of the sensors receiving the signal.

In [9]:
cols = [col for col in df.columns if 'latitude_' in col]

df = df.withColumn('mean_lat',\
     sum([F.when(F.col(col).isNull(), 0).otherwise(F.col(col)) for col in cols])/ \
     sum([F.when(F.col(col).isNull(), 0).otherwise(1) for col in cols])). \
     fillna(0, 'mean_lat')

cols = [col for col in df.columns if 'longitude_' in col]

df = df.withColumn('mean_lon',\
     sum([F.when(F.col(col).isNull(), 0).otherwise(F.col(col)) for col in cols])/ \
     sum([F.when(F.col(col).isNull(), 0).otherwise(1) for col in cols])). \
     fillna(0, 'mean_lon')

In [10]:
df.select('latitude_0', 'latitude_1', 'latitude_2', 'latitude_3', 'mean_lat').show(5)

+----------+----------+----------+----------+------------------+
|latitude_0|latitude_1|latitude_2|latitude_3|          mean_lat|
+----------+----------+----------+----------+------------------+
|   53.0433| 52.780922|  52.02494|  52.34244|        52.5479005|
| 43.571663| 43.339207|  43.34309|   41.4643|42.929565000000004|
| 46.762405| 46.761898|      null|      null|        46.7621515|
|  52.47304|  50.93708|  52.53275|  52.53275| 51.90161383333333|
| 50.883423|   50.8633|  51.32772|      null| 51.02481433333333|
+----------+----------+----------+----------+------------------+
only showing top 5 rows



We will also use the weighted mean of sensors coordinates as an additional feature. In this calculation, weights are going to be inverse values of timestamp measurements.

In [11]:
cols = [col for col in df.columns if 'latitude_' in col]

df = df.withColumn('w_mean_lat',\
     sum([F.when(F.col(col).isNull(), 0).otherwise(F.col(col)*(1/F.col('tmp_{}'.format(i)))) \
          for i, col in enumerate(cols)])/ \
     sum([F.when(F.col(col).isNull(), 0).otherwise(1/F.col('tmp_{}'.format(i))) \
          for i, col in enumerate(cols)])). \
     fillna(0, 'w_mean_lat')

cols = [col for col in df.columns if 'longitude_' in col]

df = df.withColumn('w_mean_lon',\
     sum([F.when(F.col(col).isNull(), 0).otherwise(F.col(col)*(1/F.col('tmp_{}'.format(i)))) \
          for i, col in enumerate(cols)])/ \
     sum([F.when(F.col(col).isNull(), 0).otherwise(1/F.col('tmp_{}'.format(i))) \
          for i, col in enumerate(cols)])). \
     fillna(0, 'w_mean_lon')

In [12]:
df.select('latitude_0', 'latitude_1', 'tmp_0', 'tmp_1', 'w_mean_lat').show(5)

+----------+----------+---------------+--------------------+-----------------+
|latitude_0|latitude_1|          tmp_0|               tmp_1|       w_mean_lat|
+----------+----------+---------------+--------------------+-----------------+
|   53.0433| 52.780922|  5.770127167E9|2.662732089412679...|52.49118230290629|
| 43.571663| 43.339207|   9.68341093E8|-1.98891237046369...|43.71810384249567|
| 46.762405| 46.761898|   9.82753933E8|-1.98800833742654...|46.76243136648981|
|  52.47304|  50.93708|6.1095513667E10|8.225027432537793E10|52.06661720099436|
| 50.883423|   50.8633| 7.921691875E10|-1.04589263600445...|50.77621312581878|
+----------+----------+---------------+--------------------+-----------------+
only showing top 5 rows



As the last step, we will fill in missing values and save the data frame.

In [13]:
df = df \
    .fillna(-90, subset=[col for col in df.columns if 'latitude' in col]) \
    .fillna(-180, subset=[col for col in df.columns if 'longitude' in col]) \
    .fillna(-90, subset=[col for col in df.columns if 'mean_lat' in col]) \
    .fillna(-180, subset=[col for col in df.columns if 'mean_lon' in col]) \
    .fillna(0)

In [14]:
df.repartition(1).write.save('round2_competition_data/tmp_evaltest/', format='csv', header=True)

In [15]:
list_dir = os.listdir('round2_competition_data/tmp_evaltest/')

if '_SUCCESS' in list_dir:
    file_name = [name for name in list_dir if name.startswith('part')]
    os.rename(os.path.join('round2_competition_data/tmp_evaltest', file_name[0]), \
              'round2_competition_data/tmp_evaltest/eval_test.csv')
    os.replace('round2_competition_data/tmp_evaltest/eval_test.csv', 'round2_competition_data/eval_test/eval_test.csv')
    
    import shutil
    shutil.rmtree('round2_competition_data/tmp_evaltest/')