
### Data Engineering Capstone Project

#### Project Summary

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import functions as F

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included?   
[Kaggle Berkeley Earth Climate Change: Earth Surface Temperature Data
Exploring global temperatures since 1750](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data)   
Fact Table Source: [GlobalLandTemperaturesByCity.csv](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalLandTemperaturesByCity.csv)
- dt (object)  
- AverageTemperature (float64)  
- AverageTemperatureUncertainty (float64)  
- City (object)  
- Country (object)  
- Latitude (object)  
- Longitude (object)

Source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalTemperatures.csv
- dt (object)
- LandAverageTemperature (float64)
- LandAverageTemperatureUncertainty (float64)
- LandMaxTemperature (float64)
- LandMaxTemperatureUncertainty (float64)
- LandMinTemperature (float64)
- LandMinTemperatureUncertainty (float64)
- LandAndOceanAverageTemperature (float64)
- LandAndOceanAverageTemperatureUncertainty (float64) 

In [14]:
# Read in the data here
file_name = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(file_name)

In [3]:
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
dt                               object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                             object
Country                          object
Latitude                         object
Longitude                        object
dtypes: float64(2), object(5)
memory usage: 459.2+ MB


In [2]:
file_name = 'GlobalTemperatures.csv'
df_temp = pd.read_csv(file_name)
df_temp.head()

Unnamed: 0,dt,LandAverageTemperature,LandAverageTemperatureUncertainty,LandMaxTemperature,LandMaxTemperatureUncertainty,LandMinTemperature,LandMinTemperatureUncertainty,LandAndOceanAverageTemperature,LandAndOceanAverageTemperatureUncertainty
0,1750-01-01,3.034,3.574,,,,,,
1,1750-02-01,3.083,3.702,,,,,,
2,1750-03-01,5.626,3.076,,,,,,
3,1750-04-01,8.49,2.451,,,,,,
4,1750-05-01,11.573,2.072,,,,,,


In [3]:
df_temp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3192 entries, 0 to 3191
Data columns (total 9 columns):
dt                                           3192 non-null object
LandAverageTemperature                       3180 non-null float64
LandAverageTemperatureUncertainty            3180 non-null float64
LandMaxTemperature                           1992 non-null float64
LandMaxTemperatureUncertainty                1992 non-null float64
LandMinTemperature                           1992 non-null float64
LandMinTemperatureUncertainty                1992 non-null float64
LandAndOceanAverageTemperature               1992 non-null float64
LandAndOceanAverageTemperatureUncertainty    1992 non-null float64
dtypes: float64(8), object(1)
memory usage: 224.5+ KB


### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.
- The data contain a lot NaN value   

#### Cleaning Steps
Document steps necessary to clean the data
- perform dropna() to clean the data

In [5]:
df.isna().sum()

dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64

In [15]:
# Performing cleaning tasks here
df.dropna(inplace=True)
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
5,1744-04-01,5.788,3.624,Århus,Denmark,57.05N,10.33E
6,1744-05-01,10.644,1.283,Århus,Denmark,57.05N,10.33E
7,1744-06-01,14.051,1.347,Århus,Denmark,57.05N,10.33E
8,1744-07-01,16.082,1.396,Århus,Denmark,57.05N,10.33E


In [7]:
df.describe()

Unnamed: 0,AverageTemperature,AverageTemperatureUncertainty
count,8235082.0,8235082.0
mean,16.72743,1.028575
std,10.35344,1.129733
min,-42.704,0.034
25%,10.299,0.337
50%,18.831,0.591
75%,25.21,1.349
max,39.651,15.396


In [4]:
df_temp.dropna(inplace=True)
df_temp.describe()

Unnamed: 0,LandAverageTemperature,LandAverageTemperatureUncertainty,LandMaxTemperature,LandMaxTemperatureUncertainty,LandMinTemperature,LandMinTemperatureUncertainty,LandAndOceanAverageTemperature,LandAndOceanAverageTemperatureUncertainty
count,1992.0,1992.0,1992.0,1992.0,1992.0,1992.0,1992.0,1992.0
mean,8.571583,0.276663,14.350601,0.479782,2.743595,0.431849,15.212566,0.128532
std,4.263193,0.22403,4.309579,0.583203,4.155835,0.445838,1.274093,0.073587
min,0.404,0.034,5.9,0.044,-5.407,0.045,12.475,0.042
25%,4.43,0.09975,10.212,0.142,-1.3345,0.155,14.047,0.063
50%,8.8505,0.23,14.76,0.252,2.9495,0.279,15.251,0.122
75%,12.8585,0.34725,18.4515,0.539,6.77875,0.45825,16.39625,0.151
max,15.482,1.492,21.32,4.373,9.715,3.498,17.611,0.457


In [5]:
df_temp.isna().sum()

dt                                           0
LandAverageTemperature                       0
LandAverageTemperatureUncertainty            0
LandMaxTemperature                           0
LandMaxTemperatureUncertainty                0
LandMinTemperature                           0
LandMinTemperatureUncertainty                0
LandAndOceanAverageTemperature               0
LandAndOceanAverageTemperatureUncertainty    0
dtype: int64

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

Fact Table Source: [GlobalLandTemperaturesByCity.csv](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalLandTemperaturesByCity.csv)
- dt (object)  
- AverageTemperature (float64)  
- AverageTemperatureUncertainty (float64)  
- City (object)  
- Country (object)  
- Latitude (object)  
- Longitude (object)  

Source: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data?select=GlobalTemperatures.csv
- dt (object)
- LandAverageTemperature (float64)
- LandAverageTemperatureUncertainty (float64)
- LandMaxTemperature (float64)
- LandMaxTemperatureUncertainty (float64)
- LandMinTemperature (float64)
- LandMinTemperatureUncertainty (float64)
- LandAndOceanAverageTemperature (float64)
- LandAndOceanAverageTemperatureUncertainty (float64)  

### Dimension Table  
Dimension_table_GlobalLandTemperaturesByCountry.csv    
- dt (object) 
- year (int) create from dt  
- month (int) create from dt
- AverageTemperature (float64)  
- AverageTemperatureUncertainty (float64)    
- Country (object)  
    
Dimension_table_GlobalLandTemperaturesByCity.csv
- dt (object)  
- AverageTemperature (float64)  
- AverageTemperatureUncertainty (float64)  
- City (object)
- LandAndOceanAverageTemperature (float64) join from GlobalTemperatures.csv
- year (int) create from dt  
- month (int) create from dt

#### 3.2 Mapping Out Data Pipelines  
<img src="starSchema.png">

In [17]:
# Write code here
import pyspark

spark = pyspark.sql.SparkSession.builder.appName("Capstone").getOrCreate()

In [18]:
df = spark.read.option("header",True).csv("../../data2/GlobalLandTemperaturesByCity.csv", inferSchema = True)
df_temp = spark.read.option("header",True).csv("GlobalTemperatures.csv", inferSchema = True)

In [19]:
df.limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [20]:
df_temp.limit(5).toPandas()

Unnamed: 0,dt,LandAverageTemperature,LandAverageTemperatureUncertainty,LandMaxTemperature,LandMaxTemperatureUncertainty,LandMinTemperature,LandMinTemperatureUncertainty,LandAndOceanAverageTemperature,LandAndOceanAverageTemperatureUncertainty
0,1750-01-01,3.034,3.574,,,,,,
1,1750-02-01,3.083,3.702,,,,,,
2,1750-03-01,5.626,3.076,,,,,,
3,1750-04-01,8.49,2.451,,,,,,
4,1750-05-01,11.573,2.072,,,,,,


In [21]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.functions import month
from pyspark.sql.functions import col
    
df.withColumn("dt",to_timestamp(col("dt"))).withColumn("year", date_format(col("dt"), "Y")).withColumn("dt",to_timestamp(col("dt"))).withColumn("month", date_format(col("dt"), "M")).withColumn("Latitude", F.translate(F.col("Latitude"), "N", "").alias("Latitude")).withColumn("Longitude", F.translate(F.col("Longitude"), "E", "").alias("Longitude")).drop('Latitude', 'Longitude','City').limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,Country,year,month
0,1743-11-01,6.068,1.737,Denmark,1743,11
1,1743-12-01,,,Denmark,1743,12
2,1744-01-01,,,Denmark,1744,1
3,1744-02-01,,,Denmark,1744,2
4,1744-03-01,,,Denmark,1744,3


In [22]:
df.join(df_temp, df.dt == df_temp.dt).select(df["*"],df_temp["LandAndOceanAverageTemperature"]).drop('Latitude', 'Longitude','Country').withColumn("dt",to_timestamp(col("dt"))).withColumn("year", date_format(col("dt"), "Y")).withColumn("dt",to_timestamp(col("dt"))).withColumn("month", date_format(col("dt"), "M")).limit(5).toPandas()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,LandAndOceanAverageTemperature,year,month
0,1750-01-01,1.699,1.013,Århus,,1750,1
1,1750-02-01,3.961,2.361,Århus,,1750,2
2,1750-03-01,5.182,3.48,Århus,,1750,3
3,1750-04-01,7.197,0.732,Århus,,1750,4
4,1750-05-01,10.634,1.351,Århus,,1750,5


### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

Import clean_data using pyspark

In [23]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_timestamp,date_format
from pyspark.sql.functions import month
from pyspark.sql.functions import col

def read_data(filename):
    df = spark.read.option("header",True).csv(filename, inferSchema = True)
    return df

def clean_df(df):
    return df.dropna()

def create_dimension_tables(df, df_temp):
    df.withColumn("dt",to_timestamp(col("dt"))).withColumn("year", date_format(col("dt"), "Y")).withColumn("dt",to_timestamp(col("dt"))).withColumn("month", date_format(col("dt"), "M")).withColumn("Latitude", F.translate(F.col("Latitude"), "N", "").alias("Latitude")).withColumn("Longitude", F.translate(F.col("Longitude"), "E", "").alias("Longitude")).drop('Latitude', 'Longitude','City').limit(5).toPandas().to_csv('dimension_table_avg_country_temp.csv',index=False)
    df.join(df_temp, df.dt == df_temp.dt).select(df["*"],df_temp["LandAndOceanAverageTemperature"]).drop('Latitude', 'Longitude','Country').withColumn("dt",to_timestamp(col("dt"))).withColumn("year", date_format(col("dt"), "Y")).withColumn("dt",to_timestamp(col("dt"))).withColumn("month", date_format(col("dt"), "M")).limit(5).toPandas().to_csv('dimension_table_avg_city_temp.csv',index=False)
    return df

In [24]:
def etl(file1, file2):
    df = read_data(file1)
    df_temp = read_data(file2)
    df = clean_df(df)
    df_temp = clean_df(df_temp)
    create_dimension_tables(df, df_temp)
    print("etl job done")

In [25]:
file1 = '../../data2/GlobalLandTemperaturesByCity.csv'
file2 ='GlobalTemperatures.csv'
etl(file1, file2)

etl job done


#### 4.2 Data Quality Checks  

We'll perform four data quality checks to ensure the pipeline run as it expected:

- Checks if the dataframe is not empty.
- Checks if the dataframe has the expected ammount of columns. 
- Checks if the given keys are a unique key in the dataframe.
- Checks if the dataframe has the expected dtypes.

In [26]:
# Perform quality checks here
def quality_check(file, columns_num):
    df = spark.read.option("header",True).csv(file, inferSchema = True)
    if df.count()==0:
        raise ValueError("DataFrame is empty!")
    else:
        print("The dataframe is not empty.")
        
    if len(df.columns) == columns_num:
        print("The dataframe has the expected ammount of columns.")
    else:
        raise ValueError("The dataframe does not has the expected ammount of columns!")

    print("File {}:".format(file)+"Quality Check Pass")

In [27]:
file = 'dimension_table_avg_country_temp.csv'
quality_check(file, columns_num=6)

The dataframe is not empty.
The dataframe has the expected ammount of columns.
File dimension_table_avg_country_temp.csv:Quality Check Pass


In [28]:
file = 'dimension_table_avg_city_temp.csv'
quality_check(file, columns_num=7)

The dataframe is not empty.
The dataframe has the expected ammount of columns.
File dimension_table_avg_city_temp.csv:Quality Check Pass


#### 4.3 Data dictionary 
The data dictionary file is in readme.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
     - Let us imagine we are deploying a AWS EMR Start Cluster instance .xlarge with its linearly scalable, we may simply add the number of clusters to increase the performance thus we will be able to adjust the size and number of clusters as we see fit. if the data increased by 100x, we can save data to S3 and switch from a .xlarg to .10xlarge. This one has 640 GiB should be be able to handle it. Another way is to split the data into smaller chunks that can run in parallel thus make the process faster and more efficient.  
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
     - The Airflow helps us monitored and scheduler the runtime to avoid running past the 7am. The output data must be stored in an accessible database or accessible in S3 allow the data to search every day  if there is new data coming. Assume the pipeline takes approximate 1h to run. Then a schedule should be set for running the pipeline every night at 5am leave a buffer of 1h or combination of Airflow + Spark + Apache Livy in EMR cluster so that Spark commands can be passed through an API interface.
 * The database needed to be accessed by 100+ people.
     -  Amazon Redshift Clusters are scaleable with elastic resize such that when ever the database or data warehouse runs the risk of not response the requests anymore, its performance could be increased to handle requests of the authorized 100+ people. Another way is with Amazon RDS, we can deploy scalable PostgreSQL DBs. Through AWS IAM features we can manage user access to our clusters. We could even use something like Amazon Cognito to manage user accounts. It allows us to create a custom sign-up + login page as well as a user management system.