# Project Title
### Data Engineering Capstone Project

#### Project Summary
-- My project has the main goal to simulate a fictional tour company to provide a report in a single csv data called "result", all the relevant data and information, to be distributed for all the company. This database can be used to mkt questions, financial questions and another areas e.g., that will improve how the company works --


The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up

### Step 1: Scope the Project and Gather Data

#### Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
For this scope, we will use the data from immigration and temperature. In the end of the solution we should find a clean data and easy to be send to our tour companie

#### Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 

The data set's are all provided by reliable sources, especially from government agencies

In [1]:
# Do all imports and installs here
import pandas as pd, re
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf

In [2]:
#i had to use "chunksize"comand to speed up my cod, because it's such a big data 
immigration = '../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat'
df_im = pd.read_sas(immigration, 'sas7bdat', chunksize=10000, encoding="ISO-8859-1")
df_im = df_im.read()
df_im.head()

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,6.0,2016.0,4.0,692.0,692.0,XXX,20573.0,,,,...,U,,1979.0,10282016,,,,1897628000.0,,B2
1,7.0,2016.0,4.0,254.0,276.0,ATL,20551.0,1.0,AL,,...,Y,,1991.0,D/S,M,,,3736796000.0,296.0,F1
2,15.0,2016.0,4.0,101.0,101.0,WAS,20545.0,1.0,MI,20691.0,...,,M,1961.0,09302016,M,,OS,666643200.0,93.0,B2
3,16.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,1988.0,09302016,,,AA,92468460000.0,199.0,B2
4,17.0,2016.0,4.0,101.0,101.0,NYC,20545.0,1.0,MA,20567.0,...,,M,2012.0,09302016,,,AA,92468460000.0,199.0,B2


In [3]:
# importing in the temperature data
fname = '../../data2/GlobalLandTemperaturesByCity.csv'
df = pd.read_csv(fname, sep=',')

In [4]:
# firts results
df.head()

Unnamed: 0,dt,AverageTemperature,AverageTemperatureUncertainty,City,Country,Latitude,Longitude
0,1743-11-01,6.068,1.737,Århus,Denmark,57.05N,10.33E
1,1743-12-01,,,Århus,Denmark,57.05N,10.33E
2,1744-01-01,,,Århus,Denmark,57.05N,10.33E
3,1744-02-01,,,Århus,Denmark,57.05N,10.33E
4,1744-03-01,,,Århus,Denmark,57.05N,10.33E


In [5]:
#importing the data from aircodes
airport = pd.read_csv("airport-codes_csv.csv")
airport.head()

Unnamed: 0,ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
0,00A,heliport,Total Rf Heliport,11.0,,US,US-PA,Bensalem,00A,,00A,"-74.93360137939453, 40.07080078125"
1,00AA,small_airport,Aero B Ranch Airport,3435.0,,US,US-KS,Leoti,00AA,,00AA,"-101.473911, 38.704022"
2,00AK,small_airport,Lowell Field,450.0,,US,US-AK,Anchor Point,00AK,,00AK,"-151.695999146, 59.94919968"
3,00AL,small_airport,Epps Airpark,820.0,,US,US-AL,Harvest,00AL,,00AL,"-86.77030181884766, 34.86479949951172"
4,00AR,closed,Newport Hospital & Clinic Heliport,237.0,,US,US-AR,Newport,,,,"-91.254898, 35.6087"


In [6]:
#import the data from us cities indo panda
us_cities = pd.read_csv("us-cities-demographics.csv", sep=";")
us_cities.head()

Unnamed: 0,City,State,Median Age,Male Population,Female Population,Total Population,Number of Veterans,Foreign-born,Average Household Size,State Code,Race,Count
0,Silver Spring,Maryland,33.8,40601.0,41862.0,82463,1562.0,30908.0,2.6,MD,Hispanic or Latino,25924
1,Quincy,Massachusetts,41.0,44129.0,49500.0,93629,4147.0,32935.0,2.39,MA,White,58723
2,Hoover,Alabama,38.5,38040.0,46799.0,84839,4819.0,8229.0,2.58,AL,Asian,4759
3,Rancho Cucamonga,California,34.5,88127.0,87105.0,175232,5821.0,33878.0,3.18,CA,Black or African-American,24437
4,Newark,New Jersey,34.6,138040.0,143873.0,281913,5829.0,86253.0,2.73,NJ,White,76402


In [7]:
# documenting all the variables inside the data sheet
imm_sample = pd.read_csv('Sem_Ttulo_13.csv')
imm_sample.head()
imm_header = list(imm_sample.head(0))
imm_header = imm_header[1:]
print(imm_header)

[]


### Step 1: Scope the Project and Gather Data

#### Data dictionary
# Based on the code abouve and the website trade.gov we can detect and understand each headline

* 'cicid' unid ID, 
* 'i94yr' year 4 digit, 
* 'i94mon' month number, 
* 'i94cit' city 3 digit, 
* 'i94res' country 3 digit, 
* 'i94port'port, 
* 'arrdate' arrival date, 
* 'i94mode' type of transport, 
* 'i94addr' state of arrival, 
* 'depdate' departure, 
* 'i94bir' age inyear, 
* 'i94visa' visa codes category, 
* 'count' summary stats, 
* 'dtadfile' data field,
* 'visapost' visa issued place, 
* 'occup' occupation, 
* 'entdepa' arrival flag, 
* 'entdepd' departure flag, 
* 'entdepu' update flag, 
* 'matflag' match flag, 
* 'biryear' year birth 4 digit, 
* 'dtaddto' character field, 
* 'gender' gender, 
* 'insnum' ins number, 
* 'airline' airline arrived, 
* 'admnum' admission number, 
* 'fltno' flight number, 
* 'visatype class of visa']
'''

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
config("spark.jars.repositories", "https://repos.spark-packages.org/").\
config("spark.jars.packages", "saurfang:spark-sas7bdat:2.0.0-s_2.11").\
enableHiveSupport().getOrCreate()

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, LongType, DoubleType

df_spark = spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')



spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

### Step 2: Explore and Assess the Data
#### Cleaning Steps
Now it's time to remove everything with null content from temperature and immigration data

In [9]:
# Create dictionary of valid i94port cod's
#it's optional to use port as common point, if I choose select the file "marcus cod.txt"I would use as common point i94cit, just change port for cit in all codes below

re_obj = re.compile(r'\'(.*)\'.*\'(.*)\'')
i94port_valid = {}
with open('cods.txt') as f:
     for line in f:
         match = re_obj.search(line)
         i94port_valid[match[1]]=[match[2]]

def clean_i94_data(file):
    '''
    As input it's immigration data
    As output it's spark dataframe processed
    '''
 
    df_immi = spark.read.format('com.github.saurfang.sas.spark').load(file)

    # Filter out entries where i94port is invalid
    df_immi = df_immi.filter(df_immi.i94port.isin(list(i94port_valid.keys())))

    return df_immi

In [10]:
df_temp=spark.read.format("csv").option("header", "true").load("../../data2/GlobalLandTemperaturesByCity.csv")

# filter and remove all null information at average temperature uncertainty
df_temp=df_temp.filter(df_temp.AverageTemperatureUncertainty != 'NaN')

# delete double values
df_temp=df_temp.dropDuplicates(['City', 'Country'])

@udf()
def get_i94port(city):
    '''
    As input it's temperature data
    As output it's spark dataframe processed
    '''
    
    for key in i94port_valid:
        if city.lower() in i94port_valid[key][0].lower():
            return key

# insert port information 
df_temp=df_temp.withColumn("i94port", get_i94port(df_temp.City))

# delete null values
df_temp=df_temp.filter(df_temp.i94port != 'null')


### Step 3: Define the Data Model
#### 3.1 Conceptual Data Model
The first dimension table will contain events from the I94 immigration data such as(year, city code, arrive and departure) and the second dimension table will contain temperature, city code,latitude and longitude.
The fact table will join the data between I94 table and worldtemp linked by "i94cit"(city cod)
the data was cleaned before to select only what we wanna, and result in a fast and clean code

#### 3.2 Mapping Out Data Pipelines
The first part creating the data model for temperature, it's to filter the exact columns and we will print the result inside a folder called results
the second part creating the data model immigration, it's to filter the exact columns and we will print the result inside a folder called results

### Step 4: Run Pipelines to Model the Data 
#### 4.1 Create the data model
Build the data pipelines to create the data model.

In [11]:
spark = SparkSession.builder.\
config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11")\
.enableHiveSupport().getOrCreate()
#df_spark =spark.read.format('com.github.saurfang.sas.spark').load('../../data/18-83510-I94-Data-2016/i94_apr16_sub.sas7bdat')

In [12]:
i94_schema = StructType([
        StructField("cicid", LongType()),
        StructField("i94res", LongType()),
        StructField("i94addr", StringType()),
        StructField("visapost", StringType()),
        StructField("i94r", LongType()),
        StructField("i94mon", LongType()),
        StructField("arrdate", LongType()),
        StructField("depdate", LongType()),
        StructField("dtadfile", StringType()),
        StructField("dtaddto", StringType()),
        StructField("i94port", StringType()),
        StructField("traveler_id", LongType()),
        StructField("birth_year", StringType()),
        StructField("age", LongType()),
        StructField("gender", StringType()),
        StructField("airline", StringType()),
        StructField("flight_no", StringType()),
        StructField("visa_type", StringType())
    ])

In [13]:
df_spark=spark.read.options(schema=i94_schema, header='True').parquet("sas_data")

In [15]:
airports_dim_schema = StructType([
        StructField("airport_id", StringType()),
        StructField("iata_code", StringType()),
        StructField("airport_type", StringType()),
        StructField("airport_name", StringType()),
        StructField("municipality", StringType()),
        StructField("iso_region", StringType()),
        StructField("coordinates", StringType()),        
    ])

In [16]:
airports_spark = spark.read.options(delimiter=',',schema=airports_dim_schema, header='True').csv('airport-codes_csv.csv')

In [17]:
airports_stg = airports_spark.select('ident','iata_code','type','name','municipality','iso_region','coordinates')

In [18]:
airports_stg = airports_stg.createOrReplaceTempView("airports")

In [19]:
airports_dim = spark.sql("""SELECT 
                ident as airport_id, 
                iata_code, 
                type as airport_type, 
                name as airport_name, 
                municipality, 
                iso_region,
                coordinates 
            FROM airports WHERE type in ('small_airport','medium_airport','large_airport','closed')
            AND iata_code is not null""")

AnalysisException: 'java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient;'

In [None]:
airports_dim.show(10)

In [None]:
travelers_spark = df_spark.select('cicid','biryear','i94bir','gender','airline','fltno','visatype','visapost')

In [None]:
travelers_stg = travelers_spark.createOrReplaceTempView("travelers")

In [None]:
travelers_dim = spark.sql("""SELECT 
                cast(cicid as bigint) as traveler_id, 
                cast(biryear as int) as birth_year, 
                cast(i94bir as int) as age, 
                gender, 
                airline, 
                fltno as flight_no, 
                visatype as visa_type 
              FROM travelers
              WHERE airline is not null
              OR fltno is not null""")

In [None]:
travelers_dim.show(10)

#### 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

In [None]:
def quality_check(df, description):
    '''
    As input it's data
    As output it's checked data
    '''
    
    result = df.count()
    if result == 0:
        print("zero fail occurrence at {} ".format(description))
    else:
        print("Aproved by {} with {} records".format(description, result))
    return 0
    
    na_values = spark_df.toPandas().isna().sum()
    
    if na_values == 0:
        print(f'NA quality check passed. There is not NA value in the {table_name}.')
    else:
        print(f'NA quality check failed. It was found {na_values} in {table_name}.')
# Perform data quality check
#quality_check(df_immi, "immigration table")
#quality_check(df_temp, "temperature table")

#### 4.3 Data dictionary 
* 'cicid' unid ID, 
* 'i94yr' year 4 digit, 
* 'i94mon' month number, 
* 'i94cit' city 3 digit, 
* 'i94res' country 3 digit, 
* 'i94port'port, 
* 'arrdate' arrival date, 
* 'i94mode' type of transport, 
* 'i94addr' state of arrival, 
* 'depdate' departure, 
* 'i94bir' age inyear, 
* 'i94visa' visa codes category, 
* 'count' summary stats, 
* 'dtadfile' data field,
* 'visapost' visa issued place, 
* 'occup' occupation, 
* 'entdepa' arrival flag, 
* 'entdepd' departure flag, 
* 'entdepu' update flag, 
* 'matflag' match flag, 
* 'biryear' year birth 4 digit, 
* 'dtaddto' character field, 
* 'gender' gender, 
* 'insnum' ins number, 
* 'airline' airline arrived, 
* 'admnum' admission number, 
* 'fltno' flight number, 
* 'visatype class of visa']
'''

#### Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.

SQL was chosen to process the large input files into dataframes. And the results be an excel for easily work for all employees even without knoledge in python. We also decided to use spark since we have so many data quantity we need some speed in processing

* Propose how often the data should be updated and why.

Since it´s a report for all sectors at our companie, and all the data are group by year, I suggest a year update to make sure mkt,financial, sales and others use the exactly same data and could schedule their internal update in advance.
Since we are simulating a real company, we selected as main variables: year, city cod, port cod, arrivel and dedeparture date, visa code, Average Temperature, "City", "Country", "Latitude", "Longitude". 

For example, mkt team will know now the exact data to invest money in add, to rise company leeds

Sales will know the right moment to call to a client.

Financial team will see and predict better how the cash flow.

* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.

If the data was increased by 100x, we would need to process keep processing at spark to handle all the information and didnt take so long to receive the final report

* The data populates a dashboard that must be updated on a daily basis by 7am every day.

If the data must be uptade everyday, to avoid any kind of update I would suggest a report Day-1, in that way we would have enough time to uptade the report and avoid any system or internet or process problem. And also insert a collumn with day.

* The database needed to be accessed by 100+ people.

If we need to send the database to more than 100 people, we sould export the report to pdf or even create a summary version of the data, that can easily send by e-mail.