# People Names and Storms, Any Correlation?
### Data Engineering Capstone Project

#### Project Summary
Build an ETL pipeline that pulls publicly available state, baby name, and storm datasets into an S3 bucket, processes it with Spark, and writes it back to S3 as a set of dimension and fact tables.

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
sc.install_pypi_package("pandas==0.23.2")
sc.install_pypi_package("geopy==2.2.0")
sc.install_pypi_package("certifi==2019.11.28")
sc.install_pypi_package("xlrd==0.9.4")
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1666040033804_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==0.23.2
  Using cached pandas-0.23.2-cp36-cp36m-manylinux1_x86_64.whl (8.9 MB)
Collecting python-dateutil>=2.5.0
  Using cached python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.23.2 python-dateutil-2.8.2

Collecting geopy==2.2.0
  Using cached geopy-2.2.0-py3-none-any.whl (118 kB)
Collecting geographiclib<2,>=1.49
  Using cached geographiclib-1.52-py3-none-any.whl (38 kB)
Installing collected packages: geographiclib, geopy
Successfully installed geographiclib-1.52 geopy-2.2.0

Collecting certifi==2019.11.28
  Using cached certifi-2019.11.28-py2.py3-none-any.whl (156 kB)
Installing collected packages: certifi
Successfully installed certifi-2019.11.28

Collecting xlrd==0.9.4
  Downloading xlrd-0.9.4-py3-none-any.whl (143 kB)
Installing collected packages: xlrd
Successfully installed xlrd-0.9.4

In [2]:
import configparser
import datetime
import os
import time
import logging
from pyspark.sql import SparkSession, DataFrame
from pyspark.context import SparkContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import *
from functools import reduce
import pandas as pd
import numpy as np
from pyspark.sql.types import StructType as R, StructField as Fld

bucket = 's3a://jccapstonedata/'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Step 1: Scope the Project and Gather Data

#### Scope 
* Extract data on various storms and join it with SS name and state datasets. The storm data undergoes some initial transformations with pandas to make it readily availble for further processing in PySpark.
* The raw datasets are uploaded to S3, explored in greater detail with Apache Spark and Pandas, and then reuploaded to S3 as a set of dimension and fact tables. 
* The end goal is to provide enough information for an analyst to be able to answer questions regarding naming patterns between people and storms, popular names by state over a range of years, and general information regarding storms through the same range of years. 

#### Describe and Gather Data 
1. HURDAT2: Comes from the Hurricane Research Division's (HRD) Atlantic Oceanographic and Meteorological Laboratory (AOML) and provides storm name, status, maintained wind speed, location(s) over a storm's course, and other information (~55,000 rows)
2. Name: Comes from the Social Security Administration (SSA) and provides birth name, birth year, birth state, sex, and name popularity (~6.3 million rows)
3. State: Come from Kaggle and provides state name, code, region, and division (51 rows)
4. Saffir-Simpson Wind Scale: Comes from the National Hurrican Center and provides a hurricane's categorization based on maintained wind speeds (5 rows)

### Create Saffir-Simpson Hurricane Wind Scale

In [3]:
data = [{'category': '1', 'min_sustained_wind_kt': 64, 'max_sustained_wind_kt': 82, 'brief_damage_description': 'Power outages that could last a few to several days.'}, 
            {'category': '2', 'min_sustained_wind_kt': 83, 'max_sustained_wind_kt': 95, 'brief_damage_description': 'Near-total power loss is expected with outages that could last from several days to weeks.'},
            {'category': '3-MAJOR', 'min_sustained_wind_kt': 96, 'max_sustained_wind_kt': 112, 'brief_damage_description': 'Electricity and water will be unavailable for several days to weeks after the storm passes.'},
            {'category': '4-MAJOR', 'min_sustained_wind_kt': 113, 'max_sustained_wind_kt': 136, 'brief_damage_description': 'Catastrophic damage will occur; most of the area will be uninhabitable for weeks or months.'},
            {'category': '5-MAJOR', 'min_sustained_wind_kt': 137, 'max_sustained_wind_kt': 1000000, 'brief_damage_description': 'Catastrophic damage will occur; most of the area will be uninhabitable for weeks or months.'}]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
pd_wind_scale = pd.DataFrame(data)

wind_scale = pd_wind_scale[['category', 'min_sustained_wind_kt', 'max_sustained_wind_kt', 'brief_damage_description']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
spark_wind_scale = spark.createDataFrame(data)

saffir_simpson_hurricane_wind_scale = spark_wind_scale.select('category', 'min_sustained_wind_kt', 'max_sustained_wind_kt', 
                                                             'brief_damage_description')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [6]:
saffir_simpson_hurricane_wind_scale.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/wind_scale.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Read Storm Data

In [7]:
raw_storm_data = spark.read.csv(os.path.join(bucket, '*', 'raw_storm_data.csv'), header=True)
raw_storm_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+--------+-------------------+-------+----+------+-------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|_c0|       0|                  1|      2|   3|     4|      5|   6|    7|    8|    9|   10|   11|   12|   13|   14|   15|   16|   17|   18|   19|   20|
+---+--------+-------------------+-------+----+------+-------+----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|  0|AL011851|            UNNAMED|     14|null|  null|   null|null| null| null| null| null| null| null| null| null| null| null| null| null| null| null|
|  1|18510625|               0000|       |  HU| 28.0N|  94.8W|  80| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999|
|  2|18510625|               0600|       |  HU| 28.0N|  95.4W|  80| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999| -999|
|  3|18510625|               1200|       |  HU| 28.0N|  96.0W|  80| -999| -999| -999| -9

### Read Names by State Data

In [8]:
raw_name_data = spark.read.text(os.path.join(bucket, '*', '*', '*.TXT'))
raw_name_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|               value|
+--------------------+
|  CA,F,1910,Mary,295|
| CA,F,1910,Helen,239|
|CA,F,1910,Dorothy...|
|CA,F,1910,Margare...|
|CA,F,1910,Frances...|
+--------------------+
only showing top 5 rows

### Read State Data

In [9]:
raw_state_data = spark.read.json(os.path.join(bucket, '*', 'states.json'))
raw_state_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------+----------+----------+
|          Division|Region|     State|State Code|
+------------------+------+----------+----------+
|           Pacific|  West|    Alaska|        AK|
|East South Central| South|   Alabama|        AL|
|West South Central| South|  Arkansas|        AR|
|          Mountain|  West|   Arizona|        AZ|
|           Pacific|  West|California|        CA|
+------------------+------+----------+----------+
only showing top 5 rows

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues.

##### Check for missing values

In [10]:
def missing_values(df):
    num_nulls = df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    return num_nulls

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Check for duplicates

In [11]:
def duplicates(df):
    num_dups = df.groupBy(df.columns).count().filter("count > 1").count()
    return num_dups

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##### Check for missing values and duplicates in name data

In [12]:
print('Missing values in baby name dataset')
missing_values(raw_name_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Missing values in baby name dataset
+-----+
|value|
+-----+
|    0|
+-----+

In [13]:
print('Total duplicates in baby name dataset:', duplicates(raw_name_data))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total duplicates in baby name dataset: 0

##### Check for missing values and duplicates in state data

In [14]:
print('Missing values in states dataset')
missing_values(raw_state_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Missing values in states dataset
+--------+------+-----+----------+
|Division|Region|State|State Code|
+--------+------+-----+----------+
|       0|     0|    0|         0|
+--------+------+-----+----------+

In [15]:
print('Total duplicates in states dataset:', duplicates(raw_state_data))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total duplicates in states dataset: 0

##### Check for missing values and duplicates in raw storm data

In [16]:
print('Missing values in storm dataset')
missing_values(raw_storm_data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Missing values in storm dataset
+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|  0|  1|  2|   3|   4|   5|   6|   7|   8|   9|  10|  11|  12|  13|  14|  15|  16|  17|  18|  19|  20|
+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|  0|  0|  0|1933|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|1936|
+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+

In [17]:
print('Total duplicates in storm dataset:', duplicates(raw_storm_data))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total duplicates in storm dataset: 0

#### Cleaning Steps
Very little clean up is necessary. A deeper dive shows the missing values in the storm data set come from the header rows not having the same number of columns as the rest of the dataset.

In [18]:
print(raw_storm_data.filter(raw_storm_data['3'].isNull()).count())
raw_storm_data.filter(raw_storm_data['3'].isNull()).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1933
+---+--------+-------------------+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|       0|                  1|      2|   3|   4|   5|   6|   7|   8|   9|  10|  11|  12|  13|  14|  15|  16|  17|  18|  19|  20|
+---+--------+-------------------+-------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  0|AL011851|            UNNAMED|     14|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
| 15|AL021851|            UNNAMED|      1|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
| 17|AL031851|            UNNAMED|      1|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
| 19|AL041851|            UNNAMED|     49|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|
| 69|AL051851|            UNNAMED|     16|null|null|null|null|nu

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

![Schema](Capstone_Schema.png)

#### 3.2 Mapping Out Data Pipelines
At a high level, the transformations are done in both Pandas and Spark as Spark proved to perform poorly in certain cases. <br> As a data engineer I need to,
1. Combine storm data row-wise and remove unnecessary columns
* Make separate header dataframe and statistics dataframe and join them to create unique rows for storm data
2. Filter out unnamed storms for storm and baby name analysis
3. Breakdown storm id to get years for storm and baby name analysis
4. Create a wind scale table to give storms typed as hurricanes a category for further analysis
5. Combine storm and name data on name to analyze potential trends

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### Baby Names by State Table

In [19]:
split_col = F.split(raw_name_data.value, ',')
names_by_state = raw_name_data.withColumn('state_code', split_col.getItem(0)) \
.withColumn('sex', split_col.getItem(1)) \
.withColumn('birth_year', split_col.getItem(2)) \
.withColumn('birth_name', split_col.getItem(3)) \
.withColumn('count', split_col.getItem(4)) \
.drop(raw_name_data.value)

names_by_state = names_by_state.orderBy(['birth_year', 'state_code', 'birth_name']) \
.withColumn('name_id', F.row_number().over(Window.partitionBy().orderBy('birth_year', 'state_code', 'birth_name'))) \
.withColumn('decade', (F.floor(F.col('birth_year')/10)*10).cast('int')) \
.withColumn('birth_name', F.upper(F.col('birth_name')))

baby_names_by_state = names_by_state.select('name_id', 'birth_name', 'birth_year', 'state_code', 'sex', 'count', 'decade')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
baby_names_by_state.write.partitionBy('state_code', 'birth_year').mode(
        'overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/names_by_state.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
baby_names_by_state.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+----------+----------+----------+---+-----+------+
|name_id|birth_name|birth_year|state_code|sex|count|decade|
+-------+----------+----------+----------+---+-----+------+
|      1|      ANNA|      1910|        AK|  F|   10|  1910|
|      2|     ANNIE|      1910|        AK|  F|   12|  1910|
|      3|      CARL|      1910|        AK|  M|    5|  1910|
|      4|   DOROTHY|      1910|        AK|  F|    5|  1910|
|      5|    EDWARD|      1910|        AK|  M|    5|  1910|
+-------+----------+----------+----------+---+-----+------+
only showing top 5 rows

### States table

In [22]:
state_data = raw_state_data.withColumnRenamed('State', 'state') \
.withColumnRenamed('State Code', 'state_code') \
.withColumnRenamed('Region', 'region') \
.withColumnRenamed('Division', 'division') \
.withColumn('state_id', F.row_number().over(Window.partitionBy().orderBy('state', 'state_code')))

state_data = state_data.select('state_id', 'state_code', 'state', 'region', 'division')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
state_data.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/state_data.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
state_data.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+----------+----------+------+------------------+
|state_id|state_code|     state|region|          division|
+--------+----------+----------+------+------------------+
|       1|        AL|   Alabama| South|East South Central|
|       2|        AK|    Alaska|  West|           Pacific|
|       3|        AZ|   Arizona|  West|          Mountain|
|       4|        AR|  Arkansas| South|West South Central|
|       5|        CA|California|  West|           Pacific|
+--------+----------+----------+------+------------------+
only showing top 5 rows

### Atlantic Storms Table

Atlantic storms header table (helper)

In [25]:
identified_storms = raw_storm_data.filter(raw_storm_data['0'].contains('AL')).withColumnRenamed('0', 'storm_id') \
.withColumnRenamed('1', 'storm_name').withColumnRenamed('2', 'entries') \
.drop('3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20')

atlantic_storms_by_year = identified_storms.withColumn('storm_id', F.trim(identified_storms.storm_id)) \
.withColumn('storm_name', F.trim(F.upper(F.col('storm_name')))) \
.withColumn('entries', F.trim(identified_storms.entries).cast(IntegerType())) \
.withColumn('basin', F.substring(F.col('storm_id'), 1, 2)) \
.withColumn('atcf_cyclone_num', F.substring(F.col('storm_id'), 3, 2).cast(IntegerType())) \
.withColumn('storm_year', F.substring(F.col('storm_id'), 5, 8).cast(IntegerType())) \
.withColumn('header_id', identified_storms._c0.cast(IntegerType())) \
.drop('_c0') \
.dropDuplicates()

atlantic_storms_header = atlantic_storms_by_year.withColumn('header_id', atlantic_storms_by_year['header_id'].cast(IntegerType())).orderBy(F.asc('header_id')) \
.select('header_id', 'storm_id', 'storm_name', 'entries', 'basin', 'atcf_cyclone_num', 'storm_year')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
atlantic_storms_header.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/storm_data/storm_headers.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
atlantic_storms_header.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------+----------+-------+-----+----------------+----------+
|header_id|storm_id|storm_name|entries|basin|atcf_cyclone_num|storm_year|
+---------+--------+----------+-------+-----+----------------+----------+
|        0|AL011851|   UNNAMED|     14|   AL|               1|      1851|
|       15|AL021851|   UNNAMED|      1|   AL|               2|      1851|
|       17|AL031851|   UNNAMED|      1|   AL|               3|      1851|
|       19|AL041851|   UNNAMED|     49|   AL|               4|      1851|
|       69|AL051851|   UNNAMED|     16|   AL|               5|      1851|
+---------+--------+----------+-------+-----+----------------+----------+
only showing top 5 rows

Atlantic storms stats table (helper)

In [28]:
spark_storm_stats = raw_storm_data.filter(~raw_storm_data['0'].contains('AL')) \
.withColumnRenamed('0', 'date') \
.withColumnRenamed('1', 'time') \
.withColumnRenamed('2', 'record_identifier') \
.withColumnRenamed('3', 'storm_status') \
.withColumnRenamed('4', 'latitude') \
.withColumnRenamed('5', 'longitude') \
.withColumnRenamed('6', 'max_sustained_wind_kt')\
.withColumnRenamed('7', 'min_pressure_mbar') \
.drop('8', '9', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20')

spark_storm_stats = spark_storm_stats.withColumn('storm_date', F.trim(spark_storm_stats.date)) \
.withColumn('storm_time', F.trim(spark_storm_stats.time)) \
.withColumn('record_identifier', F.trim(spark_storm_stats.record_identifier)) \
.withColumn('storm_status', F.trim(spark_storm_stats.storm_status)) \
.withColumn('max_sustained_wind_kt', F.trim(spark_storm_stats['max_sustained_wind_kt']).cast(IntegerType()))\
.withColumn('min_pressure_mbar', F.trim(spark_storm_stats['min_pressure_mbar']).cast(IntegerType())) \
.withColumn('stat_id', spark_storm_stats._c0.cast(IntegerType()))

# extract year, month, and day from date column
spark_storm_stats = spark_storm_stats.withColumn('storm_date', F.to_date(F.col('date'),'yyyyMMdd') )
spark_storm_stats = spark_storm_stats.withColumn('storm_year', F.year(spark_storm_stats.storm_date).cast(IntegerType())) \
.withColumn('storm_month', F.month(spark_storm_stats.storm_date).cast(IntegerType())) \
.withColumn('storm_day', F.dayofmonth(spark_storm_stats.storm_date).cast(IntegerType())) 

spark_storm_stats = spark_storm_stats.select('stat_id', 'storm_date', 'storm_year', 'storm_month', 'storm_day', 'storm_time', 'record_identifier', 
                           'storm_status', 'latitude', 'longitude', 'max_sustained_wind_kt', 'min_pressure_mbar')

atlantic_storm_stats = spark_storm_stats.toPandas()

# tranform storm_status and record_identifier in atlantic_storms_stats
atlantic_storm_stats.storm_status = atlantic_storm_stats.storm_status.str.strip() \
.map({'HU': 'hurricane', 'TS': 'tropical storm', 'EX': 'extratropical cyclone',
     'TD': 'tropical depression', 'LO': 'low pressure system', 'DB': 'disturbance', 
      'SD': 'subtropical depression', 'SS': 'subtropical storm', 'WV': 'tropical wave'})

atlantic_storm_stats.record_identifier = atlantic_storm_stats.record_identifier.str.strip()  \
.map({'': '', 'L': 'landfall', 'R': 'intensity details with rapid changes',
      'I': 'pressure and wind intensity peak', 'P': 'min central pressure',
      'T': 'clarify track detail', 'W': 'max sustained wind speed',
      'C': 'approach to coast, no landfall', 'S': 'status change in system',
      'G': 'genesis of the system'})

# transform lat and long
lat_north = pd.to_numeric(atlantic_storm_stats['latitude'].str[:-1])
lat_south = pd.to_numeric(atlantic_storm_stats['latitude'].str[:-1])*-1
long_east = pd.to_numeric(atlantic_storm_stats['longitude'].str[:-1])
long_west = pd.to_numeric(atlantic_storm_stats['longitude'].str[:-1])*-1

atlantic_storm_stats['latitude'] = np.where(
    atlantic_storm_stats['latitude'].str[-1:] == 'N', lat_north, lat_south)
atlantic_storm_stats['longitude'] = np.where(
    atlantic_storm_stats['longitude'].str[-1:] == 'E', long_east, long_west)

# get hurricane category by sustained_wind(kt)

atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'] < wind_scale['min_sustained_wind_kt'][0]), atlantic_storm_stats['storm_status'], 'uncategorized')
atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'].between(wind_scale['min_sustained_wind_kt'][0], wind_scale['max_sustained_wind_kt'][0]+1)), wind_scale['category'][0], atlantic_storm_stats['storm_category'])
atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'].between(wind_scale['min_sustained_wind_kt'][1], wind_scale['max_sustained_wind_kt'][1]+1)), wind_scale['category'][1], atlantic_storm_stats['storm_category'])
atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'].between(wind_scale['min_sustained_wind_kt'][2], wind_scale['max_sustained_wind_kt'][2]+1)), wind_scale['category'][2], atlantic_storm_stats['storm_category'])
atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'].between(wind_scale['min_sustained_wind_kt'][3], wind_scale['max_sustained_wind_kt'][3]+1)), wind_scale['category'][3], atlantic_storm_stats['storm_category'])
atlantic_storm_stats['storm_category'] = np.where((atlantic_storm_stats['max_sustained_wind_kt'] >= wind_scale['min_sustained_wind_kt'][4]), wind_scale['category'][4], atlantic_storm_stats['storm_category'])

pd_storm_stats = atlantic_storm_stats[['stat_id', 'storm_date', 'storm_year', 'storm_month', 'storm_day', 'storm_time', 
      'record_identifier', 'storm_status', 'storm_category', 'latitude', 'longitude', 'max_sustained_wind_kt', 'min_pressure_mbar']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Define some schemas for storm data

In [29]:
from pyspark.sql.types import StructType as R, StructField as Fld

storm_stats_schema = StructType([
    StructField('index', StringType()),
    StructField('storm_date', DateType()),
    StructField('storm_year', IntegerType()),
    StructField('storm_month', IntegerType()),
    StructField('storm_day', IntegerType()),
    StructField('storm_time', StringType()),
    StructField('record_identifier', StringType()),
    StructField('storm_status', StringType()),
    StructField('category', StringType()),
    StructField('latitude', DoubleType()),
    StructField('longitude', DoubleType()),
    StructField('max_sustained_wind_kt', IntegerType()),
    StructField('min_pressure_mbar', IntegerType())
])

storm_schema = R([
    Fld('storm_id', StringType()),
    Fld('basin', StringType()),
    Fld('atcf_cyclone_num', IntegerType()),
    Fld('storm_name', StringType()),
    Fld('storm_date', DateType()),
    Fld('storm_year', IntegerType()),
    Fld('storm_month', IntegerType()),
    Fld('storm_day', IntegerType()),
    Fld('storm_time', StringType()),
    Fld('record_identifier', StringType()),
    Fld('storm_status', StringType()),
    Fld('storm_category', StringType()),
    Fld('latitude', DoubleType()),
    Fld('longitude', DoubleType()),
    Fld('max_sustained_wind_kt', IntegerType()),
    Fld('min_pressure_mbar', IntegerType())
])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
atlantic_storm_stats = spark.createDataFrame(pd_storm_stats, storm_stats_schema)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
atlantic_storm_stats.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/storm_data/storm_stats.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
atlantic_storm_stats.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------+----------+-----------+---------+----------+-----------------+------------+--------+--------+---------+---------------------+-----------------+
|index|storm_date|storm_year|storm_month|storm_day|storm_time|record_identifier|storm_status|category|latitude|longitude|max_sustained_wind_kt|min_pressure_mbar|
+-----+----------+----------+-----------+---------+----------+-----------------+------------+--------+--------+---------+---------------------+-----------------+
|    1|1851-06-25|      1851|          6|       25|      0000|                 |   hurricane|       1|    28.0|    -94.8|                   80|             -999|
|    2|1851-06-25|      1851|          6|       25|      0600|                 |   hurricane|       1|    28.0|    -95.4|                   80|             -999|
|    3|1851-06-25|      1851|          6|       25|      1200|                 |   hurricane|       1|    28.0|    -96.0|                   80|             -999|
|    4|1851-06-25|      1851

### Atlantic Storms Table (PANDAS)

In [33]:
data = []

storms_by_year = atlantic_storms_header.toPandas()

storm_ids = list(storms_by_year.storm_id)
names = list(storms_by_year.storm_name)
basins = list(storms_by_year.basin)
cyclone_nums = list(storms_by_year.atcf_cyclone_num)
entries = storms_by_year.entries.apply(lambda x: int(x)).tolist()

cols = ['storm_id', 'storm_name', 'basin', 'atcf_cyclone_num', 'storm_date', 'storm_year', 'storm_time', 'record_identifier',
    'storm_status', 'storm_category', 'latitude', 'longitude', 'max_sustained_wind_kt', 'min_pressure_mbar']

for i, nrows, nxt in zip(range(len(storms_by_year['header_id'])), entries, storms_by_year['header_id']):
    temp_df = pd.DataFrame(pd_storm_stats.loc[(pd_storm_stats['stat_id'] >= nxt) & (pd_storm_stats['stat_id'] <= (nxt + nrows))])
    temp_df['storm_id'] = storm_ids[i]
    temp_df['storm_name'] = names[i]
    temp_df['basin'] = basins[i]
    temp_df['atcf_cyclone_num'] = cyclone_nums[i]
    temp_df = temp_df[cols]
    data.append(temp_df)

pd_atlantic_storms = pd.concat(
        data).reset_index(drop=True)[['storm_id', 'basin', 'atcf_cyclone_num', 'storm_name', 'storm_date', 'storm_year', 'storm_time', 'record_identifier',
        'storm_status', 'storm_category', 'latitude', 'longitude', 'max_sustained_wind_kt', 'min_pressure_mbar']]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
atlantic_storms = spark.createDataFrame(pd_atlantic_storms)

atlantic_storms = atlantic_storms.withColumn('atl_id', F.row_number().over(Window.partitionBy().orderBy('storm_id'))) \
.select('atl_id', 'storm_id', 'basin', 'atcf_cyclone_num', 'storm_name', 'storm_date', 'storm_year', 'storm_time', 'record_identifier',
    'storm_status', 'storm_category', 'latitude', 'longitude', 'max_sustained_wind_kt', 'min_pressure_mbar')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
atlantic_storms.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/dim_tables/storm_data/atlantic_storms.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
atlantic_storms.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------+-----+----------------+----------+----------+----------+----------+-----------------+------------+--------------+--------+---------+---------------------+-----------------+
|atl_id|storm_id|basin|atcf_cyclone_num|storm_name|storm_date|storm_year|storm_time|record_identifier|storm_status|storm_category|latitude|longitude|max_sustained_wind_kt|min_pressure_mbar|
+------+--------+-----+----------------+----------+----------+----------+----------+-----------------+------------+--------------+--------+---------+---------------------+-----------------+
|     1|AL011851|   AL|               1|   UNNAMED|1851-06-25|      1851|      0000|                 |   hurricane|             1|    28.0|    -94.8|                   80|             -999|
|     2|AL011851|   AL|               1|   UNNAMED|1851-06-25|      1851|      0600|                 |   hurricane|             1|    28.0|    -95.4|                   80|             -999|
|     3|AL011851|   AL|               1|   UNNAMED

### Named Atlantic Storms with US Landfall Table

Geopy to map latitude and longitude to location

In [37]:
import ssl
import certifi
import geopy.geocoders
from geopy.geocoders import Nominatim

geopy.geocoders.options.default_ssl_context=ssl.create_default_context(cafile=certifi.where())

geolocator=Nominatim(user_agent="capstone", scheme='http', timeout=None)

pd_atlantic_storms = atlantic_storms.toPandas()

pd_state_data = state_data.toPandas()

named_atlantic_storms=pd_atlantic_storms.loc[~pd_atlantic_storms['storm_name'].str.contains(
    'UNNAMED')]
records=named_atlantic_storms.loc[named_atlantic_storms['record_identifier'] == 'landfall'].reset_index(
    drop=True)

states_with_codes=dict(
    zip(pd_state_data['state'], pd_state_data['state_code']))

location_data=[]
no_state=0

for row in records.itertuples(index=False):
    dct={}
    location=geolocator.reverse(f'{row.latitude}, {row.longitude}')
    if location is not None:
        storm_state=location.raw.get('address').get('state')
        if storm_state not in states_with_codes.keys():
            no_state+=1
            continue
        else:
            dct['atl_id']=row.atl_id
            dct['storm_id']=row.storm_id
            dct['storm_name']=row.storm_name
            dct['storm_year']=row.storm_year
            dct['storm_status']=row.storm_status
            dct['storm_category']=row.storm_category
            dct['state']=storm_state
            dct['latitude']=row.latitude
            dct['longitude']=row.longitude
            temp_df=pd.DataFrame([dct], columns=['storm_id', 'storm_name', 'storm_year',
                                 'storm_status', 'storm_category', 'latitude', 'longitude', 'state'])
            location_data.append(temp_df)
    else:
        no_state+=1

storms_with_landfall=pd.concat(location_data, sort=False).reset_index().drop_duplicates()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
swl = spark.createDataFrame(storms_with_landfall)

swl = swl.withColumn('swl_id', F.row_number().over(Window.partitionBy().orderBy('storm_id', 'storm_name')))

named_atlantic_storms_with_us_landfall = swl.join(state_data, 'state', 'inner').select(swl.swl_id, swl.storm_id, swl.storm_name, swl.storm_year,
                                 swl.storm_status, swl.storm_category, state_data.state_code)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
named_atlantic_storms_with_us_landfall.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/fact_tables/storms_with_us_landfall.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
named_atlantic_storms_with_us_landfall.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------+----------+----------+--------------+--------------+----------+
|swl_id|storm_id|storm_name|storm_year|  storm_status|storm_category|state_code|
+------+--------+----------+----------+--------------+--------------+----------+
|     1|AL011953|     ALICE|      1953|tropical storm|tropical storm|        FL|
|     2|AL011955|    BRENDA|      1955|tropical storm|tropical storm|        LA|
|     3|AL011959|    ARLENE|      1959|tropical storm|tropical storm|        LA|
|     4|AL011966|      ALMA|      1966|     hurricane|             1|        FL|
|     5|AL011968|      ABBY|      1968|tropical storm|tropical storm|        FL|
+------+--------+----------+----------+--------------+--------------+----------+
only showing top 5 rows

### Names by Person and Storm

In [41]:
storms = named_atlantic_storms_with_us_landfall.alias('storms')
names = names_by_state.alias('names')
winds_scale = saffir_simpson_hurricane_wind_scale.alias('winds_scale')

names_by_person_and_storm = storms.join(names, storms.storm_name == names.birth_name) \
.join(winds_scale, storms.storm_category == winds_scale.category, 'left') \
.select(storms.swl_id, storms.storm_id, names.name_id, storms.storm_name, names.birth_name.alias('baby_name'), storms.storm_year, 
        names.birth_year.alias('baby_birth_year'), storms.state_code, names.sex.alias('baby_sex'), storms.storm_status, 
        storms.storm_category, winds_scale.brief_damage_description.alias('storm_damage_description'), names['count'].alias('name_count'))

names_by_person_and_storm = names_by_person_and_storm.withColumn('bsn_id', F.row_number().over(Window.partitionBy().orderBy(names_by_person_and_storm.name_count)))

names_by_person_and_storm = names_by_person_and_storm.select('bsn_id', 'swl_id', 'storm_id', 'name_id', 'storm_name', 'baby_name', 'storm_year', 'baby_birth_year', 
                                                             'state_code', 'storm_status', 'storm_category', 'storm_damage_description', 'baby_sex', 'name_count')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
names_by_person_and_storm.write.mode('overwrite').parquet(os.path.join(bucket, 'transformed_data/fact_tables/names_by_person_and_storm_fact.parquet'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
names_by_person_and_storm.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+--------+-------+----------+---------+----------+---------------+----------+-----------------+-----------------+------------------------+--------+----------+
|bsn_id|swl_id|storm_id|name_id|storm_name|baby_name|storm_year|baby_birth_year|state_code|     storm_status|   storm_category|storm_damage_description|baby_sex|name_count|
+------+------+--------+-------+----------+---------+----------+---------------+----------+-----------------+-----------------+------------------------+--------+----------+
|     1|   245|AL132011|   3429|       LEE|      LEE|      2011|           1910|        LA|subtropical storm|subtropical storm|                    null|       M|        10|
|     2|   245|AL132011|   9787|       LEE|      LEE|      2011|           1910|        LA|subtropical storm|subtropical storm|                    null|       M|        10|
|     3|   245|AL132011|  17667|       LEE|      LEE|      2011|           1911|        LA|subtropical storm|subtropical storm|        

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [44]:
print('raw_storm_data count before transformations:', raw_storm_data.count())
print('atlantic_storms count after tranformations:', atlantic_storms.count())
print('atlantic_storms_header count after tranformations:', atlantic_storms_header.count())
print('atlantic_storm_stats count after tranformations:',  atlantic_storm_stats.count())

print('\nStorms data completeness check passed!') if atlantic_storms.count() + atlantic_storms_header.count() == raw_storm_data.count() else print('\nStorms data completeness \
check failed, some rows may be MIA.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

raw_storm_data count before transformations: 55437
atlantic_storms count after tranformations: 53501
atlantic_storms_header count after tranformations: 1936
atlantic_storm_stats count after tranformations: 53501

Storms data completeness check passed!

In [45]:
print('raw_name_data count before transformations:', raw_name_data.count())
print('names_by_state count after tranformations:', names_by_state.count())

print('\nNames data completeness check passed!') if raw_name_data.count() == names_by_state.count() else print('\nNames data completeness check failed, some rows may be MIA.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

raw_name_data count before transformations: 6311504
names_by_state count after tranformations: 6311504

Names data completeness check passed!

In [46]:
print('raw_state_data count before transformations:', raw_state_data.count())
print('state_data count after tranformations:', state_data.count())

print('\nState data completeness check passed!') if raw_name_data.count() == names_by_state.count() else print('\nState data completeness check failed, some rows may be MIA.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

raw_state_data count before transformations: 51
state_data count after tranformations: 51

State data completeness check passed!

Atlantic Storm table latitude and longitude quality check

In [47]:
lat_bool = str(atlantic_storms['latitude'])[-1:] == 'N'
long_bool = str(atlantic_storms['longitude'])[-1:] == 'W'

print('storm_data latitude bool:', lat_bool)
print('storm_data longitude bool:', long_bool)
print('Latitude and longitude quality check passed!') if (lat_bool is False & long_bool is False) else print('Uh oh, your filtering may have excluded some rows.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

storm_data latitude bool: False
storm_data longitude bool: False
Latitude and longitude quality check passed!

Quality check for storms with names and landfall in the US

In [48]:
with_state = named_atlantic_storms_with_us_landfall.count() #271
total = len(records) # 663
print('Storms with landfall:', total)
print('Storms with landfall in the US:', with_state)
print('Storms with landfall not in the US:', no_state)
print('Named Atlantic Storms with US Landfall completeness check passed!') if (with_state + no_state) == total else print('Uh oh, your filtering for Named Atlantic Storms \
with US Landfall may have excluded some rows.')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Storms with landfall: 663
Storms with landfall in the US: 271
Storms with landfall not in the US: 392
Named Atlantic Storms with US Landfall completeness check passed!

#### 4.3 Data dictionary 

In [None]:
print('capstone_data_dictionary.xlsx')

#### 4.4. Data Validation

In [50]:
names_by_person_and_storm_fact = spark.read.parquet(os.path.join(bucket, '*', '*', 'names_by_person_and_storm_fact.parquet'))
names_by_person_and_storm_fact.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+------+--------+-------+----------+---------+----------+---------------+----------+------------+--------------+------------------------+--------+----------+
|bsn_id|swl_id|storm_id|name_id|storm_name|baby_name|storm_year|baby_birth_year|state_code|storm_status|storm_category|storm_damage_description|baby_sex|name_count|
+------+------+--------+-------+----------+---------+----------+---------------+----------+------------+--------------+------------------------+--------+----------+
|     1|    83|AL041992|  71592|    ANDREW|   ANDREW|      1992|           1913|        FL|   hurricane|       5-MAJOR|    Catastrophic dama...|       M|        10|
|     2|    83|AL041992|  77047|    ANDREW|   ANDREW|      1992|           1913|        FL|   hurricane|       5-MAJOR|    Catastrophic dama...|       M|        10|
|     3|    83|AL041992|  77918|    ANDREW|   ANDREW|      1992|           1913|        FL|   hurricane|       5-MAJOR|    Catastrophic dama...|       M|        10|
|     4|  

In [51]:
fact_table = names_by_person_and_storm_fact.withColumn('name_count', F.col('name_count').cast(IntegerType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
df1 = fact_table.filter(((F.col('storm_year') == 2001) & (F.col('storm_name') == 'ALLISON')  & (F.col('state_code') == 'LA')) & ((fact_table.baby_birth_year == 1999) | \
                                    (fact_table.baby_birth_year == 2000) | (fact_table.baby_birth_year == 2001) | (fact_table.baby_birth_year == 2003)))
qry1 = df1.groupBy(df1.storm_name, df1.storm_year, df1.storm_status,  df1.storm_category,  df1.baby_name, df1.baby_birth_year, \
                  df1.state_code).agg({'name_count': 'max'}).sort('baby_birth_year')
qry1.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+--------------------+--------------------+---------+---------------+----------+---------------+
|storm_name|storm_year|        storm_status|      storm_category|baby_name|baby_birth_year|state_code|max(name_count)|
+----------+----------+--------------------+--------------------+---------+---------------+----------+---------------+
|   ALLISON|      2001|subtropical depre...|subtropical depre...|  ALLISON|           1999|        LA|            572|
|   ALLISON|      2001|subtropical depre...|subtropical depre...|  ALLISON|           2000|        LA|            538|
|   ALLISON|      2001|subtropical depre...|subtropical depre...|  ALLISON|           2001|        LA|            539|
|   ALLISON|      2001|subtropical depre...|subtropical depre...|  ALLISON|           2003|        LA|            545|
+----------+----------+--------------------+--------------------+---------+---------------+----------+---------------+

In [53]:
df2 = fact_table.filter(((F.col('storm_year') == 2020) & (F.col('storm_name') == 'ISAIAS')) & ((fact_table.baby_birth_year == 2019) | \
                                    (fact_table.baby_birth_year == 2020) | (fact_table.baby_birth_year == 2021)))
qry2 = df2.groupBy(df2.storm_name, df2.storm_year, df2.storm_status,  df2.storm_category,  df2.baby_name, df2.baby_birth_year, df2.state_code).agg({'name_count': 'max'}) \
.sort('baby_birth_year')
qry2.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+------------+--------------+---------+---------------+----------+---------------+
|storm_name|storm_year|storm_status|storm_category|baby_name|baby_birth_year|state_code|max(name_count)|
+----------+----------+------------+--------------+---------+---------------+----------+---------------+
|    ISAIAS|      2020|   hurricane|             1|   ISAIAS|           2019|        NC|            129|
|    ISAIAS|      2020|   hurricane|             1|   ISAIAS|           2020|        NC|            139|
|    ISAIAS|      2020|   hurricane|             1|   ISAIAS|           2021|        NC|            127|
+----------+----------+------------+--------------+---------+---------------+----------+---------------+

In [54]:
df3 = fact_table.filter(((F.col('storm_year') == 2005) & (F.col('storm_name') == 'DENNIS')) & ((fact_table.baby_birth_year == 2004) | \
                                    (fact_table.baby_birth_year == 2005) | (fact_table.baby_birth_year == 2006) | (fact_table.baby_birth_year == 2010)))
qry3 = df3.groupBy(df3.storm_name, df3.storm_year, df3.storm_status,  df3.storm_category,  df3.baby_name, df3.baby_birth_year, df3.state_code).agg({'name_count': 'max'}) \
.sort('baby_birth_year')
qry3.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+------------+--------------+---------+---------------+----------+---------------+
|storm_name|storm_year|storm_status|storm_category|baby_name|baby_birth_year|state_code|max(name_count)|
+----------+----------+------------+--------------+---------+---------------+----------+---------------+
|    DENNIS|      2005|   hurricane|       3-MAJOR|   DENNIS|           2004|        FL|            114|
|    DENNIS|      2005|   hurricane|       3-MAJOR|   DENNIS|           2005|        FL|            106|
|    DENNIS|      2005|   hurricane|       3-MAJOR|   DENNIS|           2006|        FL|            118|
|    DENNIS|      2005|   hurricane|       3-MAJOR|   DENNIS|           2010|        FL|             76|
+----------+----------+------------+--------------+---------+---------------+----------+---------------+

In [55]:
df4 = fact_table.filter(((F.col('storm_year') == 2017) & (F.col('storm_name') == 'HARVEY')  & (F.col('state_code') == 'LA')) & ((fact_table.baby_birth_year == 2015) | \
                                    (fact_table.baby_birth_year == 2016) | (fact_table.baby_birth_year == 2017) | (fact_table.baby_birth_year == 2018)))
qry4 = df4.groupBy(df4.storm_name, df4.storm_year, df4.storm_status,  df4.storm_category,  df4.baby_name, df4.baby_birth_year, df4.state_code).agg({'name_count': 'max'}) \
.sort('baby_birth_year')
qry4.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+----------+--------------+--------------+---------+---------------+----------+---------------+
|storm_name|storm_year|  storm_status|storm_category|baby_name|baby_birth_year|state_code|max(name_count)|
+----------+----------+--------------+--------------+---------+---------------+----------+---------------+
|    HARVEY|      2017|tropical storm|tropical storm|   HARVEY|           2015|        LA|             95|
|    HARVEY|      2017|tropical storm|tropical storm|   HARVEY|           2016|        LA|             83|
|    HARVEY|      2017|tropical storm|tropical storm|   HARVEY|           2017|        LA|             91|
|    HARVEY|      2017|tropical storm|tropical storm|   HARVEY|           2018|        LA|             53|
+----------+----------+--------------+--------------+---------+---------------+----------+---------------+

#### Step 5: Complete Project Write Up
**Rationale for the choice of tools and technologies**<br>
Initially, Spark and Hadoop were chosen to carry out the big-data related tasks. A combination because Spark is faster than Hadoop but Hadoop has a distributed file system that Spark lacks. Udacity focused on using Spark with Hadoop during the lessons so this was more familiar; however, Spark proved to be a poor choice for the data given it's size. When comparing the performance of comparable functionaility in Pandas and Spark, Spark took on average 7 times longer than Pandas to execute. For this reason, where applicable transformations are executed in Pandas and then the data is converted to a Spark dataframe.<br>
**How often the data should be updated and why?**<br>
The name data and storm data would most likely be updated yearly; the state data and wind_scale data are stagnant for the time being. Updating more frquently than annually would be a gross misuse of resources.<br>

#### Write a description of how you would approach the problem differently under the following scenarios:
**The data was increased by 100x.** <br>
I would reconsider using ALL Spark with more nodes as I believe it would be more performant handling the data of that size. <br>
**The data populates a dashboard that must be updated on a daily basis by 7am every day.** <br>
I would use airflow to set up a scheduled run while strongly discouraging updating the data on that schedule. <br>
**The database needed to be accessed by 100+ people.** <br>
As the data is in S3 I don't forsee this being an issue. Per Amazon's documentation, S3 can handle up to 5,500 GET requests per seconds and allows concurrent access. Using Spark to perform real time queries would run great in this situation. If a team of analyst want to work with the data in a database the onus would be on them to set one up using the transformed data from S3.