# Data Pipeline of Global Wold Temperatures
## Data Engineering Capstone Project
---

### Project Summary
> The project contains information on how the whole pipeline will look in production. This notebook is designed to test the code to be used in production.

The project follows these steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

---

### Installations and Imports

In [5]:
# Do all imports and installs here
!pip install kaggle 
!pip install twine
import pandas as pd
from zipfile import ZipFile
import os
import subprocess

# pyspark imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# starting spark application
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

Collecting kaggle
[?25l  Downloading https://files.pythonhosted.org/packages/62/ab/bb20f9b9e24f9a6250f95a432f8d9a7d745f8d24039d7a5a6eaadb7783ba/kaggle-1.5.6.tar.gz (58kB)
[K    100% |████████████████████████████████| 61kB 2.3MB/s ta 0:00:011
Collecting python-slugify (from kaggle)
  Downloading https://files.pythonhosted.org/packages/92/5f/7b84a0bba8a0fdd50c046f8b57dcf179dc16237ad33446079b7c484de04c/python-slugify-4.0.0.tar.gz
Collecting text-unidecode>=1.3 (from python-slugify->kaggle)
[?25l  Downloading https://files.pythonhosted.org/packages/a6/a5/c0b6468d3824fe3fde30dbb5e1f687b291608f9473681bbf7dabbf5a87d7/text_unidecode-1.3-py2.py3-none-any.whl (78kB)
[K    100% |████████████████████████████████| 81kB 5.5MB/s ta 0:00:011
[?25hBuilding wheels for collected packages: kaggle, python-slugify
  Running setup.py bdist_wheel for kaggle ... [?25ldone
[?25h  Stored in directory: /root/.cache/pip/wheels/57/4e/e8/bb28d035162fb8f17f8ca5d42c3230e284c6aa565b42b72674
  Running setup.py bd

---

### Step 1: Scope the Project and Gather Data

#### Scope 
<hr style="border-top: 3px double">
Explain what you plan to do in the project in more detail. 

##### <u>Development Scope</u>
In this development notebook, I will use the [Wold Temperature Data](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data) from kaggle. The data will be stored in the local machine under a folder representing the bucket and keys of s3.

##### <u>Production Scope</u>

In the production phase, I will use the raw data from the source of truth, which is berkeleyearth.org. We will use AWS Lambda function to gather the data, the EMR to process the cleaning and transformation, and Redshift Spectrum to read the S3 parquet files to connected with either PowerBI or Quicksight for downstream consumption.

#### Description of Data
<hr style="border-top: 2px double">

As they explain in Kaggle, Berkeley Earth Surface Temperature Study combines 1.6 billion temperature reports from 16 pre-existing archives.

Global Land and Ocean-and-Land Temperatures (GlobalTemperatures.csv):

- Date: starts in 1750 for average land temperature and 1850 for max and min land temperatures and global ocean and land temperatures
- LandAverageTemperature: global average land temperature in celsius
- LandAverageTemperatureUncertainty: the 95% confidence interval around the average
- LandMaxTemperature: global average maximum land temperature in celsius
- LandMaxTemperatureUncertainty: the 95% confidence interval around the maximum land temperature
- LandMinTemperature: global average minimum land temperature in celsius
- LandMinTemperatureUncertainty: the 95% confidence interval around the minimum land temperature
- LandAndOceanAverageTemperature: global average land and ocean temperature in celsius
- LandAndOceanAverageTemperatureUncertainty: the 95% confidence interval around the global average land and ocean temperature

Other files are provided in the following groupings:
- Country
- State
- Major City
- City

#### Data Gathering 
<hr style="border-top: 2px double">

##### <u>Development Data Gathering</u>
As mentioned above, the data will be downloaded from Kaggle.com from theri kaggle cli API for this development process. 

To connect to the API, we must create a token. This token will allow programatic download of the data in a zip compression format.

> An API token was created generating a kaggle.json file. This file was uploaded to /home/workspace

##### <u>Production Data Gathering</u>

The production version of data gathering will be in an AWS Lambda function that will request the text file from http://berkeleyearth.org.

### Gathering the Data for Development
<hr style="border-top: 2px double">

In [6]:
# reset directory and raw content
!rm world-temp-data -r -f
!rm climate-change-earth-surface-temperature-data.zip

In [7]:
# create credential folder
!mkdir ~/.kaggle

In [8]:
# the file with token was manualy uploaded
!mv /home/workspace/kaggle.json ~/.kaggle/kaggle.json

In [9]:
# securing file to only owner read and write
!chmod 600 ~/.kaggle/kaggle.json

In [10]:
# listing datasets that contains climate and change
!kaggle datasets list -s climate && change

ref                                                          title                                               size  lastUpdated          downloadCount  voteCount  usabilityRating  
-----------------------------------------------------------  -------------------------------------------------  -----  -------------------  -------------  ---------  ---------------  
berkeleyearth/climate-change-earth-surface-temperature-data  Climate Change: Earth Surface Temperature Data      85MB  2017-05-01 17:29:10          38220       1075  0.7647059        
sumanthvrao/daily-climate-time-series-data                   Daily Climate time series data                      22KB  2019-08-23 09:22:09           1629         35  1.0              
theworldbank/world-bank-climate-change-data                  World Bank Climate Change Data                      42MB  2019-05-16 20:00:44           1650         66  0.7058824        
jsphyg/weather-dataset-rattle-package                        Rain in Australia  

In [11]:
# downloading the zip file
!kaggle datasets download -d berkeleyearth/climate-change-earth-surface-temperature-data 

Downloading climate-change-earth-surface-temperature-data.zip to /home/workspace
 83%|███████████████████████████████▍      | 70.0M/84.7M [00:00<00:00, 66.0MB/s]
100%|███████████████████████████████████████| 84.7M/84.7M [00:00<00:00, 109MB/s]


In [12]:
# unziping the downloaded file into wold_temp_date
zip_file = ZipFile("climate-change-earth-surface-temperature-data.zip", mode="r")
zip_file.extractall("s3/world-temp-data/csv-files")

# Read all Flat File Datasets
---

In [13]:
df_global = spark.read.csv("s3/world-temp-data/csv-files/GlobalTemperatures.csv", header=True, inferSchema=True)
df_global.printSchema()

root
 |-- dt: timestamp (nullable = true)
 |-- LandAverageTemperature: double (nullable = true)
 |-- LandAverageTemperatureUncertainty: double (nullable = true)
 |-- LandMaxTemperature: double (nullable = true)
 |-- LandMaxTemperatureUncertainty: double (nullable = true)
 |-- LandMinTemperature: double (nullable = true)
 |-- LandMinTemperatureUncertainty: double (nullable = true)
 |-- LandAndOceanAverageTemperature: double (nullable = true)
 |-- LandAndOceanAverageTemperatureUncertainty: double (nullable = true)



In [14]:
df_bycity = spark.read.csv("s3/world-temp-data/csv-files/GlobalLandTemperaturesByCity.csv", header=True, inferSchema=True)
df_bycountry = spark.read.csv("s3/world-temp-data/csv-files/GlobalLandTemperaturesByCountry.csv", header=True, inferSchema=True)
df_bymajorcity = spark.read.csv("s3/world-temp-data/csv-files/GlobalLandTemperaturesByMajorCity.csv", header=True, inferSchema=True)
df_bystate = spark.read.csv("s3/world-temp-data/csv-files/GlobalLandTemperaturesByState.csv", header=True, inferSchema=True)

## Showing and Explaining the Individual Datasets
<hr style="border-top: 2px double">

### <ol style="display: inline-block"><li> Global Content

In [15]:
df_global.limit(10).toPandas().T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
dt,1750-01-01 00:00:00,1750-02-01 00:00:00,1750-03-01 00:00:00,1750-04-01 00:00:00,1750-05-01 00:00:00,1750-06-01 00:00:00,1750-07-01 00:00:00,1750-08-01 00:00:00,1750-09-01 00:00:00,1750-10-01 00:00:00
LandAverageTemperature,3.034,3.083,5.626,8.49,11.573,12.937,15.868,14.75,11.413,6.367
LandAverageTemperatureUncertainty,3.574,3.702,3.076,2.451,2.072,1.724,1.911,2.231,2.637,2.668
LandMaxTemperature,,,,,,,,,,
LandMaxTemperatureUncertainty,,,,,,,,,,
LandMinTemperature,,,,,,,,,,
LandMinTemperatureUncertainty,,,,,,,,,,
LandAndOceanAverageTemperature,,,,,,,,,,
LandAndOceanAverageTemperatureUncertainty,,,,,,,,,,


> This dataset describes the global temperatures on a time series basis. It also includes descriptive and inferential statistics about the global average. It is useful to compare global temperatures with temperatures by country to find countries that are outliers in our analysis of climate change.
<hr style="border-top: 2px dotted; background: white;">

### <ol start="2" style="display: inline-block"><li> By City Content

In [16]:
df_bycity.show(5)

+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01 00:00:00|             6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01 00:00:00|              null|                         null|Århus|Denmark|  57.05N|   10.33E|
+-------------------+------------------+-----------------------------+-----+-------+--------+---------+
only showing top 5 rows



> This dataset describes the global temperatures by city. It can be used to forecast the temperatures by city or to investigate global temperature abnormalies by city.
<hr style="border-top: 2px dotted; background: white;">

### <ol start="3" style="display: inline-block"><li>By Country Content

In [17]:
df_bycountry.show(5)

+-------------------+------------------+-----------------------------+-------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|Country|
+-------------------+------------------+-----------------------------+-------+
|1743-11-01 00:00:00|4.3839999999999995|                        2.294|  Åland|
|1743-12-01 00:00:00|              null|                         null|  Åland|
|1744-01-01 00:00:00|              null|                         null|  Åland|
|1744-02-01 00:00:00|              null|                         null|  Åland|
|1744-03-01 00:00:00|              null|                         null|  Åland|
+-------------------+------------------+-----------------------------+-------+
only showing top 5 rows



> This dataset describes the global temperatures by country. It can be used to forecast the temperatures by city or to investigate global temperature abnormalies by country.
<hr style="border-top: 2px dotted; background: white;">

### <ol start="4" style="display: inline-block"><li>By Major City Content

In [18]:
df_bymajorcity.show(5)

+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|   City|      Country|Latitude|Longitude|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---------+
|1849-01-01 00:00:00|            26.704|                        1.435|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-02-01 00:00:00|            27.434|                        1.362|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-03-01 00:00:00|            28.101|                        1.612|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-04-01 00:00:00|             26.14|           1.3869999999999998|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
|1849-05-01 00:00:00|            25.427|                          1.2|Abidjan|Côte D'Ivoire|   5.63N|    3.23W|
+-------------------+------------------+-----------------------------+-------+-------------+--------+---

> This dataset describes the global temperatures by major city. It can be used to forecast the temperatures by city or to investigate global temperature abnormalies by major city.
<hr style="border-top: 2px dotted; background: white;">

### <ol start="5" style="display: inline-block"><li>By State Content

In [19]:
df_bystate.show(5)

+-------------------+------------------+-----------------------------+-----+-------+
|                 dt|AverageTemperature|AverageTemperatureUncertainty|State|Country|
+-------------------+------------------+-----------------------------+-----+-------+
|1855-05-01 00:00:00|            25.544|                        1.171| Acre| Brazil|
|1855-06-01 00:00:00|            24.228|                        1.103| Acre| Brazil|
|1855-07-01 00:00:00|            24.371|                        1.044| Acre| Brazil|
|1855-08-01 00:00:00|            25.427|                        1.073| Acre| Brazil|
|1855-09-01 00:00:00|            25.675|                        1.014| Acre| Brazil|
+-------------------+------------------+-----------------------------+-----+-------+
only showing top 5 rows



> This dataset describes the global temperatures by state. It can be used to forecast the temperatures by city or to investigate global temperature abnormalies by state.
<hr style="border-top: 2px dotted; background: white;">

# Converting and Reading Parquet
---

#### Converting CSV to Parquet 
Apache Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON.

In [20]:
df_global.write.parquet("s3/world-temp-data/parquet-staging/GlobalTemperatures", mode="overwrite")
df_bycity.write.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByCity", mode="overwrite")
df_bycountry.write.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByCountry", mode="overwrite")
df_bymajorcity.write.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByMajorCity", mode="overwrite")
df_bystate.write.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByState", mode="overwrite")

In [21]:
df_global = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalTemperatures")
df_bycity = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByCity")
df_bycountry = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByCountry")
df_bymajorcity = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByMajorCity")
df_bystate = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalLandTemperaturesByState")

# Exploring Parquet Data
---
Identify data quality issues, like missing values, duplicate data, etc.

In [22]:
def explore(dataframe, show_affected=False):
    """
    This function explores the transposed content of a dataset,
    the schema, and the null percentages
    """
    df = dataframe    
    null_expr = [f"SUM(CASE WHEN {x} IS NULL THEN 1 ELSE 0 END) AS null_{x}" for x in df.columns]
    df_pd = (
        df.selectExpr(null_expr)
            .toPandas()
            .transpose()
            .rename(columns={0: 'nulls'})
            .assign(null_pct= lambda x: x.nulls / df.count())
    )    
    display(df_pd.style.format({'nulls': '{:d}', 'null_pct': '{:2.2%}'}))
    df.printSchema()
    
    if show_affected:
        (df.where(F.col('dt').isin(affected_dates))
             .groupBy('dt')
             .agg(
                F.expr('sum(AverageTemperature) AS affectedAverageTemperature'),
                F.expr('sum(AverageTemperatureUncertainty) AS affectedAverageTemperatureUncertainty')
             ).orderBy(F.desc('dt'))
        ).show()

In [23]:
affected_dates = df_global.where(F.col('LandAverageTemperature').isNull()).select('dt').collect()
affected_dates = [x.asDict().get('dt') for x in affected_dates]

## <ol start="1" style="display: inline-block"><li> Exploring Global Dataset

In [24]:
explore(df_global)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_LandAverageTemperature,12,0.38%
null_LandAverageTemperatureUncertainty,12,0.38%
null_LandMaxTemperature,1200,37.59%
null_LandMaxTemperatureUncertainty,1200,37.59%
null_LandMinTemperature,1200,37.59%
null_LandMinTemperatureUncertainty,1200,37.59%
null_LandAndOceanAverageTemperature,1200,37.59%
null_LandAndOceanAverageTemperatureUncertainty,1200,37.59%


root
 |-- dt: timestamp (nullable = true)
 |-- LandAverageTemperature: double (nullable = true)
 |-- LandAverageTemperatureUncertainty: double (nullable = true)
 |-- LandMaxTemperature: double (nullable = true)
 |-- LandMaxTemperatureUncertainty: double (nullable = true)
 |-- LandMinTemperature: double (nullable = true)
 |-- LandMinTemperatureUncertainty: double (nullable = true)
 |-- LandAndOceanAverageTemperature: double (nullable = true)
 |-- LandAndOceanAverageTemperatureUncertainty: double (nullable = true)



## <ol start="2" style="display: inline-block"><li> Exploring By City Dataset

In [25]:
explore(df_bycity, True)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_AverageTemperature,364130,4.23%
null_AverageTemperatureUncertainty,364130,4.23%
null_City,0,0.00%
null_Country,0,0.00%
null_Latitude,0,0.00%
null_Longitude,0,0.00%


root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+-------------------+--------------------------+-------------------------------------+
|                 dt|affectedAverageTemperature|affectedAverageTemperatureUncertainty|
+-------------------+--------------------------+-------------------------------------+
|1752-09-01 00:00:00|                      null|                                 null|
|1752-08-01 00:00:00|                      null|                                 null|
|1752-07-01 00:00:00|                      null|                                 null|
|1752-06-01 00:00:00|                      null|                                 null|
|1752-05-01 00:00:00|                      null|                                 

## <ol start="3" style="display: inline-block"><li> Exploring By Country

In [26]:
explore(df_bycountry, True)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_AverageTemperature,32651,5.65%
null_AverageTemperatureUncertainty,31912,5.53%
null_Country,0,0.00%


root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+--------------------------+-------------------------------------+
|                 dt|affectedAverageTemperature|affectedAverageTemperatureUncertainty|
+-------------------+--------------------------+-------------------------------------+
|1752-09-01 00:00:00|                      null|                                 null|
|1752-08-01 00:00:00|                      null|                                 null|
|1752-07-01 00:00:00|                      null|                                 null|
|1752-06-01 00:00:00|                      null|                                 null|
|1752-05-01 00:00:00|                      null|                                 null|
|1752-02-01 00:00:00|                      null|                                 null|
|1751-12-01 00:00:00|   

## <ol start="4" style="display: inline-block"><li> Exploring By Major City

In [27]:
explore(df_bymajorcity, True)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_AverageTemperature,11002,4.60%
null_AverageTemperatureUncertainty,11002,4.60%
null_City,0,0.00%
null_Country,0,0.00%
null_Latitude,0,0.00%
null_Longitude,0,0.00%


root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

+-------------------+--------------------------+-------------------------------------+
|                 dt|affectedAverageTemperature|affectedAverageTemperatureUncertainty|
+-------------------+--------------------------+-------------------------------------+
|1752-09-01 00:00:00|                      null|                                 null|
|1752-08-01 00:00:00|                      null|                                 null|
|1752-07-01 00:00:00|                      null|                                 null|
|1752-06-01 00:00:00|                      null|                                 null|
|1752-05-01 00:00:00|                      null|                                 

## <ol start="5" style="display: inline-block"><li> Exploring By State

In [28]:
explore(df_bystate, True)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_AverageTemperature,25648,3.97%
null_AverageTemperatureUncertainty,25648,3.97%
null_State,0,0.00%
null_Country,0,0.00%


root
 |-- dt: timestamp (nullable = true)
 |-- AverageTemperature: double (nullable = true)
 |-- AverageTemperatureUncertainty: double (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)

+-------------------+--------------------------+-------------------------------------+
|                 dt|affectedAverageTemperature|affectedAverageTemperatureUncertainty|
+-------------------+--------------------------+-------------------------------------+
|1752-09-01 00:00:00|                      null|                                 null|
|1752-08-01 00:00:00|                      null|                                 null|
|1752-07-01 00:00:00|                      null|                                 null|
|1752-06-01 00:00:00|                      null|                                 null|
|1752-05-01 00:00:00|                      null|                                 null|
|1752-02-01 00:00:00|                      null|                          

**Analsysis**
> There are too many nulls in other than LandAverageTemperature and LandAverageTemperatureUncertainty. For our analysis there is also no use for the other columns. Therefore, those columns will be ignored.

> Using the same affected dates to impute missing values to the global dataset from grouped datasets will not work since they are affected the same way. Regardless, the addition of the averages will not equal the true missing global average.

> For LandAverageTemperature and LandAverageTemperatureUncertainty, we will use the average of the previous values and the next values that approximate the affected nulled ranges. E.g. if the current null value is followed by 2 nulls, we will use the next 3 values and the last 3 values to calculate the average.


# Cleaning Stage
---

The globaltemperatures dataset seems to be fairly clean. However, these are averages, and we still have the questions if the averages are representable of all countries and states within those countries.


## Imputation if Missing Values

In [29]:
df_global.createOrReplaceTempView('globaltemps')

df_global_clean = spark.sql("""
WITH cte_null_impute AS (
SELECT
    dt
    ,LandAverageTemperature
    ,AVG(LandAverageTemperature) OVER(ORDER BY dt ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING) AS preced_follow_avgtemp
    ,LandAverageTemperatureUncertainty
    ,AVG(LandAverageTemperatureUncertainty) OVER(ORDER BY dt ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING) AS preced_follow_avgunctemp
FROM globaltemps
)
SELECT 
    dt
    ,COALESCE(LandAverageTemperature, preced_follow_avgtemp) AS LandAverageTemperature
    ,COALESCE(LandAverageTemperatureUncertainty, preced_follow_avgunctemp) AS LandAverageTemperatureUncertainty
FROM cte_null_impute
""")

In [30]:
explore(df_global_clean)

Unnamed: 0,nulls,null_pct
null_dt,0,0.00%
null_LandAverageTemperature,0,0.00%
null_LandAverageTemperatureUncertainty,0,0.00%


root
 |-- dt: timestamp (nullable = true)
 |-- LandAverageTemperature: double (nullable = true)
 |-- LandAverageTemperatureUncertainty: double (nullable = true)



### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
![conceptual data model](resources/udacity-dend-datamodel.png)

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

1. Data is crawled form original  http://berkeleyearth.org website using AWS Lambda
2. Data is dumped into a s3 bucket in csv format in s3://world-temp-data/csv-files
3. Data is read using a spark cluster using pyspark and converted into parquet in s3://world-temp-data/parquet-staging
4. Data is read using a spark cluster using pyspark and transformed/cleaned
5. Data is check for null values and duplicates using a spark cluster
6. Quality checks are conducted:
    a. If data pass all quality checks, it is dumped into into s3://world-temp-data/parquet-clean as parquet
    b. If data does not pass quality checks, the error is sent by email to users and the process stops
7. A Glue crawler catalogs the parquet files in s3://world-temp-data/parquet-clean as parquet in the Glue global_temp database
8. Redshift Spectrum refers to the clean-data table in the global_temp database in a global_temp_db redshift database
9. Power BI connects to Redshift and extracts the clean-data table from the global_temp_db using a Redshift Driver.

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

##### Dag Task 1: AWS Lmabda Crawler
> We are extracting data from the website using a Python request package. The outputs will be parsed as csv documents that will be stored in S3.

##### Dag Task 2: Glue Spark Job or EMR Spark Job to Read Csv
> The CSV data will be read using spark into a spark dataframe in EMR memory.

##### Dag Task 3: Glue Spark Job or EMR Spark Job to Convert to Parquet
> The EMR dataframe in memory is then converted to parquet to consume less resources when conducting transformations and cleaning activities. This step is separated from the rest because we would like to know if the transformation was succesful and the cleaning script runs without any issues. It would also let us know if we have already extracted the same date by the partition provided.

##### Dag Task 4: EMR Job to Read Parquet and Clean Data
> We are reading the parquet files and conducting the cleaning on parquet data since it is more efficient. This task will conbine reading parquet and celaning because we are not expecting the reading part to fail. On the contrary, we can expect the cleaning part to fail. Therefore we will know that the cleaning script needs to be evaluated for quality.

In [31]:
## Reading parquet file
df = spark.read.parquet("s3/world-temp-data/parquet-staging/GlobalTemperatures")

## Cleaning parquet file
df.createOrReplaceTempView('staging')

df = spark.sql("""
WITH cte_null_impute AS (
SELECT
    dt
    ,LandAverageTemperature
    ,AVG(LandAverageTemperature) OVER(ORDER BY dt ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING) AS preced_follow_avgtemp
    ,LandAverageTemperatureUncertainty
    ,AVG(LandAverageTemperatureUncertainty) OVER(ORDER BY dt ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING) AS preced_follow_avgunctemp
FROM staging
)
SELECT 
    dt
    ,COALESCE(LandAverageTemperature, preced_follow_avgtemp) AS LandAverageTemperature
    ,COALESCE(LandAverageTemperatureUncertainty, preced_follow_avgunctemp) AS LandAverageTemperatureUncertainty
FROM cte_null_impute
""")

##### Dag Task 5: Glue Spark Job or EMR Spark Job to Check for Nulls and Duplicates
> In this task we are doing to determine if the data is ready for downstream consumption and meets the quality requirements agreed. It is a separate task because we need to make sure that our downstream users are not consuming the wrong data.

> The data quality checks performed below will be converted to Redshift postgress sql format to be used in an Airflow dag.

In [32]:
## conduct null data quality
column_wide_null_count = sum(df.selectExpr(
    "CASE WHEN LandAverageTemperature IS NULL THEN 1 ELSE 0 END AS null_LandAverageTemperature",
    "CASE WHEN LandAverageTemperatureUncertainty IS NULL THEN 1 ELSE 0 END AS null_LandAverageTemperatureUncertainty").collect()[0])

## conduct duplicate data quality
duplicate_count = df.groupBy('dt').agg((F.count('dt')>1).alias('group_count')).where(F.col('group_count')).count()

error_list = []
if column_wide_null_count > 0:
    error_list.append("Null where found in the data")

if duplicate_count > 0:
    error_list.append("Duplicates where found in the data")
    
if len(error_list) > 0:
    raise ValueError(' and '.join(error_list))

##### Dag Task 6: Glue Spark Job or EMR Spark Job to Dump Clean Data and Crawled for Downstream
> Here we are dumping the data that passess quality checks into the final bucket that will be catalogued using Glue and read using Redshift Spectrum as the endpoint of the visualization tool.

> A Glue crawler will catalog the clean data after it is dumped in the destination s3 object/key.

In [34]:
## Dumping clean dataset into the clean key
df.write.parquet("s3/world-temp-data/parquet-clean/global-temperatures")

# Done! Data is ready for visualization and forecasting model ingestion

#### 4.2 Data Quality Checks

> The data will be checked right after the data is cleaned. We are preventing bad data from being dumped in the s3://udacity-dend-samuel/clean-data/global-temperatures file key.

Thes checks include:
* Null checks for both LandAverageTemperature and LandAverageTemperatureUncertainty
* Duplicate checks on dt

> Another worthy quality check is to make sure data is catalogued in Glue. This one will run in Redshift Sepctrum and it will check for the quantity of rows in the prod.global_temperatures table that Redshift Spectrum reads from the s3://world-temp-data/parquet-clean/global-temperatures file key using the Glue catalog.

Here is an example of how the data quality operator will look:

airflow/operators.py
```python
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class NullQualityOperator(BaseOperator):
    ui_color = '#89DA59'

    @apply_defaults
    def __init__(self,
                 redshift_conn_id,
                 table_name,
                 grouping,
                 *args, **kwargs):

        super(DataQualityOperator, self).__init__(*args, **kwargs)
        self.redshift_conn_id = redshift_conn_id
        self.table_name = table_name
        self.grouping = grouping

    def execute(self, context):
        redshift = PostgresHook(postgres_conn_id=self.redshift_conn_id)
        table_name = self.table_name
        grouping = self.grouping
        null_exist_query = f"""
            SELECT 
                SUM(CASE WHEN LandAverageTemperature IS NULL THEN 1 ELSE 0 END) AS null_LandAverageTemperature
            FROM {table_name}
            """
        null_exist_query2 = f"""
            SELECT 
                SUM(CASE WHEN LandAverageTemperatureUncertainty IS NULL THEN 1 ELSE 0 END) AS null_LandAverageTemperatureUncertainty
            FROM {table_name}
            """
        
        dupliate_exist_query = f"""
            WITH duplicates AS (
                SELECT 
                    dt
                FROM {table_name}
                GROUP BY {grouping}
                HAVING COUNT(1) > 1
            )
            SELECT COUNT(1) AS duplicate_count
            FROM duplicates
        """
        
        self.log.info(f"Conducting data quality on table {table_name}")
        records = redshift.get_records(null_exist_query)
        records2 = redshift.get_records(null_exist_query2)
        duplicate = redshift.get_records(dupliate_exist_query)
        
        if len(records) < 1 or len(records[0]) < 1:
            raise ValueError(f"Data quality check failed. The {table_name} table might have no data.")
        
        num_records = records[0][0]        
        if num_records > 0:
            raise ValueError(f'There are LandAverageTemperature nulls in the {table_name} table.') 
            
        if len(records2) < 1 or len(records2[0]) < 1:
            raise ValueError(f"Data quality check failed. The {table_name} table might have no data.")
        
        num_records = records2[0][0]        
        if num_records > 0:
            raise ValueError(f'There are LandAverageTemperatureUncertainty nulls in the {table_name} table.')   
            
        if len(duplicate) < 1 or len(duplicate[0]) < 1:
            raise ValueError(f"Data quality check failed. The {table_name} table might have no data.")
        
        num_records = duplicate[0][0]        
        if num_records > 0:
            raise ValueError(f'There are duplicates in the {table_name} table.') 
```

Here is an example of how the Airflow dag will be populated with the task.

```python
from airflow.operators import DataQualityOperator     

run_quality_checks = DataQualityOperator(
    task_id='global_check_nulls_and_duplicates',
    dag=dag,
    redshift_conn_id="redshift",
    table_name="spectrum.GlobalTemperatures",
    grouping="dt"
)

run_quality_checks = DataQualityOperator(
    task_id='country_check_nulls_and_duplicates',
    dag=dag,
    redshift_conn_id="redshift",
    table_name="spectrum.GlobalLandTemperaturesByCountry",
    grouping="dt, Country"
)
```

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**Data Dictionary:**

- Glue database: production 
- Glueu and Redshift table name: global_temperatures
- Redshift database: operations
- Redshift schema: prod

|column|details|
|-|-|
|dt|the date the senesor measured the temperature|
|LandAverageTemperature|the global average temperature of the land in overall|
|LandAverageTemperatureUncertainty|the global uncertainty aournd the average temperature of the land|

---
- Glue database: production
- Glue and Redshift table name: bycountry_temperatures
- Redshift database: operations
- Redshift schema: prod

|column|details|
|-|-|
|dt|the date the senesor measured the temperature|
|Country|the country in which the temperature was taken from|
|LandAverageTemperature|the country average temperature of the land in overall|
|LandAverageTemperatureUncertainty|the country uncertainty aournd the average temperature of the land|

---

In [36]:
!rm sas_data -r -f

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.