# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [3]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import os
import glob
import configparser
import boto3
from pyspark.sql.types import IntegerType,StringType
from pyspark.sql.functions import col, countDistinct, udf

In [None]:
%load_ext sql
%load_ext autoreload
%autoreload 2

In [None]:
# set pandas dataframe display parameters
pd.set_option('max_columns', None)
pd.set_option('max_colwidth',100)
pd.set_option('max_rows',None)

In [None]:
config = configparser.ConfigParser()
config.read('decp.cfg')
os.environ['AWS_ACCESS_KEY_ID']=config.get('AWS','AWS_ACCESS_KEY_ID')
os.environ['AWS_SECRET_ACCESS_KEY']=config.get('AWS','AWS_SECRET_ACCESS_KEY')

In [None]:
spark = SparkSession.builder\
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.0.0") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false") \
        .config("spark.hadoop.fs.s3a.fast.upload","true") \
        .config("dfs.client.read.shortcircuit.skip.checksum", "true")\
        .getOrCreate()

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### 1.1 Scope
**What Data**

The main dataset include data on immigration to the United States, and supplementary datasets will include data on airport codes, U.S. city demographics, and temperature data. 

**What Plan to Do**

We will use these data to build a model to explore the correlation between immigration data and month, immigration reason.

**What looks like** 

![immigration_reason](./image/immigration_reason.png)

![immigration_reason_month](./image/immigration_reason_month.png)

**What Tools**

Pandas, Numpy, Pyspark, Matplotlib, etc.

### 1.2 Describe and Gather Data

### 1.2.1 Describe
#### 1.  I94 Immigration Data & Descriptions
This data comes from the US National Tourism and Trade Office. A data dictionary is included in the workspace. [This](https://www.trade.gov/national-travel-and-tourism-office) is where the data comes from. There's a sample file so you can take a look at the data in csv format before reading it all in. You do not have to use the entire dataset, just use what you need to accomplish the goal you set at the beginning of the project.

Some of files
Sample : *immigration_data_sample.csv*
Descriptions : *I94_SAS_Labels_Descriptions.SAS*

#### 2. World Temperature Data
This dataset came from Kaggle. You can read more about it [here](https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data).

File : *GlobalLandTemperaturesByCity.csv*

#### 3. U.S. City Demographic Data
This data comes from OpenSoft. You can read more about it [here](https://public.opendatasoft.com/explore/dataset/us-cities-demographics/export/).

File : *us-cities-demographics.csv*

#### 4. Airport Code Table
This is a simple table of airport codes and corresponding cities. It comes from [here](https://datahub.io/core/airport-codes#data).

File : *airport-codes_csv.csv*

### 1.2.2 Gather Data
#### 1. I94 Immigration Data

In [5]:
# Read in the data here
# read the path of all immigration data
fpath="./data/18-83510-I94-Data-2016/"
immigration_files=glob.glob(fpath +'*.sas7bdat')

In [None]:
# read the immigration files and save date to parquet
for i,f in enumerate(immigration_files):
    if i > 0:
        print(f)
#       df_immigration = pd.read_sas(f, 'sas7bdat', encoding='ISO-8859-1', chunksize=20)
        df_immigration = pd.read_sas(f, 'sas7bdat', encoding='ISO-8859-1')
#         print(df_immigration.head())
        df_immigration.to_parquet('./img_parq_temp/df_immigration_' + f[-18:] + '.parquet')
    else:
        continue

**Because it takes too long to store immigration data，so we directly read the previously stored data.**

In [None]:
# Because it takes too long to store immigration data，
# so we directly read the previously stored data.
df_immigration = spark.read.parquet('./immig_parq_temp')

#### 2. I94 Descriptions

In [None]:
# get file path
bucket_name = './source/'
city_files = os.path.join(bucket_name,'I94_SAS_Labels_Descriptions.SAS')

In [None]:
# read file

def code_mapper(f_content, idx):
    f_content2 = f_content[f_content.index(idx):]
    f_content2 = f_content2[:f_content2.index(';')].split('\n')
    f_content2 = [i.replace("'", "") for i in f_content2]
    dic = [i.split('=') for i in f_content2[1:]]
    dic = dict([i[0].strip(), i[1].strip()] for i in dic if len(i) == 2)
    return dic
with open(city_files) as f:
    f_content = f.read()
    f_content = f_content.replace('\t', '') 
    
i94cit_res = code_mapper(f_content, "i94cntyl")
i94port = code_mapper(f_content, "i94prtl")
i94mode = code_mapper(f_content, "i94model")
i94addr = code_mapper(f_content, "i94addrl")
i94visa = {'1':'Business',
            '2': 'Pleasure',
            '3' : 'Student'}

df_i94cit_res= pd.DataFrame(list(i94cit_res.items()),columns = ['i94cit_res','origin_travelled_country'])

df_i94port= pd.DataFrame(list(i94port.items()),columns = ['i94port','destination_city'])

df_i94mode= pd.DataFrame(list(i94mode.items()),columns = ['i94mode','transportation'])

df_i94addr= pd.DataFrame(list(i94addr.items()),columns = ['i94addr','state'])

df_i94visa= pd.DataFrame(list(i94visa.items()),columns = ['i94visa','immigration_reason'])

#### 3. Airport Code Table, World Temperature Data, U.S. City Demographic Data

In [None]:
# get file path
airpot_files = 'airport-codes_csv.csv'
temperature_files = 'GlobalLandTemperaturesByCity.csv'
demographic_files = 'us-cities-demographics.csv'

# bucket_name = 's3://lcw-udacity-capstone/source/'
bucket_name = './source/'
airport_files = os.path.join(bucket_name, airpot_files)
temperature_files = os.path.join(bucket_name, temperature_files)
demographic_files = os.path.join(bucket_name, demographic_files)

In [None]:
# read file
df_airport = pd.read_csv(airport_files, sep=',')
df_temperature = pd.read_csv(temperature_files, sep=',')
df_demographic = pd.read_csv(demographic_files,sep=';')

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

#### 1. I94 Immigration Data

In [None]:
# Performing cleaning tasks here
# Convert data format
df_immigration = df_immigration.withColumn("cicid", df_immigration["cicid"].astype('bigint'))
df_immigration = df_immigration.withColumn("i94yr", df_immigration["i94yr"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94mon", df_immigration["i94mon"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94cit", df_immigration["i94cit"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94res", df_immigration["i94res"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("arrdate", df_immigration["arrdate"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94mode", df_immigration["i94mode"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("depdate", df_immigration["depdate"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94bir", df_immigration["i94bir"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("i94visa", df_immigration["i94visa"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("count", df_immigration["count"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("dtadfile", df_immigration["dtadfile"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("biryear", df_immigration["biryear"].cast(IntegerType()))
df_immigration = df_immigration.withColumn("admnum", df_immigration["admnum"].astype('bigint'))
df_immigration = df_immigration.withColumn("depdate", df_immigration["depdate"].astype('int'))

# transform the date data and extract columns to create table
df_immigration = df_immigration.selectExpr("*", "date_add('1960-01-01',arrdate) as arrival_date",\
                     "date_add('1960-01-01',depdate) as departure_date").drop('arrdate','depdate')

df_immigration = df_immigration.sort('i94yr','i94mon','cicid')

# the "cicid" column be set as the primary key and be auto-incrementing
df_immigration = df_immigration.withColumn("cicid", monotonically_increasing_id())

# extract columns to create table
immigration_table = df_immigration.select('cicid', 'i94yr',   'i94mon',  'i94cit',  'i94res',  'i94port',  'arrival_date',  
                                        'departure_date', 'i94mode',  'i94addr',   'i94visa','matflag', 'airline',  'admnum',  'fltno')
immigrant_table = df_immigration.select('admnum', 'i94bir', 'gender', 'biryear', 'visatype').dropDuplicates()

# To filter out immigrant_table who has been recorded by immigration for many times,
# but the information recorded before and after is inconsistent
admnum_error = immigrant_table.groupby('admnum').count().orderBy(col("count").desc()).where('count > 1')
pd_admnum_error = admnum_error.toPandas()
rows = pd_admnum_error['admnum'].tolist()

# To filter out the immigration_table what has the error Admission Number
immigration_table =  immigration_table.filter(~col('admnum').isin(rows))

# To filter out the immigrant_table what has the error Admission Number
immigrant_table = immigrant_table.filter(~col('admnum').isin(rows))

immigration_table = immigration_table.fillna(0)
immigrant_table = immigrant_table.fillna(0)


#### 2. I94 Descriptions

In [None]:
# only df_i94port need to clean
df_i94port['city'] = df_i94port['destination_city'].str.split(',', expand=True)[0]
df_i94port['region'] = df_i94port['destination_city'].str.split(',', expand=True)[1]
df_i94port.drop(['destination_city'], axis=1, inplace=True)

#### 3. Airport Code Table, World Temperature Data, U.S. City Demographic Data

In [None]:
# To filter the closed airports
airport_table = df_airport[~df_airport['type'].isin(['closed'])]

# To filter the airports in US
airport_table = airport_table[airport_table['iso_country']=='US']

airport_table = airport_table[~airport_table['local_code'].isna()]

airport_table['longitude'] = airport_table['coordinates'].str.split(',', expand=True)[0].astype('float').round(2)
airport_table['latitude'] = airport_table['coordinates'].str.split(',', expand=True)[1].astype('float').round(2)
airport_table['iso_region'] = airport_table['iso_region'].str.split('-', expand=True)[1]

airport_table.drop(['coordinates','continent','iata_code'], axis=1, inplace=True)

# drop duplicates of local_code, and keep the first
airport_table = airport_table.drop_duplicates(subset=['local_code'], keep='first', inplace=False)

airport_table = airport_table.reset_index()
airport_table.rename(columns = {'index':'airport_id'}, inplace = True)

In [None]:
# To filter out the rows what have null value
temperature_table = df_temperature[~df_temperature['AverageTemperature'].isna()]

# transport the type of 'dt' to <datetime>
temperature_table.loc[:,'dt'] = pd.to_datetime(temperature_table.loc[:,'dt'])

# In view of the impact of human activities on climate in recent years, we screened the data after 1985 
# to calculate the average temperature
temperature_table = temperature_table[(temperature_table['dt']>'1985-12-31')]

temperature_table['month'] = temperature_table['dt'].dt.month

# delete the letter E what in the columns of 'latitude' and 'longitude'
temperature_table.loc[:,'Latitude'] = temperature_table.apply(lambda x: x['Latitude'][:-1], axis = 1)
temperature_table.loc[:,'Longitude'] = temperature_table.apply(lambda x: x['Longitude'][:-1], axis = 1)

temperature_table['Longitude'] = '-' + temperature_table['Longitude'] 

temperature_table['Latitude'] = temperature_table['Latitude'].astype('float')
temperature_table['Longitude'] = temperature_table['Longitude'].astype('float')

# calculate the mean of city's temperature of each month
temperature_table_month=temperature_table.groupby(['Country','City','month','Latitude','Longitude'],as_index=False)['AverageTemperature'].mean().round(3)

temperature_table_month = temperature_table_month.reset_index()
temperature_table_month.rename(columns = {'index':'temperature_id'}, inplace = True)


In [None]:
# To filter out the columns of 'State Code, Race, Count', and merge the duplicates
demographic_table = df_demographic.iloc[:,0:9].drop_duplicates()
demographic_table = demographic_table.fillna(0)
demographic_table['Male Population'] = demographic_table['Male Population'].astype('int')
demographic_table['Female Population'] = demographic_table['Female Population'].astype('int')
demographic_table['Number of Veterans'] = demographic_table['Number of Veterans'].astype('int')
demographic_table['Foreign-born'] = demographic_table['Foreign-born'].astype('int')

demographic_table = demographic_table.reset_index()
demographic_table.rename(columns = {'index':'demographic_id'}, inplace = True)

#### 4. When we have processed the data, we write them to the PARQUET, CSV file for using.

In [None]:
# write table to parquet files 
output_data = './transformed/'
immigration_table.repartition(4).write.parquet(output_data +'immigration.parq', mode="overwrite")
immigrant_table.repartition(1).write.parquet(output_data +'immigrant.parq', mode="overwrite")

# write table to csv files
s3_output_data = './transformed/'
df_i94cit_res.to_csv(s3_output_data + 'df_i94cit_res.csv',index=False)
df_i94port.to_csv(s3_output_data + 'df_i94port.csv',index=False)
df_i94mode.to_csv(s3_output_data + 'df_i94mode.csv',index=False)
df_i94addr.to_csv(s3_output_data + 'df_i94addr.csv',index=False)
df_i94visa.to_csv(s3_output_data + 'df_i94visa.csv',index=False)

airport_transformed = os.path.join(s3_output_data, 'us_airport.csv')
airport_table.to_csv(airport_transformed, index=False)
temperature_table_month.to_csv(s3_output_data + 'temperature_table_month.csv',index=False)
demographic_table.to_csv(s3_output_data + 'demographic_table.csv',index=False)

### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### 3.1 Schema for Immigration Analysis
![schema](image/immigration.png)

- In this model, we don't consider the violet tables, like *i94cit_res, i94mode, temperature, demographic*. Otherwise, the analysis based on the fact table will be greatly affected.

- If we want to explore the relationship between temperature, urban population and immigration data, we can build another model

#### Fact Table
immigration - records with air transportation of immigrantion
- *cicid, year, month, airport_id, state_id, city, visa_id, admnum, longitude, latitude*

#### Dimension Table
i94port - code of destination city

i94addr - is where the immigrants resides in USA 

i94visa - reason for immigration

airport - information of airports in USA

### 3.2 Steps
1. Create tables (**create_tables.py**)
2. Read data, explore and clean data, save data (**etl.py**)
3. Copy data to PostgreSQL (**airflow/dags/dag.py**)
4. Insert to fact table (**airflow/dags/dag.py**)
5. Analyse the fact table (**immigration_analysis.ipynb**)

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

### **"airflow/dags/dag.py"**

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
# **"airflow/dags/dag.py"**

run_quality_checks = DataQualityOperator(
    task_id='Run_data_quality_checks',
    dag=dag,
    dq_checks=[
        {'check_sql': "SELECT COUNT(*) FROM immigration WHERE cicid is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM immigrant WHERE admnum is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM i94cit_res WHERE i94cit_res is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM i94port WHERE i94port is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM i94mode WHERE i94mode is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM i94addr WHERE i94addr is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM i94visa WHERE i94visa is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM airport WHERE local_code is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM temperature WHERE averagetemperature is null", 'expected_result':0},
        {'check_sql': "SELECT COUNT(*) FROM demographic WHERE total_population is null", 'expected_result':0}
    ],
    table=('immigration', 'immigrant', 'i94cit_res', 'i94port', 'i94mode', 'i94addr', 'i94visa', 'airport', 'temperature', 'demographic'),
    redshift_conn_id="redshift"
)

### **"airflow/plugins/operators/data_quality.py"**
The operator's main functionality is to receive one or more SQL based test cases along with the expected results and execute the tests. 


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

**"Data_Dictionary.xlsx"**

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

### 1. tools and technologies

Pandas - We use it to read and wirte files to dataframe.

Numpy, Matplotlib - We can use them to analyse the immigration and plot.

Pyspark - When we read a large amount of data, pandas is no longer applicable. At this time, we need to use pyspark.

### 2. 

Monthly. Because most of our analysis is done on a monthly basis.

### 3.

- The data was increased by 100x.

When the amount of data is small, we can use the traditional way to process it on the personal computer. Due to the scalability of redshift, we can also directly use redshift to store data, so that even if the data was increased by 100x, we don't have to worry about insufficient storage space, and use Hadoop cluster to calculate and analyze.

- The data populates a dashboard that must be updated on a daily basis by 7am every day.

Airflow is a programmable, scheduling and monitoring workflow platform.  Airflow provides rich command-line tools for system control, and its web management interface can also facilitate the control and scheduling of tasks, and real-time monitor the operation status of tasks. We can use airflow to schedule the execution time of tasks.

- The database needed to be accessed by 100+ people.

Traditional relational database can be contentedly used. However, in order to improve the access and reading efficiency, we can choose NoSQL, such as Cassandra.
