# Project Title
### WORLD CITIES LAND TEMPERATURE ETL & DATA WAREHOUSE

#### Project Summary
ETL Pipeline to Data Warehouse in AWS Redshift 

The project follows below steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
from helpers.get_kaggle_data import kaggle_download
from helpers.upload_to_s3 import upload_file
from helpers.data_quality_checks import data_quality_check, data_integrity_check
from helpers.sql import sql_create_tables, staging_insert, sql_drop_tables, sql_insert_tables
import pandas as pd
import os
import psycopg2
from configparser import ConfigParser

### Step 1: Scope the Project and Gather Data

#### Scope 

A Data Warehouse will be created to store clean organize data that will be accesable to multiple users on a AWS Redshift cluster. 

The ETL Pipeline will work as described below:

- A Kaggle API will be used to get and download the data
- Pandas will be used to explore and clean the data
- Dataframe will be saved as CSV file and uploaded to AWS s3 bucket
- Data will be imported to AWS Redshift
- Data quality tests will be carried out to make sure the data exists and is available 

#### The Data:

The source of the data used for this project comes from Kaggle [Climate Change: Earth Surface Temperature Data Set](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

"Some say climate change is the biggest threat of our age while others say it’s a myth based on dodgy science. We are turning some of the data over to you so you can form your own view."

Date: starts in 1750 for average land temperature and 1850 for max and min land temperatures and global ocean and land temperatures

Files Included:
- Global Land Temperatures By Major City (GlobalLandTemperaturesByMajorCity.csv)
- Global Land Temperatures By City (GlobalLandTemperaturesByCity.csv)


The raw data comes from the [Berkeley Earth data page](http://berkeleyearth.org/data/). 

In [2]:
# Download Data from Kaggle API
# You must have an account with Kaggle
kaggle_download('berkeleyearth/climate-change-earth-surface-temperature-data')

In [12]:
# Read the data
directory = os.path.realpath("..") + '/global-land-temp-dw/'
tables = ['GlobalLandTemperaturesByMajorCity.csv', 'GlobalLandTemperaturesByCity.csv']

df_major_city = pd.read_csv(directory + "data/" + tables[0])
df_city = pd.read_csv(directory + "data/" + tables[1])

In [4]:
print("DF for info major cities")
df_major_city.info()

print("\nDF info for cities")
df_city.info()

DF for info major cities
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 239177 entries, 0 to 239176
Data columns (total 7 columns):
 #   Column                         Non-Null Count   Dtype  
---  ------                         --------------   -----  
 0   dt                             239177 non-null  object 
 1   AverageTemperature             228175 non-null  float64
 2   AverageTemperatureUncertainty  228175 non-null  float64
 3   City                           239177 non-null  object 
 4   Country                        239177 non-null  object 
 5   Latitude                       239177 non-null  object 
 6   Longitude                      239177 non-null  object 
dtypes: float64(2), object(5)
memory usage: 12.8+ MB

DF info for cities
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8599212 entries, 0 to 8599211
Data columns (total 7 columns):
 #   Column                         Dtype  
---  ------                         -----  
 0   dt                             object 

### Step 2: Data Wrangling

In [5]:
# Null/Missing Values
df_city.isnull().sum()

print("Null/Missing Values for major cities")
print(df_major_city.isnull().sum())

print("\nNull/Missing Values for cities")
print(df_city.isnull().sum())

Null/Missing Values for major cities
dt                                   0
AverageTemperature               11002
AverageTemperatureUncertainty    11002
City                                 0
Country                              0
Latitude                             0
Longitude                            0
dtype: int64

Null/Missing Values for cities
dt                                    0
AverageTemperature               364130
AverageTemperatureUncertainty    364130
City                                  0
Country                               0
Latitude                              0
Longitude                             0
dtype: int64


In [6]:
# Drop NA/missing values This data requires all values to be available
def drop_na(pd_df):

    print(f'Number of rows before removing NA: {pd_df.shape[0]:,}')

    pd_df.dropna(axis=0, inplace=True)

    print(f'Number of rows after removing NA: {pd_df.shape[0]:,}')


drop_na(df_major_city)
drop_na(df_city)


Number of rows before removing NA: 239,177
Number of rows after removing NA: 228,175
Number of rows before removing NA: 8,599,212
Number of rows after removing NA: 8,235,082


In [7]:
# Add column for Boolean Major city
df_major_city = df_major_city.assign(major_city='true')
df_city = df_city.assign(major_city='false')



In [8]:
# Concatenate both datasets
dframes = [df_major_city, df_city]
df = pd.concat(dframes, ignore_index=True)

In [9]:
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,major_city
0,1849-01-01,26.704,1.435,Abidjan,Côte D'Ivoire,5.63N,3.23W,True
1,1849-02-01,27.434,1.362,Abidjan,Côte D'Ivoire,5.63N,3.23W,True
2,1849-03-01,28.101,1.612,Abidjan,Côte D'Ivoire,5.63N,3.23W,True
3,1849-04-01,26.14,1.387,Abidjan,Côte D'Ivoire,5.63N,3.23W,True
4,1849-05-01,25.427,1.2,Abidjan,Côte D'Ivoire,5.63N,3.23W,True


In [10]:
# Sort data by year and reset index
df.sort_values(by='dt',inplace=True)

df.reset_index(drop=True, inplace=True)

In [11]:
df.head(1000)


Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude,major_city
0,1743-11-01,7.067,1.839,West Bromwich,United Kingdom,52.24N,2.63W,false
1,1743-11-01,2.840,1.887,Lvov,Ukraine,50.63N,24.08E,false
2,1743-11-01,7.807,1.816,Venice,Italy,45.81N,12.69E,false
3,1743-11-01,6.176,2.264,Stara Zagora,Bulgaria,42.59N,26.18E,false
4,1743-11-01,7.541,1.753,Luton,United Kingdom,52.24N,0.00W,false
...,...,...,...,...,...,...,...,...
995,1744-04-01,8.810,2.688,Mazyr,Belarus,52.24N,28.91E,false
996,1744-04-01,12.943,2.166,Edirne,Turkey,42.59N,26.18E,false
997,1744-04-01,11.596,2.044,Aix En Provence,France,44.20N,4.47E,false
998,1744-04-01,8.296,2.501,Luton,United Kingdom,52.24N,0.00W,false


In [12]:
# Confirm Data types are as expected
df.dtypes

dt                                object
AverageTemperature               float64
AverageTemperatureUncertainty    float64
City                              object
Country                           object
Latitude                          object
Longitude                         object
major_city                        object
dtype: object

In [13]:
# Save data to CSV to load into SQL redshift
df.to_csv(directory + "data/dataset.csv")

In [14]:
# Get Bucket Name from configurations & upload file to S3
config = ConfigParser()
config.read('DW.cfg')
file_log = config.get('S3', 'log_data')

upload_file(directory + "data/dataset.csv", file_log, "logs/log_data.csv")


True

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model

For this project, a Star Schema was selected because of it simple style and its effectiveness to handle simple queries.

![Data Model](media/data_model.png 'Data Model')

#### 3.2 Mapping Out Data Pipelines
Necessary steps to pipeline the data into the chosen data model

- A Kaggle API is used to get and download the data
- Pandas is used to explore and clean the data
- Dataframe is saved as CSV file and uploaded to AWS s3 bucket
- Data is imported to AWS Redshift
- Data quality tests are carried out to make sure the data exists and is available 

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model

In [2]:
# Get Database Variables
config = ConfigParser()
config.read('DW.cfg')

print("{}, {}, {}, {}, {}".format(*config['CLUSTER'].values()))


# Connect to redshift database
conn = psycopg2.connect("""host={} 
                           dbname={} 
                           user={} 
                           password={}
                           port={}"""\
                           .format(*config['CLUSTER'].values()    
))

# conn.set_session(autocommit=True)
cur = conn.cursor()


dwhcluster.cedlo7g6palf.eu-west-1.redshift.amazonaws.com, landtempdb, dwhuser, Passw0rd, 5439


In [3]:
# Drop all table if exists
for query in sql_drop_tables:
    cur.execute(query)
    conn.commit()

In [4]:
# Create tables
for query in sql_create_tables:
    cur.execute(query)
    conn.commit()


In [5]:
# Copy data to Staging
s3_bucket = "s3://{}/logs/".format(config.get('S3','log_data'))

sql_query = staging_insert.format(
    s3_bucket,
    config.get('IAM_ROLE','ARN'),
    config.get('CLUSTER','dwh_region')
    )
# print(sql_query)
cur.execute(sql_query)
conn.commit()

In [6]:
# Insert data from staging to facts and dimensions
for query in sql_insert_tables:
    # print(query)
    cur.execute(query)
    conn.commit()

#### 4.2 Data Quality Checks

The Data pipeline has the following data Inegrity Constraints:
- Each Column have a Data Type constraints
- Duplicates are avoided by using runing a subquery to check if that value is not already in the table i.e. `WHERE dt NOT IN (SELECT DISTINCT dt FROM time)`

Data in the Schema is evaluated that table exists and it has rows
Data Types for all columns are tested to confirm they are as expected
 


In [7]:
# Data Quality check Data data types:
reading_by_city_data_types = [
    'int',
    'datetime.date',
    'decimal.Decimal',
    'decimal.Decimal',
    'decimal.Decimal',
    'bool'
]

cities_data_types = [
    'int',
    'str',
    'str',
    'str',
    'str',
    'bool'
]

dt_data_types = [
    'datetime.date',
    'int',
    'int',
    'int',
    'int',
    'int'
]

# Tables Data Types List 
tables_data_types = [
    reading_by_city_data_types,
    cities_data_types, 
    dt_data_types
]

# Tables to Check
tables = ['readings_by_city', 'cities', 'time']


In [8]:

for table in tables:

    data_quality_check(conn, table)

Data Quality SUCCESS for table: readings_by_city. Total rows: 8609236
Data Quality SUCCESS for table: cities. Total rows: 3610
Data Quality SUCCESS for table: time. Total rows: 3167


In [9]:
# Data Integrity Check for Data Types

for table, data_types in zip(tables, tables_data_types):

    data_integrity_check(conn, table, data_types)


Data Integrity SUCCESS for table: readings_by_city
Data Integrity SUCCESS for table: cities
Data Integrity SUCCESS for table: time


#### 4.3 Data dictionary

![Data Dictionary](media/data_dict.JPG 'Data Dictionary')

In [10]:
data_dictionary = [
    {   
        'table':'Dimension cities',
        'field name': 'city_id',
        'data type': 'BIGINT IDENTITY(0,1)',
        'data format': 'NNNNNNN',
        'field size': 'VARIABLE',
        'description':'Primary Key Index number for each row',
        'example':'03'
    },
    {
        'table':'Dimension cities',
        'field name': 'city',
        'data type': 'TEXT',
        'data format': '',
        'field size': 'VARIABLE',
        'description':'city name, comes from staging_events',
        'example':'Barcelona'
    },
    {
        'table':'Dimension cities',
        'field name': 'country',
        'data type': 'TEXT',
        'data format': '',
        'field size': 'VARIABLE',
        'description':'country name, comes from staging_events',
        'example':'Venezuela'
    },
    {
        'table':'Dimension cities',
        'field name': 'latitude',
        'data type': 'TEXT',
        'data format': 'NN.NT',
        'field size': 'VARIABLE',
        'description':'city latitude, comes from staging_events',
        'example':'12.5N'
    },
    {
        'table':'Dimension cities',
        'field name': 'longitude',
        'data type': 'TEXT',
        'data format': 'NN.NT',
        'field size': 'VARIABLE',
        'description':'city longitude, comes from staging_events',
        'example':'12.5E'
    },
    {
        'table':'Dimension cities',
        'field name': 'major_city',
        'data type': 'BOOLEAN',
        'data format': 'true/false',
        'field size': '5',
        'description':'Boolean that identifies if city is major or not from staging_events',
        'example':'true'
    },
    {
        'table':'Dimension time',
        'field name': 'dt',
        'data type': 'DATE',
        'data format': 'YYYY-MM-DD',
        'field size': '10',
        'description':'date of temperature measure, comes from staging_events',
        'example':'2018-02-07'
    },
    {
        'table':'Dimension time',
        'field name': 'day',
        'data type': 'SMALLINT',
        'data format': 'NN',
        'field size': '2',
        'description':'day of temperature measure, comes from column dt',
        'example':'07'
    },
    {
        'table':'Dimension time',
        'field name': 'month',
        'data type': 'SMALLINT',
        'data format': 'NN',
        'field size': '2',
        'description':'month of temperature measure, comes from column dt',
        'example':'02'
    },
    {
        'table':'Dimension time',
        'field name': 'week',
        'data type': 'SMALLINT',
        'data format': 'NN',
        'field size': '2',
        'description':'week of temperature measure, comes from column dt',
        'example':'02'
    },
    {
        'table':'Dimension time',
        'field name': 'weekday',
        'data type': 'SMALLINT',
        'data format': 'NN',
        'field size': '2',
        'description':'week day number of temperature measure, comes from column dt',
        'example':'02'
    },
    {
        'table':'Dimension time',
        'field name': 'year',
        'data type': 'SMALLINT',
        'data format': 'NNNN',
        'field size': '4',
        'description':'year of temperature measure, comes from column dt',
        'example':'2018'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'by_city_id',
        'data type': 'BIGINT IDENTITY(0,1)',
        'data format': 'NNNNNN',
        'field size': 'VARIABLE',
        'description':'Primary Key Index number for each row',
        'example':'2'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'city_id',
        'data type': 'NUMERIC',
        'data format': 'NNNNNN',
        'field size': 'VARIABLE',
        'description':'Foreign Key Index number from cities table',
        'example':'4525'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'avg_temp',
        'data type': 'NUMERIC(7,3)',
        'data format': 'NNNN.NNN',
        'field size': '7',
        'description':'temperature reading comes from staging_events',
        'example':'25.025'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'avg_temp_uncertainty',
        'data type': 'NUMERIC(7,3)',
        'data format': 'NNNN.NNN',
        'field size': '7',
        'description':'temperature uncertainty reading comes from staging_events',
        'example':'1.025'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'date',
        'data type': 'DATE',
        'data format': 'YYYY-MM-DD',
        'field size': '7',
        'description':'date of temperature measure, comes from staging_events',
        'example':'2018-02-07'
    },
    {
        'table':'Fact readings_by_city',
        'field name': 'major_city',
        'data type': 'BOLEAN',
        'data format': '',
        'field size': '5',
        'description':'Boolean that identifies if city is major or not from staging_events',
        'example':'true'
    },
    
    
       
]

In [13]:
data_dic_df = pd.DataFrame(data_dictionary)

data_dic_df.to_csv(directory + "media/data_dict.csv")

data_dic_df

Unnamed: 0,table,field name,data type,data format,field size,description,example
0,Dimension cities,city_id,"BIGINT IDENTITY(0,1)",NNNNNNN,VARIABLE,Primary Key Index number for each row,03
1,Dimension cities,city,TEXT,,VARIABLE,"city name, comes from staging_events",Barcelona
2,Dimension cities,country,TEXT,,VARIABLE,"country name, comes from staging_events",Venezuela
3,Dimension cities,latitude,TEXT,NN.NT,VARIABLE,"city latitude, comes from staging_events",12.5N
4,Dimension cities,longitude,TEXT,NN.NT,VARIABLE,"city longitude, comes from staging_events",12.5E
5,Dimension cities,major_city,BOOLEAN,true/false,5,Boolean that identifies if city is major or no...,true
6,Dimension time,dt,DATE,YYYY-MM-DD,10,"date of temperature measure, comes from stagin...",2018-02-07
7,Dimension time,day,SMALLINT,NN,2,"day of temperature measure, comes from column dt",07
8,Dimension time,month,SMALLINT,NN,2,"month of temperature measure, comes from colum...",02
9,Dimension time,week,SMALLINT,NN,2,"week of temperature measure, comes from column dt",02


#### Step 5: Complete Project Write Up
Clearly state the rationale for the choice of tools and technologies for the project.

- Kaggle API: This allows the pipeline to run completely on scripts.

- AWS Redshift & PostgreSQL: Redshift makes it easy to import data from s3 and enables you to mantain fast query performance. 

- AWS s3: Provides a platform for storage of data in the form of objects. 

- Anaconda3: Use for managing Python dependencies and working virtual environment


Propose how often the data should be updated and why.
- As per documentation from the Climate Change: Earth Surface Temperature Data Set, this can be updated on a monthly basis. as this is how often the source is updated.

Write a description of how you would approach the problem differently under the following scenarios:
- The data was increased by 100x.

    - In case of an increased of size by magnitude of 100x, using pandas to manipulate the data would not pre practical, in this instance Spark would be the better choice and data should be partitioned

- The data populates a dashboard that must be updated on a daily basis by 7am every day.

    - For this specific data set, the data will not change daily, but assiming this scenario, once the data is available in the DW, it can be accessed (i.e. using a python script to obtain the data and create the dashboar.) Assuming daily updates a scheduler tool can be use like Apache Airflow.

- The database needed to be accessed by 100+ people.

    - Depending on the settings used to create the AWS Redshift cluster, the Data Warehouse can be configured to allow access to as much as users as required.