**Udacity Data Engineering Capstone Project**<br/>
Avraam Marimpis <avraam.marimpis@gmail.com>, October 2020

- - -


# Imports 

In [2]:
import sys
sys.path.append('config/')
sys.path.append('common/')

import config
import data as cnf_data
import aws_dwh
import preprocess_fn

In [3]:
import pyspark
import pyspark.sql.functions as fn
import pyspark.sql.types as t

In [4]:
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [5]:
import json

In [6]:
from functools import reduce

In [7]:
def unionAll(*dfs):
    """ Helper utility function to perform `union` on multiple Dataframes.
    
    Parameters
    ----------
    dfs: list
        A list of pyspark.sql.Dataframes.
    
    Returns
    -------
    df: pyspark.sql.Dataframe
        A new Dataframe that is the result of the unification of the given Dataframes.
    """
    return reduce(pyspark.sql.DataFrame.unionAll, dfs)

# Local AWS credentials and settings 

In [8]:
dwh = aws_dwh.parse_dwh()

# Add AWS/S3 JARs to Spark 

In [9]:
if not config.APP_DEV:
    spark.stop()
    
    spark = SparkSession.builder \
                .appName("my_app") \
                .config('spark.sql.codegen.wholeStage', False) \
                .config("spark.driver.extraClassPath", "/home/vagrant/opt/libs/aws-java-sdk-1.7.4.jar") \
                .config("spark.jars", "/home/vagrant/opt/libs/hadoop-aws-2.7.2.jar") \
                .getOrCreate()
    
    spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", dwh['aws']['access_key_id'])
    spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", dwh['aws']['secret_access_key'])

    spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
#     spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
    spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", f"s3.{dwh['aws']['region']}.amazonaws.com")

    sc = spark.sparkContext
    sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

# Load datasets

The datasets "Air Quality", "US droughts" and "Global Temperatures" are read from Parquet for which, we have well-defined the schemas earlier so there's no need to do it again.

However, for the dataset "Wildfires", we read it from CSV and we can define a simple schema to meet our demands.

## Dataset "Wildfires" 

In [None]:
dataset_wildfires = cnf_data.dataset_wildfiles()
print(f"Dataset source: {dataset_wildfires}")

In [None]:
schema = t.StructType([
    t.StructField("OBJECTID", t.StringType()),
    t.StructField("FOD_ID", t.StringType()),
    t.StructField("FPA_ID", t.StringType()),
    t.StructField("FIRE_CODE", t.StringType()),
    t.StructField("FIRE_NAME", t.StringType()),
    t.StructField("ICS_209_INCIDENT_NUMBER", t.StringType()),
    t.StructField("ICS_209_NAME", t.StringType()),
    t.StructField("MTBS_ID", t.StringType()),
    t.StructField("MTBS_FIRE_NAME", t.StringType()),
    t.StructField("COMPLEX_NAME", t.StringType()),
    t.StructField("FIRE_YEAR", t.IntegerType()),
    t.StructField("DISCOVERY_DATE", t.StringType()),
    t.StructField("DISCOVERY_DOY", t.StringType()),
    t.StructField("DISCOVERY_TIME", t.StringType()),
    t.StructField("STAT_CAUSE_CODE", t.FloatType()),
    t.StructField("STAT_CAUSE_DESCR", t.StringType()),
    t.StructField("CONT_DATE", t.StringType()),
    t.StructField("CONT_DOY", t.StringType()),
    t.StructField("CONT_TIME", t.StringType()),
    t.StructField("FIRE_SIZE", t.FloatType()),
    t.StructField("FIRE_SIZE_CLASS", t.StringType()),
    t.StructField("STATE", t.StringType()),
    t.StructField("COUNTY", t.StringType()),
    t.StructField("FIPS_CODE", t.StringType()),
    t.StructField("FIPS_NAME", t.StringType()),
    t.StructField("DISCOVERY_DATE_converted", t.DateType()),
    t.StructField("CONT_DATE_converted", t.DateType())
])

In [None]:
df_wf = spark.read.csv(dataset_wildfires, header=True, schema=schema)

### Exploratory Analysis 

In [None]:
df_wf.describe().toPandas().transpose()

Let's begin by dropping some unecessary fields

In [None]:
drop_fields = [
    'FIRE_CODE', 'FIRE_NAME',
    'ICS_209_INCIDENT_NUMBER', 'ICS_209_NAME',
    'MTBS_ID', 'MTBS_FIRE_NAME',
    'COMPLEX_NAME', 'DISCOVERY_DATE', 'CONT_DATE'
]

In [None]:
df_wf = df_wf.drop(*drop_fields)

In [None]:
fields_checked, missing_df = preprocess_fn.missing_fields_perc(df_wf, threshold=0.25)

In [None]:
fields_checked

In [None]:
if len(missing_df.columns) == 0:
    print(f"There no missing values in the given Dataframe.")
else:
    print(missing_df)

In [None]:
r = preprocess_fn.count_duplicates(df_wf)
print(f"There are {r} rows duplicate (out of {df_wf.count()} total records).")

In [None]:
# New partition columns
part_cols = {
    "part_year": fn.year(fn.col("DISCOVERY_DATE_converted")),
    "part_month": fn.month(fn.col("DISCOVERY_DATE_converted"))
}

for new_col, col_fn in part_cols.items():
    df_wf = df_wf.withColumn(new_col, col_fn)

In [None]:
len(df_wf.columns)

In [None]:
df_wf = df_wf.na.fill("")

In [None]:
if cnf_data.DATASET_STORE == "local":
    !rm -rf {config.ARTIFACTS}/wildfires
    df_wf.coalesce(1).write.mode("overwrite").format('json').save(f"{config.TEMP}/wildfires/")
    !mv {config.TEMP}/wildfires/ {config.ARTIFACTS}/
else:
    df_wf.coalesce(1).write.mode("overwrite").format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/wildfires/")

## Dataset "Air Quality" 

In [None]:
dataset_air_quality = cnf_data.dataset_air_quality()
print(f"Dataset source: {dataset_air_quality}")

In [None]:
df_aq = spark.read.parquet(dataset_air_quality)

### Exploratory Analysis

In [None]:
df_aq.describe().toPandas().transpose()

In [None]:
# aqi = df_aq.groupby("state_name")\
#             .agg(fn.avg("aqi").alias("avg_aqi"))\
#             .sort(fn.col("avg_aqi").desc())\
#             .toPandas()

Let's check the for columns with missing or no values, and their percentage

In [None]:
fields_checked, missing_df = preprocess_fn.missing_fields_perc(df_aq, threshold=0.25)

In [None]:
if len(missing_df.columns) == 0:
    print(f"There no missing values in the given Dataframe.")
else:
    print(missing_df.toPandas().transpose())

Let's try do identify which states are missing values

In [None]:
amount_missing_df = df_aq.groupby("state_name")\
                    .agg(fn.count(fn.when(fn.isnan("aqi") | fn.col("aqi").isNull(), "aqi")).alias("missing_values"))\
                    .sort(fn.col("missing_values").desc())\
                    .toPandas()

In [None]:
amount_missing_df.head(5)

It seems that the state of California reports the least.

Let's find out the average and overall AQI (Air Quality Index) per state.

In [None]:
aqi_per_state = df_aq.groupby("state_name")\
                    .agg(fn.avg("aqi").alias("avg_aqi"))\
                    .sort(fn.col("avg_aqi").desc())\
                    .toPandas()

In [None]:
aqi_per_state.head(10)

In [None]:
r = preprocess_fn.count_duplicates(df_aq)
print(f"There are {r} rows duplicate (out of {df_aq.count()} total records).")

Unfortunately the most important field in our analysis is also one of the impacted one.

We could try different methods of imputing (filling in) these missing values; such as the `Imputer` class from the package `pyspark.ml.feature` but the `AQI` is a discrete index.

In [None]:
# New partition columns
part_cols = {
    "part_year": fn.year(fn.col("date_of_last_change")),
    "part_month": fn.month(fn.col("date_of_last_change"))
}

for new_col, col_fn in part_cols.items():
    df_aq = df_aq.withColumn(new_col, col_fn)

In [None]:
df_aq = df_aq.na.fill(-1, subset=["method_code"])

In [None]:
df_aq = df_aq.na.fill("")

In [None]:
if cnf_data.DATASET_STORE == "local":
    !rm -rf {config.ARTIFACTS}/airquality
#     df_aq.write.mode("overwrite").partitionBy("part_year").csv("/tmp/airquality.csv")
#     df_aq.coalesce(1).write.format('json').save('/tmp/airquality')
    df_aq.coalesce(1).write.mode("overwrite").format('json').save(f"{config.TEMP}/airquality/")
    !mv /tmp/airquality {config.ARTIFACTS}/
else:
    # partitionBy fails, so we have to skip it :-()
    #df_aq.write.mode("overwrite").partitionBy("part_year").parquet(f"{dwh['s3']['bucket-1']['FQN']}/airquality")
#     df_aq.coalesce(1).write.format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/airquality/")
    df_aq.coalesce(1).write.mode("overwrite").format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/airquality/")


## Dataset "US droughts" 

In [10]:
dataset_us_drougts = cnf_data.dataset_us_drougts()
print(f"Dataset source: {dataset_us_drougts}")

Dataset source: /mtgp/UDACITY/artifacts/sample_us_droughts


In [11]:
df_droughts = spark.read.parquet(dataset_us_drougts)

### Exploratory Analysis 

In [12]:
df_droughts.columns

['releaseDate',
 'FIPS',
 'county',
 'state',
 'NONE',
 'D0',
 'D1',
 'D2',
 'D3',
 'D4',
 'validStart',
 'validEnd',
 'domStatisticFormatID',
 'county_cleaned']

In [12]:
df_droughts.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
FIPS,141751,31418.57105769977,16231.042267302484,1001,72153
county,141751,,,Abbeville County,Ziebach County
state,141751,,,AK,WY
NONE,141751,60.9302231378358,47.10116246375515,0.0,100.0
D0,141751,39.069776861972194,47.10116246283229,0.0,100.0
D1,141751,22.504482648842068,40.30043959972282,0.0,100.0
D2,141751,12.022394056306913,31.143338089766747,0.0,100.0
D3,141751,5.12338346868,20.88409465365992,0.0,100.0
D4,141751,1.3942584530056623,10.906642966203608,0.0,100.0


In [13]:
fields_checked, missing_df = preprocess_fn.missing_fields_perc(df_droughts, threshold=0.25)

In [14]:
if len(missing_df.columns) == 0:
    print(f"There no missing values in the given Dataframe.")
else:
    print(missing_df.toPandas().transpose())

There no missing values in the given Dataframe.


Even with `threshold` set to `0.0`, it seems that there are no empty records.

In [15]:
r = preprocess_fn.count_duplicates(df_droughts)
print(f"There are {r} rows incomplete (out of {df_droughts.count()} total records).")

There are 6913 rows incomplete (out of 141751 total records).


In [16]:
drop_columns = ["domStatisticFormatID"]

In [17]:
df_droughts = df_droughts.drop(*drop_columns)

First, let's find out the number of droughts per state and per year

In [18]:
pd_states_year_count = df_droughts.groupby(["state", fn.year(fn.col("releaseDate")).alias("year")])\
                    .agg(fn.count(fn.col("state")).alias("count"))\
                    .sort([fn.col("state"), fn.col("year")])\
                    .toPandas()

In [19]:
pd_states_year_count

Unnamed: 0,state,year,count
0,AK,2000,72
1,AK,2001,57
2,AK,2002,72
3,AK,2003,76
4,AK,2004,75
...,...,...,...
876,WY,2012,65
877,WY,2013,61
878,WY,2014,75
879,WY,2015,57


A pivot table will summarize the results better.

In [20]:
pivot_tbl = pd_states_year_count.pivot("state", "year", "count")

In [21]:
pivot_tbl.head(5)

year,2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016
state,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1
AK,72.0,57.0,72.0,76.0,75.0,60.0,78.0,60.0,71.0,67.0,56.0,70.0,72.0,70.0,81.0,81.0,69.0
AL,164.0,179.0,179.0,178.0,180.0,174.0,168.0,155.0,179.0,168.0,170.0,166.0,168.0,167.0,169.0,175.0,135.0
AR,206.0,224.0,200.0,205.0,174.0,186.0,190.0,179.0,182.0,193.0,217.0,193.0,197.0,184.0,214.0,183.0,176.0
AZ,38.0,39.0,42.0,38.0,28.0,41.0,38.0,39.0,40.0,37.0,38.0,37.0,49.0,41.0,34.0,48.0,39.0
CA,161.0,146.0,175.0,141.0,149.0,157.0,144.0,167.0,148.0,147.0,145.0,150.0,162.0,143.0,133.0,159.0,127.0


In [22]:
# New partition columns
part_cols = {
    "part_year": fn.year(fn.col("releaseDate")),
    "part_month": fn.month(fn.col("releaseDate"))
}

for new_col, col_fn in part_cols.items():
    df_droughts = df_droughts.withColumn(new_col, col_fn)

In [26]:
df_droughts = df_droughts.na.fill(-1.0, subset=["NONE"])

In [27]:
df_droughts = df_droughts.na.fill("")

In [None]:
if cnf_data.DATASET_STORE == "local":
    !rm -rf {config.ARTIFACTS}/droughts
#     df_droughts.write.mode("overwrite").partitionBy("part_year").parquet("/tmp/droughts")
    df_droughts.coalesce(1).write.mode("overwrite").format('json').save(f"{config.TEMP}/droughts/")
    !mv {config.TEMP}/droughts/ {config.ARTIFACTS}/
else:
#     df_droughts.write.mode("overwrite").parquet(f"{dwh['s3']['bucket-1']['FQN']}/droughts")
    df_droughts.coalesce(1).write.mode("overwrite").format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/droughts")

## Dataset "Global Temperatures" 

In [None]:
dataset_global_temps = cnf_data.dataset_global_temps()
print(f"Dataset source: {dataset_global_temps}")

In [None]:
df_temps = spark.read.parquet(dataset_global_temps)

### Exploratory Analysis

In [None]:
df_temps.columns

In [None]:
df_temps.describe().toPandas().transpose()

In [None]:
fields_checked, missing_df = preprocess_fn.missing_fields_perc(df_temps, threshold=0.25)

In [None]:
if len(missing_df.columns) == 0:
    print(f"There no missing values in the given Dataframe.")
else:
    print(missing_df.toPandas().transpose())

Even with `threshold` set to `0.0`, it seems that there are no empty records.

In [None]:
r = preprocess_fn.count_duplicates(df_temps)
print(f"There are {r} rows incomplete (out of {df_temps.count()} total records).")

Let's check the average temperature per state / per year

In [None]:
pd_states_year_count = df_temps.groupby(["State", fn.year(fn.col("dt")).alias("year")])\
                    .agg(fn.avg(fn.col("AverageTemperature")).alias("avg_temp"))\
                    .sort([fn.col("State"), fn.col("year")])\
                    .toPandas()

Again, a pivot table creates a readable summary of the results

In [None]:
pivot_tbl = pd_states_year_count.pivot("State", "year", "avg_temp")

In [None]:
pivot_tbl.head(10)

After some exploration, we can come into the conclusion that the `NaN`s, actually represent missing rows for the specific years. So, we will have to insert dummy records, filled with zeros for the states that do not have these records.

We notice lots of missing values; let's try to impute them using the average temperature per state. We could also take the average temperature for each state per month across all the yearly recordings.

More sofisticated strategies include, building a linear regression model (although this depends whether or not the colinearity of the variables); and more data sources and observations. 

In [None]:
df = df_temps

In [None]:
# First, let's fill in the missing records
# Please consult the setting `config/data.py:DATESET_FILLIN_DT`.

In [None]:
if cnf_data.DATESET_FILLIN_DT:
    all_states = df_temps.select("State").distinct().collect()

    max_year, min_year = df_temps.select(fn.year(fn.col("dt")).alias("year"))\
                                .agg(
                                    fn.max(fn.col("year")).alias("max"),
                                    fn.min(fn.col("year")).alias("min")
                                ).collect()[0]

    temps_period = pd.period_range(start=min_year, end=max_year).to_native_types().tolist()

    new_dfs = []

    for state in all_states:
        df_state = df_temps.filter(fn.col("State") == state[0])
        df_dates = df_state.select(fn.date_format(fn.col("dt"), "YYYY-MM-dd").alias("dt_formatted")).sort(fn.col("dt_formatted")).distinct()

        state_periods = list(map(lambda row: row['dt_formatted'], df_dates.collect()))

        # Find the differences between the complete periods in the dataset, and the periods for the current state.
        set_full = set(temps_period)
        set_state = set(state_periods)
        diff = list(set_full - set_state)

        # Generate new rows
        make_rows = list(map(lambda dt: [
            datetime.datetime.strptime(dt, '%Y-%m-%d').date(),
            0.0,
            0.0,
            state[0],
    #         1,
    #         1
            datetime.datetime.strptime(dt, '%Y-%m-%d').year,
            datetime.datetime.strptime(dt, '%Y-%m-%d').month
        ], diff)
        )
        df_new_rows = spark.createDataFrame(make_rows, schema=df_temps.schema)
    #     df_temps = df_temps.union(df_new_rows)

        new_dfs.append(df_new_rows)

    new_df_temps = unionAll(*new_dfs)
    new_df_temps = df_temps.union(new_df_temps)

    if cnf_data.DATASET_STORE == "local":
        new_df_temps.write.mode("overwrite").parquet("/tmp/filled_temperatures")
        !mv /tmp/filled_temperatures/ {config.ARTIFACTS}/
    else:
        new_df_temps.write.mode("overwrite").parquet(f"s3a://{dwh['s3']['init_bucket']}/filled_temperatures")
        
    df = new_df_temps


In [None]:
# Now, let's impute the missing values; you may skip this step if it's taking too long
# Please consult the setting `config/data.py:DATASET_IMPUTE`.

In [None]:
if cnf_data.DATASET_IMPUTE:
    from pyspark.ml.feature import Imputer
    
    all_states = df.select("State").distinct().collect()
    
    new_dfs = []
    for state in all_states:
        imputer = Imputer(strategy="mean", inputCols=["AverageTemperature"], outputCols=["AverageTemperature_imputed"])
        df_state = df.filter(fn.col("State") == state[0])
        imputed_state = imputer.fit(df_state).transform(df_state)

        new_dfs.append(imputed_state)
        
    imputed_dfs = unionAll(*new_dfs)
    
    if cnf_data.DATASET_STORE == "local":
        !rm -rf {config.ARTIFACTS}/imputed_temperatures/
        imputed_dfs.coalesce(1).write.mode("overwrite").format('json').save(f"{config.TEMP}/imputed_temperatures/")
        !mv {config.TEMP}/imputed_temperatures/ {config.ARTIFACTS}/
    else:
        imputed_dfs.coalesce(1).write.mode("overwrite").format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/imputed_temperatures")
                                                                            
    df = imputed_dfs

In [None]:
pd_states_year_count = df.groupby(["State", fn.year(fn.col("dt")).alias("year")])\
                    .agg(fn.avg(fn.col("AverageTemperature_imputed")).alias("avg_temp"))\
                    .sort([fn.col("State"), fn.col("year")])\
                    .toPandas()

In [None]:
pivot_tbl = pd_states_year_count.pivot("State", "year", "avg_temp")

In [None]:
pivot_tbl.head(5)

In [None]:
# New partition columns
part_cols = {
    "part_year": fn.year(fn.col("dt")),
    "part_month": fn.month(fn.col("dt"))
}

for new_col, col_fn in part_cols.items():
    df = df.withColumn(new_col, col_fn)

In [None]:
# Fill in the missing with extreme values so as the JSON dumped, do not exclude these fields

df = df.na.fill(-10000.0, subset=["AverageTemperatureUncertainty"])
df = df.na.fill(-10000.0, subset=["AverageTemperature"])

In [None]:
if cnf_data.DATASET_STORE == "local":
    !rm -rf {config.ARTIFACTS}/temperatures
    df.coalesce(1).write.mode("overwrite").format('json').save(f"{config.TEMP}/temperatures/")
    !mv {config.TEMP}/temperatures/ {config.ARTIFACTS}/
else:
    df.coalesce(1).write.mode("overwrite").format('json').save(f"{dwh['s3']['bucket-1']['FQN']}/temperatures")

# Upload to S3

In [None]:
sys.path.append("common/")
import aws_dwh

## Upload JSON Paths to S3 

In [None]:
dirs = [
    f"{config.REDSHIFT_JSON_PATHS}"
]

for d in dirs:
    aws_dwh.upload_to_s3(d, dwh)

## Upload local artifacts to S3 

In [None]:
if cnf_data.DATASET_STORE == "local":
    dirs = [
        f"{config.ARTIFACTS}/wildfires",
        f"{config.ARTIFACTS}/imputed_temperatures",
        f"{config.ARTIFACTS}/temperatures",
        f"{config.ARTIFACTS}/airquality",
        f"{config.ARTIFACTS}/droughts"
    ]

    for d in dirs:
        aws_dwh.upload_to_s3(d, dwh)