# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [77]:
#importing libraries
import pandas as pd
import numpy as np
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
from datetime import datetime, timedelta

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

In [2]:
#launching a spark session
spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

Writing immigration data:
* spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')
* spark.write.parquet("sas_data")

In [3]:
# Reading in the data
c = pd.read_csv("/Users/tatianatikhonova/Documents/udacity/Capstone/us-cities-demographics.csv", sep = ';')
a = pd.read_csv("/Users/tatianatikhonova/Documents/udacity/Capstone/airport-codes.csv", sep = ',')
immigration = spark.read.parquet("sas_data") 

#immigration sample
i = pd.read_csv("/Users/tatianatikhonova/Documents/udacity/Capstone/immigration_data_sample.csv", sep = ',')


---

### Reading in the data dictionary for Immigration

In [4]:

with open('./I94_SAS_Labels_Descriptions.SAS') as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '')

def code_mapper(file, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic

i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
'2': 'Pleasure',
'3' : 'Student'}

In [5]:
#turning listo of ports into a dataframe with distinct columns
i94port_df = pd.DataFrame.from_dict(i94port, orient='index')
i94port_df.reset_index(level=0, inplace=True)
i94port_df.columns = ['Code','City_State']
i94port_df

Unnamed: 0,Code,City_State
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"
...,...,...
655,ADU,No PORT Code (ADU)
656,AKT,No PORT Code (AKT)
657,LIT,No PORT Code (LIT)
658,A2A,No PORT Code (A2A)


In [6]:
#splitting City_State column in two
i94port_df[['City', 'State','Else']] = i94port_df['City_State'].astype("string").str.split(', ', expand=True)
i94port_df.drop(['City_State', 'Else'], axis=1, inplace=True)
i94port_df

Unnamed: 0,Code,City,State
0,ALC,ALCAN,AK
1,ANC,ANCHORAGE,AK
2,BAR,BAKER AAF - BAKER ISLAND,AK
3,DAC,DALTONS CACHE,AK
4,PIZ,DEW STATION PT LAY DEW,AK
...,...,...,...
655,ADU,No PORT Code (ADU),
656,AKT,No PORT Code (AKT),
657,LIT,No PORT Code (LIT),
658,A2A,No PORT Code (A2A),


In [7]:
#Counting % of missing ports out of total. Turns out to be 9%
i94port_df.query('City.str.contains("No PORT")', engine='python').count() / i94port_df.shape[0]

Code     0.090909
City     0.090909
State    0.000000
dtype: float64

In [8]:
missing_ports_list = list(i94port_df.query('City.str.contains("No PORT")', engine='python').Code)

---

### Step 2: Explore and Assess the Data

What I will be looking out for:

* **Completeness** (do we have all the records that we need? any missing / NaaN?)
* **Validity** (records that don’t conform to a defined schema, e.g. negative height not possible but present or duplicate key identifier)
* **Accuracy** (adheres to define schema, but is incorrect; e.g. overestimated values or out of date information)
* **Consistency** (data valid and accurate, but fields are represented in an inconsistent manner, e.g. state as NY and New York)
* **Tidiness** (structure of tidy data: variable = column, observation = row, observational unit = table)

In [9]:
c.head(5)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Wichita,Kansas,34.6,192354.0,197601.0,389955,23978.0,40270.0,2.56,KS,American Indian and Alaska Native,8791
1,Allen,Pennsylvania,33.5,60626.0,59581.0,120207,5691.0,19652.0,2.67,PA,Black or African-American,22304
2,Danbury,Connecticut,37.3,43435.0,41227.0,84662,3752.0,25675.0,2.74,CT,Black or African-American,8454
3,Nashville,Tennessee,34.1,314231.0,340365.0,654596,27942.0,88193.0,2.39,TN,Hispanic or Latino,67526
4,Stamford,Connecticut,35.4,64941.0,63936.0,128877,2269.0,44003.0,2.7,CT,Asian,11013


In [10]:
a.head(5)

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [11]:
i.head(5)

Unnamed: 0.1,Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,i94bir,i94visa,count,dtadfile,visapost,occup,entdepa,entdepd,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,2027561,4084316.0,2016.0,4.0,209.0,209.0,HHW,20566.0,1.0,HI,20573.0,61.0,2.0,1.0,20160422,,,G,O,,M,1955.0,7202016,F,,JL,56582670000.0,00782,WT
1,2171295,4422636.0,2016.0,4.0,582.0,582.0,MCA,20567.0,1.0,TX,20568.0,26.0,2.0,1.0,20160423,MTR,,G,R,,M,1990.0,10222016,M,,*GA,94362000000.0,XBLNG,B2
2,589494,1195600.0,2016.0,4.0,148.0,112.0,OGG,20551.0,1.0,FL,20571.0,76.0,2.0,1.0,20160407,,,G,O,,M,1940.0,7052016,M,,LH,55780470000.0,00464,WT
3,2631158,5291768.0,2016.0,4.0,297.0,297.0,LOS,20572.0,1.0,CA,20581.0,25.0,2.0,1.0,20160428,DOH,,G,O,,M,1991.0,10272016,M,,QR,94789700000.0,00739,B2
4,3032257,985523.0,2016.0,4.0,111.0,111.0,CHM,20550.0,3.0,NY,20553.0,19.0,2.0,1.0,20160406,,,Z,K,,M,1997.0,7042016,F,,,42322570000.0,LAND,WT


In [12]:
i94port

{'ALC': 'ALCAN, AK',
 'ANC': 'ANCHORAGE, AK',
 'BAR': 'BAKER AAF - BAKER ISLAND, AK',
 'DAC': 'DALTONS CACHE, AK',
 'PIZ': 'DEW STATION PT LAY DEW, AK',
 'DTH': 'DUTCH HARBOR, AK',
 'EGL': 'EAGLE, AK',
 'FRB': 'FAIRBANKS, AK',
 'HOM': 'HOMER, AK',
 'HYD': 'HYDER, AK',
 'JUN': 'JUNEAU, AK',
 '5KE': 'KETCHIKAN, AK',
 'KET': 'KETCHIKAN, AK',
 'MOS': 'MOSES POINT INTERMEDIATE, AK',
 'NIK': 'NIKISKI, AK',
 'NOM': 'NOM, AK',
 'PKC': 'POKER CREEK, AK',
 'ORI': 'PORT LIONS SPB, AK',
 'SKA': 'SKAGWAY, AK',
 'SNP': 'ST. PAUL ISLAND, AK',
 'TKI': 'TOKEEN, AK',
 'WRA': 'WRANGELL, AK',
 'HSV': 'MADISON COUNTY - HUNTSVILLE, AL',
 'MOB': 'MOBILE, AL',
 'LIA': 'LITTLE ROCK, AR (BPS)',
 'ROG': 'ROGERS ARPT, AR',
 'DOU': 'DOUGLAS, AZ',
 'LUK': 'LUKEVILLE, AZ',
 'MAP': 'MARIPOSA AZ',
 'NAC': 'NACO, AZ',
 'NOG': 'NOGALES, AZ',
 'PHO': 'PHOENIX, AZ',
 'POR': 'PORTAL, AZ',
 'SLU': 'SAN LUIS, AZ',
 'SAS': 'SASABE, AZ',
 'TUC': 'TUCSON, AZ',
 'YUI': 'YUMA, AZ',
 'AND': 'ANDRADE, CA',
 'BUR': 'BURBANK, CA',
 '

In [13]:
immigration.head(1)

[Row(cicid=5748517.0, i94yr=2016.0, i94mon=4.0, i94cit=245.0, i94res=438.0, i94port='LOS', arrdate=20574.0, i94mode=1.0, i94addr='CA', depdate=20582.0, i94bir=40.0, i94visa=1.0, count=1.0, dtadfile='20160430', visapost='SYD', occup=None, entdepa='G', entdepd='O', entdepu=None, matflag='M', biryear=1976.0, dtaddto='10292016', gender='F', insnum=None, airline='QF', admnum=94953870030.0, fltno='00011', visatype='B1')]

In [14]:
#understand number of rows and columns
print(f'count of rows and columns for cities: {c.shape}')
print(f'count of rows and columns for airports: {a.shape}')
print(f'count of rows and columns for immigration: {immigration.count(), len(immigration.columns)}')

count of rows and columns for cities: (2891, 12)
count of rows and columns for airports: (55075, 12)
count of rows and columns for immigration: (3096313, 28)


In [15]:
#understand columns and data types
c.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2891 entries, 0 to 2890
Data columns (total 12 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   City                    2891 non-null   object 
 1   State                   2891 non-null   object 
 2   Median Age              2891 non-null   float64
 3   Male Population         2888 non-null   float64
 4   Female Population       2888 non-null   float64
 5   Total Population        2891 non-null   int64  
 6   Number of Veterans      2878 non-null   float64
 7   Foreign-born            2878 non-null   float64
 8   Average Household Size  2875 non-null   float64
 9   State Code              2891 non-null   object 
 10  Race                    2891 non-null   object 
 11  Count                   2891 non-null   int64  
dtypes: float64(6), int64(2), object(4)
memory usage: 271.2+ KB


In [16]:
a.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 55075 entries, 0 to 55074
Data columns (total 12 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   ident         55075 non-null  object 
 1   type          55075 non-null  object 
 2   name          55075 non-null  object 
 3   elevation_ft  48069 non-null  float64
 4   continent     27356 non-null  object 
 5   iso_country   54828 non-null  object 
 6   iso_region    55075 non-null  object 
 7   municipality  49399 non-null  object 
 8   gps_code      41030 non-null  object 
 9   iata_code     9189 non-null   object 
 10  local_code    28686 non-null  object 
 11  coordinates   55075 non-null  object 
dtypes: float64(1), object(11)
memory usage: 5.0+ MB


In [17]:
i.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 29 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  1000 non-null   int64  
 1   cicid       1000 non-null   float64
 2   i94yr       1000 non-null   float64
 3   i94mon      1000 non-null   float64
 4   i94cit      1000 non-null   float64
 5   i94res      1000 non-null   float64
 6   i94port     1000 non-null   object 
 7   arrdate     1000 non-null   float64
 8   i94mode     1000 non-null   float64
 9   i94addr     941 non-null    object 
 10  depdate     951 non-null    float64
 11  i94bir      1000 non-null   float64
 12  i94visa     1000 non-null   float64
 13  count       1000 non-null   float64
 14  dtadfile    1000 non-null   int64  
 15  visapost    382 non-null    object 
 16  occup       4 non-null      object 
 17  entdepa     1000 non-null   object 
 18  entdepd     954 non-null    object 
 19  entdepu     0 non-null      

In [18]:
c.columns.to_series().groupby(c.dtypes).groups

{int64: ['Total Population', 'Count'], float64: ['Median Age', 'Male Population', 'Female Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size'], object: ['City', 'State', 'State Code', 'Race']}

In [19]:
c.describe()

Unnamed: 0,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,Count
count,2891.0,2888.0,2888.0,2891.0,2878.0,2878.0,2875.0,2891.0
mean,35.494881,97328.43,101769.6,198966.8,9367.832523,40653.6,2.742543,48963.77
std,4.401617,216299.9,231564.6,447555.9,13211.219924,155749.1,0.433291,144385.6
min,22.9,29281.0,27348.0,63215.0,416.0,861.0,2.0,98.0
25%,32.8,39289.0,41227.0,80429.0,3739.0,9224.0,2.43,3435.0
50%,35.3,52341.0,53809.0,106782.0,5397.0,18822.0,2.65,13780.0
75%,38.0,86641.75,89604.0,175232.0,9368.0,33971.75,2.95,54447.0
max,70.5,4081698.0,4468707.0,8550405.0,156961.0,3212500.0,4.98,3835726.0


#### Missing values in Cities

In [20]:
def find_missing_data(df):
    '''
    INPUT:
        df - (dataframe), dataframe to check for missing values in its columns
    OUTPUT:
        df_null: (dataframe), with count & percentage of missing values in input dataframe columns
    '''
    null_data = df.isnull().sum()[df.isnull().sum() > 0]
    
    data_dict = {'count': null_data.values, 
                 'pct': np.round(null_data.values *100/df.shape[0],2)}
    
    df_null = pd.DataFrame(data=data_dict, index=null_data.index)
    df_null.sort_values(by='count', ascending=False, inplace=True)
    return df_null



In [21]:
c.isnull().sum().sum() #count of all missing values in Cities

48

In [22]:
find_missing_data(c)

Unnamed: 0,count,pct
Average Household Size,16,0.55
Number of Veterans,13,0.45
Foreign-born,13,0.45
Male Population,3,0.1
Female Population,3,0.1


In [23]:
def nans(df): 
    return df[df.isnull().any(axis=1)]

nans(c)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
5,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,Hispanic or Latino,335559
9,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,Hispanic or Latino,76349
280,Bayamón,Puerto Rico,39.4,80128.0,90131.0,170259,,,,PR,Hispanic or Latino,169155
740,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,White,72211
806,San Juan,Puerto Rico,41.4,155408.0,186829.0,342237,,,,PR,American Indian and Alaska Native,4031
908,The Villages,Florida,70.5,,,72590,15231.0,4034.0,,FL,Black or African-American,331
1121,Guaynabo,Puerto Rico,42.2,33066.0,37426.0,70492,,,,PR,American Indian and Alaska Native,589
1196,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Asian,235
1443,Mayagüez,Puerto Rico,38.1,30799.0,35782.0,66581,,,,PR,Hispanic or Latino,65521
1530,Caguas,Puerto Rico,40.4,34743.0,42265.0,77008,,,,PR,American Indian and Alaska Native,624


In [24]:
#missing values comprise less than 1% of Cities data, so it's safe to drop them
c2 = c.dropna(axis=0)
c2.isnull().sum().sum() 

0

#### Missing values in Airport

In [25]:
a.isnull().sum().sum() #count of all missing values in Airports

126968

In [26]:
find_missing_data(a)

Unnamed: 0,count,pct
iata_code,45886,83.32
continent,27719,50.33
local_code,26389,47.91
gps_code,14045,25.5
elevation_ft,7006,12.72
municipality,5676,10.31
iso_country,247,0.45


In [27]:
#dropping columns that are missing over 25% of data
cols = a.columns[a.isnull().sum()/len(a) > .25]
a2 = a.drop(cols,axis=1)
a2.head(2)

Unnamed: 0,ident,type,name,elevation_ft,iso_country,iso_region,municipality,coordinates
0,00A,heliport,Total Rf Heliport,11.0,US,US-PA,Bensalem,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,US,US-KS,Leoti,"-101.473911, 38.704022"


In [28]:
#dropping the remaining rows with null values
a2.dropna(axis=0, inplace=True)
a2.isnull().sum().sum() #count of all missing values in Airports

0

#### Missing values in Immigration

In [29]:
type(immigration)

pyspark.sql.dataframe.DataFrame

In [46]:
i2 = immigration[~immigration["i94port"].isin(missing_ports_list)] #filtering out missng ports derived from the SAS file

{col:i2.filter(i2[col].isNull()).count() / i2.count() for col in i2.columns} #checking % of null values in each col

{'cicid': 0.0,
 'i94yr': 0.0,
 'i94mon': 0.0,
 'i94cit': 0.0,
 'i94res': 0.0,
 'i94port': 0.0,
 'arrdate': 0.0,
 'i94mode': 7.729379857947582e-05,
 'i94addr': 0.04910743449916529,
 'depdate': 0.045907341875969,
 'i94bir': 0.0002590474169964859,
 'i94visa': 0.0,
 'count': 0.0,
 'dtadfile': 3.2340501497688625e-07,
 'visapost': 0.6080088664718906,
 'occup': 0.9973755683034625,
 'entdepa': 7.697039356449892e-05,
 'entdepd': 0.0446046664756421,
 'entdepu': 0.999873548639144,
 'matflag': 0.0446046664756421,
 'biryear': 0.0002590474169964859,
 'dtaddto': 0.00015426419214397473,
 'gender': 0.13397279128927997,
 'insnum': 0.9632262625570083,
 'airline': 0.027045391187472068,
 'admnum': 0.0,
 'fltno': 0.0063222446377831495,
 'visatype': 0.0}

In [53]:
i2 = i2.drop("insnum", "entdepu", "occup", "visapost") #dropping extra columns
i2 = i2.dropna(how='any') #dropping null values
{col:i2.filter(i2[col].isNull()).count() / i2.count() for col in i2.columns}

{'cicid': 0.0,
 'i94yr': 0.0,
 'i94mon': 0.0,
 'i94cit': 0.0,
 'i94res': 0.0,
 'i94port': 0.0,
 'arrdate': 0.0,
 'i94mode': 0.0,
 'i94addr': 0.0,
 'depdate': 0.0,
 'i94bir': 0.0,
 'i94visa': 0.0,
 'count': 0.0,
 'dtadfile': 0.0,
 'entdepa': 0.0,
 'entdepd': 0.0,
 'matflag': 0.0,
 'biryear': 0.0,
 'dtaddto': 0.0,
 'gender': 0.0,
 'airline': 0.0,
 'admnum': 0.0,
 'fltno': 0.0,
 'visatype': 0.0}

In [72]:
# Create udf to convert SAS date to PySpark date 
@udf(StringType())
def convert_datetime(x):
    if x:
        return (datetime(1960, 1, 1).date() + timedelta(x)).isoformat()
    return None

In [78]:
i2 = i2.withColumn("arrdate", convert_datetime(i2.arrdate)) #converting arrival_date of SAS format to PySpark format
i2 = i2.filter(i2.i94addr != 'other')

i3 = i2.select(col("cicid").alias("id"), 
                                       col("arrdate").alias("arrival_date"),
                                       col("i94port").alias("port_code"),
                                       col("i94addr").alias("state_code"),
                                       col("i94bir").alias("age"),
                                       col("gender").alias("gender"),
                                       col("i94visa").alias("visa_type"),
                                       "count").drop_duplicates()

i3.head()

Py4JJavaError: An error occurred while calling o1423.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 717.0 failed 1 times, most recent failure: Lost task 3.0 in stage 717.0 (TID 1196) (192.168.1.14 executor driver): org.apache.spark.SparkException: 
Error from python worker:
  dyld[24154]: dyld cache '/System/Library/dyld/dyld_shared_cache_x86_64h' not loaded: syscall to map cache into shared region failed
  dyld[24154]: Library not loaded: /System/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation
    Referenced from: /Library/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python
    Reason: tried: '/System/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation' (no such file), '/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation' (no such file)
PYTHONPATH was:
  /Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip:/Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.3-src.zip:/Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/spark-core_2.12-3.2.1.jar
org.apache.spark.SparkException: EOFException occurred while reading the port number from pyspark.daemon's stdout and terminated with code: 134.
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:227)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:133)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:106)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: 
Error from python worker:
  dyld[24154]: dyld cache '/System/Library/dyld/dyld_shared_cache_x86_64h' not loaded: syscall to map cache into shared region failed
  dyld[24154]: Library not loaded: /System/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation
    Referenced from: /Library/Frameworks/Python.framework/Versions/3.6/Resources/Python.app/Contents/MacOS/Python
    Reason: tried: '/System/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation' (no such file), '/Library/Frameworks/CoreFoundation.framework/Versions/A/CoreFoundation' (no such file)
PYTHONPATH was:
  /Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip:/Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/python/lib/py4j-0.10.9.3-src.zip:/Users/tatianatikhonova/opt/anaconda3/lib/python3.8/site-packages/pyspark/jars/spark-core_2.12-3.2.1.jar
org.apache.spark.SparkException: EOFException occurred while reading the port number from pyspark.daemon's stdout and terminated with code: 134.
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:227)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:133)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:106)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162)
	at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81)
	at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model


Immigration = FACT
Airports = DIM
Cities = DIM


#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [None]:
# Write code here

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# Perform quality checks here

#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

Immigration columns:

*  cicid      primary key, id from sas file
*  i94yr      entry year, 4 digit year
*  i94mon     entry month, numeric month
*  i94cit     i94 citizenship country code as per SAS Labels Descriptions file
*   i94res    i94 residence country code as per SAS Labels Descriptions file
*  i94port     i94port code as per SAS Labels Descriptions file
*  arrdate     date of arrival in U.S.
*  i94mode     code for travel mode of arrival as per SAS Labels Descriptions file
*  i94addr     address
*  depdate     departure date from U.S.
*  i94bir      age of the immigrant
*  i94visa     visa category code as per SAS Labels Descriptions file
*  dtadfile    Character Date Field - Date added to I-94 Files - CIC does not use */  
*  visapost    visa category code as per SAS Labels Descriptions file
*  occup       occupation of immigrant
*  entdepa     Arrival Flag - admitted or paroled into the U.S. - CIC does not use
*  entdepd     Departure Flag - Departed, lost I-94 or is deceased - CIC does not use
*  entdepu     Update Flag - Either apprehended, overstayed, adjusted to perm residence - CIC does not use
*  matflag     Match flag - Match of arrival and departure records
*  biryear     birth year of immigrant
* count        used for summary stats
*  dtaddto     character Date Field - Date to which admitted to U.S. (allowed to stay until) - CIC does not use */
*  gender      gender of immigrant
*  insnum      INS number
*  airline     airline code used to arrive in U.S.
*  admnum      admission number
*  fltno       flight number
*  visatype  visa type

In [77]:
fact_immigraions:
|-- cicid: id from sas file
|-- entry_year: 4 digit year
|-- entry_month: numeric month
|-- origin_country_code: i94 country code as per SAS Labels Descriptions file
|-- port_code: i94port code as per SAS Labels Descriptions file
|-- arrival_date: date of arrival in U.S.
|-- travel_mode_code: code for travel mode of arrival as per SAS Labels Descriptions file
|-- us_state_code: two letter U.S. state code
|-- departure_date: departure date from U.S.
|-- age: age of the immigrant
|-- visa_category_code: visa category code as per SAS Labels Descriptions file
|-- occupation: occupation of immigrant
|-- gender: gender of immigrant
|-- birth_year: birth year of immigrant
|-- entry_date: Date to which admitted to U.S. (allowed to stay until)
|-- airline: airline code used to arrive in U.S.
|-- admission_number: admission number
|-- flight_number: flight number
|-- visa_type: visa type
    
dim_city_demographics:
|-- port_code: i94port code
|-- city: U.S. city name
|-- state_code: two letter U.S. sate code
|-- male_population: total male population
|-- female_population: total female population
|-- total_population: total population
|-- number_of_veterans: number of veterans
|-- num_foreign_born: number of foreign born 

SyntaxError: invalid syntax (<ipython-input-77-33effe78f7ae>, line 1)

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.