## Data Engineering Capstone Project


### Step 2. Explore and Assess the Data

The purpose of this notebook is to read in the relevant data, and assess the following attributes of each data source;

* Data schema.
* Size of each data source.
* Quality of each data source.

As described in the README file, for each data source, we will read it into a data frame using pandas, and subsequently analyse the attributes. Pandas was chosen to read the data in such as to enable ease of use with airflow. 

In [1]:
import pandas as pd
import os
import datetime

#### Immigration Dataset

The immigration dataset is stored in a series of parquet files. They are stored in `data/immigration-data/`. We are going to read them in using spark and analyse the schema.

In [35]:
	
## read in the parquet files from the directory
data_directory = 'data/immigration-data'
data_files = data_files = [os.path.join(data_directory, f) for f in os.listdir(data_directory)]

dfs = []

for f in data_files:
    _df = pd.read_parquet(f)
    dfs.append(_df)

immigration_data = pd.concat(dfs)

In [36]:
## get the immigration data columns
immigration_data.columns

Index(['cicid', 'i94yr', 'i94mon', 'i94cit', 'i94res', 'i94port', 'arrdate',
       'i94mode', 'i94addr', 'depdate', 'i94bir', 'i94visa', 'count',
       'dtadfile', 'visapost', 'occup', 'entdepa', 'entdepd', 'entdepu',
       'matflag', 'biryear', 'dtaddto', 'gender', 'insnum', 'airline',
       'admnum', 'fltno', 'visatype'],
      dtype='object')

In [37]:
## get the first 10 rows
immigration_data.head(10)

Unnamed: 0,cicid,i94yr,i94mon,i94cit,i94res,i94port,arrdate,i94mode,i94addr,depdate,...,entdepu,matflag,biryear,dtaddto,gender,insnum,airline,admnum,fltno,visatype
0,3545479.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,1968.0,10182016,M,,PR,93995870000.0,102,B2
1,3545480.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,1979.0,10182016,M,,PR,93994370000.0,152,B2
2,3545481.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,1980.0,10182016,F,,PR,93994430000.0,152,B2
3,3545482.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,1998.0,10182016,F,,PR,93995980000.0,102,B2
4,3545483.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,1999.0,10182016,M,,PR,93996030000.0,102,B2
5,3545484.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20583.0,...,,M,2007.0,10182016,F,,PR,93995900000.0,102,B2
6,3545485.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20584.0,...,,M,1962.0,10182016,F,,PR,93996430000.0,102,B2
7,3545486.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20584.0,...,,M,1970.0,10182016,F,,CZ,93996710000.0,327,B2
8,3545487.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20585.0,...,,M,1957.0,10182016,M,,PR,93994260000.0,152,B2
9,3545488.0,2016.0,4.0,260.0,260.0,LOS,20563.0,1.0,CA,20585.0,...,,M,1962.0,10172016,M,,BR,93923220000.0,16,B2


In [38]:
## analyse the timestamp rows
def show_ts_columns(df):
    print(df[['arrdate', 'depdate', 'dtaddto']].head(50))
    
show_ts_columns(immigration_data)

    arrdate  depdate   dtaddto
0   20563.0  20583.0  10182016
1   20563.0  20583.0  10182016
2   20563.0  20583.0  10182016
3   20563.0  20583.0  10182016
4   20563.0  20583.0  10182016
5   20563.0  20583.0  10182016
6   20563.0  20584.0  10182016
7   20563.0  20584.0  10182016
8   20563.0  20585.0  10182016
9   20563.0  20585.0  10172016
10  20563.0  20585.0  10172016
11  20563.0  20585.0  10172016
12  20563.0  20585.0  10172016
13  20563.0  20586.0  10182016
14  20563.0  20586.0  10182016
15  20563.0  20586.0  10182016
16  20563.0  20586.0  10182016
17  20563.0  20586.0  10182016
18  20563.0  20586.0  10182016
19  20563.0  20586.0  10182016
20  20563.0  20586.0  10182016
21  20563.0  20587.0  10182016
22  20563.0  20587.0  10182016
23  20563.0  20587.0  10182016
24  20563.0  20587.0  10182016
25  20563.0  20587.0  10182016
26  20563.0  20588.0  10182016
27  20563.0  20588.0  10182016
28  20563.0  20588.0  10182016
29  20563.0  20589.0  10182016
30  20563.0  20589.0  10182016
31  2056

In [39]:
## convert sas timestamp to date
def convert_sas_timestamp(column_name, df):
    df[column_name] = pd.to_timedelta(df[column_name], unit='D') + pd.Timestamp('1960-1-1')
    return df

In [40]:
## convert arrival date and departure dates 
immigration_data = convert_sas_timestamp('arrdate', immigration_data)
immigration_data = convert_sas_timestamp('depdate', immigration_data)

show_ts_columns(immigration_data)

      arrdate    depdate   dtaddto
0  2016-04-19 2016-05-09  10182016
1  2016-04-19 2016-05-09  10182016
2  2016-04-19 2016-05-09  10182016
3  2016-04-19 2016-05-09  10182016
4  2016-04-19 2016-05-09  10182016
5  2016-04-19 2016-05-09  10182016
6  2016-04-19 2016-05-10  10182016
7  2016-04-19 2016-05-10  10182016
8  2016-04-19 2016-05-11  10182016
9  2016-04-19 2016-05-11  10172016
10 2016-04-19 2016-05-11  10172016
11 2016-04-19 2016-05-11  10172016
12 2016-04-19 2016-05-11  10172016
13 2016-04-19 2016-05-12  10182016
14 2016-04-19 2016-05-12  10182016
15 2016-04-19 2016-05-12  10182016
16 2016-04-19 2016-05-12  10182016
17 2016-04-19 2016-05-12  10182016
18 2016-04-19 2016-05-12  10182016
19 2016-04-19 2016-05-12  10182016
20 2016-04-19 2016-05-12  10182016
21 2016-04-19 2016-05-13  10182016
22 2016-04-19 2016-05-13  10182016
23 2016-04-19 2016-05-13  10182016
24 2016-04-19 2016-05-13  10182016
25 2016-04-19 2016-05-13  10182016
26 2016-04-19 2016-05-14  10182016
27 2016-04-19 2016-0

In [41]:
## filter where the immigration departure is valid
immigration_data = immigration_data[immigration_data['dtaddto'].str.len() == 8]

In [42]:
# convert the datetime column
immigration_data['dtaddto'] = pd.to_datetime(immigration_data['dtaddto'], format="%m%d%Y", errors='coerce')

In [43]:
show_ts_columns(immigration_data)

      arrdate    depdate    dtaddto
0  2016-04-19 2016-05-09 2016-10-18
1  2016-04-19 2016-05-09 2016-10-18
2  2016-04-19 2016-05-09 2016-10-18
3  2016-04-19 2016-05-09 2016-10-18
4  2016-04-19 2016-05-09 2016-10-18
5  2016-04-19 2016-05-09 2016-10-18
6  2016-04-19 2016-05-10 2016-10-18
7  2016-04-19 2016-05-10 2016-10-18
8  2016-04-19 2016-05-11 2016-10-18
9  2016-04-19 2016-05-11 2016-10-17
10 2016-04-19 2016-05-11 2016-10-17
11 2016-04-19 2016-05-11 2016-10-17
12 2016-04-19 2016-05-11 2016-10-17
13 2016-04-19 2016-05-12 2016-10-18
14 2016-04-19 2016-05-12 2016-10-18
15 2016-04-19 2016-05-12 2016-10-18
16 2016-04-19 2016-05-12 2016-10-18
17 2016-04-19 2016-05-12 2016-10-18
18 2016-04-19 2016-05-12 2016-10-18
19 2016-04-19 2016-05-12 2016-10-18
20 2016-04-19 2016-05-12 2016-10-18
21 2016-04-19 2016-05-13 2016-10-18
22 2016-04-19 2016-05-13 2016-10-18
23 2016-04-19 2016-05-13 2016-10-18
24 2016-04-19 2016-05-13 2016-10-18
25 2016-04-19 2016-05-13 2016-10-18
26 2016-04-19 2016-05-14 201

#### Temperature Data

The temperature data is divided into four csv files;

* GlobalTemperatures.csv
* GlobalLandTemperaturesByCity.csv
* GlobalLandTemperaturesByCountry.csv
* GlobalLandTemperaturesByMajorCity.csv
* GlobalLandTemperaturesByState.csv

For each of the csv files, we will read them in using pandas, we will get the schema, print the first 10 rows of the data, and display the count.

In [28]:
## base path for the csv files
base_path = './data/climate-change'

## list of the files
import os
import pandas as pd

file_names = ['GlobalTemperatures', 
#               'GlobalLandTemperaturesByCity', 
              'GlobalLandTemperaturesByCountry',
              'GlobalLandTemperaturesByMajorCity',
              'GlobalLandTemperaturesByState']

for data_source in file_names:
    data_dest = os.path.join(base_path, f'{data_source}.csv')
    print(f'== Analysing Data Source:: {data_source} :: File Path :: {data_dest} ==')
          
    data_df = pd.read_csv(data_dest)
    
    if data_source == 'GlobalLandTemperaturesByCountry':
        
        ## print the schema
        print('\n** SCHEMA **\n')
        print(list(data_df))
        print()
        
        print(f'Before::{data_df.columns}')
        
        data_df.drop(['AverageTemperatureUncertainty'], inplace=True, axis=1)

        print(f'After::{data_df.columns}')
        
        data_df.columns = ['ts', 
                           'average_temperature',
                           'country_code']
        
        ## get the first 10 rows
        print('\n** FIRST 10 ROWS **\n')
        print(data_df.head(10))
        print()

        ## get the count
        print('\n** NUMBER OF ROWS **\n')
        print(len(data_df))
        print()

    

== Analysing Data Source:: GlobalTemperatures :: File Path :: ./data/climate-change/GlobalTemperatures.csv ==
== Analysing Data Source:: GlobalLandTemperaturesByCountry :: File Path :: ./data/climate-change/GlobalLandTemperaturesByCountry.csv ==

** SCHEMA **

['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'Country']

Before::Index(['dt', 'AverageTemperature', 'AverageTemperatureUncertainty', 'Country'], dtype='object')
After::Index(['dt', 'AverageTemperature', 'Country'], dtype='object')

** FIRST 10 ROWS **

           ts  average_temperature country_code
0  1743-11-01                4.384        Åland
1  1743-12-01                  NaN        Åland
2  1744-01-01                  NaN        Åland
3  1744-02-01                  NaN        Åland
4  1744-03-01                  NaN        Åland
5  1744-04-01                1.530        Åland
6  1744-05-01                6.702        Åland
7  1744-06-01               11.609        Åland
8  1744-07-01               15.342   

#### Demographics

The demographics dataset contains information about the demographics of all US cities. We will read in the csv files using pandas and get the schema, first 10 rows, and the row count.

In [36]:
file_path = './data/demographics/us-cities-demographics.csv'

demographics_df = pd.read_csv(file_path, delimiter=";")

## get the schema
print('\n** SCHEMA **\n')
print(list(demographics_df))
print()

## get the columns

print(f'Before::{demographics_df.columns}')

df_cols = ['city',
           'state',
           'median_age',
           'male_population',
           'female_population',
           'total_population',
           'number_of_veterans',
           'foreign_born',
           'average_household_size',
           'state_code',
           'race',
           'count']

print(f'NewColumns::{df_cols}::{len(df_cols)}')

demographics_df.columns = df_cols

print(f'After::{demographics_df.columns}')

## get the first 10 rows
print('\n** FIRST 10 ROWS **\n')
print(demographics_df.head(10))
print()

## get the row count
print('\n** ROW COUNT **\n')
print(len(demographics_df))
print()


** SCHEMA **

['City', 'State', 'Median Age', 'Male Population', 'Female Population', 'Total Population', 'Number of Veterans', 'Foreign-born', 'Average Household Size', 'State Code', 'Race', 'Count']

Before::Index(['City', 'State', 'Median Age', 'Male Population', 'Female Population',
       'Total Population', 'Number of Veterans', 'Foreign-born',
       'Average Household Size', 'State Code', 'Race', 'Count'],
      dtype='object')
NewColumns::['city', 'state', 'median_age', 'male_population', 'female_population', 'total_population', 'number_of_veterans', 'foreign_born', 'average_household_size', 'state_code', 'race', 'count']::12
After::Index(['city', 'state', 'median_age', 'male_population', 'female_population',
       'total_population', 'number_of_veterans', 'foreign_born',
       'average_household_size', 'state_code', 'race', 'count'],
      dtype='object')

** FIRST 10 ROWS **

             city           state  median_age  male_population  \
0     Los Angeles      Californ

#### Airport Codes

The airport codes dataset contains airport codes, and corresponding cities

We will read in the `.csv` file using pandas, get the schema, the first 10 rows, and the length of the dataset.

In [None]:
file_path = './data/airport-codes/airport-codes_csv.csv'

airport_codes_df = pd.read_csv(file_path)

print('\n** SCHEMA **\n')
print(list(airport_codes_df))
print()

print('\n** FIRST 10 ROWS **\n')
print(airport_codes_df.head(10))
print()

print('\n** ROW COUNT **\n')
print(len(airport_codes_df))
print()