# ETL and data warehouse design

In [1]:
!ls

20220205_dump.sql      ETL.ipynb
airport-codes_csv.csv  generate-data-quality-report.py
create_table.sql       immigration_data_sample.csv
data_dim_airport.csv   mytests.py
data_dim_state.csv     __pycache__
data_dim_time.csv      test2.py
data_dim_visa.csv      test.py
data_fact_immi.csv     us-cities-demographics.csv
dataqualitychecks.py


In [2]:
import pandas as pd
airport_data = pd.read_csv("airport-codes_csv.csv")
immigration_data = pd.read_csv("immigration_data_sample.csv")
demographics_data = pd.read_csv("us-cities-demographics.csv", delimiter = ';')


In [3]:
airport_data.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [4]:
airport_data.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
elevation_ft,48069.0,1240.789677,1602.363459,-1266.0,205.0,718.0,1497.0,22000.0


In [5]:
airport_data.shape

(55075, 12)

In [6]:
airport_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


# I. Extract and transform airport data
## 1. Drop columns: continent, gps_code, iata_code, local_code
## 2. Handle null value for columns: iso_country, elevation_tf, municipality
## 3. Enriching data for coordinates column to longitude and latitude
## 4. Delete dupplicate

In [7]:
#1. drop columns
airport_droped = airport_data.drop(columns = ['continent', 'gps_code', 'iata_code', 'local_code'])
airport_droped.head()                       

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,coordinates
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,US,US-AK,Anchor Point,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,US,US-AL,Harvest,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,US,US-AR,Newport,"-91.254898, 35.6087"


In [8]:
# 2.1 Handle null by mean of elevation_ft
mean_elevation = airport_droped['elevation_ft'].mean()
airport_droped['elevation_ft'].fillna(value=mean_elevation, inplace=True)
airport_droped.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  55075 non-null  float64
 4   iso_country   54828 non-null  object 
 5   iso_region    55075 non-null  object 
 6   municipality  49399 non-null  object 
 7   coordinates   55075 non-null  object 
dtypes: float64(1), object(7)
memory usage: 3.4+ MB


In [9]:
# 2.2 drop null values
airport_drop_na = airport_droped.dropna()
airport_drop_na.info()

<class 'pandas.core.frame.DataFrame'>
Index: 49254 entries, 0 to 55074
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         49254 non-null  object 
 1   type          49254 non-null  object 
 2   name          49254 non-null  object 
 3   elevation_ft  49254 non-null  float64
 4   iso_country   49254 non-null  object 
 5   iso_region    49254 non-null  object 
 6   municipality  49254 non-null  object 
 7   coordinates   49254 non-null  object 
dtypes: float64(1), object(7)
memory usage: 3.4+ MB


In [10]:
airport_drop_na.head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,coordinates
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,US,US-AK,Anchor Point,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,US,US-AL,Harvest,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,US,US-AR,Newport,"-91.254898, 35.6087"


In [11]:
# 3 enriching data for coordinates column
coordinates = airport_drop_na['coordinates'].str.split(',', n=1, expand=True)
coordinates

Unnamed: 0,0,1
0,-74.93360137939453,40.07080078125
1,-101.473911,38.704022
2,-151.695999146,59.94919968
3,-86.77030181884766,34.86479949951172
4,-91.254898,35.6087
...,...,...
55070,122.3586,40.542524
55071,123.49600219726562,41.784400939941406
55072,1.4825,51.894444
55073,47.296388888900005,-11.584277777799999


In [None]:
airport_drop_na['longitude'] = pd.to_numeric(coordinates[0],errors='coerce')
airport_drop_na['latitude'] = pd.to_numeric(coordinates[1],errors='coerce')

In [None]:
airport_drop_na.drop(columns = ['coordinates'], inplace=True)

In [14]:
airport_drop_na.head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,longitude,latitude
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,-74.933601,40.070801
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,US,US-AK,Anchor Point,-151.695999,59.9492
3,00AL,small_airport,Epps Airpark,820.0,US,US-AL,Harvest,-86.770302,34.864799
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,US,US-AR,Newport,-91.254898,35.6087


In [15]:
airport_drop_na.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
elevation_ft,49254.0,1239.339442,1523.657036,-1266.0,250.0,820.0,1350.0,14965.0
longitude,49254.0,-42.934712,75.101361,-179.876999,-93.588026,-75.339203,7.898816,179.9757
latitude,49254.0,26.887504,25.966877,-80.318298,13.623582,37.065448,43.409652,82.75


In [16]:
airport_drop_na.info()

<class 'pandas.core.frame.DataFrame'>
Index: 49254 entries, 0 to 55074
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         49254 non-null  object 
 1   type          49254 non-null  object 
 2   name          49254 non-null  object 
 3   elevation_ft  49254 non-null  float64
 4   iso_country   49254 non-null  object 
 5   iso_region    49254 non-null  object 
 6   municipality  49254 non-null  object 
 7   longitude     49254 non-null  float64
 8   latitude      49254 non-null  float64
dtypes: float64(3), object(6)
memory usage: 3.8+ MB


In [17]:
## 4. Delete dupplicate
airport_transformed = airport_drop_na.drop_duplicates()
airport_transformed.info()

<class 'pandas.core.frame.DataFrame'>
Index: 49254 entries, 0 to 55074
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         49254 non-null  object 
 1   type          49254 non-null  object 
 2   name          49254 non-null  object 
 3   elevation_ft  49254 non-null  float64
 4   iso_country   49254 non-null  object 
 5   iso_region    49254 non-null  object 
 6   municipality  49254 non-null  object 
 7   longitude     49254 non-null  float64
 8   latitude      49254 non-null  float64
dtypes: float64(3), object(6)
memory usage: 3.8+ MB


# II. Extract and transform immigration data
## 1. Drop columns: insnum, visapost, occup, entdepu, insnum
## 2. Handle null value for columns: i94addr, depdate, entdepd, matflag, gender, airline, fltno
## 3. Delete dupplicate

In [18]:
immigration_data.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,...,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,...,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,...,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,...,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,...,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [19]:
immigration_data.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Unnamed: 0,1000.0,1542097.0,915287.9,10925.0,721442.2,1494568.0,2360901.0,3095749.0
cicid,1000.0,3040461.0,1799818.0,13208.0,1412170.0,2941176.0,4694151.0,6061994.0
i94yr,1000.0,2016.0,0.0,2016.0,2016.0,2016.0,2016.0,2016.0
i94mon,1000.0,4.0,0.0,4.0,4.0,4.0,4.0,4.0
i94cit,1000.0,302.928,206.4853,103.0,135.0,213.0,438.0,746.0
i94res,1000.0,298.262,202.1204,103.0,131.0,213.0,438.0,696.0
arrdate,1000.0,20559.68,8.995027,20545.0,20552.0,20560.0,20567.25,20574.0
i94mode,1000.0,1.078,0.4859549,1.0,1.0,1.0,1.0,9.0
depdate,951.0,20575.04,24.21123,20547.0,20561.0,20570.0,20580.0,20715.0
i94bir,1000.0,42.382,17.90342,1.0,30.75,42.0,55.0,93.0


In [20]:
immigration_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  1000 non-null   int64  
 1   cicid       1000 non-null   float64
 2   i94yr       1000 non-null   float64
 3   i94mon      1000 non-null   float64
 4   i94cit      1000 non-null   float64
 5   i94res      1000 non-null   float64
 6   i94port     1000 non-null   object 
 7   arrdate     1000 non-null   float64
 8   i94mode     1000 non-null   float64
 9   i94addr     941 non-null    object 
 10  depdate     951 non-null    float64
 11  i94bir      1000 non-null   float64
 12  i94visa     1000 non-null   float64
 13  count       1000 non-null   float64
 14  dtadfile    1000 non-null   int64  
 15  visapost    382 non-null    object 
 16  occup       4 non-null      object 
 17  entdepa     1000 non-null   object 
 18  entdepd     954 non-null    object 
 19  entdepu     0 non-null      

In [21]:
#1. drop columns Unnamed: 0, insnum, visapost, occup, entdepu
immigration_droped = immigration_data.drop(columns = ['Unnamed: 0', 'insnum', 'visapost', 'occup', 'entdepu']) 
immigration_droped.head()                  

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,G,O,M,1955.0,7202016,F,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,G,R,M,1990.0,10222016,M,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,G,O,M,1940.0,7052016,M,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,G,O,M,1991.0,10272016,M,QR,94789700000.0,00739,B2
4,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,...,Z,K,M,1997.0,7042016,F,,42322570000.0,LAND,WT


In [22]:
#2. Drop null value
immigration_droped_na = immigration_droped.dropna()
immigration_droped_na.info()

<class 'pandas.core.frame.DataFrame'>
Index: 757 entries, 0 to 999
Data columns (total 24 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   cicid     757 non-null    float64
 1   i94yr     757 non-null    float64
 2   i94mon    757 non-null    float64
 3   i94cit    757 non-null    float64
 4   i94res    757 non-null    float64
 5   i94port   757 non-null    object 
 6   arrdate   757 non-null    float64
 7   i94mode   757 non-null    float64
 8   i94addr   757 non-null    object 
 9   depdate   757 non-null    float64
 10  i94bir    757 non-null    float64
 11  i94visa   757 non-null    float64
 12  count     757 non-null    float64
 13  dtadfile  757 non-null    int64  
 14  entdepa   757 non-null    object 
 15  entdepd   757 non-null    object 
 16  matflag   757 non-null    object 
 17  biryear   757 non-null    float64
 18  dtaddto   757 non-null    object 
 19  gender    757 non-null    object 
 20  airline   757 non-null    object 
 21

In [23]:
## 3. Delete dupplicate
immigration_transformed = immigration_droped_na.drop_duplicates()
immigration_transformed.info()

<class 'pandas.core.frame.DataFrame'>
Index: 757 entries, 0 to 999
Data columns (total 24 columns):
 #   Column    Non-Null Count  Dtype  
---  ------    --------------  -----  
 0   cicid     757 non-null    float64
 1   i94yr     757 non-null    float64
 2   i94mon    757 non-null    float64
 3   i94cit    757 non-null    float64
 4   i94res    757 non-null    float64
 5   i94port   757 non-null    object 
 6   arrdate   757 non-null    float64
 7   i94mode   757 non-null    float64
 8   i94addr   757 non-null    object 
 9   depdate   757 non-null    float64
 10  i94bir    757 non-null    float64
 11  i94visa   757 non-null    float64
 12  count     757 non-null    float64
 13  dtadfile  757 non-null    int64  
 14  entdepa   757 non-null    object 
 15  entdepd   757 non-null    object 
 16  matflag   757 non-null    object 
 17  biryear   757 non-null    float64
 18  dtaddto   757 non-null    object 
 19  gender    757 non-null    object 
 20  airline   757 non-null    object 
 21

# III. Extract and transform demographics data
## 1. Handle null value 
## 2. Delete dupplicate

In [24]:
demographics_data.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [25]:
demographics_data.describe().transpose()

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
Median Age,2891.0,35.494881,4.401617,22.9,32.8,35.3,38.0,70.5
Male Population,2888.0,97328.426247,216299.936929,29281.0,39289.0,52341.0,86641.75,4081698.0
Female Population,2888.0,101769.630886,231564.572571,27348.0,41227.0,53809.0,89604.0,4468707.0
Total Population,2891.0,198966.779315,447555.929634,63215.0,80429.0,106782.0,175232.0,8550405.0
Number of Veterans,2878.0,9367.832523,13211.219924,416.0,3739.0,5397.0,9368.0,156961.0
Foreign-born,2878.0,40653.59868,155749.103665,861.0,9224.0,18822.0,33971.75,3212500.0
Average Household Size,2875.0,2.742543,0.433291,2.0,2.43,2.65,2.95,4.98
Count,2891.0,48963.774473,144385.588565,98.0,3435.0,13780.0,54447.0,3835726.0


In [26]:
demographics_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [27]:
demographics_drop_na = demographics_data.dropna()
demographics_drop_na.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2875 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2875 non-null   object 
 1   State                   2875 non-null   object 
 2   Median Age              2875 non-null   float64
 3   Male Population         2875 non-null   float64
 4   Female Population       2875 non-null   float64
 5   Total Population        2875 non-null   int64  
 6   Number of Veterans      2875 non-null   float64
 7   Foreign-born            2875 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2875 non-null   object 
 10  Race                    2875 non-null   object 
 11  Count                   2875 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 292.0+ KB


In [28]:
## 2. Delete dupplicate
demographics_transformed = demographics_drop_na.drop_duplicates()
demographics_transformed.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2875 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2875 non-null   object 
 1   State                   2875 non-null   object 
 2   Median Age              2875 non-null   float64
 3   Male Population         2875 non-null   float64
 4   Female Population       2875 non-null   float64
 5   Total Population        2875 non-null   int64  
 6   Number of Veterans      2875 non-null   float64
 7   Foreign-born            2875 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2875 non-null   object 
 10  Race                    2875 non-null   object 
 11  Count                   2875 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 292.0+ KB


# IV. Design Data warehouse schema by star schema

In [29]:
ident_airport = pd.Series(airport_transformed['ident']).unique()

In [30]:
ident_immigration = pd.Series(immigration_transformed['i94port']).unique()

In [31]:
def find_common_elements(array1, array2):
    set1 = set(array1)
    set2 = set(array2)
    common_elements = list(set1.intersection(set2))
    return common_elements

In [32]:
common = find_common_elements(ident_airport, ident_immigration)
common

['DLR', 'SFR']

In [33]:
state_code_demographic = pd.Series(demographics_transformed['State Code']).unique()

In [34]:
state_code_immigration = pd.Series(immigration_transformed['i94addr']).unique()

In [35]:
common2 = find_common_elements(state_code_demographic, state_code_immigration)
common2

['CO',
 'IL',
 'IN',
 'RI',
 'MN',
 'NM',
 'MO',
 'TX',
 'OR',
 'CT',
 'SC',
 'NV',
 'NC',
 'PA',
 'ID',
 'NJ',
 'CA',
 'VA',
 'MA',
 'IA',
 'NY',
 'KS',
 'DC',
 'MI',
 'NE',
 'FL',
 'MS',
 'AL',
 'KY',
 'TN',
 'NH',
 'GA',
 'AR',
 'HI',
 'AZ',
 'MD',
 'LA',
 'WI',
 'WA',
 'OH',
 'UT']

In [36]:
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  password="01668539119",
  database="data_engineer_project"  
)

print(mydb)

<mysql.connector.connection_cext.CMySQLConnection object at 0x7fbcce5b6250>


In [37]:
cursor = mydb.cursor()


In [38]:
query = "SHOW tables;"
cursor.execute(query)
results = cursor.fetchall()
print(results)

[('dim_airport',), ('dim_state',), ('dim_time',), ('dim_visa',), ('fact_immigration',)]


![image.png](attachment:5acf8e2d-1b89-44a8-adad-efa6c6d902a5.png)

In [39]:
# with open('create_table.sql','r') as sql_file:
#     sql_script = sql_file.read()
#     cursor.execute(sql_script)


# V. Create fact and dimension tables
## 1. Create dim_airport table and data

In [40]:
airport_transformed.head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,longitude,latitude
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,-74.933601,40.070801
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,US,US-AK,Anchor Point,-151.695999,59.9492
3,00AL,small_airport,Epps Airpark,820.0,US,US-AL,Harvest,-86.770302,34.864799
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,US,US-AR,Newport,-91.254898,35.6087


In [41]:
data_dim_airport = airport_transformed[['ident','type','name','elevation_ft','iso_country','iso_region','municipality','longitude','latitude']]

In [42]:
data_dim_airport['ident'].head()

0     00A
1    00AA
2    00AK
3    00AL
4    00AR
Name: ident, dtype: object

## 2.Create dim_time table and data

In [43]:
from datetime import datetime, timedelta
start_time = datetime(1960,1,1)
data_dim_time = pd.DataFrame({'arrdate':immigration_transformed['arrdate']})
data_dim_time['arrdate'].head(2)

0    20566.0
1    20567.0
Name: arrdate, dtype: float64

In [44]:
data_dim_time['arrival_date'] = data_dim_time['arrdate'].apply(lambda x: start_time + timedelta(days=int(x))) 

In [45]:
data_dim_time['arrival_date'].head()

0   2016-04-22
1   2016-04-23
2   2016-04-07
3   2016-04-28
5   2016-04-08
Name: arrival_date, dtype: datetime64[ns]

In [46]:
data_dim_time['day'] = data_dim_time['arrival_date'].dt.day
data_dim_time['day'].head()

0    22
1    23
2     7
3    28
5     8
Name: day, dtype: int32

In [47]:
data_dim_time['month'] = data_dim_time['arrival_date'].dt.month
data_dim_time['month'].head()

0    4
1    4
2    4
3    4
5    4
Name: month, dtype: int32

In [48]:
data_dim_time['year'] = data_dim_time['arrival_date'].dt.year
data_dim_time['year'].head()

0    2016
1    2016
2    2016
3    2016
5    2016
Name: year, dtype: int32

In [49]:
data_dim_time['week'] = data_dim_time['arrival_date'].dt.isocalendar().week
data_dim_time['week'].head()

0    16
1    16
2    14
3    17
5    14
Name: week, dtype: UInt32

In [50]:
data_dim_time['weekday'] = data_dim_time['arrival_date'].dt.weekday
data_dim_time['weekday'].head()

0    4
1    5
2    3
3    3
5    4
Name: weekday, dtype: int32

In [51]:
from datetime import datetime, timedelta
def create_dim_time_table(df):
    start_time = datetime(1960,1,1)
    data_dim_time = pd.DataFrame({'arrdate': df})
    
    data_dim_time['arrival_date'] = data_dim_time['arrdate'].apply(lambda x: start_time + timedelta(days=int(x))) 
    data_dim_time['day'] = data_dim_time['arrival_date'].dt.day
    data_dim_time['month'] = data_dim_time['arrival_date'].dt.month
    data_dim_time['year'] = data_dim_time['arrival_date'].dt.year
    data_dim_time['week'] = data_dim_time['arrival_date'].dt.isocalendar().week
    data_dim_time['weekday'] = data_dim_time['arrival_date'].dt.weekday
    
    return data_dim_time
data_dim_time = create_dim_time_table(immigration_transformed['arrdate']) 

In [52]:
data_dim_time.head()

Unnamed: 0,arrdate,arrival_date,day,month,year,week,weekday
0,20566.0,2016-04-22,22,4,2016,16,4
1,20567.0,2016-04-23,23,4,2016,16,5
2,20551.0,2016-04-07,7,4,2016,14,3
3,20572.0,2016-04-28,28,4,2016,17,3
5,20552.0,2016-04-08,8,4,2016,14,4


## 3. Create dim_state table and data

In [53]:
demographics_transformed.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [54]:
data_dim_state = demographics_transformed[['State Code','State','Median Age','Male Population','Female Population','Total Population','Number of Veterans','Foreign-born','Average Household Size','Race','Count']]

In [55]:
new_columns_name_dim_state = {
    'State Code':'state_code',
    'State':'state',
    'Median Age':'median_age',
    'Male Population':'male_population',
    'Female Population':'female_population',
    'Total Population':'total_population',
    'Number of Veterans':'number_of_veterans',
    'Foreign-born':'foreign_born',
    'Average Household Size':'average_household_size',
    'Race':'race',
    'Count':'count'
}
data_dim_state = data_dim_state.rename(columns=new_columns_name_dim_state)

In [56]:
data_dim_state['state'].head()

0         Maryland
1    Massachusetts
2          Alabama
3       California
4       New Jersey
Name: state, dtype: object

## 4. Create dim_visa and data


In [57]:
immigration_transformed.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepa,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,G,O,M,1955.0,7202016,F,JL,56582670000.0,00782,WT
1,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,...,G,R,M,1990.0,10222016,M,*GA,94362000000.0,XBLNG,B2
2,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,G,O,M,1940.0,7052016,M,LH,55780470000.0,00464,WT
3,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,...,G,O,M,1991.0,10272016,M,QR,94789700000.0,00739,B2
5,1481650.0,2016.0,4.0,577.0,577.0,ATL,20552.0,1.0,GA,20606.0,...,T,N,M,1965.0,10072016,M,DL,736852600.0,910,B2


In [58]:
immigration_transformed[['i94visa','visatype']].count()

i94visa     757
visatype    757
dtype: int64

In [59]:
distinct_rows = immigration_transformed.drop_duplicates(subset=['i94visa','visatype'])
data_dim_visa = distinct_rows[['i94visa','visatype']]
data_dim_visa

Unnamed: 0,i94visa,visatype
0,2.0,WT
1,2.0,B2
30,1.0,B1
35,2.0,GMT
43,1.0,WB
70,3.0,F1
120,1.0,E2
752,2.0,CP


In [60]:
data_dim_visa = data_dim_visa.reset_index().rename(columns={'index':'visa_id1'})
data_dim_visa = data_dim_visa.reset_index().rename(columns={'index':'visa_id'})
data_dim_visa = data_dim_visa.drop(columns=['visa_id1'])
data_dim_visa

Unnamed: 0,visa_id,i94visa,visatype
0,0,2.0,WT
1,1,2.0,B2
2,2,1.0,B1
3,3,2.0,GMT
4,4,1.0,WB
5,5,3.0,F1
6,6,1.0,E2
7,7,2.0,CP


In [61]:
immigration_join = immigration_transformed.merge(data_dim_visa, on=['i94visa','visatype'])

In [62]:
immigration_join['visa_id'].unique()

array([0, 1, 2, 3, 4, 5, 6, 7])

## 5. Create data for fact table

In [63]:
immigration_join = immigration_join.rename(columns={'i94port':'ident',
                                                   'i94addr':'state_code'})

In [64]:
immigration_join.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,ident,arrdate,i94mode,state_code,depdate,...,entdepd,matflag,biryear,dtaddto,gender,airline,admnum,fltno,visatype,visa_id
0,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,...,O,M,1955.0,7202016,F,JL,56582670000.0,782,WT,0
1,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,...,O,M,1940.0,7052016,M,LH,55780470000.0,464,WT,0
2,232708.0,2016.0,4.0,113.0,135.0,NYC,20546.0,1.0,NY,20554.0,...,O,M,1983.0,6302016,F,BA,55474490000.0,117,WT,0
3,1230572.0,2016.0,4.0,438.0,438.0,LOS,20551.0,1.0,CA,20565.0,...,O,M,2012.0,7052016,F,QF,55743810000.0,15,WT,0
4,5056736.0,2016.0,4.0,209.0,209.0,PHI,20571.0,1.0,HI,20575.0,...,O,M,1944.0,7252016,M,DL,59336620000.0,598,WT,0


In [65]:
data_fact_immi = immigration_join[['cicid','ident','visa_id','arrdate','state_code','depdate','i94res','i94mode','i94cit','airline','fltno']]

In [66]:
import numpy as np
i = 0
for chunk in np.array_split(data_dim_airport,5):
    print("chunk: "+str(i), len(chunk))
    i+=1
    break

chunk: 0 9851


In [67]:
data_dim_airport.to_csv('data_dim_airport.csv', index=False)

In [68]:
data_dim_state.to_csv('data_dim_state.csv', index=False)

In [69]:
data_dim_time.to_csv('data_dim_time.csv', index=False)

In [70]:
data_dim_visa.to_csv('data_dim_visa.csv', index=False)

In [71]:
data_fact_immi.to_csv('data_fact_immi.csv', index=False)

# VI. Load data to database

In [72]:
from sqlalchemy import create_engine
connection_string = 'mysql+mysqlconnector://root:01668539119@localhost:3306/data_engineer_project'
engine = create_engine(connection_string)


In [73]:
data_dim_airport.head()

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,longitude,latitude
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,-74.933601,40.070801
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,-101.473911,38.704022
2,00AK,small_airport,Lowell Field,450.0,US,US-AK,Anchor Point,-151.695999,59.9492
3,00AL,small_airport,Epps Airpark,820.0,US,US-AL,Harvest,-86.770302,34.864799
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,US,US-AR,Newport,-91.254898,35.6087


In [None]:
data_dim_airport.to_sql(name='dim_airport', con=engine, if_exists='append', index=False, chunksize=1000)

In [None]:
data_dim_state.head()

In [None]:
data_dim_state.count()

In [None]:
a = data_dim_state['state_code'].drop_duplicates()

In [None]:
data_dim_state['race'].unique()

In [None]:
dict_func = {'median_age': 'mean', 
             'average_household_size': 'mean', 
             'total_population':'sum',
             'male_population':'sum',
             'female_population':'sum',
             'foreign_born':'sum'}

In [None]:
data_dim_state_table = data_dim_state.groupby(['state_code','state']).agg(dict_func)

In [None]:
data_dim_state_table.head()

In [None]:
data_dim_state_table = data_dim_state_table.reset_index()
data_dim_state_table.head()

In [None]:
data_dim_state_table.to_sql(name='dim_state', con=engine, if_exists='append', index=False, chunksize=1000)

In [None]:
data_dim_time.head()

In [None]:
data_dim_time.count()

In [None]:
len(data_dim_time['arrdate'].unique())

In [None]:
data_dim_time = data_dim_time.drop_duplicates()

In [None]:
data_dim_time

In [None]:
data_dim_time.to_sql(name='dim_time', con=engine, if_exists='append', index=False, chunksize=1000)

In [106]:
# Perform the upsert using SQL's INSERT INTO ... ON DUPLICATE KEY UPDATE syntax
# query = """
#     INSERT INTO dim_time (arrdate, arrival_date, day, month, year, week, weekday)
#     VALUES (%s, %s, %s, %s, %s, %s, %s)
#     ON DUPLICATE KEY UPDATE
#     arrival_date = VALUES(arrival_date),
#     day = VALUES(day),
#     month = VALUES(month),
#     year = VALUES(year),
#     week = VALUES(week),
#     weekday = VALUES(weekday)
# """
# data = [row for row in data_dim_time.itertuples(index=False, name=None)]
# cursor.executemany(query, data)
# # Commit the changes
# mydb.commit()

In [None]:
data_dim_visa

In [None]:
data_dim_visa.to_sql(name='dim_visa', con=engine, if_exists='append', index=False, chunksize=1000)

In [None]:
data_fact_immi.head()

In [None]:
pd.Series(data_dim_airport['ident']).unique()

In [None]:
pd.Series(data_fact_immi['ident']).unique()

In [None]:
ident_airport = pd.Series(data_dim_airport['ident']).unique()
ident_immigration = pd.Series(data_fact_immi['ident']).unique()
common = find_common_elements(ident_airport, ident_immigration)
common

In [None]:
data_fact_immi_table = data_fact_immi[(data_fact_immi['ident'] == 'DLR') | (data_fact_immi['ident'] == 'SFR')]

In [None]:
data_fact_immi_table = data_fact_immi_table.reset_index(drop=True)

In [None]:
stateCODE_airport = pd.Series(data_dim_state['state_code']).unique()
stateCODE_immigration = pd.Series(data_fact_immi['state_code']).unique()
common = find_common_elements(stateCODE_airport, stateCODE_immigration)
common

In [None]:
data_fact_immi_table = data_fact_immi_table[data_fact_immi_table['state_code'].isin(common)]

In [None]:
data_fact_immi_table.to_sql(name='fact_immigration', con=engine, if_exists='append', index=False, chunksize=1000)

# VII. Data quality check and generate report

In [None]:
mydb

In [None]:
import mytests
tests = {key:value for key,value in mytests.__dict__.items() if key.startswith('test')}
tests

In [88]:
import os
import pandas as pd
from tabulate import tabulate

import mytests
# import the data quality checks
from dataqualitychecks import check_for_nulls
from dataqualitychecks import check_for_min_max
from dataqualitychecks import check_for_valid_values
from dataqualitychecks import check_for_duplicates
from dataqualitychecks import run_data_quality_check
from dataqualitychecks import check_records
# connect to database

print("Connected to data warehouse")

#Start of data quality checks
results = []
tests = {key:value for key,value in mytests.__dict__.items() if key.startswith('test')}
for testname,test in tests.items():
    test['conn'] = mydb
    results.append(run_data_quality_check(**test))

#print(results)
df=pd.DataFrame(results)
df.index+=1
df.columns = ['Test Name', 'Table','Column','Test Passed']
print(tabulate(df,headers='keys',tablefmt='psql'))
#End of data quality checks
mydb.close()
print("Disconnected from data warehouse")


Connected to data warehouse
**************************************************
Sun Jul 23 10:10:48 2023
Starting test Check for nulls
Finished test Check for nulls
Test Passed True
Test Parameters
column = arrdate
table = dim_time

Duration :  0.001950979232788086
Sun Jul 23 10:10:48 2023
**************************************************
**************************************************
Sun Jul 23 10:10:48 2023
Starting test Check for min and max
Finished test Check for min and max
Test Passed True
Test Parameters
column = month
table = dim_time
minimum = 1
maximum = 12

Duration :  0.0015070438385009766
Sun Jul 23 10:10:48 2023
**************************************************
**************************************************
Sun Jul 23 10:10:48 2023
Starting test Check for valid values
{'5', '1', '4', '3', '0', '2', '6'}
Finished test Check for valid values
Test Passed True
Test Parameters
column = weekday
table = dim_time
valid_values = {'5', '0', '1', '2', '6', '4', '3'}

Durat

# VIII. Querying the Data Warehouse (Cubes, Rollups, Grouping Sets and Materialized Views)
https://learnsql.com/blog/sql-grouping-sets-clause/

In [107]:
query_grouping_set = """
select year,state, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by grouping sets((dim_time.year,dim_state.state),(dim_time.year),(dim_state.state),())
order by dim_time.year, dim_state.state
"""

In [114]:
query_cube = """
select year,state, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by cube(dim_time.year,dim_state.state)
order by dim_time.year, dim_state.state
"""

In [None]:
query_rollup = """
select year,state, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by rollup(dim_time.year,dim_state.state)
order by dim_time.year, dim_state.state
"""

In [89]:
import mysql.connector

mydb = mysql.connector.connect(
  host="localhost",
  user="root",
  password="01668539119",
  database="data_engineer_project"  
)

print(mydb)
cursor = mydb.cursor()


<mysql.connector.connection_cext.CMySQLConnection object at 0x7fbcc9204dd0>


In [110]:
query_grouping_set2 = """
select weekday,state, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by weekday,state

UNION ALL

select NULL,state, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by state

UNION ALL

select weekday,NULL, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate
group by weekday

UNION ALL

select NULL,NULL, count(cicid) as totalFlight
from fact_immigration
left join dim_state
on fact_immigration.state_code = dim_state.state_code
left join dim_time
on fact_immigration.arrdate=dim_time.arrdate

ORDER BY weekday, state
"""

In [111]:
cursor.execute(query_grouping_set2)
results = cursor.fetchall()
print(results)

[(None, None, 46), (None, 'California', 36), (None, 'Colorado', 1), (None, 'Florida', 1), (None, 'Hawaii', 1), (None, 'Illinois', 1), (None, 'Massachusetts', 1), (None, 'Missouri', 1), (None, 'Nevada', 1), (None, 'Texas', 3), ('0', None, 3), ('0', 'California', 2), ('0', 'Missouri', 1), ('1', None, 10), ('1', 'California', 7), ('1', 'Florida', 1), ('1', 'Illinois', 1), ('1', 'Massachusetts', 1), ('2', None, 3), ('2', 'California', 2), ('2', 'Hawaii', 1), ('3', None, 5), ('3', 'California', 4), ('3', 'Texas', 1), ('4', None, 11), ('4', 'California', 11), ('5', None, 6), ('5', 'California', 5), ('5', 'Colorado', 1), ('6', None, 8), ('6', 'California', 5), ('6', 'Nevada', 1), ('6', 'Texas', 2)]
