# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [13]:
#import libraries
import pandas as pd
import configparser
import boto3
import pandas as pd
import re
#import psycopg2
from pyspark.sql import SparkSession

<h3>2 - Questions</h3>

<ol> 
    <li>Which cities do immigrants tend to move and where did they come from?</li>
<li>Does temperature play a role on where people on temporary visas go?</li>
</ol>

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc?

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

The data sets used for this project are airport codes, immigration data, us cities demographics data, and temperature data
<h4>Airport Codes</h4>
Airport codes may refer to either IATA airport code, a three-letter code which is used in passenger reservation, ticketing and baggage-handling systems, or the ICAO airport code which is a four letter code used by ATC systems and for airports that do not have an IATA airport code. The data was provided by Udacity which was obtained from <a href="https://datahub.io/core/airport-codes#data">Data Hub</a>.
<h4>Immigration Data</h4>
The data comes from the US National Tourism and Trade Office and provided by Udacity. A data dictionary is provided within the file I94_SAS_Labels_Descriptions.SAS.
The data set was taken from <a href="https://travel.trade.gov/research/reports/i94/historical/2016.html">this link</a>.
The dataset can be previewed from the immigration_data_sample.csv file. The full dataset consists of several SAS files which are located within the
SAS_data folder. 
<h4>Temperature Data</h4>
The dataset is provided by <a href="https://creativecommons.org/licenses/by-nc-sa/4.0/">Creative Commons BY-NC-SA 4.0 </a>.
<h4>US Cities Demographics</h4>
The data comes from <a href="https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/">OpenSoft</a>. The dataset contains information about the demographics of all US cities and census-designated places with a population greater or equal to 65,000. This data comes from the US Census Bureau's 2015 American Community Survey <a href="https://www.census.gov/data/developers/about/terms-of-service.html">and is referenced in this link. </a> 

In [14]:
#Get AWS credentials
config = configparser.ConfigParser()
config.read('dwh.cfg')
AWS_ACCESS_KEY_ID = config.get('AWS', 'AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = config.get('AWS', 'AWS_SECRET_ACCESS_KEY')
s3 = boto3.resource('s3', aws_access_key_id = AWS_ACCESS_KEY_ID, aws_secret_access_key = AWS_SECRET_ACCESS_KEY)

In [15]:
#Initialize spark session
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()

In [16]:
#make connection to redshift
# HOST=my-udacity-db.crd2lwlmrmzb.us-west-2.redshift.amazonaws.com
# DB_NAME=my_udacity_db
# DB_USER=dbuser
# DB_PASSWORD=DB_Password2
# DB_PORT=5439

In [17]:
# Show all the columns for the datasets
pd.set_option('display.max_columns', 30)

<h4>Airport Codes</h4>

In [18]:
#Read airport codes csv and preview the data
data_airport_codes = pd.read_csv('airport-codes_csv.csv')
data_airport_codes.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [19]:
#get shape of dataset
data_airport_codes.shape

(55075, 12)

<h4>Immigration Data</h4>

In [20]:
#Read immigration data csv and preview the data
data_immigration = pd.read_csv('immigration_data_sample.csv')
data_immigration.head()

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [21]:
#Read in July and December immigration data into spark dataframes
df_spark_imm_jul =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat')
df_spark_imm_dec =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat')
# df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

<h4>Combine Immigration Data Sets</h4>

In [22]:
spark_imm_comb = df_spark_imm_jul.union(df_spark_imm_dec)

In [23]:
spark_imm_comb.show()

+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|cicid| i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear| dtaddto|gender|insnum|airline|         admnum|fltno|visatype|
+-----+------+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+--------+------+------+-------+---------------+-----+--------+
|  1.0|2016.0|   7.0| 254.0| 276.0|    LOS|20636.0|    1.0|     CA|20640.0|  38.0|    2.0|  1.0|20160701|    null| null|      G|      O|   null|      M| 1978.0|09282016|     M|  null|     OZ|6.3092898033E10|00202|      WT|
|  2.0|2016.0|   7.0| 140.0| 140.0|    NYC|20636.0|    1.0|     NY|20657.0|  45.0|    2.0|  1.0|20160701|   

<h4>Read in Data Dictionary</h4>

In [24]:
#read in immigration data dictionary

with open('I94_SAS_Labels_Descriptions.SAS') as f:
    txt = f.read()
    f.seek(0)
    lines = f.readlines()
comment_lines = [line for line in lines if line.startswith('/*') and line.endswith('*/\n')]

In [25]:
clpatt = re.compile(r'^/\*\s+(?P<code>.+?)\s+-\s+(?P<description>.+)\s+\*/$')
matches = [clpatt.match(line) for line in comment_lines]
if not all(m is not None for m in matches):
    for i, m in enumerate(matches):
        if m is None:
            print(i)
print(f'CODE{"":16}', 'DESCRIPTION')
for m in matches:
    print(f'{m.group("code"):20}', m.group('description'))

CODE                 DESCRIPTION
I94YR                4 digit year
I94MON               Numeric month
I94CIT & I94RES      This format shows all the valid and invalid codes for processing
I94PORT              This format shows all the valid and invalid codes for processing
I94MODE              There are missing values as well as not reported (9)
I94BIR               Age of Respondent in Years
COUNT                Used for summary statistics
DTADFILE             Character Date Field - Date added to I-94 Files - CIC does not use
VISAPOST             Department of State where where Visa was issued - CIC does not use
OCCUP                Occupation that will be performed in U.S. - CIC does not use
ENTDEPA              Arrival Flag - admitted or paroled into the U.S. - CIC does not use
ENTDEPD              Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
ENTDEPU              Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
MAT

In [26]:
#Create data dictionary for i94cit i94res columns
re_obj = re.compile(r'\d+^[-+]?[0-9]+$\'(.*)\'.*\'(.*)\'')
i94cit_valid = {}
with open('i94cit.txt') as f:
      for line in f:
        (key,val) = line.split('=')
        i94cit_valid[int(key.strip())] = val.strip().strip("''").rstrip('\n')

In [27]:
print(i94cit_valid)

{582: 'MEXICO Air Sea, and Not Reported (I-94, no land arrivals)', 236: 'AFGHANISTAN', 101: 'ALBANIA', 316: 'ALGERIA', 102: 'ANDORRA', 324: 'ANGOLA', 529: 'ANGUILLA', 518: 'ANTIGUA-BARBUDA', 687: 'ARGENTINA ', 151: 'ARMENIA', 532: 'ARUBA', 438: 'AUSTRALIA', 103: 'AUSTRIA', 152: 'AZERBAIJAN', 512: 'BAHAMAS', 298: 'BAHRAIN', 274: 'BANGLADESH', 513: 'BARBADOS', 104: 'BELGIUM', 581: 'BELIZE', 386: 'BENIN', 509: 'BERMUDA', 153: 'BELARUS', 242: 'BHUTAN', 688: 'BOLIVIA', 717: 'BONAIRE, ST EUSTATIUS, SABA', 164: 'BOSNIA-HERZEGOVINA', 336: 'BOTSWANA', 689: 'BRAZIL', 525: 'BRITISH VIRGIN ISLANDS', 217: 'BRUNEI', 105: 'BULGARIA', 393: 'BURKINA FASO', 243: 'BURMA', 375: 'BURUNDI', 310: 'CAMEROON', 326: 'CAPE VERDE', 526: 'CAYMAN ISLANDS', 383: 'CENTRAL AFRICAN REPUBLIC', 384: 'CHAD', 690: 'CHILE', 245: 'CHINA, PRC', 721: 'CURACAO', 270: 'CHRISTMAS ISLAND', 271: 'COCOS ISLANDS', 691: 'COLOMBIA', 317: 'COMOROS', 385: 'CONGO', 467: 'COOK ISLANDS', 575: 'COSTA RICA', 165: 'CROATIA', 584: 'CUBA', 218: 

In [28]:
#Create data dictionary for i94addr columns
i94addr_valid = {}
with open('i94addr.txt') as f:
     for line in f:
        (key,val) = line.split('=')
        i94addr_valid[key.strip().strip("''")] = val.strip().strip("''").rstrip('\n')

In [29]:
print(i94addr_valid)

{'AL': 'ALABAMA', 'AK': 'ALASKA', 'AZ': 'ARIZONA', 'AR': 'ARKANSAS', 'CA': 'CALIFORNIA', 'CO': 'COLORADO', 'CT': 'CONNECTICUT', 'DE': 'DELAWARE', 'DC': 'DIST. OF COLUMBIA', 'FL': 'FLORIDA', 'GA': 'GEORGIA', 'GU': 'GUAM', 'HI': 'HAWAII', 'ID': 'IDAHO', 'IL': 'ILLINOIS', 'IN': 'INDIANA', 'IA': 'IOWA', 'KS': 'KANSAS', 'KY': 'KENTUCKY', 'LA': 'LOUISIANA', 'ME': 'MAINE', 'MD': 'MARYLAND', 'MA': 'MASSACHUSETTS', 'MI': 'MICHIGAN', 'MN': 'MINNESOTA', 'MS': 'MISSISSIPPI', 'MO': 'MISSOURI', 'MT': 'MONTANA', 'NC': 'N. CAROLINA', 'ND': 'N. DAKOTA', 'NE': 'NEBRASKA', 'NV': 'NEVADA', 'NH': 'NEW HAMPSHIRE', 'NJ': 'NEW JERSEY', 'NM': 'NEW MEXICO', 'NY': 'NEW YORK', 'OH': 'OHIO', 'OK': 'OKLAHOMA', 'OR': 'OREGON', 'PA': 'PENNSYLVANIA', 'PR': 'PUERTO RICO', 'RI': 'RHODE ISLAND', 'SC': 'S. CAROLINA', 'SD': 'S. DAKOTA', 'TN': 'TENNESSEE', 'TX': 'TEXAS', 'UT': 'UTAH', 'VT': 'VERMONT', 'VI': 'VIRGIN ISLANDS', 'VA': 'VIRGINIA', 'WV': 'W. VIRGINIA', 'WA': 'WASHINGTON', 'WI': 'WISCONSON', 'WY': 'WYOMING', '99':

In [30]:
#Read in SAS files
# imm_fname = '../../data/18-83510-I94-Data-2016/i94_dec16_sub.sas7bdat'
# data_imm_dec = pd.read_sas(imm_fname, 'sas7bdat', encoding="ISO-8859-1")

In [31]:
#preview the data
#data_imm_dec.head()

In [32]:
#get shape of dataset
#data_imm_dec.shape

In [33]:
#Read in SAS files
#imm_fname = '../../data/18-83510-I94-Data-2016/i94_jul16_sub.sas7bdat'
#data_imm_july = pd.read_sas(imm_fname, 'sas7bdat', encoding="ISO-8859-1")

In [34]:
#concantenate dataframes
#data_imm = pd.concat([data_imm_dec, data_imm_july])

In [35]:
#reset indices
#data_imm = data_imm.reset_index(drop=True)

In [36]:
#data_imm.head()

In [37]:
# #how many different types of visas are listed within the dataset?
# data_imm['visatype'].nunique()

In [38]:
#DELETE THIS IF I DON'T NEED IT
# re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
# i94port_valid = {}
# with open('i94port_valid.txt') as f:
#      for line in f:
#         match = re_obj.search(line)
#         i94port_valid[match[1]]=[match[2]]


In [39]:
# print(i94port_valid)

<h4>Temperature Data</h4>

In [40]:
#Read temperature dataset from link provided by Udacity into spark
spark_df_temp = spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

In [41]:

# fname = '../../data2/GlobalLandTemperaturesByCity.csv'
# data_temp = pd.read_csv(fname)
# data_temp.head()

In [42]:
# #get shape of dataset
# data_temp.shape

<h4>US Cities Demographics</h4> - probaby delete

In [43]:
#Read immigration data csv and preview the data
data_demo = pd.read_csv('us-cities-demographics.csv', sep=';')
data_demo.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [44]:
#get shape of dataset
data_demo.shape

(2891, 12)

In [45]:
# #write to parquet
# df_spark.write.parquet("sas_data")
# df_spark=spark.read.parquet("sas_data")

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [46]:
# Performing cleaning tasks here

<h4>Clean Combined Immigration Data</h4>

<h4>Drop Unecessary Columns</h4>
The combined immigatation dataset contains several columns with values of NaN. The first step is to identify those columns and if those columns are not needed, remove the columms.
<ul>
    <li>occup - occupation that will be performed in the US. This column does not seem relevant to my project so I removed the column</li>
    <li>entdepu - Departure Flag - Departed, lost I-94 or is deceased. This column does not seem relevant to my project so I removed the column</li>
    <li>insnum - INS number </li>
    </ul>

In [47]:
def clean_i94_data(file):
    '''
    Filters out invalid entries for i94port column using spark filter function.
    '''
    
    # Read I94 data into Spark
    df_immigration = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter where i94port is invalid
    df_immigration = df_immigration.filter(df_immigration.i94port.isin(list(i94port_valid.keys())))

    return df_immigration

In [48]:
# #columns to be dropped
# col_drop = ['occup', 'entdepu', 'insnum']

In [49]:
# data_imm.drop(axis=1, columns=col_drop)

<h4>Clean Airport Codes</h4> - probably delete this

<h4>Drop Unnecessary Columns</h4>
The airport codes dataset contains one column with several NaNs. The first step is to identify those columns and if those columns are not needed, remove the columms.
<ul>
    <li>local_code - local airport code. Column is not needed for my dataset</li>
    <li>gps_code - GPS codes</li>
    </ul>

In [50]:
#columns to be dropped
col_drop = ['local_code', 'gps_code']

In [51]:
data_airport_codes.drop(axis=1, columns=col_drop)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,iata_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,"-91.254898, 35.6087"
5,00AS,small_airport,Fulton Airport,1100.0,,US,US-OK,Alex,,"-97.8180194, 34.9428028"
6,00AZ,small_airport,Cordes Airport,3810.0,,US,US-AZ,Cordes,,"-112.16500091552734, 34.305599212646484"
7,00CA,small_airport,Goldstone /Gts/ Airport,3038.0,,US,US-CA,Barstow,,"-116.888000488, 35.350498199499995"
8,00CL,small_airport,Williams Ag Airport,87.0,,US,US-CA,Biggs,,"-121.763427, 39.427188"
9,00CN,heliport,Kitchen Creek Helibase Heliport,3350.0,,US,US-CA,Pine Valley,,"-116.4597417, 32.7273736"


<h4>Clean Temperature Data</h4>

Just by previewing the data, we can see lots of NaNs. Those rows will be removed as they are not useful as the temperature data listed by month so I can't make any useful inference from the surrounding data. 

In [52]:
spark_df_temp.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1743-12-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-01-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-02-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-03-01|               null|                         null|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|        

In [53]:
# Filter out entries with NaN average temperature
spark_df_temp = spark_df_temp.filter(spark_df_temp.AverageTemperature != 'NaN')

In [54]:
spark_df_temp.show()

+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|        dt| AverageTemperature|AverageTemperatureUncertainty| City|Country|Latitude|Longitude|
+----------+-------------------+-----------------------------+-----+-------+--------+---------+
|1743-11-01|              6.068|           1.7369999999999999|Århus|Denmark|  57.05N|   10.33E|
|1744-04-01| 5.7879999999999985|           3.6239999999999997|Århus|Denmark|  57.05N|   10.33E|
|1744-05-01|             10.644|           1.2830000000000001|Århus|Denmark|  57.05N|   10.33E|
|1744-06-01| 14.050999999999998|                        1.347|Århus|Denmark|  57.05N|   10.33E|
|1744-07-01|             16.082|                        1.396|Århus|Denmark|  57.05N|   10.33E|
|1744-09-01| 12.780999999999999|                        1.454|Århus|Denmark|  57.05N|   10.33E|
|1744-10-01|               7.95|                         1.63|Århus|Denmark|  57.05N|   10.33E|
|1744-11-01|  4.638999999999999|        

In [55]:
#Remove duplicate locations
spark_df_temp = spark_df_temp.dropDuplicates(['City', 'Country'])

In [56]:
spark_df_temp.show()

+----------+--------------------+-----------------------------+------------+------------------+--------+---------+
|        dt|  AverageTemperature|AverageTemperatureUncertainty|        City|           Country|Latitude|Longitude|
+----------+--------------------+-----------------------------+------------+------------------+--------+---------+
|1743-11-01|               3.264|                        1.665|   Allentown|     United States|  40.99N|   74.56W|
|1779-11-01|0.011999999999999985|                        2.714|      Atyrau|        Kazakhstan|  47.42N|   50.92E|
|1825-01-01|  26.069000000000003|                         2.16|     Bintulu|          Malaysia|   2.41N|  113.30E|
|1825-01-01|              26.517|           2.5839999999999996| Butterworth|          Malaysia|   5.63N|  100.09E|
|1845-01-01|              24.995|                        1.871|      Cainta|       Philippines|  15.27N|  120.83E|
|1825-01-01|              24.753|           2.1519999999999997|      Ciamis|    

In [57]:
# #drop dates earlier than 2000-01-01
# data_temp[data_temp.dt >= '2000-01-01']

In [58]:
# #drop rows with NaNs in AverageTemperature column
# data_temp.dropna(subset=['AverageTemperature'])

NameError: name 'data_temp' is not defined

<h4>Clean US Cities Demographics</h4>

In [None]:
data_demo

First we want to ensure that the number of males plus number of females adds up to total population. 
Next we want to ensure that the number of foreign born residents and veteran residents is less than the total population

In [None]:
i=0
for index, row in data_demo.iterrows():
    if(row['Male Population'] + row['Female Population'] != row['Total Population']):
       print("Issue with number of males or females are row: ", row)
       print(i)
    elif(row['Number of Veterans'] > row['Total Population'] or row['Foreign-born'] > row['Total Population']):
       print("Issue with number of foreign born or number of veterans at row: ", row)
       print(i)
    i+=1

It seems like there are three rows which contain number of Males and Females listed as NaN. Since there are so few rows with this issue, I decided that it is appropriate to remove those rows

In [None]:
drop_cols = ['Male Population', 'Female Population']

In [None]:
#drop those values in the Male Population and Female Population columns which contain NaNs
data_demo = data_demo.dropna(subset=drop_cols)

In [None]:
#run loop again to ensure drop worked
i=0
for index, row in data_demo.iterrows():
    if(row['Male Population'] + row['Female Population'] != row['Total Population']):
       print("Issue with number of males or females are row: ", row)
       print(i)
    elif(row['Number of Veterans'] > row['Total Population'] or row['Foreign-born'] > row['Total Population']):
       print("Issue with number of foreign born or number of veterans at row: ", row)
       print(i)
    i+=1

In [59]:
#convert data_demo data frame to spark data frame
data_demo_spark = spark.createDataFrame(data_demo)

In [60]:
data_demo_spark.show()

+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|            City|         State|Median Age|Male Population|Female Population|Total Population|Number of Veterans|Foreign-born|Average Household Size|State Code|                Race| Count|
+----------------+--------------+----------+---------------+-----------------+----------------+------------------+------------+----------------------+----------+--------------------+------+
|   Silver Spring|      Maryland|      33.8|        40601.0|          41862.0|           82463|            1562.0|     30908.0|                   2.6|        MD|  Hispanic or Latino| 25924|
|          Quincy| Massachusetts|      41.0|        44129.0|          49500.0|           93629|            4147.0|     32935.0|                  2.39|        MA|               White| 58723|
|          Hoover|       Alabama|      38.5|      

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model



I chose to use a star schema with the immigration dataset chosen to be the fact table and the temperature dataset, airport codes, city demographics chosen to be dimension tables.

<h4>Fact Table</h4>

The fact table imm_fact is as shown below <br>
cicid                 INTEGER <br>
i94yr                 INTEGER<br>
i94mon                INTEGER<br>
i94cit                INTEGER<br>
i94res                INTEGER<br>
i94port               CHAR(3)<br>
arrdate               INTEGER<br>
i94mode               INTEGER<br>
i94addr               CHAR(3)<br>
depdate               INTEGER<br>
i94bir                INTEGER<br>
i94visa               INTEGER<br>
count                 INTEGER<br>
dtadfile              VARCHAR<br>
visapost              CHAR(3)<br>
entdepa               CHAR(1)<br>
entdepd               CHAR(1)<br>
matflag               CHAR(1)<br>
biryear               INTEGER<br>
dtaddto               INTEGER<br>
gender                CHAR(1)<br>
airline               CHAR(2)<br>
admnum                INTEGER<br>
fltno                 VARCHAR<br>
visatype              VARCHAR<br>

<h4>Dimension Tables</h4>

The table <b>dim_country_origin</b> is shown below using the i94cit i94res columns from the immigration data dataset <br>
country_code  INTEGER<br>
country_name  VARCHAR

The dimension table <b>dim_destination</b> is shown below using the i94addr column from the immigration dataset <br>
state_abb  CHAR(2)<br>
#city VARCHAR - maybe not use this one


The dimension table <b>dim_visa_status</b> is shown below using the VISATYPE column<br>
visa_type VARCHAR  <br>

The dimension table <b>dim_temp</b> is shown below using the city column from the temperature dataset<br>
city VARCHAR  <br>
Country  VARCHAR <br>
monthly_avg_temp INTEGER

The dimension table <b>dim_demo_city</b> is shown below using the columns<br>
city VARCHAR <br>
state VARCHAR <br>
number_of_veterans INTEGER <br>
foreign_born INTEGER <br>
race VARCHAR <br>
count INTEGER

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

The table <b>dim_visa_status</b> can be created by utilizing insert statements as there is not a lot of variety. There are 17 different types of visas within the dataset

The table <b>dim_country_origin</b> table can be created by reading the code values from the i94cit i94res columns from the immigration dataset. Also, the description for each code has to be parsed to the country_name column.

The table <b>dim_destination</b> table can be created by reading the column i94addr

The table <b>dim_temp</b> can be created by reading the City, Country and AverageTemperature columns respectively from the temperature dataset.

The table <b>dim_demo_city</b> can be created from the us cities demographics dataset using the columns:<br>
City <br>State<br> Number of Veterans<br> Foreign-born <br>Race<br> Count

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# #Build database connection
# conn = psycopg2.connect(CONN_STRING)
# conn.set_session(autocommit=True)
# cur = conn.cursor()

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.