## Capstone Project : Global Airport Ticket (GAT)
###  Udacity Data Engineering Nanodegree 
#### Data Warehouse Modeling use Postgre SQL with Start Schema

#### Project Summary
The objective of this project was to create an ETL pipeline for the Global Airport Ticket Data Contains view in 1993 (Q1:Q4) summary characteristics of each domestic itinerary on the Origin and Destination Survey, including the reporting carrier, itinerary fare, a number of passengers, originating airport, roundtrip indicator, and miles flown, global temperatures , global airport, and global ISO countries datasets to form an analytics database on travel events and find travel patterns the globe.  

##### The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

We need to create ETL for converting numbers of CSV files to Data Warehouse Modeling using Postgre SQL with Star Schema for Airport Ticket Data Contains view in 1991-Q1 for date analytics database on travel events and find travel patterns the globe. 

### DataSet

##### I  ) Global ISO Countries
file name: Countries.csv

source file: https://public.opendatasoft.com/explore/dataset/world-administrative-boundaries-countries/export/

#####  II ) Global Airport
file name: Airports.csv

source file: https://www.transtats.bts.gov/Download_Lookup.asp?Y11x72=Y_NVecbeg
#####  III  ) Global Airport Ticket 1993 [Q1,Q2,Q3,4]
file name:  Ticket1993Q1.csv,Ticket1993Q2.csv,
            Ticket1993Q3.csv,Ticket1993Q4.csv

source file: https://www.transtats.bts.gov/DL_SelectFields.asp?gnoyr_VQ=FKF&QO_fu146_anzr=b4vtv0%20n0q%20Qr56v0n6v10%20f748rB

In [1]:
# Do all imports and installs here
import psycopg2 # PostgreSQL database adapter for Python
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT # <-- ADD THIS LINE
import configparser # to work with the configuration file
import pandas as pd
from datetime import datetime
import os

config = configparser.ConfigParser()
config.read('dwh.cfg') # edit this file to include your own values.

pd.options.display.max_columns = None # allow the user to see all of the data requested on screen
pd.options.display.max_rows = 100 # allow the user to see all of the data requested on screen           

In [2]:
import DAL as dal # database access layer
import UDF as udf # user-defined functions

In [3]:
# read config values
HOST=config['database']['HOST']
DB_NAME=config['database']['DB_NAME']
DB_USER=config['database']['DB_USER']
DB_PASSWORD=config['database']['DB_PASSWORD']
csv_path=config['other']['csv_path']

#### Connect Database in  postger

In [None]:
"""
Connect to postgres database, execute query,close connection.
DDL,DML
"""

try:
    dal.create_database()
except Exception as e:
    print('Error: ',e)    

### Step 2: Explore and Assess the Data
#### Explore Sample Data top 1000 rows
Identify data quality issues, like missing values, duplicate data, etc.

In [6]:
# Countries 
df= pd.read_csv("{}Countries.csv".format(csv_path),sep=";", nrows=1000)
df.head()

Unnamed: 0,Geo Point,Geo Shape,ISO 3 country code,Region Code,Region Name,Preferred Term,French Name,English Term,Spanish Term,Chinese Term,Sub-region Code,ISO 2 country code,Russian Term,Sub-region Name,Arabic Term
0,"61.9461348064,96.577692508","{""coordinates"": [[[[146.68274, 43.70777], [146...",RUS,150,Europe,Russian Federation,Fédération de Russie (la),Russia,Federación de Rusia (la),俄罗斯联邦,151,RU,Российская Федерация,Eastern Europe,الاتحاد الروسي
1,"7.50188241413,134.568686878","{""coordinates"": [[[[134.51614, 7.34326], [134....",PLW,9,Oceania,Palau,Palaos (les),Palau,Palau,帕劳,57,PW,Палау,Micronesia,بالاو
2,"-8.82297620856,125.853675172","{""coordinates"": [[[[124.0461, -9.34], [124.066...",TLS,142,Asia,Timor-Leste,Timor-Leste (le),Timor-Leste,Timor-Leste,东帝汶,35,TL,Тимор-Лешти,South-eastern Asia,تيمور- ليشتي
3,"-20.402920373,-174.836286272","{""coordinates"": [[[[-175.24524, -21.12978], [-...",TON,9,Oceania,Tonga,Tonga (les) [fém.],Tonga,Tonga,汤加,61,TO,Тонга,Polynesia,تونغا
4,"-17.2593235217,35.5514219184","{""coordinates"": [[[[32.89043, -26.84714], [32....",MOZ,2,Africa,Mozambique,Mozambique (le),Mozambique,Mozambique,莫桑比克,202,MZ,Мозамбик,Sub-Saharan Africa,موزامبيق


In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 203 entries, 0 to 202
Data columns (total 15 columns):
Geo Point             203 non-null object
Geo Shape             203 non-null object
ISO 3 country code    203 non-null object
Region Code           203 non-null int64
Region Name           203 non-null object
Preferred Term        203 non-null object
French Name           195 non-null object
English Term          203 non-null object
Spanish Term          195 non-null object
Chinese Term          195 non-null object
Sub-region Code       203 non-null int64
ISO 2 country code    202 non-null object
Russian Term          195 non-null object
Sub-region Name       203 non-null object
Arabic Term           195 non-null object
dtypes: int64(2), object(13)
memory usage: 23.9+ KB


step:-

-dimensions convert grouping columns: [(Region_Code,Region_Name)]

In [8]:
# Airports 
df= pd.read_csv("{}airports.csv".format(csv_path), nrows=1000)
df.head()

Unnamed: 0,Code,Description
0,01A,"Afognak Lake, AK: Afognak Lake Airport"
1,03A,"Granite Mountain, AK: Bear Creek Mining Strip"
2,04A,"Lik, AK: Lik Mining Camp"
3,05A,"Little Squaw, AK: Little Squaw Airport"
4,06A,"Kizhuyak, AK: Kizhuyak Bay"


In [9]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 2 columns):
Code           1000 non-null object
Description    1000 non-null object
dtypes: object(2)
memory usage: 15.7+ KB


step:-

-spilt Description column to city,cuntry, airport name

-convert to dimensions

In [10]:
# Ticket 
df= pd.read_csv("{}Ticket/Ticket1993Q1.csv".format(csv_path), nrows=1000)
df.head()
    

Unnamed: 0,ItinID,Coupons,Year,Quarter,Origin,OriginAirportID,OriginAirportSeqID,OriginCityMarketID,OriginCountry,OriginStateFips,OriginState,OriginStateName,OriginWac,RoundTrip,OnLine,DollarCred,FarePerMile,RPCarrier,Passengers,ItinFare,BulkFare,Distance,DistanceGroup,MilesFlown,ItinGeoType,Unnamed: 25
0,199311407485,4,1993,1,OMA,13871,1387101,33316,US,31,NE,Nebraska,65,1.0,1.0,1,0.2215,UA,1.0,509.0,0.0,2298.0,5,2298.0,2,
1,199311407486,4,1993,1,OMA,13871,1387101,33316,US,31,NE,Nebraska,65,1.0,1.0,1,0.131,UA,2.0,301.0,0.0,2298.0,5,2298.0,2,
2,199311407487,4,1993,1,OMA,13871,1387101,33316,US,31,NE,Nebraska,65,1.0,1.0,1,0.1171,UA,1.0,269.0,0.0,2298.0,5,2298.0,2,
3,199311407488,4,1993,1,OMA,13871,1387101,33316,US,31,NE,Nebraska,65,1.0,1.0,1,0.0013,UA,1.0,3.0,0.0,2298.0,5,2298.0,2,
4,199311407489,4,1993,1,OMA,13871,1387101,33316,US,31,NE,Nebraska,65,1.0,1.0,1,0.1088,UA,1.0,250.0,0.0,2298.0,5,2298.0,2,


In [11]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 26 columns):
ItinID                1000 non-null int64
Coupons               1000 non-null int64
Year                  1000 non-null int64
Quarter               1000 non-null int64
Origin                1000 non-null object
OriginAirportID       1000 non-null int64
OriginAirportSeqID    1000 non-null int64
OriginCityMarketID    1000 non-null int64
OriginCountry         1000 non-null object
OriginStateFips       1000 non-null int64
OriginState           1000 non-null object
OriginStateName       1000 non-null object
OriginWac             1000 non-null int64
RoundTrip             1000 non-null float64
OnLine                1000 non-null float64
DollarCred            1000 non-null int64
FarePerMile           1000 non-null float64
RPCarrier             1000 non-null object
Passengers            1000 non-null float64
ItinFare              1000 non-null float64
BulkFare              1000 non-null flo

step:-

-dimensions convert grouping columns: [(Ryear, quarter)]
                            
-fact [year, quarter,Origin,origincountry,
      originstate,Passengers,RoundTrip]                            

### we need to create analytics for  travel patterns around the globe for these columns  :
    [number_passengers, round_trip,online,dollar_cred,bulk_fare ]

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
-Data Warehouse with star schema dimensions and fact table

-Chose that model it's easy to work and  low cost

#### 3.2 Mapping Out Data Pipelines
-copy all csv file to storage tables on DB

-cleaning tasks after ETL on storage tables

-create dimensions and fact table

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model for storage
Build the data pipelines to create the data model for storage.
##### Create and copy from CSV to storage

In [12]:
try:
    print('create Airports')
    col_list=udf.create_schema_from_csv('{}airports.csv'.format(csv_path), \
                                        'stg_airports', \
                                        True, \
                                        ",")
    print('copy Airports.csv')
    udf.copy_data_from_csv('{}airports.csv'.format(csv_path), \
                           'stg_airports', \
                           col_list, \
                           ",")
    ###############################
    print('create Countries')
    col_list=udf.create_schema_from_csv('{}Countries.csv'.format(csv_path), \
                                        'stg_Countries', \
                                        True, \
                                        ";")
    print('copy Countries.csv')
    udf.copy_data_from_csv('{}Countries.csv'.format(csv_path), \
                           'stg_Countries', \
                           col_list, \
                           ";")
    #############################
    table_name='stg_ticket'
    
    # specify your path of directory
    path = r"{}Ticket/".format(csv_path)
    
    # call listdir() method
    # path is a directory of which you want to list
    director = os.listdir( path )
    
    # get first file name
    file='{}{}'.format(path, director[0])
    #print(file)
    print('create Ticket')
    col_list=udf.create_schema_from_csv(file, \
                                        table_name, \
                                        True, \
                                        ",")
    
    # This would print all the files and directories
    for file in director:
        print("copy {}".format(file))
        
        file='{}{}'.format(path, file)
    
        udf.copy_data_from_csv(file, \
                               table_name, \
                               col_list,\
                               ",")
    
    print('copy scomplete success') 
    
except Exception as e:
    print('Error: ',e)        

create Airports
copy Airports.csv
create Countries
copy Countries.csv
create Ticket
copy Ticket1993Q1.csv
copy Ticket1993Q2.csv
copy Ticket1993Q3.csv
copy Ticket1993Q4.csv
copy scomplete success


##### Cleaning Tasks  

Countries:

-dimensions convert grouping columns: [(Region_Code,Region_Name)]

Airports:

-spilt Description column to city,cuntry, airport name


-convert to dimensions


Ticket:

-dimensions convert grouping columns: [(year, quarter)]

-create fact table [year, quarter,Origin,origincountry, originstate,Passengers,RoundTrip,OnLine,DollarCred,BulkFare] 
  
drop storage tables

#### 4.2 Create the data model for dimensions and fact
Build the data pipelines to create the data model for dimensions and fact.
#### 4.2.1 Create dimensions tables
Countries:

-dimensions convert grouping columns: 

    -Region:(Region_Code,Region_Name),

    -Country:(country_code,country_name,country_code_iso_2)    

In [None]:
# Create dimension Region 
udf.exec_sql("""create table if not exists dim_region
                (
                  region_code smallint NOT NULL,
                  region_name character varying NOT NULL,
                  CONSTRAINT dim_region_pkey PRIMARY KEY (region_code)
                );
                
                insert into dim_region
                select  DISTINCT
                        cast(region_code as smallint ), 
                        region_name
                from 
                        stg_countries;                        
            """)

In [30]:
# Create dimension Country 
udf.exec_sql("""
                CREATE TABLE if not exists dim_country
                (
                  country_code character varying NOT NULL,
                  country_name character varying NOT NULL,
                  country_code_iso_2 character varying,
                  CONSTRAINT country_pk PRIMARY KEY (country_code)                 
                );
                
                insert into dim_country
                SELECT  DISTINCT
                        iso_3_country_code as country_code, 
                        english_term as country_name, 
                        iso_2_country_code
                FROM 
                        stg_countries;
            """)

Airports:
-spilt Description column to cuntry,city, airport name
-convert to dimensions

In [31]:
# spilt Description column to city,cuntry, airport name
# Create dimension Country 
udf.exec_sql("""
                CREATE TABLE if not exists dim_airport
                (
                  airport_code character varying NOT NULL,
                  country_name character varying NOT NULL,
                  city_name character varying NOT NULL,
                  airport_name character varying NOT NULL,
                  airport_description character varying NOT NULL,
                  CONSTRAINT airport_pk PRIMARY KEY (airport_code)                  
                );
                
                insert into dim_airport
                select  DISTINCT
                        code as airport_code,
                        SPLIT_PART(SPLIT_PART(description, ':', 1), ', ', 2)  as country_name,
                        SPLIT_PART(description, ',', 1)  as city_name,
                        SPLIT_PART(description, ': ', 2)  as airport_name,
                        description
                from 
                        stg_airports;
            """)

Ticket:

-dimensions convert grouping columns: [(year, quarter)]

-create fact table 
                [year,quarter,Origin,origincountry,originstate,Passengers,RoundTrip,OnLine,DollarCred,BulkFare] 

In [32]:
# Create dimension year_quarter 
udf.exec_sql("""
                CREATE TABLE if not exists dim_year_quarter
                (
                  year_quarter_code character varying NOT NULL,
                  year character varying NOT NULL,
                  quarter character varying NOT NULL,                  
                  CONSTRAINT year_quarter_pk PRIMARY KEY (year_quarter_code)                  
                );
                
                insert into dim_year_quarter
                select  DISTINCT
                        CONCAT(year,'Q',quarter ) as year_quarter,
                        year,
                        quarter
                from 
                        stg_ticket;
            """)

In [47]:
# create fact table [year,quarter,Origin,origincountry,originstate,Passengers,RoundTrip,OnLine,DollarCred,BulkFare] 
udf.exec_sql("""
            CREATE TABLE  if not exists fact_ticket
            (
              itinid bigint NOT NULL,
              year_quarter_code character varying,
              year character varying,
              quarter character varying,
              region_code smallint,
              region_name character varying,
              country_code character varying,
              country_name character varying,
              city_name character varying,
              airport_code character varying,
              airport_name character varying,
              number_passengers smallint,
              round_trip smallint,
              online smallint,
              dollar_cred smallint,
              bulk_fare smallint,
              CONSTRAINT fact_ticket_pk PRIMARY KEY (itinid),
              CONSTRAINT airport_fk FOREIGN KEY (airport_code)
                  REFERENCES public.dim_airport (airport_code) MATCH SIMPLE
                  ON UPDATE NO ACTION ON DELETE NO ACTION,
              CONSTRAINT country_fk FOREIGN KEY (country_code)
                  REFERENCES public.dim_airport (country_code) MATCH SIMPLE
                  ON UPDATE NO ACTION ON DELETE NO ACTION,
              CONSTRAINT region_fk FOREIGN KEY (region_code)
                  REFERENCES public.dim_region (region_code) MATCH SIMPLE
                  ON UPDATE NO ACTION ON DELETE NO ACTION,
              CONSTRAINT year_quarter_fk FOREIGN KEY (year_quarter_code)
                  REFERENCES public.dim_year_quarter (year_quarter_code) MATCH SIMPLE
                  ON UPDATE NO ACTION ON DELETE NO ACTION);
            """)

In [49]:
print ("insert fact_ticket")            
udf.exec_sql("""
            insert into fact_ticket
            select  
                    cast(stg_ticket.itinid as bigint),
                    CONCAT(stg_ticket.year,'Q',stg_ticket.quarter ) as year_quarter,
                    stg_ticket.year,
                    stg_ticket.quarter,

                    dim_region.region_code, 
                    dim_region.region_name,

                    dim_country.country_code, 
                    dim_country.country_name,

                    dim_airport.city_name,

                    dim_airport.airport_code,
                    dim_airport.airport_name,           

                    stg_ticket.Passengers ::numeric:: smallint as number_passengers ,
                    stg_ticket.RoundTrip ::numeric:: smallint as round_trip,
                    stg_ticket.OnLine ::numeric:: smallint as online,
                    stg_ticket.DollarCred::numeric:: smallint as dollar_cred,
                    stg_ticket.BulkFare ::numeric:: smallint as bulk_fare
            from 
                    public.stg_ticket
                inner join
                    public.dim_airport 
                on
                    stg_ticket.origin=dim_airport.airport_code
                left join
                    public.dim_country
                on
                    dim_country.country_name = dim_airport.country_name 
                or
                    dim_country.country_code = dim_airport.country_name 
                or
                    dim_country.country_code_iso_2 = dim_airport.country_name
                left join
                    public.dim_region
                on
                    dim_region.region_code=dim_country.region_code;
        """)

insert fact_ticket


#### 4.2 Data Quality Checks
-Using  Integrity constraints on the relational database (Constraint Primary Key, Foreign Key , Data Type , Not Null And Inner Join, etc.) .

-use inner join in the fact table to get complete data .

##### Run Quality Checks

In [51]:
%load_ext sql
connection_string = "postgresql://{DB_USER}:{DB_PASSWORD}@{HOST}/{DB_NAME}".format(HOST=HOST,DB_NAME=DB_NAME,DB_USER=DB_USER,DB_PASSWORD=DB_PASSWORD)
%sql $connection_string

The sql extension is already loaded. To reload it, use:
  %reload_ext sql


'Connected: postgres@airport_ticket'

In [52]:
%sql select count(0) from stg_ticket

 * postgresql://postgres:***@localhost/airport_ticket
1 rows affected.


count
6985303


In [50]:
%sql select count(0) from fact_ticket

 * postgresql://postgres:***@localhost/airport_ticket
1 rows affected.


count
6985303


after integrity constraints we see all data is moved

#### 4.3 Data dictionary 
all URLs have description for each field setp 1 > DataSet

#### Step 5: Complete Project Write Up

Run the ETL every quarter,we need compare full quarter.

Recommend add partitioned table for fact_ticket on year_quarter column.

##### drop storage tables

In [15]:
try:
    udf.exec_sql("drop TABLE stg_countries")
    udf.exec_sql("drop TABLE stg_airports")
    udf.exec_sql("drop TABLE stg_ticket")
        
except Exception as e:
    print('Error: ',e)                   