# Full Dataset: Extraction, Transformation, Feature Engineering, and Joins

In [0]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from datetime import datetime
from datetime import timedelta
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, NullType, ShortType, DateType, BooleanType, BinaryType, TimestampType
from pyspark.ml.feature import Imputer


sqlContext = SQLContext(sc)

In [0]:
# Configuration for Blob Storage 

blob_container = "container1" # The name of your container created in https://portal.azure.com
storage_account = "w261sp22team12" # The name of your Storage account created in https://portal.azure.com
secret_scope = "s1" # The name of the scope created in your local computer using the Databricks CLI
secret_key = "k1" # The name of the secret key created in your local computer using the Databricks CLI 
blob_url = f"wasbs://{blob_container}@{storage_account}.blob.core.windows.net"
mount_path = "/mnt/mids-w261"

In [0]:
# SAS Token
spark.conf.set(
  f"fs.azure.sas.{blob_container}.{storage_account}.blob.core.windows.net",
  dbutils.secrets.get(scope = secret_scope, key = secret_key)
)

In [0]:
# Set partitions
spark.conf.set("spark.sql.shuffle.partitions", 1000)
spark.conf.set("spark.sql.files.minPartitionNum", 1000)

In [0]:
#Spark details
spark

In [0]:
def sparkShape(dataFrame):
    return (dataFrame.count(), len(dataFrame.columns))

## Data

In [0]:
display(dbutils.fs.ls("/mnt/mids-w261/datasets_final_project"))

path,name,size
dbfs:/mnt/mids-w261/datasets_final_project/airlines/,airlines/,0
dbfs:/mnt/mids-w261/datasets_final_project/airlines_data/,airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data/,parquet_airlines_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_3m/,parquet_airlines_data_3m/,0
dbfs:/mnt/mids-w261/datasets_final_project/parquet_airlines_data_6m/,parquet_airlines_data_6m/,0
dbfs:/mnt/mids-w261/datasets_final_project/stations_data/,stations_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data/,weather_data/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_6_hr/,weather_data_6_hr/,0
dbfs:/mnt/mids-w261/datasets_final_project/weather_data_single/,weather_data_single/,0


### Weather Data

In [0]:
# Load the full Weather data
df_weather = spark.read.parquet("/mnt/mids-w261/datasets_final_project/weather_data/*") \
                        .withColumn('fl_date', to_date(col('date'))) \
                        .repartition(1000, 'fl_date') \
                        .persist()

In [0]:
df_weather.createOrReplaceTempView("weather")

In [0]:
print("weather", sparkShape(df_weather))

### Station Data

In [0]:
# Weather Station Supplementary Data
df_stations = spark.read.parquet("/mnt/mids-w261/datasets_final_project/stations_data/*").persist()

# Create a tempview so we can use SQL
df_stations.createOrReplaceTempView("stations")

In [0]:
display(df_stations)

usaf,wban,station_id,lat,lon,neighbor_id,neighbor_name,neighbor_state,neighbor_call,neighbor_lat,neighbor_lon,distance_to_neighbor
690020,93218,69002093218,36.0,-121.233,69002093218,JOLON HUNTER LIGGETT MIL RES,CA,KHGT,36.0,-121.233,0.0
690020,93218,69002093218,36.0,-121.233,69007093217,FRITZSCHE AAF,CA,KOAR,36.683,-121.767,55.73024537916726
690020,93218,69002093218,36.0,-121.233,69014093101,EL TORO MCAS,CA,KNZJ,33.667,-117.733,255.49106220353931
690020,93218,69002093218,36.0,-121.233,70027127506,BARROW POINT BARROW,AK,KPBA,71.333,-156.65,2750.4353299559803
690020,93218,69002093218,36.0,-121.233,70045027512,LONELY,AK,LNI,70.917,-153.25,2676.3554370627157
690020,93218,69002093218,36.0,-121.233,70063027403,OLIKTOK POW 2,AK,POLI,70.5,-149.883,2604.050248854232
690020,93218,69002093218,36.0,-121.233,70063526465,GALBRAITH LAKE AIRPORT,AK,PAGB,68.479,-149.49,2490.975609447228
690020,93218,69002093218,36.0,-121.233,70063627405,PRUDHOE BAY,AK,PAUD,70.25,-148.333,2568.180281844432
690020,93218,69002093218,36.0,-121.233,70104626418,CENTRAL AIRPORT,AK,PACE,65.567,-144.765,2254.558489129194
690020,93218,69002093218,36.0,-121.233,70119526625,SHISHMAREF/NEW AIRPORT,AK,PASH,66.25,-166.089,2743.6881828292408


In [0]:
#print("stations", sparkShape(df_stations))

In [0]:
# Read cleaned and transformed airlines data
df_airports_pagerank = spark.read.parquet(f"{blob_url}/airlines_full_airport_airline_ripple_pagerank") \
                                    .repartition(1000, 'fl_date') \
                                    .persist()

# Create a tempview so we can use SQL
df_airports_pagerank.createOrReplaceTempView("airlines")

In [0]:
display(df_airports_pagerank)

In [0]:
'''
Step 1: 
    Find the distinct origin airport codes (ICAO)
    These will be used to join with station data to filter
    for weather station close to the airports
'''


qs_distinct_airports = '''
    SELECT         
        DISTINCT origin_icao AS distinct_airports
    FROM 
        airlines
'''

df_distinct_airports = spark.sql(qs_distinct_airports).persist()

df_distinct_airports.createOrReplaceTempView("distinct_airports")



In [0]:
print(df_distinct_airports.count())

In [0]:
%sql 

SELECT *
FROM distinct_airports

distinct_airports
KRFD
KACV
KBRO
KGGG
KYNG
KBGM
KMRY
KACK
KSTC
KABR


In [0]:
'''
Step 2: 
    Join distinct airport origin ICAOs with station data
'''

max_distance = 500
qs_join_airport_stations = f'''
    WITH ranking_table AS (
      SELECT
          /*+ REPARTITION(1000) */
          neighbor_call, 
          station_id,
          distance_to_neighbor, 
          ROW_NUMBER() OVER(
            PARTITION BY neighbor_call
            ORDER BY distance_to_neighbor ASC
          ) AS rank_neighbor
      FROM distinct_airports
      INNER JOIN stations 
        ON distinct_airports.distinct_airports = stations.neighbor_call
      WHERE distance_to_neighbor <= {max_distance}
    )
    
SELECT 
    /*+ REPARTITION(1000) */
    *
FROM 
    ranking_table
WHERE 
    rank_neighbor <= 2
'''

# Create df using query string
df_join_airport_stations = spark.sql(qs_join_airport_stations).persist()

# Create view for SQL
df_join_airport_stations.createOrReplaceTempView("join_airport_stations")

# Find shape
print(sparkShape(df_join_airport_stations)) # 654, 4

### Functions & columns

In [0]:
# Filter US only data and Report Type FM-15 and FM-16
def transform_weather_data(weather_data, weather_columns_of_interest):
    return (
        weather_data
            .withColumn("COUNTRY", F.substring(F.col("NAME"), -2, 2))
            .filter("COUNTRY = 'US'")
            .filter("(TRIM(REPORT_TYPE) IN ('FM-15'))")
            .select(weather_columns_of_interest)
    )

### Missing Data
For un-signed columns, the values "99" , "999" , "9999", "99999" or "999999" are used as the value where the data is missing. For the signed columns, the values "+99", "+9999" or "+99999" are used as the value where the data is missing. The missing values are also indicated by condition code , discrepancy code or quality code "9".

In [0]:
# Split the comma separated values for columns of interest
'''
NOAA Source: 
https://www.ncei.noaa.gov/data/global-hourly/doc/isd-format-document.pdf
'''

def split_weather_data_features(weather_data):
    return (
        weather_data
            # WND (page 7 NOAA)
            .withColumn("wind_directional_angle", split(weather_data['WND'], ',').getItem(0).cast(DoubleType()))
            .withColumn("wind_directional_qc", split(weather_data['WND'], ',').getItem(1))
            .withColumn("wind_directional_type_code", split(weather_data['WND'], ',').getItem(2))
            .withColumn("wind_directional_speed_rate", split(weather_data['WND'], ',').getItem(3).cast(DoubleType()))
            .withColumn("wind_directional_speed_qc", split(weather_data['WND'], ',').getItem(4))
            .withColumn("wind_directional_angle", F.when((F.col("wind_directional_angle") == 999.0) | F.col("wind_directional_qc").isin("3","7","999"), None).otherwise(F.col("wind_directional_angle")))                                                         
            .withColumn("wind_directional_speed_rate", F.when((F.col("wind_directional_speed_rate") == 9999.0) |F.col("wind_directional_speed_qc").isin("3","7","999"), None).otherwise(F.col("wind_directional_speed_rate")))
        
            # CIG
            .withColumn("sky_ceiling_height_dimension", split(weather_data['CIG'], ',').getItem(0).cast(DoubleType()))
            .withColumn("sky_ceiling_qc", split(weather_data['CIG'], ',').getItem(1))
            .withColumn("sky_ceiling_determination_code", split(weather_data['CIG'], ',').getItem(2))
            .withColumn("sky_ceiling_cavok_code", split(weather_data['CIG'], ',').getItem(3))
            #99999 = Missing 
            .withColumn("sky_ceiling_height_dimension", F.when((F.col("sky_ceiling_height_dimension")==99999.0)| F.col("sky_ceiling_qc").isin("3","7","999"), None).otherwise(F.col("sky_ceiling_height_dimension")))
        
            # VIS (page 10 NOAA) ; vis means Visibility Observation ; qc means quality code
            .withColumn("vis_distance", split(weather_data['VIS'], ',').getItem(0).cast(DoubleType()))
            .withColumn("vis_distance_qc", split(weather_data['VIS'], ',').getItem(1))
            .withColumn("vis_variability_code", split(weather_data['VIS'], ',').getItem(2))
            .withColumn("vis_variability_qc", split(weather_data['VIS'], ',').getItem(3))
            # Missing value
            .withColumn("vis_distance", F.when((F.col("vis_distance")==999999.0) | F.col("vis_distance_qc").isin("3","7","999"),None).otherwise(F.col("vis_distance")))
                                         
                                         
            # TMP (page 10 NOAA)
            .withColumn("air_temperature", split(weather_data['TMP'], ',').getItem(0).cast(DoubleType()))
            .withColumn("air_temperature_qc", split(weather_data['TMP'], ',').getItem(1))
            .withColumn("air_temperature", F.when((F.col("air_temperature")==9999.0) | F.col("air_temperature_qc").isin("3","7","999"),None).otherwise(F.col("air_temperature")))

        
            # DEW (page 11 NOAA)
            .withColumn("dew_point_temperature", split(weather_data['DEW'], ',').getItem(0).cast(DoubleType()))
            .withColumn("dew_point_qc", split(weather_data['DEW'], ',').getItem(1))
            .withColumn("dew_point_temperature", F.when((F.col("dew_point_temperature")==9999.0 )| F.col("dew_point_qc").isin("3","7","999"),None).otherwise(F.col("dew_point_temperature")))
        
            # SLP (sea level pressure; page 12 NOAA)
            .withColumn("sea_level_pressure", split(weather_data['SLP'], ',').getItem(0).cast(DoubleType()))
            .withColumn("sea_level_pressure_qc", split(weather_data['SLP'], ',').getItem(1))
            .withColumn("sea_level_pressure", F.when((F.col("sea_level_pressure")==99999.0 ) | F.col("sea_level_pressure_qc").isin("3","7","999"),None).otherwise(F.col("sea_level_pressure")))
        
            # AA1 (page 13 NOAA) ; lp means liquid precipitation ; qc means quality code
            .withColumn("lp_period_qty", split(weather_data['AA1'], ',').getItem(0).cast(DoubleType()))
            .withColumn("lp_depth_dimension", split(weather_data['AA1'], ',').getItem(1))
            .withColumn("lp_condition_code", split(weather_data['AA1'], ',').getItem(2))
            .withColumn("lp_quality_code", split(weather_data['AA1'], ',').getItem(3))
            .withColumn("lp_period_qty", F.when((F.col("lp_period_qty")==99.0) | F.col("lp_quality_code").isin("3","7","999"), None).otherwise(F.col("lp_period_qty")))
        
            # MA1 (page 88 NOAA) ; ap means atmospheric pressure ; qc means quality code
            .withColumn("ap_altimeter_setting_rate", split(weather_data['MA1'], ',').getItem(0).cast(DoubleType()))
            .withColumn("ap_altimeter_qc", split(weather_data['MA1'], ',').getItem(1))
            .withColumn("ap_station_pressure_rate", split(weather_data['MA1'], ',').getItem(2).cast(DoubleType()))
            .withColumn("ap_station_pressure_qc", split(weather_data['MA1'], ',').getItem(3))
            .withColumn("ap_altimeter_setting_rate", F.when((F.col("ap_altimeter_setting_rate") == 99999.0) | F.col("ap_altimeter_qc").isin("3","7","999"), None).otherwise(F.col("ap_altimeter_setting_rate")))        
            .withColumn("ap_station_pressure_rate", F.when((F.col("ap_station_pressure_rate") == 99999.0) | F.col("ap_station_pressure_qc").isin("3","7","999"), None).otherwise(F.col("ap_station_pressure_rate")))

            # GD1 Sky Cover Summation State Identifiers (page 55 NOAA) ;  qc means quality code
            .withColumn("sky_coverage_code", split(weather_data['GD1'], ',').getItem(0))
            .withColumn("sky_coverage_code_2", split(weather_data['GD1'], ',').getItem(1))
            .withColumn("sky_coverage_qc", split(weather_data['GD1'], ',').getItem(2))
            .withColumn("sky_height_dimension", split(weather_data['GD1'], ',').getItem(3).cast(DoubleType()))
            .withColumn("sky_height_dimension_qc", split(weather_data['GD1'], ',').getItem(4))
            .withColumn("sky_characteristic_code", split(weather_data['GD1'], ',').getItem(5))
            .withColumn("sky_height_dimension", F.when((F.col("sky_height_dimension") == 99999.0) | F.col("sky_height_dimension_qc").isin("3","7","999"), None).otherwise(F.col("sky_height_dimension")))
                        
            # GE1 / SKY Condition Observation (page 9 and 57 NOAA)
            .withColumn("sky_convective_cloud_attribute", split(weather_data['GE1'], ',').getItem(0).cast(DoubleType()))
            .withColumn("sky_vertical_datum_attribute", split(weather_data['GE1'], ',').getItem(1))
            .withColumn("sky_base_height_upper_range_attribute", split(weather_data['GE1'], ',').getItem(2))
            .withColumn("sky_base_height_lower_range_attribute", split(weather_data['GE1'], ',').getItem(3))
            .withColumn("sky_convective_cloud_attribute", F.when((F.col("sky_convective_cloud_attribute") == 9.0) | F.col("sky_vertical_datum_attribute").isin("999999"), None).otherwise(F.col("sky_convective_cloud_attribute")))
                                    
            # GF1 -- not very useful, lots of missing values
        
            # IA1 Ground Surface Observation (page 76 NOAA)
            .withColumn("ground_observation_code", split(weather_data['IA1'], ',').getItem(0))
            .withColumn("ground_observation_qc", split(weather_data['IA1'], ',').getItem(1))
        
            # AJ1 Snow Depth Identifier (page 21 NOAA)
            .withColumn("snow_depth_dimension", split(weather_data['AJ1'], ',').getItem(0).cast(DoubleType()))
            .withColumn("snow_depth_condition_code", split(weather_data['AJ1'], ',').getItem(1))
            .withColumn("snow_depth_qc", split(weather_data['AJ1'], ',').getItem(2))
            .withColumn("snow_depth_equivalent_water_depth", split(weather_data['AJ1'], ',').getItem(3).cast(DoubleType()))
            .withColumn("snow_depth_water_condition_code", split(weather_data['AJ1'], ',').getItem(4))
            .withColumn("snow_depth_water_qc", split(weather_data['AJ1'], ',').getItem(5))
            .withColumn("snow_depth_dimension", F.when((F.col("snow_depth_dimension") == 9999.0) | F.col("snow_depth_qc").isin("3","7","999"), None).otherwise(F.col("snow_depth_dimension")))
            .withColumn("snow_depth_equivalent_water_depth", F.when((F.col("snow_depth_equivalent_water_depth") == 9999.0) | F.col("snow_depth_water_qc").isin("3","7","999"), None).otherwise(F.col("snow_depth_equivalent_water_depth")))
                        
            # AT1 Daily Present Weather Observation (page 27 NOAA)
            .withColumn("weather_obs_source_element", split(weather_data['AT1'], ',').getItem(0))
            .withColumn("weather_obs_weather_type_num", split(weather_data['AT1'], ',').getItem(1))
            .withColumn("weather_obs_weather_type_abb", split(weather_data['AT1'], ',').getItem(2))
            .withColumn("weather_obs_qc", split(weather_data['AT1'], ',').getItem(3))
            .drop("WND", "CIG", "VIS", "TMP", "DEW", "SLP", "GA1", "GE1", "GD1", "AA1", "AJ1", "AT1", "IA1", "MA1")
    )

In [0]:
# Columns on interest based on EDA
weather_columns_of_interest = [
    "STATION", "COUNTRY", "DATE", "LATITUDE", "LONGITUDE", "REPORT_TYPE", "CALL_SIGN",
    "WND", "CIG", "VIS", "TMP", "DEW", "SLP", 
    "GA1", "GE1", "GD1", 
    "AA1", "AJ1", "AT1", 
    "IA1", "MA1",
    "fl_date"
]

In [0]:
def impute_missing_values(df , cols_to_impute):
    imputer =  Imputer (
                        inputCols =cols_to_impute,
                        outputCols=["{}_imputed".format(c) for c in cols_to_impute]
                        ).setStrategy("median")
    return imputer.fit(df).transform(df)

In [0]:
#All the values in sky_convective_cloud_attribute are Null
cols_to_impute = ['wind_directional_angle', 'wind_directional_speed_rate', 'sky_ceiling_height_dimension', 'vis_distance', 'air_temperature', 'dew_point_temperature', 'sea_level_pressure', 'lp_period_qty', 'ap_altimeter_setting_rate', 'sky_height_dimension', 'snow_depth_dimension', 'snow_depth_equivalent_water_depth', 'ap_station_pressure_rate']


In [0]:
def update_missing_with_imputed(df):
    return (
        df
            .withColumn("wind_directional_speed_rate", F.when((F.col("wind_directional_speed_rate").isNotNull()),F.col("wind_directional_speed_rate")).otherwise(F.col("wind_directional_speed_rate_imputed")))
            .withColumn("sky_ceiling_height_dimension", F.when((F.col("sky_ceiling_height_dimension").isNotNull()), F.col("sky_ceiling_height_dimension")).otherwise(F.col("sky_ceiling_height_dimension_imputed")))
             .withColumn("vis_distance", F.when((F.col("vis_distance").isNotNull()) ,F.col("vis_distance")).otherwise(F.col("vis_distance_imputed")))
             .withColumn("air_temperature", F.when((F.col("air_temperature").isNotNull()) ,F.col("air_temperature")).otherwise(F.col("air_temperature_imputed")))
            .withColumn("dew_point_temperature", F.when((F.col("dew_point_temperature").isNotNull()),F.col("dew_point_temperature")).otherwise(F.col("dew_point_temperature_imputed")))
            .withColumn("sea_level_pressure", F.when((F.col("sea_level_pressure").isNotNull() ) ,F.col("sea_level_pressure")).otherwise(F.col("sea_level_pressure_imputed")))
            .withColumn("lp_period_qty", F.when((F.col("lp_period_qty").isNotNull()) , F.col("lp_period_qty")).otherwise(F.col("lp_period_qty_imputed")))
      
            .withColumn("ap_altimeter_setting_rate", F.when((F.col("ap_altimeter_setting_rate").isNotNull()) , F.col("ap_altimeter_setting_rate")).otherwise(F.col("ap_altimeter_setting_rate_imputed")))
            .withColumn("ap_station_pressure_rate", F.when((F.col("ap_station_pressure_rate").isNotNull()) , F.col("ap_station_pressure_rate")).otherwise(F.col("ap_station_pressure_rate_imputed")))
            .withColumn("sky_height_dimension", F.when((F.col("sky_height_dimension").isNotNull()) , F.col("sky_height_dimension")).otherwise(F.col("sky_height_dimension_imputed")))
            .withColumn("snow_depth_dimension", F.when((F.col("snow_depth_dimension").isNotNull()) , F.col("snow_depth_dimension")).otherwise(F.col("snow_depth_dimension_imputed")))
            .withColumn("snow_depth_equivalent_water_depth", F.when((F.col("snow_depth_equivalent_water_depth").isNotNull()) , F.col("snow_depth_equivalent_water_depth")).otherwise(F.col("snow_depth_equivalent_water_depth_imputed")))
      .drop('wind_directional_angle_imputed', 'wind_directional_speed_rate_imputed', 'sky_ceiling_height_dimension_imputed', 'vis_distance_imputed', 'air_temperature_imputed', 'dew_point_temperature_imputed', 'sea_level_pressure_imputed', 'lp_period_qty_imputed', 'ap_altimeter_setting_rate_imputed', 'sky_height_dimension_imputed', 'snow_depth_dimension_imputed', 'snow_depth_equivalent_water_depth_imputed','ap_station_pressure_rate_imputed')
                        
    )

In [0]:
#Filter the columns and US only
df_weather_filtered = transform_weather_data(df_weather, weather_columns_of_interest) 
df_weather_filtered = df_weather_filtered.select([col(c).alias(c.lower()) for c in df_weather_filtered.columns]).cache()

In [0]:
#Split the columns
df_weather_split = split_weather_data_features(df_weather_filtered).persist()


In [0]:
#df_weather_split_with_impute = impute_missing_values(df_weather_split,cols_to_impute )

In [0]:
#df_weather_split = update_missing_with_imputed(df_weather_split_with_impute )

In [0]:
# Create view for SQL
df_weather_split.createOrReplaceTempView("weather")

In [0]:
print(df_weather_split.count())

In [0]:
qs_join_weather_distinct_airport_stations = '''
    SELECT 
        /*+ REPARTITION(1000) */
        
        join_airport_stations.neighbor_call AS airport_icao,
        join_airport_stations.distance_to_neighbor AS airport_distance_to_weather_station, 
        join_airport_stations.rank_neighbor AS rank,        
        
        weather.*
    FROM 
        join_airport_stations
    INNER JOIN weather ON join_airport_stations.station_id = weather.station
'''

df_weather_join_final = spark.sql(qs_join_weather_distinct_airport_stations).persist()

In [0]:
print(df_weather_join_final.count())

In [0]:
print(sparkShape(df_weather_join_final))

In [0]:

# Create view for SQL
df_weather_join_final.createOrReplaceTempView("weather_final")

### Summary

In [0]:
# Checkpoint final dataset: distinct airport <--> station <--> weather  
df_weather_join_final.write.mode("overwrite").parquet(f"{blob_url}/weather_full")

In [0]:
df_weather_full = spark.read.parquet(f"{blob_url}/weather_full")
df_weather_2019 = df_weather_full.filter(year(col('DATE')) == "2019")
df_weather_2019.createOrReplaceTempView('weather2019')

In [0]:
display(df_weather_2019)

In [0]:
numeric_features = [column for (column, data_type) in df_weather_2019.dtypes if (data_type == "int" or data_type == "double")]
print(numeric_features)

In [0]:
sns.set(rc = {'figure.figsize':(10,10)})
sns.heatmap(df_weather_2019.filter(col('DATE') < "2019-04-01T00:00:00.000").select(numeric_features).toPandas().corr(), cmap = "Blues")