# Open Water Swims
### Data Engineering Capstone Project

#### Project Summary
**Open Water Swims** is an application for open water swimmers. Swimmers can "check-in or check-out" to different swims. In the context of this project, we only track open water swims in the United States but it could be extended worldwide in the future.

The purpose of the project is to *extract* data from a swimslog which records all events where swimmers have checked in or checked out for swims. The project will also connect to publicly available data about buoys and water conditions (both provided by NOAA) to track the conditions associated to those swims, when available. In this project, we also have access to information about the available swims for which swimmers checked in/out.

The project follows the follow steps:
* **Step 1:** Scope the Project and Gather Data
* **Step 2:** Explore and Assess the Data
* **Step 3:** Define the Data Model
* **Step 4:** Run ETL to Model the Data
* **Step 5:** Run ETL on AWS EMR
* **Step 6:** Other considerations
* **Step 7:** Final Thoughts

The project will rely on the following Python packages.

In [1]:
# Packages to parse data files
from bs4 import BeautifulSoup
import requests
import pandas as pd
import xml.etree.ElementTree as ET
import urllib
from datetime import datetime

# spark packages
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType as R, StructField as Fld, ShortType as Short, \
    StringType as Str, FloatType as Float, DoubleType as Dbl, LongType as Long, TimestampType as TStamp


### Step 1: Scope the Project and Gather Data

#### Scope
The purpose of this project is to create a database model for an open water swims application. The project will extract  data from a swimming events log. It will also leverage publicly available data provided by NOAA. The purpose of this project is to create a **STAR** database schema, so it can be accessed by the application quickly and efficiently.

#### Technologies
As part of this project, we use the following technologies:

- **Pandas**: Pandas is a great tool to explore small data sets; it allows us to explore the structure and volume of the data.
- **Apache Spark**: Apache Spark is used to process *big data* sets. We first use it in *local* mode to validate our ETL process; then we use Apache Spark on an EMR cluster to process big data sets.
- **AWS EMR**: AWS clusters that we use to run Apache Spark on the big data set.
- **AWS S3**: used to store big data sets and fact/dimension tables created by this project.
- **AWS Redshift**: data warehouse where we load our dimension tables so they can be explored using visualization tools such as Tableau.

#### Describe and Gather Data 
We used the following public data sets to support our project:

- ***NOOA Buoy Data*** -- which is a .xml file which includes all the NOAA stations available, their location, owner, etc...This is a fairly small data set.
- ***NOAA Log file*** -- which provides ocean related data (water temp, air temp, wave height, wind dir, wind, ...) since 1970. In the context of this project, we'll only work with data since 2000. 

**NOAA** provided a great document to describe how to access their data. It's available [here](https://www.ndbc.noaa.gov/docs/ndbc_web_data_guide.pdf)

- ***Swims*** -- is a "fake" generated database which includes information on all the open water swims in United States. Each swim is scheduled to happen at a certain time and location. It is associated to a specific NOAA buoy, which provides us information on the water conditions. We have access to a history of swims since 2000. The data is organized by months and years. The dataset includes 5000 swims per month since 2000. The database includes **1.2 Million** swims).

- ***SwimsLog*** -- is also another artificially generated file which I created to simulate the events with the swims. This tracks when swimmers check in or check out for a swim. Each SwimsLog record provides the following information: an eventId, the timestamp of this event, swimmer first and last names, the status (whether the swimmer checked in or out), the location (latitude/longitude) of the swimmer when he/she checked in/out and the swimID for which the swimmer checked in/out. There are **2.4 Million** records for Swimslog.

The Swims and SwimsLog datasets were created by using http://www.generatedata.com/ pseudo-data provider.

### Step 2: Explore and Assess the Data
In this section, we'll explore the data sets to identify their structure, fields and potential quality issues.

#### Active Buoys Data
**NOAA** provides two key data sets : 
1. an xml file with the active buoys on their web-site and 
2. historical data associated to each buoy.

Let's first identify the active buoys. NOAA provides the information in a fairly small XML file. Since this is a small data set, we'll work with Pandas data frames.

In [2]:
buoys_url='https://www.ndbc.noaa.gov/activestations.xml'

response = urllib.request.urlopen(buoys_url).read()
root = ET.fromstring(response)

stations=root.getchildren()
print('There are {} active stations'.format(len(stations)))
d = []
for station in stations:
    d.append(station.attrib)
    
df_buoys = pd.DataFrame(d)
df_buoys.head(5)

There are 1435 active stations


Unnamed: 0,currents,dart,elev,id,lat,lon,met,name,owner,pgm,seq,type,waterquality
0,n,n,,922,30,-90,n,OTN201 - 4800922,Dalhousie University,IOOS Partners,,other,n
1,n,n,,923,30,-90,n,OTN200 - 4800923,Dalhousie University,IOOS Partners,,other,n
2,n,n,,1500,30,-90,n,SP031 - 3801500,SCRIPPS,IOOS Partners,,other,n
3,n,n,,1502,30,-90,n,Penobscot - 4801502,University of Maine,IOOS Partners,,other,n
4,n,n,,1503,30,-90,n,Saul - 4801503,Woods Hole Oceanographic Institution,IOOS Partners,,other,n


#### Buoy historical data
**NOAA** also provides historical data for its buoys. Data is available at the following URL: https://www.ndbc.noaa.gov/data/historical/stdmet/.
The filename starts with the buoy ID followed by the letter 'h' and the year for which the data is available. In the example below, we're accessing the 2008 data for the buoy 41004. If we look at the active buoy data (from previous step), we would know who owns this buoy and where it is located.
Note that the buoy ID was only included in the filename (and not in the data), so we decided to extract that information and added a column buoyID to the table.

In [3]:
# Read a sample buoy data log file to check the format/content
url = 'https://www.ndbc.noaa.gov/data/historical/stdmet/42036h2000.txt.gz'
ext = '.txt.gz'

df = pd.read_csv(url, sep='\s+')

# extract the buoyID from the file path then add it as a column to the Dataframe.
buoyID=url.rsplit('h', 1)[0].split('/')[-1]
df['buoyID'] = buoyID
df.head()

Unnamed: 0,YYYY,MM,DD,hh,WD,WSPD,GST,WVHT,DPD,APD,MWD,BAR,ATMP,WTMP,DEWP,VIS,TIDE,buoyID
0,2000,1,1,1,141,1.8,2.2,99.0,99.0,99.0,999,1020.3,20.4,20.7,14.6,99.0,,42036
1,2000,1,1,2,123,3.5,4.0,99.0,99.0,99.0,999,1020.3,20.4,20.7,15.5,99.0,,42036
2,2000,1,1,3,133,3.8,4.6,99.0,99.0,99.0,999,1020.5,20.4,20.7,16.3,99.0,,42036
3,2000,1,1,4,134,4.2,4.9,99.0,99.0,99.0,999,1020.5,20.3,20.6,16.6,99.0,,42036
4,2000,1,1,5,145,4.3,4.9,99.0,99.0,99.0,999,1020.3,20.4,20.6,17.4,99.0,,42036


In [4]:
df.count()

YYYY      8783
MM        8783
DD        8783
hh        8783
WD        8783
WSPD      8783
GST       8783
WVHT      8783
DPD       8783
APD       8783
MWD       8783
BAR       8783
ATMP      8783
WTMP      8783
DEWP      8783
VIS       8783
TIDE      3672
buoyID    8783
dtype: int64

Above, we can see the data provided for each buoy. When information is unavailable, NOAA provides 999 or 99 as an entry.
This varies by buoy and time. For example, for a period of time, there is no collection of the water temperature.
Note that the format varied over time. We also came across some table formats where the header and first rows were commented out (using the # character). In those cases, the units associated to each column were provided. We also discovered that some buoy tables were empty. See below the "commented out" format for a buoy with no data. Our ETL pipeline will need to address those situations.

In [5]:
# Read a sample buoy data log file to check the format/content
url = 'https://www.ndbc.noaa.gov/data/historical/stdmet/43WSLh2019.txt.gz'
ext = '.txt.gz'

df2 = pd.read_csv(url, sep='\s+')

# extract the buoyID from the file path then add it as a column to the Dataframe.
buoyID=url.rsplit('h', 1)[0].split('/')[-1]
df2['buoyID'] = buoyID

In [6]:
df2.head()

Unnamed: 0,#YY,MM,DD,hh,mm,WDIR,WSPD,GST,WVHT,DPD,APD,MWD,PRES,ATMP,WTMP,DEWP,VIS,TIDE,buoyID
0,#yr,mo,dy,hr,mn,degT,m/s,m/s,m,sec,sec,degT,hPa,degC,degC,degC,mi,ft,43WSL


NOAA provides all historical information for all its buoys in a specific folder. Let's check how many files we'll need to process.

In [7]:
# Read the buoy data log here
url = 'https://www.ndbc.noaa.gov/data/historical/stdmet/'
ext = '.txt.gz'

# get the list of data files
page = requests.get(url).text
soup = BeautifulSoup(page, 'html.parser')
fileList = [url + '/' + node.get('href') for node in soup.find_all('a') if node.get('href').endswith(ext)]

#create year filter
yearFilter=range(2000, 2025)
yearFilter=['h' + str(s) for s in yearFilter]
filteredFileList = [n for n in fileList if any(m in n for m in yearFilter)]
print('There are {} files since year 2000.'.format(len(filteredFileList)))

There are 11167 files since year 2000.


NOAA collects data on an hourly basis for each buoy. This means that we could have 24\*365 (**8,760**) records included in each file. We counted **11,167** txt.gz files post 2000 in this folder, which could translate into **98M** records!

In the context of this Jupyter Notebook, we'll just handle the files in 2000. We'll use the full fileset once we run the ETL process on AWS EMR.

In [8]:
# extract filelist in 2000 since this is the year for our test data.
smallerList=[n for n in filteredFileList if '2000.txt.gz' in n]
print(len(smallerList))

130


Let's create our **Spark session** to investigate our larger datasets.

In [9]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \
        .enableHiveSupport()\
        .getOrCreate()
# use the command below to improve efficiency
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
# only display Warning messages and above. Skips the INFO messages.
spark.sparkContext.setLogLevel('WARN')

**Important Note:** Spark can load data directly from csv files. However, Spark doesn't currently support multiple delimiter characters and this is what we unfortunately face with our data as the fields are separated by multiple spaces. This applies to the buoy historical dataset only.
To avoid this issue, we'll load the data into a pandas dataframe first, then load that data frame into a Spark dataframe.
Also, we decided to pick the data at noon the day of each swim, so we can reduce our dataset before it gets processed by Spark. This accelerates our data ingestion process.

Below is an example on how Spark loads a Pandas dataframe.

In [10]:
# define buoy data schema to improve performance
dfBuoy = spark.createDataFrame(df)

dfBuoy.printSchema()
dfBuoy.show(5)

root
 |-- YYYY: long (nullable = true)
 |-- MM: long (nullable = true)
 |-- DD: long (nullable = true)
 |-- hh: long (nullable = true)
 |-- WD: long (nullable = true)
 |-- WSPD: double (nullable = true)
 |-- GST: double (nullable = true)
 |-- WVHT: double (nullable = true)
 |-- DPD: double (nullable = true)
 |-- APD: double (nullable = true)
 |-- MWD: long (nullable = true)
 |-- BAR: double (nullable = true)
 |-- ATMP: double (nullable = true)
 |-- WTMP: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- VIS: double (nullable = true)
 |-- TIDE: double (nullable = true)
 |-- buoyID: string (nullable = true)

+----+---+---+---+---+----+---+----+----+----+---+------+----+----+----+----+----+------+
|YYYY| MM| DD| hh| WD|WSPD|GST|WVHT| DPD| APD|MWD|   BAR|ATMP|WTMP|DEWP| VIS|TIDE|buoyID|
+----+---+---+---+---+----+---+----+----+----+---+------+----+----+----+----+----+------+
|2000|  1|  1|  1|141| 1.8|2.2|99.0|99.0|99.0|999|1020.3|20.4|20.7|14.6|99.0| NaN| 42036|
|2000|  1|

In [11]:
for idx,f in enumerate(smallerList):
    url = f
    ext = '.txt.gz'
    print('Processing file:', url)
    df = pd.read_csv(url, sep='\s+')
    # extract the buoyID from the file path then add it as a column to the Dataframe.
    buoyID=url.rsplit('h', 1)[0].split('/')[-1]
    df['buoyID'] = buoyID
    df_filtered = df[df['hh'] == 12]  #only keeping 12 o'clock records.
    dftemp = spark.createDataFrame(df_filtered)
    if idx == 0 :
        dfBuoy=dftemp
    else:
        dfBuoy=dfBuoy.union(dftemp)
dfBuoy.count()

Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//42a02h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//42a03h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//42otph2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41001h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41002h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41004h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41008h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41009h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//41010h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//42001h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet//42002h2000.txt.gz
Processing file: https://www.ndbc.noaa.gov/data/historical/stdmet

42039

In [12]:
dfBuoy.show(5)

+----+---+---+---+---+----+----+----+----+----+---+------+----+----+-----+----+----+------+
|YYYY| MM| DD| hh| WD|WSPD| GST|WVHT| DPD| APD|MWD|   BAR|ATMP|WTMP| DEWP| VIS|TIDE|buoyID|
+----+---+---+---+---+----+----+----+----+----+---+------+----+----+-----+----+----+------+
|2000| 12|  1| 12|121| 5.2| 6.9|1.21|5.88|4.76| 78|1018.9|23.5|24.6|999.0|99.0|99.0| 42a02|
|2000| 12|  2| 12|356| 3.9| 5.2|0.95|7.69| 5.8|105|1021.0|23.2|24.5|999.0|99.0|99.0| 42a02|
|2000| 12|  3| 12| 29|10.5|13.1|2.48|7.69|6.05|  5|1025.8|17.6|24.5|999.0|99.0|99.0| 42a02|
|2000| 12|  4| 12| 43| 6.3| 7.2|1.38|6.67|5.27| 55|1025.2|19.1|24.5|999.0|99.0|99.0| 42a02|
|2000| 12|  5| 12| 21| 8.8|11.1|1.36|5.88| 5.1| 51|1026.0|19.0|24.9|999.0|99.0|99.0| 42a02|
+----+---+---+---+---+----+----+----+----+----+---+------+----+----+-----+----+----+------+
only showing top 5 rows



#### Swimslog data
We artificially created a swims log data set to simulate the way swimmers would check in/out to a swim.
The data is provided in a JSON file format for each month since 2000. As mentioned before, we have **2+ Million**  records. Let's see if the file can be loaded by Spark locally and display the first few records.
In the context of this notebook, we'll explore data in the data/swimslog/2000 folder. In production (on AWS EMR), we'll process 20 years of data.

In [13]:
dfSwimslog =spark.read.json('data/swimslog/2000/*.json', multiLine=True)
dfSwimslog.printSchema()

root
 |-- eventID: string (nullable = true)
 |-- eventLocation: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- swimID: long (nullable = true)
 |-- swimStatus: string (nullable = true)
 |-- swimmer: string (nullable = true)
 |-- swimmerEmail: string (nullable = true)



In [14]:
dfSwimslog.show(5)
dfSwimslog.count()

+--------------------+--------------------+---------+------+----------+------------------+--------------------+
|             eventID|       eventLocation|eventTime|swimID|swimStatus|           swimmer|        swimmerEmail|
+--------------------+--------------------+---------+------+----------+------------------+--------------------+
|68EB407E-9F70-B21...| -60.64933, 99.93537|973537720| 53711|        No|     Quon Mcclure |egestas.a.dui@atp...|
|150E9B7E-56BE-1E3...|  -54.64544, 78.4572|975202729| 51218|        No|     Hyatt Mooney |nostra@luctussit....|
|B290039C-60AB-55A...|-76.63623, 104.18138|974452207| 50427|       Yes| Penelope Delgado |Nullam@Donecnibh....|
|DF54C0CD-1A47-235...| 35.92507, -39.74448|974657087| 50239|        No|     Chaney Reese |pede@SeddictumPro...|
|6A2A4DD6-5EE8-8B8...|-13.06251, 166.54162|975074884| 52405|        No|    Charity Gould |Donec.egestas.Dui...|
+--------------------+--------------------+---------+------+----------+------------------+--------------

120000

As seen above, the swimslog records are (in theory) uniquely identified. They include an eventTime (expressed in timestamp) and location (expressed in long/lat coordinates) when the swimmer checked in/out. The swimslog records also refer to a swimID. The swimmer full name and email address are included along with the status which indicates if the swimmer participated or not in this swim. Finally, in the case above, we only processed 120K records which is what we had for year 2000 only.

#### Swims data set
We also have access to all the swims that happened since 2000. This is another artificially generated file.
As shown below, the data is structured using a JSON format. The BuoyID is a key aspect of the swim because it allows us to understand the swimming conditions provided by the NOAA data.
This swims data set includes **1.3M** records.

In [16]:
dfSwims =spark.read.json('data/swims/*/*.json', multiLine=True)
dfSwims.printSchema()
dfSwims.show(5)

root
 |-- BuoyID: string (nullable = true)
 |-- Coordinates: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- swimID: long (nullable = true)

+------+--------------------+----------+--------------+------+
|BuoyID|         Coordinates|      Date|      Location|swimID|
+------+--------------------+----------+--------------+------+
| 44089|43.10171, -161.63003|08/25/2000|   Blairgowrie| 38026|
| mxxa2|  46.54514, 96.71354|08/08/2000| Cercemaggiore| 36701|
| gbcl1|  6.87131, 151.53133|08/16/2000|           Pau| 38616|
| dpxc1|-30.22349, 161.03706|08/22/2000|     Wałbrzych| 39207|
| 18cy3| 78.00928, 160.68506|08/19/2000|Neubrandenburg| 37999|
+------+--------------------+----------+--------------+------+
only showing top 5 rows



In [17]:
# Some swims are sometimes associated with two different buoys. 
# In theory, this shouldn't be the case , so we're dropping dups.
dfSwims=dfSwims.dropDuplicates(['swimID'])

#### Cleaning & Formatting Steps

- dfSwims was provided with a simple date format. To align better with other data sets, we *exploded* this date formats into its main components (year, month, day). See below.

- Buoy data format changed over time. We need to accommodate both old and new data formats. We'll manage this problem through some transformations in our etl.py script.

In [18]:
# convert date string to date yyyy/MM/dd
dfSwims=dfSwims.withColumn("Year", date_format(to_date(col("Date"),"MM/dd/yyyy"), "yyyy"))\
                .withColumn("Month", date_format(to_date(col("Date"), "MM/dd/yyyy"), "MM"))\
                .withColumn("Day", date_format(to_date(col("Date"), "MM/dd/yyyy"), "dd"))
dfSwims.show(5)

+------+--------------------+----------+---------------+------+----+-----+---+
|BuoyID|         Coordinates|      Date|       Location|swimID|Year|Month|Day|
+------+--------------------+----------+---------------+------+----+-----+---+
| 52212| -84.22602, 65.75303|01/26/2000|            Sim|    26|2000|   01| 26|
| 14041| 79.0899, -117.79272|01/14/2000|      Rochester|    29|2000|   01| 14|
| 32069| 24.44656, 128.04441|01/02/2000|   Ponta Grossa|   474|2000|   01| 02|
| hplm2| 44.26266, -175.5417|01/19/2000|Tione di Trento|   964|2000|   01| 19|
| ducn7|-66.48567, -147.2...|01/16/2000|       Zlatoust|  1677|2000|   01| 16|
+------+--------------------+----------+---------------+------+----+-----+---+
only showing top 5 rows



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
We currently have disconnected databases and our goal is to create a **STAR** database schema where all relevant information is connected together. The STAR schema is an efficient architecture to record all swims and access data efficiently.

We have a **FACT** table which includes all the swims and some **DIMENSION** tables which provide more information on those swims. Dimension tables are: *Swimmers*, *Locations*, *Conditions* and *Time* tables.

A Database diagram can be found [here](./Documentation/DataModel.pdf).

#### 3.2 Mapping Out Data Pipelines
Here are the steps included in our data pipeline to build our STAR database. Each table will be written to the './data/output/' folder as part of this Jupyter Notebook. Tables can also be written to AWS S3 by using the etl.py script and setting the LOCATION parameter in the dl.cfg file to S3 (as opposed to LOCAL).

Using the Swimslog, Swims, Buoy data available, we take on the following actions: 

1. Create and write Swimmers table
2. Create and write Locations table
3. Create and write Conditions table
4. Create and write Time table
5. Create and write Swims table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [19]:
# create swimmers_table using dfSwimslog data
df2=dfSwimslog.dropDuplicates(['swimmer'])
swimmers_table=df2.select(split(col("swimmer"),'\s+').getItem(1).alias("firstName"), \
                    split(col("swimmer"),'\s+').getItem(2).alias("lastName"), col("swimmerEmail").alias("Email") )\
                  .withColumn("swimmerIdx", monotonically_increasing_id())

swimmers_table.printSchema()
swimmers_table.show(5)

# write swimmers_table
swimmers_table.write.mode('overwrite').parquet('./data/swimmers/swimmers_table.parquet')

# create location table using locations data from swims dataframe 
locations_table=dfSwims.select(col("Location"), split(col("Coordinates"), ',').getItem(0).alias("Longitude"), \
                                split(col("Coordinates"), ',').getItem(1).alias("Latitude"))\
                       .dropDuplicates(['Location']).withColumn("locationIdx", monotonically_increasing_id())
locations_table.printSchema()
locations_table.show(5)

# write locations_table
locations_table.write.mode('overwrite').parquet('./data/locations/locations_table.parquet')

# create conditions table using buoys and swims data
conditions_table=dfSwims.join(dfBuoy, ((dfBuoy['buoyID'] == dfSwims['BuoyID']) & \
                                        (dfBuoy['YYYY'] == dfSwims['Year']) & \
                                       (dfBuoy['MM'] == dfSwims["Month"]) & \
                                        (dfBuoy['DD'] == dfSwims['Day']))) \
                         .select(dfBuoy.buoyID,
                                    dfBuoy.ATMP, 
                                    dfBuoy.WTMP,
                                    dfBuoy.TIDE,
                                    dfBuoy.WVHT,
                                    dfBuoy.WD,
                                    dfBuoy.WSPD,
                                    dfSwims.Year,
                                    dfSwims.Month,
                                    dfSwims.Day)\
                         .withColumn("conditionsIdx", monotonically_increasing_id())

conditions_table.printSchema()
conditions_table.show(5)

# write conditions_table
conditions_table.write.partitionBy("Year", "Month", "BuoyID").mode('overwrite').parquet('./data/conditions/conditions_table.parquet')

# extract columns to create time table
time_table = dfSwims.select(col('Year').alias('YYYY'), col('Month').alias('MM'), \
                            col('Day').alias('DD'),col('Date')).dropDuplicates()
time_table.printSchema()
time_table.show(5)

# write time table to parquet files partitioned by year and month
time_table.write.partitionBy("YYYY", "MM").mode('overwrite').parquet('./data/time/time_table.parquet')

# create and write swims_table
swims_table = dfSwimslog.join(dfSwims, (dfSwimslog.swimID == dfSwims.swimID))\
                        .join(swimmers_table, (dfSwimslog.swimmerEmail == swimmers_table.Email)) \
                        .join(time_table, (dfSwims.Date == time_table.Date)) \
                        .join(conditions_table, ((dfSwims.BuoyID == conditions_table.buoyID) & \
                                                (conditions_table.Year == time_table.YYYY) & \
                                                (conditions_table.Month == time_table.MM) & \
                                                (conditions_table.Day == time_table.DD)), 'left_outer') \
                        .join(locations_table, (dfSwims.Location == locations_table.Location)) \
                        .select(swimmers_table.firstName,swimmers_table.lastName, \
                                swimmers_table.swimmerIdx,locations_table.Location,locations_table.locationIdx,\
                                conditions_table.WTMP, conditions_table.conditionsIdx, time_table.YYYY, time_table.MM, time_table.Date, dfSwimslog.swimStatus)\
                        .dropDuplicates()\
                        .withColumn("SwimslogIdx", monotonically_increasing_id())

swims_table.printSchema()
swims_table.show(5)

# write swims_table
swims_table.write.partitionBy('YYYY','MM','locationIdx').mode('overwrite').parquet('./data/swims/swims_table.parquet')


root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- swimmerIdx: long (nullable = false)

+---------+--------+--------------------+----------+
|firstName|lastName|               Email|swimmerIdx|
+---------+--------+--------------------+----------+
|   Cedric|Bradshaw|Suspendisse@Proin...|         0|
|  Charity|  Willis|     et@primisin.org|         1|
|     Clio| Russell|ante.lectus.conva...|         2|
|   Denton|  Cotton|lobortis.Class@li...|         3|
|   Sierra| Skinner|consectetuer.maur...|         4|
+---------+--------+--------------------+----------+
only showing top 5 rows

root
 |-- Location: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- locationIdx: long (nullable = false)

+-----------------+---------+----------+-----------+
|         Location|Longitude|  Latitude|locationIdx|
+-----------------+---------+----------+-----------+
| Alt

The table above shows some valid results. We can see that the first two records are almost identical. This is because NOAA increased the frequency on when they pick up the measurements. Initially, it was hourly and now multiple times per hour. We can only observed some null conditions. This means that the buoy didn't provide any data on that day and time. Buoys are sometimes non-operational or in maintenance for a few weeks.

#### 4.2 Data Quality Checks
Once the data runs through the pipeline, we can perform some quality checks. We decided to check the following conditions:
- verify the number of rows in each table
- verify that there was no duplicated keys

Here is the code to check the validity of the data. We'll change those parameters in production as the number of records will increase.

In [20]:
def test_min_numb_rows(area, df, target):
    print("Testing number of rows on:", area)
    assert df.count() >= target, "Number of rows in {} is lower than expected".format(area)

def test_key_uniqueness(area, df, primaryKey):
    print("Testing key uniqueness on:", area)
    assert df.select(primaryKey).distinct().count() == df.count(), "Multiple rows use the same primary key".format(area)

# Testing min number of rows for each key table
test_min_numb_rows('swimmers table', swimmers_table, 1001)
test_min_numb_rows('buoys', dfBuoy, 40000)
test_min_numb_rows('conditions table', conditions_table, 100)
test_min_numb_rows('swims table', swims_table, 1)
test_min_numb_rows('time table', time_table, 30)
test_min_numb_rows('locations table', locations_table, 100)

# Testing primary key uniqueness
test_key_uniqueness ('swimmers table', swimmers_table, 'swimmerIdx')
test_key_uniqueness ('swims table', swims_table, 'swimslogIdx')
test_key_uniqueness ('conditions table', conditions_table, 'conditionsIdx')
test_key_uniqueness ('time table', time_table, 'Date')
test_key_uniqueness ('locations table', locations_table, 'locationIdx')

print("All tests completed!")

Testing number of rows on: swimmers table
Testing number of rows on: buoys
Testing number of rows on: conditions table
Testing number of rows on: swims table
Testing number of rows on: time table
Testing number of rows on: locations table
Testing key uniqueness on: swimmers table
Testing key uniqueness on: swims table
Testing key uniqueness on: conditions table
Testing key uniqueness on: time table
Testing key uniqueness on: locations table
All tests completed!


#### 4.3 Data dictionary 
A data dictionary is available [here](./data/Documentation/OpenWaterSwimDataDictionary.pdf). It provides a description for each data element of our model.

### Step 5: Running the Pipeline on AWS EMR
The ultimate goal is to run the pipeline on AWS EMR. This requires several steps:
- Set up AWS EMR instance
- Copy **ETL.py** and **DL.cfg** scripts to AWS EMR
- Launch **ETL.py** script on AWS EMR

This also assumes that all our data is available on AWS S3.
We made our [S3 bucket](s3://openwaterswimming/input/) public in case anyone would like to use/test the data. Will turn it back to private once this project is approved.

#### Step 5.1: Set up AWS EMR instance
Using the AWS Console, we launch an AWS EMR instance. We opted for the 5.28 configuration with SPARK.
We decided to pick a regular configuration (m5.xlarge) using 4 nodes (1 master and 3 slaves). We don't need super powerful machines because we don't operate too many operations. However, it's good to have multiple machines since we process a lot of data.

Once we launch AWS EMR, we can access it via the terminal using the following command:
ssh -i openwaterswim.pem hadoop@IPaddress where IPaddress is the AWS EMR IP address.

We need to install a few Python packages that are used by our scripts:
- configparser
- requests
- pandas

We install those packages by using the following command:
* sudo pip install *eachpackage*

#### Step 5.2: Copy ETL.py and DL.cfg scripts
To copy the scripts to AWS EMR, we use the scp command as follows:

* scp -i openwater.pem etl.py hadoop@IPaddress:/home/hadoop/
* scp -i openwater.pem dl.cfg hadoop@IPaddress:/home/hadoop/

where IPaddress is the AWS EMR IPaddress.

#### Step 5.3: Launch ETL.py script via spark-submit
To launch the ETL.py script on AWS EMR instance, we first need to run the following command:
* **export PYTHONIOENCODING=utf-8** (this avoids an encoding issue with our data)
* **spark-submit etl.py**

#### Step 5.4: Verify the results
The **FACT** and **DIMENSION** tables are written on AWS S3 as expected. The ETL process runs pretty quickly ~15M records for our 3 key datasets: buoys, swims and swimlogs.
The process also performs quality checks defined in section 4.2 of this document.

#### Important Note
- The ETL process can also be run locally (with Spark). However, due to the large dataset, it is recommended to only run it with the local data (year 2000). We actually tested to connect with S3 from the provided environment, but it seems that the environment was missing some packages. It is able to connect to S3 repository using our EMR configuration above.
- When running locally, we definitely suggest to set the buoy range to 2000. This can be set in the dl.cfg file in the TIMEFRAME section with the MIN_RANGE and MAX_RANGE parameters.


### Step 6: Other Considerations
In this section, we'll cover a few more topics such as:
- Technology choices
- Data refresh frequency issue
- Scalability issues

#### 6.1 Technology Choices
For this project, we decided to use Pandas, Spark, AWS EMR and AWS S3 for the following reasons:
1. **Pandas** - We couldn't directly process the buoy dataset with Spark because it uses multiple spaces as a separator. This is a known limitation which will be addressed in future versions of Spark. So, we decided to first process the data with Pandas and then load those data frames into Spark. This is a temporary turn-around as it slows down the processing of all buoy data.

2. **Spark** - We needed a tool capable of processing multi-million records datasets. Spark is built for that purpose.
3. **AWS EMR** - While we can run Spark locally (on a single machine), it's better to run it on a cluster. We decided to set up an AWS EMR cluster and immediately saw the benefit from a performance perspective.
4. **AWS S3** - We work with big data sets. Also, the dataset keeps increasing every hour, so we need an elastic storage solution. Storing on AWS S3 also allows us to access the data from multiple tool. For example, we can write to S3 and access the data from a tool like AWS redshift.

#### 6.2 Data Refresh Frequency
Open Water Swimming activities happen frequently. Also, buoys data get updated on an hourly basis. If we want to capture insights about open water swimming activities, we can update the data on a monthly basis. If we'd like to use the latest data, then we suggest to update on an hourly basis. That said, it would probably be better to leverage a technology like Kafka which offers a subscription mechanism. That way, data would be available real-time.

#### 6.3 Scalability issues
How would we address the following scalability scenari:
1. Data was increased by 100%
We don't anticipate too many problems with such volume increase. We use elastic technologies that allows us to scale rapidly. For example, AWS S3 would scale immediately. AWS EMR didn't show any performance issue with our current load. If it started to slow down, we would recommend to add a node to our cluster.

2. Data populates a dashboard which needs to be refreshed daily by 7 AM
In such situation, we recommend to create a DAG in Apache Airflow. The process can be scheduled and verified on a daily basis. If a problem occurs, we would immediately be notified.

3. Database needs to be accessed by 100+ people
If the database needs to be accessed by many people and possibly geographically dispersed around the world, we recommend to use a Data Warehouse technology like AWS Redshift or Snowflake. We also recommend to use a non-relationtial database like Cassandra, so the data can be fragmented geographically for example.



### Step 7: Final Thoughts
I enjoyed this project very much despite the numerous challenges. This project was challenging because I needed to research information on publicly available datasets. I also took the path to generate my own data which came with its own challenges. For example, initial data sets were not truly comformed to JSON formats; some generated data included unexpected characters, etc...
NOAA data also presented some challenges with the data formats and the fact that they do change over time. When we look at 20+ years old data, things have chanegd. A data pipeline needs to address those aspects.
Running the ETL process on the entire dataset takes multiple hours. The buoy data is currently loaded sequentially (due to the Pandas dependency). This is something that can be improved in the future.

All in all, the **Udacity Nano-degree in Data Engineering** provided a comprehensive aspect on many data engineering activities, challenges, technologies and best practices.