# Extração e Transformação
Este trabalho faz parte do Projeto Final de Ciência de Dados da Academia QADS.

Este notebook comporta as etapas de coleta, transformação e disponibilização (estrutura de datalake) das bases para posterior análise.

Destacamos a importância do trabalho de preparação da base final para um menor uso de recursos computacionais e também a criação de novas features, como feriados, valor do dolar mensal, preço médio da passagem e o LF (Load Factor) ou Fator de Ocupação.  

# Importacao de Bibliotecas

In [None]:
import os
import re
import zipfile
import datetime
import requests
import urllib.request

import holidays
import pandas as pd
from bs4 import BeautifulSoup

# Conectar ao Google Drive

In [None]:
##################################################
from google.colab import drive

Drive='/content/gdrive/'
drive.mount(Drive)

path=Drive+'MyDrive/Projeto_DS/Ambiente_Producao/'

Mounted at /content/gdrive/


# Estrutura Datalake

```
|
|__datalake
   |
   |___tmp
   |   |________tarifa
   |   |        |______201601.csv
   |   |        |______...
   |   |        |______202108.csv
   |   |
   |   |________basica
   |            |______basica2016-01.txt
   |            |______...
   |            |______basica2021-10.txt
   |
   |___raw
   |   |________tarifa
   |   |        |______tarifa_2016.csv
   |   |        |______...
   |   |        |______tarifa_2021.csv
   |   |
   |   |________basica
   |            |______basica_2016.csv
   |            |______...
   |            |______basica_2021.csv
   |
   |___trusted
   |   |________tarifa
   |   |        |______tarifa_2016.csv
   |   |        |______...
   |   |        |______tarifa_2021.csv
   |   |
   |   |________basica
   |            |______basica_2016.csv
   |            |______...
   |            |______basica_2021.csv
   |
   |___business
   |   |________basica_tarifa_dolar_2016.csv
   |   |________...
   |   |________basica_tarifa_dolar_2021.csv
   |   |________basica_tarifa_dolar_completa.csv
   |
   |___resource
       |_______cotacao_mensal.csv
```

# Funções Úteis

In [None]:
##################################################
# Criar diretorios, caso nao existam:
def check_folder(path: str) -> None:
    if not os.path.isdir(path):
        os.makedirs(path)


# Descompactar todos os arquivos .zip do diretorio informado:
def extract_all_zip(path_dir: str, remover=True) -> None:
    for file in os.listdir(path_dir):
        file = path_dir+file
        if file.endswith('.zip'):
            try:
                with zipfile.ZipFile(file) as z:
                    z.extractall(path=path_dir)
                # remover arquivos .zip
                if remover == True:
                    os.remove(file)
            except Exception as e:
                print(f'Falha ao descompactar arquivo {file}!Erro: {e}')


def merge_all(dfs: list) -> pd.DataFrame:
    if len(dfs) == 0:
        return None
    df = dfs[0]
    for df2 in dfs[1:]:
        #df = df.union(df2)         #pyspark
        df = pd.concat([df, df2])   #pandas
    return df


def ler_bases(padrao: str, path_dir: str) -> pd.DataFrame:
    arquivos = [path_dir+arq for arq in os.listdir(path_dir) if padrao in arq]
    dfs = []
    for arq in arquivos:
        df = pd.read_csv(arq, sep=';', encoding='latin-1', low_memory=False)
        dfs.append(df)
    return merge_all(dfs)


In [None]:
path_datalake = path+'datalake/'
check_folder(path_datalake)

In [None]:
def listar_anos(base: str) -> list:
    path_tmp = path_datalake+f'tmp/{base}/'
    list_anos = []
    for arq in os.listdir(path_tmp):
        if base == 'tarifa':
            ano = arq[:4]
        elif base == 'combinada' or base == 'basica':
            ano = arq.replace(base,'').split('-')[0]
        else:
            raise Exception(f'Base {base} inexistente!')
        list_anos.append(ano)
    return sorted(list(set(list_anos)))


meses = {1:'janeiro',
         2:'fevereiro',
         3:'marco',
         4:'abril',
         5:'maio',
         6:'junho',
         7:'julho',
         8:'agosto',
         9:'setembro',
         10:'outubro',
         11:'novembro',
         12:'dezembro'}

# Meses abreviados:
meses_abrv = dict(zip(meses.keys(),[mes[:3].capitalize() for mes in list( meses.values() )]))

# Get key by value in dictionary:
def get_key_by_value(dictionary: dict, value):
    return list(dictionary.keys())[list(dictionary.values()).index(value)]

In [None]:
# Feriados
def Feriados(Serie, ano: int):
    feriados = holidays.Brazil()
    feriados = [str(x) for x in feriados[f'{ano}-01-01': f'{ano}-12-31']]

    feriados_quinta = [x for x in feriados if pd.to_datetime(x).dayofweek == 3]
    feriados_sexta = [x for x in feriados if pd.to_datetime(x).dayofweek == 4]
    feriados_segunda = [x for x in feriados if pd.to_datetime(x).dayofweek == 0]
    feriados_terca = [x for x in feriados if pd.to_datetime(x).dayofweek == 1]

    feriadao = []

    for f in feriados_quinta:
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=-1))
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=0))
        
    for f in feriados_sexta:
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=-1))
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=0))

    for f in feriados_segunda:
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=-3))
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=0))

    for f in feriados_terca:
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=-4))
        feriadao.append(pd.to_datetime(f)+datetime.timedelta(days=0))

    feriadao = feriadao + feriados_quinta + feriados_sexta

    feriadao = [str(x)[0:10] for x in feriadao]

    def if_feriado(x):
        if x in feriadao: return 1
        else: return 0

    return Serie.apply(if_feriado)


# Observar os períodos em que há maior fluxo de passageiros em uma série temporal
# e criar uma flag para sinalizar esse aumento de fluxo. Testar resultados.

# Extração dos dados

In [None]:
##################################################
def download_file(File: str, servidor: str, path_dir: str) -> None:
    try:
        urllib.request.urlretrieve(servidor+File, path_dir+File)
    except Exception as e:
        try:
            r = requests.get(servidor+File)
            if r.status_code == 200:
                with open(path_dir+File, 'wb') as f:
                    f.write(r.content)
                print(f'Arquivo baixado: {File} -- requests')
            else:
                print(f"Falha no download do arquivo {File}!\nErro: status_code={r.status_code}")
        except Exception as e2:
            print(f"Falha no download do arquivo {File}!\nErro: {e}\nErro: {e2}")


def extract(base: str, ano_min: int, ano_max: int) -> None:
    path_base = path_datalake+f'tmp/{base}/'
    check_folder(path_base)

    ####################
    if base == 'tarifa':
        # Obter lista de links:
        url = 'https://sistemas.anac.gov.br/sas/tarifadomestica/'
        site = requests.get(url).content
        soup = BeautifulSoup(site, 'html.parser')
        tag_a_list = soup.find_all('a', href=True)
        anos = [a.get('href').split('/')[-2] for a in tag_a_list if re.search('\d{4}', a.get('href'))!=None]
        csv_files = []
        for ano in anos:
            site = requests.get(url+ano).content
            soup = BeautifulSoup(site, 'html.parser')
            tag_a_list = soup.find_all('a', href=True)
            csv_files += [a.get('href').split('/')[-1] for a in tag_a_list if 'csv' in a.get('href')]

        # Download dos dados:
        for arq in csv_files:
            ano = arq[:4]
            if str(ano_min) <= ano <= str(ano_max):
                link = url+f'{ano}/'
                download_file(File=arq, servidor=link, path_dir=path_base)

    ####################
    elif base == 'combinada' or base == 'basica':
        # Obter lista de links:
        url = 'https://www.gov.br/anac/pt-br/assuntos/regulados/empresas-aereas/envio-de-informacoes/microdados/'
        site = requests.get(url).content
        soup = BeautifulSoup(site, 'html.parser')
        tag_a_list = soup.find('table').find_all('a', href=True)
        links = [a.get('href') for a in tag_a_list if ('zip' in a.get('href')) and (base in a.get('href'))]

        # Download dos dados:
        for link in links:
            arq = link.split('/')[-1]
            ano = arq.replace(base,'').split('-')[0]
            link = link.replace(arq, '')
            if str(ano_min) <= ano <= str(ano_max):
                download_file(File=arq, servidor=link, path_dir=path_base)
        extract_all_zip(path_base)

    ####################
    else:
        print(f'Base {base} inexistente!')


In [None]:
%%time
extract(base='tarifa', ano_min=2016, ano_max=2021)

CPU times: user 9.87 s, sys: 4.08 s, total: 14 s
Wall time: 19min 45s


In [None]:
%%time
extract(base='basica', ano_min=2016, ano_max=2021)

CPU times: user 28.1 s, sys: 8.87 s, total: 37 s
Wall time: 16min 5s


In [None]:
%%time
extract(base='combinada', ano_min=2016, ano_max=2021)

CPU times: user 44.1 s, sys: 16.1 s, total: 1min
Wall time: 17min 19s


# Transfomacao Raw

In [None]:
##################################################
## Concatenar dados (Anos) e Salvar dataframe

def tranform_raw(base: str) -> None:

    path_tmp = path_datalake+f'tmp/{base}/'
    path_raw = path_datalake+f'raw/{base}/'
    check_folder(path_raw)

    list_anos = listar_anos(base)

    if base == 'tarifa':
        colunas = ['ANO', 'MES', 'EMPRESA', 'ORIGEM', 'DESTINO', 'TARIFA', 'ASSENTOS']
        for ano in list_anos:
            df = pd.DataFrame({col:[] for col in colunas})
            arquivos = [path_tmp+arq for arq in os.listdir(path_tmp) if ano==arq[:4]]
            for arq in arquivos:
                aux = pd.read_csv(arq, sep=';', encoding='latin-1', decimal=',', low_memory=False)
                aux.columns = colunas
                df = pd.concat([df, aux])
            print(ano, df.shape)
            df.to_csv(path_raw+f'{base}_{ano}.csv', sep=';', encoding='latin-1', index=False)
    else:
        for ano in list_anos:
            df = ler_bases(f'{ano}', path_tmp)
            print(ano, df.shape)
            df.to_csv(path_raw+f'{base}_{ano}.csv', sep=';', encoding='latin-1', index=False)


In [None]:
%%time
tranform_raw(base='tarifa')

2016 (4634293, 7)
2017 (4754065, 7)
2018 (5113126, 7)
2019 (4335164, 7)
2020 (3472250, 7)
2021 (2661330, 7)
CPU times: user 2min 25s, sys: 5.12 s, total: 2min 31s
Wall time: 3min 7s


In [None]:
%%time
tranform_raw(base='basica')

2016 (971239, 110)
2017 (947583, 110)
2018 (974995, 110)
2019 (956515, 110)
2020 (470845, 110)
2021 (469285, 110)
CPU times: user 8min 32s, sys: 24 s, total: 8min 56s
Wall time: 11min 4s


In [None]:
%%time
tranform_raw(base='combinada')

2016 (2625277, 90)
2017 (2530830, 90)
2018 (2405185, 90)
2019 (2325077, 90)
2020 (1111681, 90)
2021 (1054295, 90)
CPU times: user 16min 26s, sys: 50.6 s, total: 17min 17s
Wall time: 19min 52s


# Transfomacao Trusted

In [None]:
def replace_virgula_ponto(v):
    if type(v)==str and ',' in v:
        return float(v.replace('.','').replace(',','.'))
    elif type(v)==str and ',' not in v:
        return float(v)
    else:
        return v

In [None]:
##################################################
def tranform_trs(base: str) -> None:
    path_raw = path_datalake+f'raw/{base}/'
    path_trs = path_datalake+f'trusted/{base}/'
    check_folder(path_trs)

    ######################
    if base == 'tarifa':
        for ano in listar_anos(base):
            df_passagens = ler_bases(ano, path_raw)
            #df_passagens = pd.read_csv(path_raw+f'{base}_{ano}.csv', sep=';', encoding='latin-1', low_memory=False)
            
            # Definir tipos do ano, mes e assentos como inteiro:
            df_passagens[['ANO', 'MES', 'ASSENTOS']] = df_passagens[['ANO', 'MES', 'ASSENTOS']].astype('int')

            print(ano, df_passagens.shape)
            df_passagens['TOTAL'] = df_passagens.TARIFA * df_passagens.ASSENTOS
            temp = df_passagens[['ANO', 'MES', 'EMPRESA', 'ORIGEM', 'DESTINO', 'ASSENTOS', 'TOTAL']]

            temp = temp.groupby(['ANO', 'MES', 'EMPRESA', 'ORIGEM', 'DESTINO'],as_index=False).agg({'ASSENTOS': 'sum', 'TOTAL': 'sum'})
            temp['TICKET_MEDIO'] = round(temp['TOTAL']/temp['ASSENTOS'])
            passagens_refined = temp[['ANO','MES','EMPRESA','ORIGEM','DESTINO','TICKET_MEDIO']]
            passagens_refined.columns = [cl.lower() for cl in passagens_refined.columns]
            passagens_refined.to_csv(path_trs+f'{base}_{ano}.csv', sep=';', encoding='latin-1', index=False)

    ######################
    elif base == 'basica':
        for ano in listar_anos(base):
            #for ano in ['2019','2020','2021']:
            #df = ler_bases(ano, path_raw)
            df = pd.read_csv(path_raw+f'{base}_{ano}.csv', sep=';', encoding='latin-1', low_memory=False)

            selecao = [
                # features essenciais:
                'hr_partida_real', # hora da partida
                'hr_chegada_real', # hora da chegada
                'ds_grupo_di',
                'ds_natureza_tipo_linha',
                'ds_natureza_etapa',
                'nr_assentos_ofertados',
                'dt_referencia', #data
                'nr_ano_referencia', # ano
                'nr_mes_referencia', # mes
                'nr_dia_referencia', # dia
                'nm_dia_semana_referencia', # dia da semana
                'sg_empresa_icao', # sigla da empresa
                'sg_icao_origem', # aeroporto de origem
                'sg_icao_destino', # aeroporto de destino
                'sg_uf_origem', # uf de origem
                'sg_uf_destino', # uf de destino
                'nm_regiao_origem',
                'nm_regiao_destino',
                'nr_horas_voadas',
                'nr_passag_gratis',
                'kg_peso',
                'kg_carga_paga',
                'kg_carga_gratis',
                'kg_correio',
                'km_distancia', # distancia
                'sg_equipamento_icao', # aviao
                'lt_combustivel', # combustivel
                'kg_payload', # peso disponível
                'nr_ask', # assentos disponíveis
                'nr_rpk',
                'nr_atk',
                'nr_rtk',
                # target:
                'nr_passag_pagos']

            # Sugestão:
            # Criar uma flag para cidades turísticas;
            # E talvez uma variável com o n de habitantes da cidade;
            
            # Selecionar campos de interesse:
            df = df[selecao]
            print(ano, df.shape, end=' ')

            df['nr_horas_voadas'] = df['nr_horas_voadas'].apply(replace_virgula_ponto)
            
            # Sugestão:
            # Criar uma variável com categorias no seguinte formato:
            # QUA-10 (10 horas), QUI-12 (12 horas)
            # Ou QUA-M (manhã), QUI-T (tarte), DOM-N (noite)

            # Criar feature Load Factor:
            df['lf_passag'] = df['nr_rpk'].astype(float)/df['nr_ask'].astype(float)
            df['lf_peso'] = df['nr_rtk'].astype(float)/df['nr_atk'].astype(float)

            # Criar feature rendimento do combustível:
            df['rend_combustivel'] = round(df['km_distancia'].astype(float)/df['lt_combustivel'].astype(float),4)

            # Reduzir nome do dia da semana:
            def reduzir(s):
                return re.sub('SÁB','SAB', s[0:3])
            df.nm_dia_semana_referencia = df.nm_dia_semana_referencia.apply(reduzir)

            def get_hora(h):
                try:
                    return h.split(':')[0]
                except:
                    return None

            df['hora'] = df['hr_partida_real'].apply(get_hora)

            # Aplicar varios filtros:
            df = df[ (df.ds_grupo_di=='REGULAR') &
                     (df.ds_natureza_tipo_linha=='DOMÉSTICA') &
                     (df.ds_natureza_etapa=='DOMÉSTICA') &
                     (df.nr_assentos_ofertados > 0) ]

            # Adicionar flag de feriados:
            df['feriadao'] = Feriados(df['dt_referencia'], ano)

            # Converter colunas para int:
            colunas_int = ['kg_peso', 'kg_carga_paga', 'kg_carga_gratis', 'kg_correio',
                           'nr_passag_gratis', 'nr_passag_pagos', 'km_distancia', 'lt_combustivel']
  
            def convert_int(x):
                try:
                    return int(x)
                except:
                    return x

            for c in colunas_int:
                #df[c] = df[c].astype(int)
                df[c] = df[c].apply(convert_int)

            print(df.shape)
            df.to_csv(path_trs+f'{base}_{ano}.csv', sep=';', encoding='latin-1', index=False)

    ######################
    else:
        print(f'Base {base} inexistente!')


In [None]:
%%time
tranform_trs('basica')

2016 (971239, 33) (776189, 38)
2017 (947583, 33) (752746, 38)
2018 (974995, 33) (778618, 38)
2019 (956515, 33) (758789, 38)
2020 (470845, 33) (369526, 38)
2021 (469285, 33) (394358, 38)
CPU times: user 3min 58s, sys: 20.8 s, total: 4min 18s
Wall time: 4min 40s


In [None]:
%%time
tranform_trs('tarifa')

2016 (4634293, 7)
2017 (4754065, 7)
2018 (5113126, 7)
2019 (4335164, 7)
2020 (3472250, 7)
2021 (2661330, 7)
CPU times: user 25.4 s, sys: 2.25 s, total: 27.7 s
Wall time: 29.5 s


# Transfomacao Business

In [None]:
def editar_mes_ano(m_a) -> str:
    m,a = m_a.split()
    m = get_key_by_value(dictionary=meses_abrv, value=m)
    m = str(m).zfill(2)
    a = str(2000+int(a))
    return f'{a}-{m}'

In [None]:
def tranform_bis() -> None:
    path_trs = path_datalake+'trusted/'
    path_bis = path_datalake+'business/'
    check_folder(path_bis)

    for ano in listar_anos('basica'):
        df_basica = pd.read_csv(path_trs+f'basica/basica_{ano}.csv', sep=';', encoding='latin-1', low_memory=False)
        df_tarifa = pd.read_csv(path_trs+f'tarifa/tarifa_{ano}.csv', sep=';', encoding='latin-1', low_memory=False)

        df_basica_tarifa = pd.merge(df_basica, df_tarifa,
                                    left_on=['nr_ano_referencia', # ano
                                             'nr_mes_referencia', # mes
                                             'sg_empresa_icao', # empresa
                                             'sg_icao_origem', # origem
                                             'sg_icao_destino' # destino
                                             ],
                                    right_on=['ano', 'mes', 'empresa', 'origem', 'destino'])

        print(ano, df_basica_tarifa.shape, end=' ')

        #####################################
        cotacoes = pd.read_csv(path_datalake+'resource/'+'cotacao_mensal.csv', decimal=',')[['Data','Último']]
        cotacoes.columns = ['mes_abrv_ano_abrv','dolar']

        cotacoes['ano_mes'] = cotacoes['mes_abrv_ano_abrv'].apply(editar_mes_ano)
        cotacoes.drop(columns='mes_abrv_ano_abrv', inplace=True)
        #####################################

        df_basica_tarifa['ano_mes'] = df_basica_tarifa['dt_referencia'].apply(lambda x: str(x)[0:7])
        df_final = pd.merge(df_basica_tarifa, cotacoes, left_on='ano_mes', right_on='ano_mes')

        df_final.columns = [re.sub('_referencia', '', cl) for cl in df_final.columns]

        # Selecionar campos de interesse:
        selecao = ['dt', 'nr_ano', 'nr_mes', 'nr_dia', 'nm_dia_semana', 'sg_empresa_icao',
                   'hr_partida_real', 'hr_chegada_real', 'sg_icao_origem',
                   'sg_icao_destino', 'nm_regiao_origem', 'nm_regiao_destino',
                   'sg_uf_origem', 'sg_uf_destino', 'sg_equipamento_icao', 'km_distancia',
                   'nr_horas_voadas', 'lt_combustivel', 'nr_assentos_ofertados',
                   'nr_passag_pagos', 'nr_passag_gratis', 'kg_payload', 'kg_peso',
                   'kg_carga_paga', 'kg_carga_gratis', 'kg_correio', 'nr_ask', 'nr_rpk',
                   'nr_atk', 'nr_rtk', 'lf_passag', 'lf_peso', 'rend_combustivel',
                   'dolar', 'ticket_medio']
        df_final = df_final[selecao]

        print(df_final.shape)
        df_final.to_csv(path_bis+f'basica_passagem_dolar_{ano}.csv', sep=';', encoding='latin-1', index=False)


In [None]:
tranform_bis()

2016 (774905, 44) (774905, 35)
2017 (751364, 44) (751364, 35)
2018 (776216, 44) (776216, 35)
2019 (746763, 44) (746763, 35)
2020 (353811, 44) (353811, 35)
2021 (283202, 44) (283202, 35)


In [None]:
%%time

# Concatenar bases (todos os anos):
path_bis = path_datalake+f'business/'

df = ler_bases('basica', path_bis)
df.to_csv(path_bis+f'basica_passagem_dolar_completa_2016-2021.csv', sep=';', encoding='latin-1', index=False)

df.shape

CPU times: user 1min 36s, sys: 5.06 s, total: 1min 41s
Wall time: 1min 47s


In [None]:
df.sample(5)

Unnamed: 0,dt,nr_ano,nr_mes,nr_dia,nm_dia_semana,sg_empresa_icao,hr_partida_real,hr_chegada_real,sg_icao_origem,sg_icao_destino,nm_regiao_origem,nm_regiao_destino,sg_uf_origem,sg_uf_destino,sg_equipamento_icao,km_distancia,nr_horas_voadas,lt_combustivel,nr_assentos_ofertados,nr_passag_pagos,nr_passag_gratis,kg_payload,kg_peso,kg_carga_paga,kg_carga_gratis,kg_correio,nr_ask,nr_rpk,nr_atk,nr_rtk,lf_passag,lf_peso,rend_combustivel,dolar,ticket_medio
324084,2016-05-22,2016,5,22,DOM,TAM,21:42:00,23:04:00,SBSP,SBUL,SUDESTE,SUDESTE,SP,MG,A320,552.0,1.37,3824,174,114,3,16900,10200,0,0,0,96048.0,62928.0,9328.0,5506.0,0.655172,0.590266,0.1444,3.6105,231.0
383251,2018-07-31,2018,7,31,TER,AZU,11:14:00,12:46:00,SBFN,SBRF,NORDESTE,NORDESTE,PE,PE,AT72,549.0,1.53,1216,70,69,1,6970,5817,12,0,0,38430.0,37881.0,3826.0,3152.0,0.985714,0.823837,0.4515,3.7557,384.0
225472,2019-04-13,2019,4,13,SAB,AZU,06:24:00,08:39:00,SBCF,SBFI,SUDESTE,SUL,MG,PR,E195,1267.0,2.25,5809,118,107,7,10745,9385,0,0,0,149506.0,135569.0,13613.0,11225.0,0.90678,0.824579,0.2181,3.9207,450.0
382719,2018-07-06,2018,7,6,SEX,AZU,18:55:00,20:09:00,SBRJ,SBKP,SUDESTE,SUDESTE,RJ,SP,E190,407.0,1.23,2665,106,51,7,11330,4706,24,0,0,43142.0,20757.0,4611.0,1701.0,0.481132,0.3689,0.1527,3.7557,420.0
196622,2017-04-19,2017,4,19,QUA,AZU,05:53:00,07:01:00,SBGR,SBCF,SUDESTE,SUDESTE,SP,MG,E195,497.0,1.13,2575,118,109,1,12548,9510,484,0,0,58646.0,54173.0,6236.0,4689.0,0.923729,0.751924,0.193,3.1758,214.0


_______________