# Project Title
### Data Engineering Capstone Project

#### Project Summary
### ***The project aims to take data relating to immigration, and perform ETL such that the data can be further analysed. The process will use airflow, and spark to co-ordinate the retrieval of the data, and transformation into fact and dimension tables. These will be stored in amazon redshift so that the users(data scientists) can query the data efficiently and perform the analytics. To accomplish the purpose of this project first perform  extracting and transforming the data using Apache Spark and loading the data to Amazon S3. then loading the star schema from Amazon S3 to Amazon Redshift using Apache Airflow, and performing data quality check*** ###



The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [None]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import functions as F


### Step 1: Scope the Project and Gather Data
### I decided to use the data sets provided as part of the 'Udacity Provided Project'. This data consists of data related to immigration in the United States.###

The data collected was as follows;

* I94 Immigration Data - This data was from the US National Tourism and Trade Office. The data contains international visitor arrival statistics by world regions, and select countries. The data contains the type of visa, the mode of transportation, the age groups, states visited, and the top ports of entry for immigration into the United States.

* US Cities: Demographics - This dataset contains information about the demographics of all US cities, and census-designated places with a population greater or equal to 65,000. The dataset can be accessed here 

### Firstly we will aim to understand the schema of the data collected. The aim of the process will be to develop a data pipeline, such that the provided data can be transformed, cleaned, and loaded into a data warehouse. The aim will be to develop the data warehouse such that relevant insights can be extracted easily. We will have a few outcomes we wish to satisfy in the process ###

* The data must be stored in fact, and dimensional tables
* The data must be cleaned, such that it can be queryable  
* The data must be stored such that database joins can be easily made to correlate data sources 

# The users wanted to perform the following analytics:
* find the state that has the most immigrants
* find out the state that has the least immigrants
* find out the state that has the most ratio of white people
* find out the state that has the least ratio of white people


In [None]:
# Read in the data here

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [None]:
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark


In [None]:
spark = create_spark_session()

In [None]:
df_immigration=spark.read.parquet("sas_data")

In [None]:
df_immigration.count()

In [None]:
df_immigration.show(10)

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [None]:
# Performing cleaning tasks here
df_immigration.printSchema()

In [None]:
df_immigration = df_immigration \
        .withColumn("cicid", df_immigration["cicid"].cast(IntegerType())) \
        .withColumn("i94yr", df_immigration["i94yr"].cast(IntegerType())) \
        .withColumn("i94mon", df_immigration["i94mon"].cast(IntegerType())) \
        .withColumn("i94cit", df_immigration["i94cit"].cast(IntegerType())) \
        .withColumn("i94res", df_immigration["i94res"].cast(IntegerType())) \
        .withColumn("arrdate", df_immigration["arrdate"].cast(StringType())) \
        .withColumn("i94mode", df_immigration["i94mode"].cast(IntegerType())) \
        .withColumn("depdate", df_immigration["depdate"].cast(StringType())) \
        .withColumn("i94bir", df_immigration["i94bir"].cast(IntegerType())) \
        .withColumn("i94visa", df_immigration["i94visa"].cast(IntegerType())) \
        .withColumn("count", df_immigration["count"].cast(IntegerType())) \

In [None]:
df_immigration = df_immigration.select("cicid","i94yr","i94mon","i94cit","i94res","i94port","arrdate","i94mode","i94addr","depdate","i94bir","i94visa","count","gender","airline","fltno","visatype")


In [None]:
df_immigration.show()

In [None]:
df_demographics = spark.read.format("csv").option("delimiter", ";").option("header","true").load("us-cities-demographics.csv").dropDuplicates()

In [None]:
df_demographics.count()

In [None]:
df_demographics.show(10)

In [None]:
df_demographics.printSchema()

In [None]:
df_demographics = df_demographics \
        .withColumn("Total Population", df_demographics["Total Population"].cast(IntegerType())) \
        .withColumn("Male Population", df_demographics["Male Population"].cast(IntegerType())) \
        .withColumn("Female Population", df_demographics["Female Population"].cast(IntegerType())) \
        .withColumn("Number of Veterans", df_demographics["Number of Veterans"].cast(IntegerType())) \
        .withColumn("Foreign-born", df_demographics["Foreign-born"].cast(IntegerType())) \
        .withColumn("Count", df_demographics["Count"].cast(IntegerType()))

In [None]:
df_demographics = df_demographics \
        .withColumnRenamed("City", "city") \
        .withColumnRenamed("State", "state") \
        .withColumnRenamed("Median Age", "median_age") \
        .withColumnRenamed("Male Population", "male_population") \
        .withColumnRenamed("Female Population", "female_population") \
        .withColumnRenamed("Total Population", "total_population") \
        .withColumnRenamed("Number of Veterans", "num_of_veterans") \
        .withColumnRenamed("Foreign-born", "foreign_born") \
        .withColumnRenamed("Average Household Size", "avg_household_size") \
        .withColumnRenamed("State Code", "state_code") \
        .withColumnRenamed("Race", "race") \
        .withColumnRenamed("Count", "count") \

In [None]:
df_demographics.show()

In [None]:
pivot_df_demographics = df_demographics.groupBy("state_code").pivot("race").sum("count").sort("state_code")

In [None]:
pivot_df_demographics = pivot_df_demographics \
        .withColumnRenamed("American Indian and Alaska Native", "indian") \
        .withColumnRenamed("Asian", "asian") \
        .withColumnRenamed("Black or African-American", "black") \
        .withColumnRenamed("Hispanic or Latino", "hispanic") \
        .withColumnRenamed("White", "white")

In [None]:
pivot_df_demographics.show(10)

In [None]:
state_list_demographics = []
for i in range(pivot_df_demographics.select("state_code").dropDuplicates().count()):
    state_list_demographics.append(str(pivot_df_demographics.select("state_code").dropDuplicates().collect()[i]["state_code"]))

In [None]:
len(state_list_demographics)

In [None]:
df_immigration_filtered = df_immigration.filter(F.col("i94addr").isin(state_list_demographics))

In [None]:
df_immigration_filtered.show(10)

In [None]:
df_immigration_filtered.count()

## Extracting fact table

In [None]:
fact_source_df_immigration = df_immigration_filtered.groupBy("i94addr").agg(F.sum(df_immigration_filtered["count"]).alias("immigration_count"))

In [None]:
fact_source_df_demographics = pivot_df_demographics

In [None]:
fact_source_df_demographics.show(10)

In [None]:
fact_table = fact_source_df_immigration.join(fact_source_df_demographics, fact_source_df_immigration.i94addr == fact_source_df_demographics.state_code).withColumn("fact_id", F.monotonically_increasing_id())

In [None]:
fact_table.show(10)

In [None]:
fact_table = fact_table.select("fact_id","state_code","immigration_count","white","asian","black","hispanic","indian")

In [None]:
fact_table.printSchema()

In [None]:
fact_table = fact_table \
        .withColumn("immigration_count", fact_table["immigration_count"].cast(IntegerType())) \
        .withColumn("white", fact_table["white"].cast(IntegerType())) \
        .withColumn("asian", fact_table["asian"].cast(IntegerType())) \
        .withColumn("black", fact_table["black"].cast(IntegerType())) \
        .withColumn("hispanic", fact_table["hispanic"].cast(IntegerType())) \
        .withColumn("indian", fact_table["indian"].cast(IntegerType()))

## Extracting dimemsion tables

### dim_state_table

In [None]:
dim_state_table = df_demographics.groupBy("state_code","state").agg(F.sum(df_demographics["foreign_born"])).select("state_code","state")

In [None]:
dim_state_table.show(10)

## dim_visa_table

In [None]:
pivot_df_immigration_visa = df_immigration_filtered.groupBy("i94addr").pivot("visatype").sum("count")

In [None]:
pivot_df_immigration_visa.show(10)

In [None]:
dim_visa_table = pivot_df_immigration_visa

In [None]:
dim_visa_table.printSchema()

In [None]:
dim_visa_table = dim_visa_table \
        .withColumnRenamed("i94addr", "state_code") \
        .withColumn("B1", dim_visa_table["B1"].cast(IntegerType())) \
        .withColumn("B2", dim_visa_table["B2"].cast(IntegerType())) \
        .withColumn("CP", dim_visa_table["CP"].cast(IntegerType())) \
        .withColumn("CPL", dim_visa_table["CPL"].cast(IntegerType())) \
        .withColumn("E1", dim_visa_table["E1"].cast(IntegerType())) \
        .withColumn("E2", dim_visa_table["E2"].cast(IntegerType())) \
        .withColumn("F1", dim_visa_table["F1"].cast(IntegerType())) \
        .withColumn("F2", dim_visa_table["F2"].cast(IntegerType())) \
        .withColumn("GMT", dim_visa_table["GMT"].cast(IntegerType())) \
        .withColumn("I", dim_visa_table["I"].cast(IntegerType())) \
        .withColumn("I1", dim_visa_table["I1"].cast(IntegerType())) \
        .withColumn("M1", dim_visa_table["M1"].cast(IntegerType())) \
        .withColumn("M2", dim_visa_table["M2"].cast(IntegerType())) \
        .withColumn("SBP", dim_visa_table["SBP"].cast(IntegerType())) \
        .withColumn("WB", dim_visa_table["WB"].cast(IntegerType())) \
        .withColumn("WT", dim_visa_table["WT"].cast(IntegerType()))

In [None]:
dim_visa_table.show(10)

## dim_foreign_table

In [None]:
dim_foreign_table = df_demographics.groupBy("state_code").agg(F.sum(df_demographics["foreign_born"]).alias("state_foreign_born"))

In [None]:
dim_foreign_table.printSchema()

In [None]:
dim_foreign_table = dim_foreign_table \
        .withColumn("state_foreign_born", dim_foreign_table["state_foreign_born"].cast(IntegerType()))

In [None]:
dim_foreign_table.show(10)

## ETL - Loading data

In [None]:
fact_table.write.mode("overwrite").parquet("s3a://my-bucket/fact_table")
dim_state_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_state_table")
dim_visa_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_visa_table")
dim_foreign_table.write.mode('overwrite').parquet("s3a://my-bucket/dim_foreign_table")

## Loading parquet data to S3 bucket

 ## some Analytics

In [None]:
import pandas as pd

In [None]:
# the state that has the most immigrants
fact_pdf = fact_table.select("*").toPandas()
fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.max()]
str(fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.max()]['state_code'].values[0])


In [None]:
#state that has the least immigrants
fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.min()]
str(fact_pdf[fact_pdf.immigration_count == fact_pdf.immigration_count.min()]['state_code'].values[0])

In [None]:
#state that has the most ratio of white people
fact_pdf['white_percentage'] = (fact_pdf['white']/(fact_pdf['white']+fact_pdf['asian']+fact_pdf['black']+fact_pdf['hispanic']+fact_pdf['indian']) * 100).round(2)
fact_pdf.head(10)


In [None]:
fact_pdf = fact_pdf.drop(columns=['white_percentage[%]'])

In [None]:
fact_pdf.info(verbose=True)

In [None]:
fact_pdf["white_percentage"].max()

In [None]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]


In [None]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white']

In [None]:
fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white'].values[0]


In [None]:
str(fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['state_code'].values[0])


In [None]:
#state that has the least ratio of white people
fact_pdf["white_percentage"].min()

In [None]:
fact_pdf_2 = fact_pdf.replace(0, fact_pdf[fact_pdf.white_percentage == fact_pdf.white_percentage.max()]['white'].values[0])

In [None]:
fact_pdf_2["white_percentage"].min()

In [None]:
fact_pdf_2[fact_pdf_2.white_percentage == fact_pdf_2.white_percentage.min()]

In [None]:
str(fact_pdf_2[fact_pdf_2.white_percentage == fact_pdf_2.white_percentage.min()]['state_code'].values[0])

####  Data dictionary 
There are the four tables made by extracting and transforming the data from the source data. The first table is the fact table, and the other three tables are dimension tables.

### fact_table

Columns | Description
------------ | -------------
fact_id | ID that uniquely identify one record in the Fact Table
state_code | Code of the state
immigration_count | Count of immigration population by state
white | Count of white people by state
asian | Count of asian people by state
black | Count of black people by state
hispanic | Count of hispanic people by state
indian | Count of Indian people by state

### dim_state_table

Columns | Description
------------ | -------------
state_code | Code of the state
state | Full name of the state

### dim_visa_table

Columns | Description
------------ | -------------
state_code | Code of the state
B1 | Count of people that has B1 visa by state
B2 | Count of people that has B2 visa by state
CP | Count of people that has CP visa by state
CPL | Count of people that has CPL visa by state
E1 | Count of people that has E1 visa by state
E2 | Count of people that has E2 visa by state
F1 | Count of people that has F1 visa by state
F2 | Count of people that has F2 visa by state
GMT | Count of people that has GMT visa by state
I | Count of people that has I visa by state
I1 | Count of people that has I1 visa by state
M1 | Count of people that has M1 visa by state
M2 | Count of people that has M2 visa by state
SBP | Count of people that has SBP visa by state
WB | Count of people that has WB visa by state
WT | Count of people that has WT visa by state

### dim_foreign_table

Columns | Description
------------ | -------------
state_code | Code of the state
state_foreign_born | Count of people who were foreign born by state

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
Tools & Technologies
* Python : Python is used as the base programming language to perform the ETL of the data.

* Spark : Pyspark is used to process the Big data and load it to Amazon S3.

* Amazon S3 : Amazon S3 is used for Storing processed outputs.

* Airflow : Airflow is used to load the star schema to Amazon Redshift and to perform data quality check.

* Amazon Redshift : Amazon Redshift is used as warehousing database.

* Pandas : Pandas is used to perform Data Analytics.
### Alternative approach to solve problem under different scenarios
* The data was increased by 100x
The Redshift Cluster Node type dc2.large can handle storage of 160GB/node and can have a maximum of 32 nodes(5.1 TB Total compressed storage). So Redshift can easily handle the data increased by 100x.

* The pipelines would be run on a daily basis by 7 am every day
To run the pipelines on a daily basis by 7 am every day, add schedule_interval='0 7 * * *' to the DAG .
```python
default_args = {
    'owner': 'ezzat',
    'start_date': datetime.now(),
    'depends_on_past': False,
    'email_on_retry': False,
    'email_on_failure': False,
    'catchup': False
}
 dag_name = 'capstone_project_dag' 
dag = DAG(dag_name,
          default_args=default_args,
          description='Extract Load and Transform data from S3 to Redshift',
          schedule_interval='0 7 * * *'
        )
```

* The database needed to be accessed by 100+ people
The maximum number of Redshift connections is 500 and 50 can run in parallel at a point in time, so 100+ people can easily connect to Redshift.

## how to run 

* You have to run all the cells from the CapstoneProjectTemplate.ipynb file in order to create the star-schema tables and perform the ETL process.

* To create the tables in the Redshift cluster, run create_tables.py file The command should look like this: python create_tables.py

* Finally, turn the capstone_project_dag DAG ON and trigger the DAG on airflow.
## hint: i used airflow in data pipeline project in the previous course . i have add three files 1 for testing data (data_quality.py) , 1 for operators (load_to_redshift.py) and 1 for dags (load_to_redshift.py)