Murilo Amaral  
Email: murilogmamaral@gmail.com  
Linkedin: https://www.linkedin.com/in/murilogmamaral/  

___
# Forecasting passenger flow at Brazilian airports
## 2 - ETL and feature engineering
This notebook aims to extract, transform and enrich data to make it available in refined zone.
___

### Libraries and Configuration

In [2]:
import glob
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
class color: RED = '\033[91m'; GREEN = '\033[92m'; GRAY = '\033[90m'; END = '\033[0m'
spark = SparkSession.builder.appName("ETL").getOrCreate()

### From download to raw zone (partitioned by arrival airport)

In [3]:
download_path = '/Volumes/datalake/download/anac/'
raw_path      = '/Volumes/datalake/raw/anac/'

In [4]:
# Selected columns
usecols = ['sg_empresa_icao',       # company
           'nm_pais_origem',        # leaving from (country)
           'nm_municipio_origem',   # leaving from (city)
           'nm_municipio_destino',  # going to (city)
           'sg_icao_origem',        # airport of departure
           'sg_icao_destino',       # airport of arrival
           'dt_referencia',         # departure date (scheduled)  
           'dt_chegada_real',       # arrival date (real)
           'hr_partida_real',       # departure time (real)
           'hr_chegada_real',       # arrival time (real)
           'nr_passag_pagos',       # number of passengers (paid tickets)
           'nr_passag_gratis'       # number of passengers (free tickets)
          ]

# Temporary columns to filter internal and regular flights
tempcols = ['ds_grupo_di',
            'nr_assentos_ofertados'
           ]

In [5]:
# Function to make data available in raw zone
def toRaw(file):
    try:
        test = pd.read_csv(raw_path+'history.csv')
    except:
        test = pd.DataFrame({'data':[]})
    try:
        if file not in test['data'].tolist():
            
            # Loading data
            df = pd.read_csv(download_path+file,
                             sep = ';',
                             encoding = 'latin-1',
                             low_memory = False,
                             usecols = usecols+tempcols)
            # Filtering regular flights
            df = df[(df['ds_grupo_di'] == 'REGULAR') & (df['nr_assentos_ofertados'] > 0)]
            df = df[usecols]
            # Types
            for col in ['nr_passag_pagos','nr_passag_gratis']:
                df[col] = df[col].fillna(0).apply(int)
            # Moving data to raw zone (partitioned by arrival airport) # Pandas will append data by default
            df.to_parquet(raw_path, index = False, partition_cols='sg_icao_destino')
            test = pd.concat([test,pd.DataFrame({'data':[file]})])
            test.to_csv(raw_path+'history.csv',index = False)
            print(f'{color.GREEN}>>>> Data from \'{file}\' were moved to raw zone.{color.END}')
        else:
            print(f'{color.GRAY}>>>> Data from \'basica{year}-{month}.zip\' was already in download zone.{color.END}')
    except:
        print(f'{color.RED}---- Data from \'{file}\' could not be moved to raw zone.{color.END}')

In [6]:
files = [x.split('/')[-1] for x in glob.glob(download_path+'*')]
files.sort()
for file in files:
    toRaw(file)

[92m>>>> Data from 'basica2014-01.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-02.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-03.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-04.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-05.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-06.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-07.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-08.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-09.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-10.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-11.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2014-12.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2015-01.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2015-02.zip' were moved to raw zone.[0m
[92m>>>> Data from 'basica2015-03.zip' were mov


### Feature engineering

In [7]:
# Dollar-Real currency (monthly)
# Data source: IPEA - The Institute for Applied Economic Research (Brazil)
url =  'http://ipeadata.gov.br/exibeserie.aspx?serid=38389'
dollar = pd.read_html(url,header=0,decimal=',', thousands='.')[2]
dollar.columns = ['year','dollar']
dollar['month'] = dollar['year'].apply(lambda x: int(str(x)[4:6]))
dollar['year'] = dollar['year'].apply(lambda x: int(str(x)[0:4]))
dollar = dollar[dollar.year>=2012]
dollar['dollar'] = dollar['dollar'].apply(lambda x: float(x))
dollar = dollar[['year','month','dollar']]
dollar_spark = spark.createDataFrame(dollar)

In [8]:
# Unemployment rate in Brazil (monthly)
# Data source: IPEA - The Institute for Applied Economic Research (Brazil)
url = 'http://www.ipeadata.gov.br/ExibeSerie.aspx?serid=1347352645'
unemployment_rate = pd.read_html(url,header=0,decimal=',', thousands='.')[2]
unemployment_rate.columns = ['year','unemployment_rate']
unemployment_rate['month'] = unemployment_rate['year'].apply(lambda x: int(str(x)[4:6]))
unemployment_rate['year'] = unemployment_rate['year'].apply(lambda x: int(str(x)[0:4]))
unemployment_rate['unemployment_rate'] = unemployment_rate['unemployment_rate'].apply(lambda x: float(x))
unemployment_rate = unemployment_rate[['year','month','unemployment_rate']]
unemployment_rate = unemployment_rate[unemployment_rate['year']>=2012].round()
unemployment_rate_spark = spark.createDataFrame(unemployment_rate)

### From raw to refined zone

In [9]:
refined_path  = '/Volumes/datalake/refined/anac/'

In [10]:
def toRefined(departure_airport):

    try:
        ### EXTRACT ###########################
        extract_path = f'/Volumes/datalake/raw/anac/sg_icao_destino={departure_airport}'
        df = spark.read.parquet(extract_path).distinct()
        
        ### TRANSFORM ###########################
        # Setting new column names
        columns = ['company','departure_country','departure_city','arrival_city',
                   'departure_airport','departure_date','arrival_date',
                   'departure_hour','arrival_hour','pax','free_pax']
        for a, b in zip(df.columns, columns):
            df = df.withColumnRenamed(a, b)

        # Unifying number of passengers
        df = df.withColumn('pax', df['pax'] + df['free_pax'])

        # Dropping NAs
        df = df.where(df['arrival_date'].isNotNull())

        # Getting year, month, day and weekday separately
        df = df.withColumn('year',F.year(F.col('arrival_date')))
        df = df.withColumn('month',F.month(F.col('arrival_date')))
        df = df.withColumn('day',F.dayofmonth(F.col('arrival_date')))
        df = df.withColumn('weekday',F.dayofweek(F.col('arrival_date')))

        # Join
        df = df.join(dollar_spark, on=['year','month'], how = 'left')
        df = df.join(unemployment_rate_spark, on=['year','month'], how = 'left')

        # Selecting columns
        df = df.select('company','departure_country','departure_city','departure_airport',
                       'arrival_date','arrival_hour','year','month','day','weekday',
                       'pax','dollar','unemployment_rate')
        
        ### LOAD ###########################
        load_path = '/Volumes/datalake/refined/anac/'
        (
            df
            .coalesce(1)
            .write
            .format("parquet")
            .mode('overwrite')
            .option("header", "true")
            .save(load_path+f'{departure_airport}')
        )

        print(f'[{departure_airport}] ETL completed.')
        
    except:
        print(f'[{departure_airport}] Error.')

In [12]:
# Getting arrival data of Salvador/BA airport
airports = ['SBSV']
for a in airports:
    toRefined(a)

[SBSV] ETL completed.
