# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

In [1]:
# Do all imports and installs here
import pandas as pd
import numpy as np
from datetime import datetime
import os
from os.path import abspath
import configparser
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, count, col, when,sum, countDistinct


In [2]:
config = configparser.ConfigParser()
config.read_file(open('aws_credentials.cfg'))

config = configparser.ConfigParser()
config.read('aws_credentials.cfg')
access_id=config.get('AWS', 'AWS_ACCESS_KEY_ID')
access_key=config.get('AWS', 'AWS_SECRET_ACCESS_KEY')


### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

### Scope

Scope of this Project is explore and describe the provided dataset, implement a data pipeline for it, using appropriate AWS Services that can handle the amount of data and answer questions when they scale up.

### Describing and Gathering Data
There are two datasets, i94 US immigration data and US cities and demographics data.

I94 Immigration Data: This data comes from the US National Tourism and Trade Office. For more information, please visit below website: https://www.trade.gov/national-travel-and-tourism-office


U.S. City Demographic Data: This data comes from OpenSoft. For more information, please visit below website: https://www.kaggle.com/berkeleyearth/climate-change-earth-surface-temperature-data

In [3]:
# Read in the data here
# Reading U.S. City Demographic Data
df1 =pd.read_csv("us-cities-demographics.csv",sep=';')
df1.head(10)

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402
5,Peoria,Illinois,33.1,56229.0,62432.0,118661,6634.0,7517.0,2.4,IL,American Indian and Alaska Native,1343
6,Avondale,Arizona,29.1,38712.0,41971.0,80683,4815.0,8355.0,3.18,AZ,Black or African-American,11592
7,West Covina,California,39.8,51629.0,56860.0,108489,3800.0,37038.0,3.56,CA,Asian,32716
8,O'Fallon,Missouri,36.0,41762.0,43270.0,85032,5783.0,3269.0,2.77,MO,Hispanic or Latino,2583
9,High Point,North Carolina,35.5,51751.0,58077.0,109828,5204.0,16315.0,2.65,NC,Asian,11060


In [4]:
#Reading I94 Immigration Data
#setting up environment to read sas_data folder
os.environ["JAVA_HOME"] = "/Library/Internet Plug-Ins/JavaAppletPlugin.plugin/Contents/Home"
os.environ["PATH"] = "/Users/sudhanagrath/anaconda3/bin:/Users/sudhanagrath/anaconda3/condabin:/Users/sudhanagrath/spark-3.1.2-bin-hadoop2.7/bin:/Users/sudhanagrath/scala-2.13.6/bin:/Library/Frameworks/Python.framework/Versions/3.8/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Applications/Postgres.app/Contents/Versions/latest/bin"
os.environ["SPARK_HOME"] = "/Users/sudhanagrath/spark-3.1.2-bin-hadoop2.7"

In [5]:
spark = SparkSession \
        .builder \
        .appName("CapstoneProject")\
        .config("spark.sql.legacy.createHiveTableByDefault","false")\
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11,org.apache.hadoop:hadoop-aws:2.7.4")\
        .config("spark.hadoop.fs.s3a.access.key", access_id)\
        .config("spark.hadoop.fs.s3a.secret.key",access_key)\
        .enableHiveSupport().getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.block.size","32000000")
hadoop_conf.set("fs.s3a.multipart.size","104857600")
hadoop_conf.set("fs.s3a.threads.core","4")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")






In [6]:
#loading data in dataframe
df_spark =spark.read.load('sas_data')

In [9]:
#checking Schema
df_spark.printSchema()

root
 |-- cicid: double (nullable = true)
 |-- i94yr: double (nullable = true)
 |-- i94mon: double (nullable = true)
 |-- i94cit: double (nullable = true)
 |-- i94res: double (nullable = true)
 |-- i94port: string (nullable = true)
 |-- arrdate: double (nullable = true)
 |-- i94mode: double (nullable = true)
 |-- i94addr: string (nullable = true)
 |-- depdate: double (nullable = true)
 |-- i94bir: double (nullable = true)
 |-- i94visa: double (nullable = true)
 |-- count: double (nullable = true)
 |-- dtadfile: string (nullable = true)
 |-- visapost: string (nullable = true)
 |-- occup: string (nullable = true)
 |-- entdepa: string (nullable = true)
 |-- entdepd: string (nullable = true)
 |-- entdepu: string (nullable = true)
 |-- matflag: string (nullable = true)
 |-- biryear: double (nullable = true)
 |-- dtaddto: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- insnum: string (nullable = true)
 |-- airline: string (nullable = true)
 |-- admnum: double (nullable = 

### Step 2: Explore and Assess the Data
#### Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

#### Cleaning Steps
Document steps necessary to clean the data

In [7]:
# counting the total number of records in SAS Dataframe
df_spark.count()

3096313

In [8]:
# counting the distinct number of records in first column of the Dataframe
df_spark.select("cicid").count()

3096313

In [None]:
# Identify data quality issues
# Duplicate records
# Above two counts show that each record in the dataframe is identifed by its cicid, therefore there are no duplicate records in this dataframe.


In [9]:
# Identify data quality issues
#finding missing or null values
df1=df_spark.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_spark.columns])
df1.show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|  occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender| insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+-------+-------+-------+-------+------+-------+-------+------+-----+--------+
|    0|    0|     0|     0|     0|      0|      0|    239| 152592| 142457|   802|      0|    0|       1| 1881250|3088187|    238| 138429|3095921| 138429|    802|    477|414269|2982605|  83627|     0|19549|       0|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-------+-------+-------+---

In [None]:
#statistics shows that visapost, occup, entdepu,insnum have more than 50% of null or missing values. 



In [10]:
# Identifying columns for dimensions
df_spark.createOrReplaceTempView("sas_data")


In [11]:
#entry_status dimension
i94_dim1=df_spark.select(countDistinct("entdepa","entdepd"))
i94_dim1.show()

+--------------------------------+
|count(DISTINCT entdepa, entdepd)|
+--------------------------------+
|                              84|
+--------------------------------+



In [12]:
#visa_type dimension
i94_dim2=df_spark.select(countDistinct("visatype"))
i94_dims2=spark.sql("select distinct visatype from sas_data")
i94_dim2.show()

+------------------------+
|count(DISTINCT visatype)|
+------------------------+
|                      17|
+------------------------+



In [13]:
#visa_code dimension
i94_dim3=df_spark.select(countDistinct("i94visa"))
i94_dims3=spark.sql("select distinct i94visa from sas_data")
i94_dim3.show()

+-----------------------+
|count(DISTINCT i94visa)|
+-----------------------+
|                      3|
+-----------------------+



In [15]:
#i94_visitor
# This query shows that total number of distinct counts for admnum is approximately 20,000 less than the total number of records in the sas_data table
i94_dim4=df_spark.select(countDistinct("admnum"))

i94_dim4.show()

+----------------------+
|count(DISTINCT admnum)|
+----------------------+
|               3075579|
+----------------------+



In [16]:
#This query shows that there are no Nulls and Nans in the sas_data table
#i94_visitor=df_spark.filter(df_spark["admnum"] ==''| df_spark["admnum"].isNull()).show()
i94_visitor=df_spark.filter(df_spark["admnum"].isNull()).show()

+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
|cicid|i94yr|i94mon|i94cit|i94res|i94port|arrdate|i94mode|i94addr|depdate|i94bir|i94visa|count|dtadfile|visapost|occup|entdepa|entdepd|entdepu|matflag|biryear|dtaddto|gender|insnum|airline|admnum|fltno|visatype|
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+
+-----+-----+------+------+------+-------+-------+-------+-------+-------+------+-------+-----+--------+--------+-----+-------+-------+-------+-------+-------+-------+------+------+-------+------+-----+--------+



In [17]:
df2=pd.read_csv("country_lookup.csv", sep='\t')
df2.head(100)

Unnamed: 0,CODE,COUNTRY_CITY
0,582,"MEXICO Air Sea, and Not Reported (I-94, no lan..."
1,236,AFGHANISTAN
2,101,ALBANIA
3,316,ALGERIA
4,102,ANDORRA
...,...,...
95,759,INDIAN OCEAN AREAS (FRENCH)
96,729,INDIAN OCEAN TERRITORY
97,204,INDONESIA
98,249,IRAN


In [18]:
df3=pd.read_csv("port-city-state.csv", sep=';')

In [19]:
df3.head(100)

Unnamed: 0,CODE,CITY_STATE
0,ALC,"ALCAN, AK"
1,ANC,"ANCHORAGE, AK"
2,BAR,"BAKER AAF - BAKER ISLAND, AK"
3,DAC,"DALTONS CACHE, AK"
4,PIZ,"DEW STATION PT LAY DEW, AK"
...,...,...
95,APF,"NAPLES, FL #ARPT"
96,OPF,"OPA LOCKA, FL"
97,ORL,"ORLANDO, FL"
98,PAN,"PANAMA CITY, FL"


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

#### 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

### Data Model
This is a Star Schema data model for analysing the I94 Immigration Dataset.

I94 Immigration dataset is considered as fact table. It has its associated 9 dimensions


Please see the document CapstoneDataModel.docx

### Mapping Pipeline to Data Model
The data provided in this project comes in two formats, parquet and csv. Given the hive enabled, SAS module, "saurfang:spark-sas7bdat:3.0.0-s_2.12", pipeline is created in the following steps:
- Ingest the sas_data dataset to AWS S3 bucket-capstone
- Create HIVE enabled Spark session
- Create internal HIVE table for each of the elevan tables in total for the data model.
- Load data from the bucket-capstone into these tables which exist in the HIVE metastore, embedded derby database.This data is available only during one session
- Create HIVE external tables for each of the internal HIVE tables. These tables are located in the AWS S3 bucket-hive. It requires creating elevan folders for each of the external tables in the bucket-hive.
- Load data into the external tables.

Note: HIVE metastore stores the metadata for the tables created based on data model. But the table data is located in AWS S3 bucket_hive. It is essential to create folders in the bucket-hive before creating and loading data into them.



### Pipeline Datasets to Data Model

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

There are three scripts to implement the pipeline
#### sql_queries.py
   This script works as module to import into the two scripts, create_tables.py and etl.py
   
   It defines:
   
   - list of queries for dropping the existing tables
   - list of queries for creating the tables in data model
   - list of queries for loading the data into tables in data model
   - list of CSV schemas for loading the CSV data.
   
#### create_tables.py
   It creates a spark session and runs the create table queries into that session. For details on type of tables, please see the readme.md
   
#### etl.py
   It creates a spark session and runs the insert table queries into that session.
   

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

# Perform quality checks here
- quality_checks.py need to be run after the etl.py has completed its run. It just counts the number of records loaded into HIVE external table.
- unit_tests.py is another file that tests all the dimensions and fact tabls have unique key defined on each of them. This test has defined here because Spark SQL does not allow defining the constraints when creating tables.


#### 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

In [None]:
Please see the file data_dictionary.txt

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.

#### Step 5.1:The write up includes an outline of the steps taken in the project.
##### Steps taken for this project with the given two datasets, SAS data and US Demograhics data, and the SAS module are:
 * Understand the data symantic with the help of relevant information available on the internet. Also spent quite a time on undestanding the given data type for the SAS module. 
 * Tried using the Pandas first with the SAS data set, could not read it into data frame. Then 
 used Spark dataframe. Setting up Spark session to load the data set required changes to default configurations such as envrionment with defined JAVA_HOME and SPARK_HOME.
 * Explored the two data sets as given briefly in the Step 2 of this document. Actually I have  tried to understand them in my local spark session well before the data model fits into my mind.
 * Put down the data model in the document submitted with this Project.
 * Given the Hive support in Spark session for the SAS module, I started reading about the data pipeline with this feature on the internet. 
 * With my extensive experience in the data and databases, and extensive reading on Hive, mapped the data model to data pipeline as given in Step 3 of this document.
 * Implemented the data pipeline as I have learnt in this data engineering course.
 
 
          
##### The purpose of the final data model is made explicit.
Purpose of the data model is to get insight on the trends of immigration, mainly with the SAS data set. This insight on immigration trend can be extended using US demographic data set. 
This is very basic data model and it can be used by anyone who is interested in knowing the Immigration Trend such as US visitors, travel and tourism related businesses etc.

##### Types of queries which can be run on the implemented data pipeline are:

*  Query1: List of top 5 countries visiting the United States
*  Query2: International Arrivals to United States
*  Query3: Count of vistitors using different Travel modes
*  Query4: List of US Cities and States missing port of entries
*  Query5: Count of visitors with specific visa type chosing their address in United States.

#### Step 5.2:Propose how often the data should be updated and why
Scripts for the data pipeline created in this project refreshes the data each time they run. But they can be modified to update the data on monthly basis. Because, we are working with immigration data in this project, I think schedule for monthly update should be correct when we would have the enough data for running the analytical queries.

#### Step 5.3:Write a description of how you would approach the problem differently under the      following scenarios:
##### The data was increased by 100x.
In this project we have worked with data set of more than 3 million rows using local spark session. But if the data increases by hundred folds, my machine where I am running the spark session would not handle it and the pipeline need to to run on Spark cluster with multiple worker nodes, each having much larger computing capacity to process the data. As worked on a Spark project in this course, we will be using AWS's EMR service to create the Spark Cluster in case of increase in many folds of data.
###### Is partitioning required ? If so how ? How can storage be handled ?
Spark Framework can determine the partitioning automatically and it also allows its users to define the data paritioning programmatically.

Automatic partitioning depends upon:
Available Resources to the Spark job - Number of cores on which task can run on
Data Sources - file format and size being used for processing
RDD Paritioning - RDD is the low level data structure used by the spark to distribute data between tasks when data is being processed.So, depending upon the data set the partitions are housed on nodes of the spark cluser, but a parition does not span across the nodes. 

Users can define the partitioning by using pyspark method partitionBy() which divides the records depending upon the parition column(s) and puts each partition data into a sub-directory when a data frame is written to storge system.

##### The data populates a dashboard that must be updated on a daily basis by 7am every day.
When there is a need to update the data based on daily basis, it also needs to be monitored and troubleshoot daily. Airflow UI are easy to monitor and troubleshoot the pipeline.We would need to deploy this pipeline as DAG in the Airflow and the DAG page would show us the status of the updates at task level.
###### What are the configuration you need to do in identified tool ?
Airflow is workflow scheduler for scheduling complex workflows and provide an easy way to maintain them.It does require configuration for defining and scheduling the DAG.
Basic configuration steps are:
* Identify the tasks to run such as executing python script, sql script or shell script
* Idenfity the Operator to run each of those tasks such as Python Operator, Bash Shell Operator.
* Configure Scheduler to define when these tasks should run in terms of time and dependency.

###### How will this impact business
By running the code in workflow, business get benefits in maintaining, versioning, testing and collaborating their code.

###### What is the operating cost ?
Airflow is an open source tool. I think it is free to use.

###### Does the tool require a separate instance ?
Airflow requires many instances to be running before we can use them, such as scheduler, webserver and database. I could not set it up on my local machine while working on the Airflow project due to time constraint.

##### The database needed to be accessed by 100+ people.
We have used Hive enabled local Spark session in this project. Hive metastore in this embedded mode allows only one session to access the derby database. When the number of users increases, metastore can be configured in network mode on a separate machine where different connectors can be used to connect to remote database databases like MySQL, Postgres, Oracle. And also metastore in its remote mode can be configured to connect to the datawarehouse in cloud
###### Does replication make sense ?
Replication of Hive metastore is possible as per this link https://cwiki.apache.org/confluence/display/Hive/Replication
But it does not make much sense to me at this time when I have just worked on embedded database as metastore_db.

###### What type of replication ?
As written in the above link, the Hive replication system has a low degree of coupling between source cluster and its replica and thrift server is the integration point between them.

###### How can your tool accommodate massive influx? What kind of access and what type of configuration?
Hive metastore can accomodate massive influx when it is configured in network mode or remote. In the network mode, metastore_db is set up on separate machine where it is configured using jdbc drivers  to connect to RDBMS databases like Postgres, MySQL or even Oracle. In remote mode metastore_db can connect to dataware house in the cloud such as Amazon's Redshift. 

###### Again the business benefits and impacts on cost
Apache Hive being open source software benefits businesses with its ease of usage and free of cost.

#### Step 5.4:Clearly state the rationale for the choice of tools and technologies for the project.
This project template was provided with a SAS module "saurfang:spark-sas7bdat:3.0.0-s_2.12" and two data sets, one in the parquet format and another in the CSV format. After reviewing my past projects submitted in this course and detail reading on the articles on internet, I thought it is correct to implement the pipeline using local spark session enabled with HIVE datawarehouse. Hive can be configured to connect to a local database or a remote database, but in this project spark session uses the embedded derby database. This derby database acts as metastore for the data model tables which have their data located in AWS S3 storage.
###### why the chosen schema
In this schema or database, meta data is stored in the metastore of Hive and data is located on distribution storage allowing users to run the queries on large data sets.

###### is there a trade-off
In traditional databases, schema is enforced at data load time.If the data doesn't conform to the schema, it is rejected. This is called schema on write. But in Hive, data is verified when a query is issued known as schema on read.Hive shines over traditional databases in this context because data load time is much lesser than the traditional databases.
###### furnish results of the sample queries (Questions) mentioned in scope

In [20]:
# Top 5 countries visiting the US
query1="""select country_city, count(*) from country_lookup_export join i94_table_export on i94_table_export.i94res=country_lookup_export.code group by country_city order by 2 desc;"""  
spark.sql(query1).show(10)

+--------------------+--------+
|        country_city|count(1)|
+--------------------+--------+
|     UNITED KINGDOM |  368421|
|              JAPAN |  249167|
|         CHINA, PRC |  185609|
|             FRANCE |  185339|
|MEXICO Air Sea, a...|  179603|
|            GERMANY |  156613|
|        SOUTH KOREA |  136312|
|             BRAZIL |  134907|
|          AUSTRALIA |  112407|
|              INDIA |  107193|
+--------------------+--------+
only showing top 10 rows



In [21]:
# Visitors count for different visa types
query2="""select visa_code, visa_description, case when visa_code=1.0 then count(admnum) when visa_code=2.0 then count(admnum) else count(admnum) end as VisitorCount from i94_table_export left outer join i94_visa_export on i94_visa_export.visa_code=i94_table_export.i94visa group by visa_code, visa_description order by visa_code"""
spark.sql(query2).show()

+---------+----------------+------------+
|visa_code|visa_description|VisitorCount|
+---------+----------------+------------+
|      1.0|        Business|      522079|
|      2.0|        Pleasure|     2530868|
|      3.0|         Student|       43366|
+---------+----------------+------------+



In [22]:
# Visitor counts in different States and Cities
query3="""select state_code, state, city, count(admnum) from i94_table_export
       left outer join us_cities_export on us_cities_export.state_code=i94_table_export.i94addr
where state_code is not null and state is not null
group by state_code, state, city
order by 2,3"""
spark.sql(query3).show()

+----------+-------+--------------+-------------+
|state_code|  state|          city|count(admnum)|
+----------+-------+--------------+-------------+
|        AL|Alabama|    Birmingham|        40940|
|        AL|Alabama|        Dothan|        40940|
|        AL|Alabama|        Hoover|        32752|
|        AL|Alabama|    Huntsville|        40940|
|        AL|Alabama|        Mobile|        40940|
|        AL|Alabama|    Montgomery|        40940|
|        AL|Alabama|    Tuscaloosa|        40940|
|        AK| Alaska|     Anchorage|         8020|
|        AZ|Arizona|      Avondale|       101090|
|        AZ|Arizona|  Casas Adobes|       101090|
|        AZ|Arizona|      Chandler|       101090|
|        AZ|Arizona|     Flagstaff|       101090|
|        AZ|Arizona|       Gilbert|       101090|
|        AZ|Arizona|      Glendale|       101090|
|        AZ|Arizona|      Goodyear|       101090|
|        AZ|Arizona|          Mesa|       101090|
|        AZ|Arizona|        Peoria|       101090|


In [23]:
# Number of visitors having missing port of entries
spark.sql("select distinct code, city_state, i94port, count(admnum) from i94_table_export left outer join port_city_state_export on trim( port_city_state_export.code)=trim(i94_table_export.i94port) where trim(city_state) like 'No PORT Code%' group by code, city_state, i94port order by 1 ; ").show()

+----+-------------------+-------+-------------+
|code|         city_state|i94port|count(admnum)|
+----+-------------------+-------+-------------+
|5T6 | No PORT Code (5T6)|    5T6|            4|
|ATW | No PORT Code (ATW)|    ATW|            3|
|CPX | No PORT Code (CPX)|    CPX|            1|
|JFA | No PORT Code (JFA)|    JFA|            3|
|JMZ | No PORT Code (JMZ)|    JMZ|            7|
|MTH | No PORT Code (MTH)|    MTH|            2|
|NC8 | No PORT Code (NC8)|    NC8|            1|
|NYL | No PORT Code (NYL)|    NYL|            5|
|PHF | No PORT Code (PHF)|    PHF|            1|
|RYY | No PORT Code (RYY)|    RYY|            2|
|SCH | No PORT Code (SCH)|    SCH|            1|
|W55 | No PORT Code (W55)|    W55|           33|
|X44 | No PORT Code (X44)|    X44|           11|
|X96 | No PORT Code (X96)|    X96|         2378|
|YGF | No PORT Code (YGF)|    YGF|         1763|
+----+-------------------+-------+-------------+



In [24]:
#weekly iternational arrival to US
query5="""select week,  count(admnum)  from i94_table_export
left outer join i94_time_export on i94_time_export.sas_date=i94_table_export.arrdate
group by 1
order by 1;"""
spark.sql(query5).show()

+----+-------------+
|week|count(admnum)|
+----+-------------+
|  13|       623150|
|  14|      1382374|
|  15|      1438628|
|  16|      1421666|
|  17|      1326808|
+----+-------------+

