In [1]:
# mapping of covid locations (fips + GPS) to the closest weather station

In [59]:
import configparser
import os
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import udf, col, lit
from pyspark.sql.types import MapType, StringType, FloatType
from pyspark.sql import DataFrame, Window
from collections import OrderedDict
import pandas as pd
import numpy as np


In [3]:
config = configparser.ConfigParser()
config.read("capstone.cfg")

local_data_dir = config["PATH"]["LOCAL_DATA_DIR"]
output_path =  config["PATH"]["STAGE_DATA_DIR"]

os.environ['AWS_ACCESS_KEY_ID']=config['AWS']['ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['AWS']['SECRET_ACCESS_KEY']

In [4]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .appName("covid_DB") \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0")\
        .config("fs.s3a.access.key", config['AWS']['ACCESS_KEY_ID'])\
        .config("fs.s3a.secret.key", config['AWS']['SECRET_ACCESS_KEY'])\
        .getOrCreate()
    
    return spark

In [5]:
spark = create_spark_session()

In [6]:
# Load relevant stations with weather element
path = os.path.join(output_path, "filtered_stations")
selected_stations = spark.read.parquet(path)

In [7]:
selected_stations.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- measured: string (nullable = true)



In [8]:
selected_stations.select("measured").distinct().show()

+--------+
|measured|
+--------+
|    TMIN|
|    SNOW|
|    AWND|
|    PRCP|
+--------+



In [9]:
# load all stations, with GPS location
raw_stations = spark.read.csv( os.path.join(local_data_dir, "WEATHER", "ghcnd-stations.txt"))

In [10]:
# parse raw stations into columns
@udf(MapType( StringType(), StringType()))
def ParseStationsUDF(line):
    return{
        "station_id": line[0:11],
        "latitude" : line[13:20], 
        "longitude" : line[21:30], 
        "elevation" : line[31:38], 
        "state" : line[38:40], 
        "station_name" : line[41:]
        
    }

fields = OrderedDict( [
        ( "station_id" , "string"),
        ( "latitude" , "float"), 
        ("longitude" , "float"), 
        ("elevation" , "float"),
        ("state" , "string"), 
        ("station_name" , "string")
] )

#exprs = [ f"parsed['{field}'].cast({fld_type}) as {field}" for field, fld_type in fields.items() ]
exprs = [ f"CAST(parsed['{field}'] AS {fld_type}) AS {field}" for field, fld_type in fields.items() ]

stations = raw_stations.withColumn("parsed", ParseStationsUDF("_c0")).selectExpr( *exprs)

In [11]:
stations.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- elevation: float (nullable = true)
 |-- state: string (nullable = true)
 |-- station_name: string (nullable = true)



In [12]:
df_stations = stations.join(selected_stations, ["station_id"])

In [13]:
df_stations.count()

23556

In [14]:
#selected_stations.count()

In [15]:
#raw_stations.count()

In [16]:
# some stations have identical latitude and longitude => keep only one within a GPS group
w = Window.partitionBy("latitude", "longitude").orderBy("elevation", "station_id")
df_stations_dedup = df_stations.withColumn("rank_elevation", F.rank().over(w))\
    .where( col("rank_elevation") == 1)\
    .drop("rank_elevation")\
    .cache()

In [17]:
df_stations_dedup.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- elevation: float (nullable = true)
 |-- state: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- measured: string (nullable = true)



In [18]:
df_stations_dedup.count()

23531

In [19]:
w = Window.partitionBy("station_id").orderBy("measured")
unique_stations = df_stations_dedup.withColumn("dummy", F.rank().over(w) )\
    .where(col("dummy") == 1) \
    .drop("dummy")


In [20]:
unique_stations.count()

14006

In [21]:
# Load NYT locations (FIPS + GPS)
path = os.path.join(output_path, "nyt_locations_geography")
df_locations = spark.read.parquet(path)
df_locations = df_locations.where( ( ~ F.isnan("latitude") ) | (~ F.isnan("longitude")) )
df_locations = df_locations.withColumn("location_id", F.monotonically_increasing_id())

In [22]:
df_locations.printSchema()

root
 |-- fips: string (nullable = true)
 |-- county: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- location_id: long (nullable = false)
 |-- state: string (nullable = true)



In [23]:
df_locations.count()

3272

In [24]:
def precompute_distance(l_ref : DataFrame) -> DataFrame:
    l_ref = l_ref.withColumnRenamed("latitude", "latitude_degrees")
    l_ref = l_ref.withColumnRenamed( "longitude", "longitude_degrees") 
    @udf( FloatType())
    def degree_to_radian(x):
        return  x* np.pi / 180.
    l_ref = l_ref.withColumn("latitude", degree_to_radian("latitude_degrees") )
    l_ref = l_ref.withColumn("longitude", degree_to_radian("longitude_degrees") )
    l_ref = l_ref.withColumn("cos_latitude", F.cos("latitude") )  
    print(type(l_ref))
    return l_ref

In [25]:
df_stations_precompute =precompute_distance(df_stations_dedup)

<class 'pyspark.sql.dataframe.DataFrame'>


In [26]:
df_locations_precompute = precompute_distance(df_locations)

<class 'pyspark.sql.dataframe.DataFrame'>


In [27]:
df_stations_precompute.where(F.isnan("cos_latitude") ).show()

+----------+----------------+-----------------+---------+-----+------------+--------+--------+---------+------------+
|station_id|latitude_degrees|longitude_degrees|elevation|state|station_name|measured|latitude|longitude|cos_latitude|
+----------+----------------+-----------------+---------+-----+------------+--------+--------+---------+------------+
+----------+----------------+-----------------+---------+-----+------------+--------+--------+---------+------------+



# closest station for all fips

In [28]:
sub_fips = df_locations_precompute\
    .select("location_id", "fips", "county", "state", "latitude", "longitude", "cos_latitude")\
    .withColumnRenamed("latitude", "latitude_fips")\
    .withColumnRenamed("longitude", "longitude_fips")\
    .withColumnRenamed("cos_latitude", "cos_latitude_fips")

In [29]:
sub_stations = df_stations_precompute\
    .select("station_id", "measured","latitude", "longitude", "cos_latitude")\
    .withColumnRenamed("latitude", "latitude_station")\
    .withColumnRenamed("longitude", "longitude_station")\
    .withColumnRenamed("cos_latitude", "cos_latitude_station")

In [30]:
fips_cross_stations = sub_fips.crossJoin(sub_stations)

In [31]:
#%%time
fips_cross_stations.count()
#76993432

76993432

In [32]:
fips_cross_stations.printSchema()

root
 |-- location_id: long (nullable = false)
 |-- fips: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude_fips: float (nullable = true)
 |-- longitude_fips: float (nullable = true)
 |-- cos_latitude_fips: double (nullable = true)
 |-- station_id: string (nullable = true)
 |-- measured: string (nullable = true)
 |-- latitude_station: float (nullable = true)
 |-- longitude_station: float (nullable = true)
 |-- cos_latitude_station: double (nullable = true)



In [33]:
def delta_coord(col1, col2):
    return F.pow( F.sin(0.5* (col1 - col2 ) ), 2 )

In [34]:
def haversine( lat1, long1, cos_lat1, lat2, long2, cos_lat2):
    ''' computation of angular distance between 2 locations given by GPS coordinates,
    using haversine formula.
    exact formulas (maybe overkill), taken from :
    https://www.movable-type.co.uk/scripts/latlong.html
    '''    
    delta_lat = delta_coord(lat1, lat2)
    delta_long = delta_coord(long1, long2)
    a = delta_lat + delta_long * cos_lat1 * cos_lat2
    return  F.atan2( F.sqrt(a), F.sqrt( 1.-a ) )

In [35]:
# compute all pair-wise distances btw fips and weather stations
dist_col = "angle"
df_cross_distance = fips_cross_stations.withColumn(dist_col,                                  
        haversine(col("latitude_fips"), col("longitude_fips"),col("cos_latitude_fips"), 
                    col("latitude_station"), col("longitude_station"), col("cos_latitude_station")) )

In [36]:
# for each measurement and fips, keep only the station with the smallest distance (e.g. smallest angle)
window = Window.partitionBy("measured", "location_id")
df_min_distance = df_cross_distance.withColumn("min_angle", F.min(dist_col).over(window))\
            .filter( col(dist_col) == col("min_angle") ) #\
           # .drop("min_angle")

In [37]:
R_earth = 6371

In [38]:
df_cross_distance.count()

76993432

In [39]:
df_locations_precompute.count()

3272

In [40]:
3274*4

13096

In [41]:
# compute distance from angle
df_min_distance = df_min_distance.withColumn("distance", R_earth * col("angle"))

In [42]:
#%%time
# must be 13088 
df_min_distance.count()

13088

In [43]:
# join with fips and stations DB to get all relevant data
res_detailed = df_min_distance.alias("closest").join( df_locations.alias("fips"), ["location_id"]) \
        .join(unique_stations.alias("station"), "station_id")

In [44]:
#%%time
res_detailed.count()

13088

In [45]:
res_short = res_detailed.select("closest.measured", "location_id", "fips.fips", "fips.county", col("fips.state").alias("fips_state"),
                    "station_id", "station_name", col("station.state").alias("station_state"),
                      "angle", "distance"        )


In [46]:
res_short.printSchema()

root
 |-- measured: string (nullable = true)
 |-- location_id: long (nullable = false)
 |-- fips: string (nullable = true)
 |-- county: string (nullable = true)
 |-- fips_state: string (nullable = true)
 |-- station_id: string (nullable = true)
 |-- station_name: string (nullable = true)
 |-- station_state: string (nullable = true)
 |-- angle: double (nullable = true)
 |-- distance: double (nullable = true)



In [47]:
#%%time
# find fips and measurement with 
res_short.sort("distance", ascending = False).show(30)

+--------+------------+-----+--------------------+--------------------+-----------+--------------------+-------------+--------------------+------------------+
|measured| location_id| fips|              county|          fips_state| station_id|        station_name|station_state|               angle|          distance|
+--------+------------+-----+--------------------+--------------------+-----------+--------------------+-------------+--------------------+------------------+
|    SNOW|695784701974| null|             Unknown|              Hawaii|US1CASM0007|HALF MOON BAY 0.5...|           CA| 0.29661576550557106|1889.7390420359932|
|    SNOW|618475290632|15009|                Maui|              Hawaii|US1CASM0007|HALF MOON BAY 0.5...|           CA|  0.2955648047390893| 1883.043370992738|
|    SNOW|377957122061|15001|              Hawaii|              Hawaii|US1CASM0007|HALF MOON BAY 0.5...|           CA| 0.29521110773626774|1880.7899673877619|
|    SNOW|266287972361|15005|             Kala

In [48]:
#out_path = os.path.join(project_path, "OUT_DATA", "stations_per_fips")
#res_short.write.partitionBy("measured").format("parquet").mode("overwrite").save(out_path)

## Outliers

In [49]:
outliers = res_short.where(col("distance")>100).cache()

In [50]:
#%%time
outliers.count()

23

In [51]:
outliers.groupBy("measured").count().show()

+--------+-----+
|measured|count|
+--------+-----+
|    TMIN|    1|
|    SNOW|   19|
|    AWND|    2|
|    PRCP|    1|
+--------+-----+



In [52]:
#%%time
pd_outliers = outliers.toPandas()

In [53]:
pd_outliers.sort_values("distance", ascending = False)

Unnamed: 0,measured,location_id,fips,county,fips_state,station_id,station_name,station_state,angle,distance
18,SNOW,695784701974,,Unknown,Hawaii,US1CASM0007,HALF MOON BAY 0.5 SSW,CA,0.296616,1889.739042
15,SNOW,618475290632,15009.0,Maui,Hawaii,US1CASM0007,HALF MOON BAY 0.5 SSW,CA,0.295565,1883.043371
12,SNOW,377957122061,15001.0,Hawaii,Hawaii,US1CASM0007,HALF MOON BAY 0.5 SSW,CA,0.295211,1880.789967
1,SNOW,266287972361,15005.0,Kalawao,Hawaii,USC00502587,DUTCH HARBOR,AK,0.292157,1861.329641
14,SNOW,163208757271,15003.0,Honolulu,Hawaii,USC00502587,DUTCH HARBOR,AK,0.2884,1837.397841
10,SNOW,506806140934,15007.0,Kauai,Hawaii,USC00502587,DUTCH HARBOR,AK,0.281888,1795.907502
6,SNOW,4,2016.0,Aleutians West Census Area,Alaska,USC00502587,DUTCH HARBOR,AK,0.081137,516.925004
19,AWND,4,2016.0,Aleutians West Census Area,Alaska,USW00025628,ST GEORGE ISLAND AP,AK,0.073584,468.803797
2,PRCP,4,2016.0,Aleutians West Census Area,Alaska,USW00025711,ST. PAUL 4 NE 70309,AK,0.073424,467.78338
7,TMIN,4,2016.0,Aleutians West Census Area,Alaska,USW00025711,ST. PAUL 4 NE 70309,AK,0.073424,467.78338


In [54]:
# remove mappings where distance from fips to station is more than 600 km
res_short = res_short.where( col("distance") < 600)

In [55]:
res_short.count()

13082

In [56]:
map_location_stations = res_short.select("location_id", "measured", "station_id", "distance")

In [61]:
out_path = os.path.join(output_path, "map_locations_stations" )
map_location_stations\
    .write\
    .partitionBy("measured")\
    .format("parquet")\
    .mode("overwrite")\
    .save(out_path)

# Output selected stations

In [69]:
# keep only selected weather data
col_stations = stations.columns
selected_stations = stations.join(map_location_stations, on = ["station_id"])\
    .select(*col_stations)\
    .distinct()

In [71]:
selected_stations.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- elevation: float (nullable = true)
 |-- state: string (nullable = true)
 |-- station_name: string (nullable = true)



In [72]:
out_path = os.path.join(output_path, "weather_stations" )
selected_stations.write\
    .format("parquet")\
    .mode("overwrite")\
    .save(out_path)    

## Output weather data

In [74]:
path = os.path.join(local_data_dir, "WEATHER", "2020.csv.gz")
weather = spark.read.csv(path,  
                         schema = "station_id string, date string, measured string, value string, measurement_flag string, quality_flag string, source_flag string, hour string")

In [75]:
unique_stations = map_location_stations\
                .select("measured", "station_id")\
                .distinct()

In [76]:
selected_weather = weather.filter(weather["quality_flag"].isNull())\
    .join( unique_stations, on = ["station_id", "measured"])\
    .select("measured", "station_id", "date", "value")

In [77]:
out_path = os.path.join(output_path, "weather_records")
selected_weather.write\
    .partitionBy("measured", "date")\
    .format("parquet")\
    .mode("overwrite")\
    .save(out_path)

KeyboardInterrupt: 