#### Importando Bibliotecas Necessárias


Iciando a resposta dos itens extras, optei por fazer essa primeira etapa de extração utilizando Spark. 

Optei pelo uso desse framework pois ele escala bem e lida muito bem com uma variadade de dados, 
simplificando o processo de extração e tratamento em poucas linhas de código.
Ele ainda é eficiente dos pontos de vista de velocidade de processamento e custo. 

Outras Bibliotecas utilizadas foram SQLite3, Pandas, Numpy e Requests para simular uma operação produtiva.

A database utilizada com todas as tabelas e visões também vai em anaxo.

Todos os arquivos adicionais estão disponíveis no .zip e no repositório. [link]

In [17]:
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
import pandas as pd
import numpy as np
from utils import normalize_string
import requests
import sqlite3

#### Spark Session e SQLite Cursor

In [18]:
spark = SparkSession \
    .builder \
    .master("local[1]") \
    .appName("case_eleflow") \
    .config("spark.sql.parquet.compression.codec", "gzip")\
    .getOrCreate()

In [19]:
cnx = sqlite3.connect('database.db')
cur = cnx.cursor()

#### Tratamento VRA - RAW STAGE

Para a normalização dos cabeçalhos foi criada a funcao 'normalize_string' que se encontra no arquivo 'utils.py'

O conceito da função se baseia no uso do biblioteca 'unicodedata' para remover acentos segundo a regra canônica dos caracters ASCII.

Em seguida a função usa uma Regra Regex (Regular Expression) para normalizar qualquer string fornecida como snake case.

In [20]:
vra = spark.read.format("json").options(header=True).load("VRA/*.json")

In [21]:
vra.show(1,vertical=True)

-RECORD 0-----------------------------------
 ChegadaPrevista      | 2021-11-12 08:30:00 
 ChegadaReal          | 2021-11-12 08:24:00 
 CódigoAutorização    | 0                   
 CódigoJustificativa  | N/A                 
 CódigoTipoLinha      | X                   
 ICAOAeródromoDestino | KORD                
 ICAOAeródromoOrigem  | SBGR                
 ICAOEmpresaAérea     | UAL                 
 NúmeroVoo            | 0844                
 PartidaPrevista      | 2021-11-11 22:00:00 
 PartidaReal          | 2021-11-11 22:14:00 
 SituaçãoVoo          | REALIZADO           
only showing top 1 row



In [22]:
for column in vra.columns:
    vra = vra.withColumnRenamed(column,normalize_string(column))

In [23]:
vra.columns

['chegada_prevista',
 'chegada_real',
 'codigo_autorizacao',
 'codigo_justificativa',
 'codigo_tipo_linha',
 'icao_aerodromo_destino',
 'icao_aerodromo_origem',
 'icao_empresa_aerea',
 'numero_voo',
 'partida_prevista',
 'partida_real',
 'situacao_voo']

O passo a seguir descreve a criação da camada de verdade (SSOT - Single Source of Truth) para os dados VRA. 

Como exemplo deixei comentado algumas possibilidades como salvar como parquet em uma cloud ou escrever com JDBC diretamente em um banco.

Para a resolução do case, decidi simplificar essa escrita usando Pandas para não haver a necessidade de baixar nenhum Jar por parte dos avaliadores.

Utilizo então o pandas para criar uma tabela numa database utilizando o SQLite3

In [24]:
#vra.write.parquet('bucket...') #Poderiamos escrever no Bucket
#vra.write.format('jdbc').mode('append').option('url','database.db').option('dbtable','vra').save() #Poderiamos escrever diretamento no Banco
df = vra.toPandas()
df.to_sql(name='vra',con=cnx, if_exists='replace', index=False)

535803

#### Tratamento AIR_CIA - RAW STAGE

In [25]:
air_cia = spark.read.format("csv").option("header","True").option("delimiter",";").load("AIR_CIA/*.csv")

In [26]:
air_cia.show(1,vertical=True)

-RECORD 0----------------------------------------
 Razão Social             | ABSA - AEROLINHAS... 
 ICAO IATA                | LTG M3               
 CNPJ                     | 00.074.635/0001-33   
 Atividades Aéreas        | TRANSPORTE AÉREO ... 
 Endereço Sede            | AEROPORTO INTERNA... 
 Telefone                 | (11) 5582-8055       
 E-Mail                   | gar@tam.com.br       
 Decisão Operacional      | DECISÃO Nº 41        
 Data Decisão Operacional | 22/04/2015           
 Validade Operacional     | 23/04/2025           
only showing top 1 row



In [27]:
for column in air_cia.columns:
    air_cia = air_cia.withColumnRenamed(column,normalize_string(column))

In [28]:
air_cia.columns

['razao_social',
 'icao_iata',
 'cnpj',
 'atividades_aereas',
 'endereco_sede',
 'telefone',
 'e_mail',
 'decisao_operacional',
 'data_decisao_operacional',
 'validade_operacional']

In [29]:
splitted = split(air_cia['icao_iata'], ' ')
air_cia = air_cia.withColumn('icao',splitted.getItem(0))\
            .withColumn('iata',splitted.getItem(1))\

air_cia = air_cia.drop('icao_iata')   
air_cia.show(1, vertical=True)

-RECORD 0----------------------------------------
 razao_social             | ABSA - AEROLINHAS... 
 cnpj                     | 00.074.635/0001-33   
 atividades_aereas        | TRANSPORTE AÉREO ... 
 endereco_sede            | AEROPORTO INTERNA... 
 telefone                 | (11) 5582-8055       
 e_mail                   | gar@tam.com.br       
 decisao_operacional      | DECISÃO Nº 41        
 data_decisao_operacional | 22/04/2015           
 validade_operacional     | 23/04/2025           
 icao                     | LTG                  
 iata                     | M3                   
only showing top 1 row



In [30]:
#air_cia.write.parquet('bucket') #Poderiamos escrever no Bucket
#air_cia.write.format('jdbc').mode('append').option('url','database.db').option('dbtable','air_cia').save() #Poderiamos escrever diretamento no Banco
df = air_cia.toPandas()
df.to_sql(name='air_cia',con=cnx, if_exists='replace', index=False)


20

### Criacao Tabela Aerodromos - RAW STAGE

O primeiro passo consiste na obtenção das chaves únicas para cada um dos aeródromos. 



In [31]:
df = pd.read_sql('SELECT * FROM vra', con=cnx)
destino = df.icao_aerodromo_destino.unique()
origem = df.icao_aerodromo_origem.unique()
icao = np.concatenate([origem,destino])
icao = np.unique([icao])

Parametros do Endpoint Rapid Api

Para o teste existe a necessidade de substituir a "X-RapidAPI-Key", por se tratar de credencial. 

In [41]:
url = "https://airport-info.p.rapidapi.com/airport"

headers = {
	"X-RapidAPI-Key": "31a41afedemsheb80e711e7f6f38p14eb9bjsnc79ea29a2c93",
	"X-RapidAPI-Host": "airport-info.p.rapidapi.com"
}

O script a seguir utiliza Python puro, pandas e SQLite3 para criar a base aeródromos.

Num cenário produtivo poderia ser utilizado spark ou outro framework para escrever diretamento no Banco.

No caso de Teste primeiro execute o drop table da tabela aerodromos para recriá-la com base na extração da API. Do contrário a mesma já se encontra no arquivo 'database,db'

In [42]:
#cur.execute('drop table aerodromos')

<sqlite3.Cursor at 0x1a6cbaf9040>

In [43]:
errors=[]
for code in icao:
    querystring = {"icao":f"{code}"}
    try:
        response = requests.request("GET", url, headers=headers, params=querystring).json()
        aero = pd.DataFrame([response])
        aero.to_sql(name='aerodromos', con=cnx, if_exists='append', index=False)
    except:
        errors.append(code)
        continue
    

### Criacao da Visao Best Routes - SSOT to VIEW
Com os dados referentes as rotas mais movimentadas por compania aerea

O passo a seguir, executa a modelagem da base via os script "route.sql" que está anexado.

Esse script é executado e então usamos Pandas para gravar essa visão no banco.

In [44]:
query = open('route.sql', 'r')
best_routes = pd.read_sql_query(query.read(), con=cnx)
query.close() 

In [45]:
best_routes.to_sql(name='best_routes',con=cnx, if_exists='replace', index=False)
best_routes.head()

Unnamed: 0,razao_social,aeroporto_origem,icao_aerodromo_origem,estado_origem,aeroporto_destino,icao_aerodromo_destino,estado_destino
0,AZUL CONECTA LTDA. (EX TWO TAXI AEREO LTDA),Coari Airport,SWKO,State of Amazonas,Eduardo Gomes International Airport,SBEG,Amazonas
1,AEROSUL TÁXI AÉREO LTDA (EX.: AUSTEN TÁXI AÉRE...,Hercílio Luz International Airport,SBFL,Santa Catarina,Caçador Airport,SBCD,Santa Catarina
2,AZUL LINHAS AÉREAS BRASILEIRAS S/A,Viracopos/Campinas International Airport,SBKP,São Paulo,Santos Dumont Airport,SBRJ,Rio de Janeiro
3,GOL LINHAS AÉREAS S.A. (EX- VRG LINHAS AÉREAS ...,Santos Dumont Airport,SBRJ,Rio de Janeiro,São Paulo–Congonhas Airport,SBSP,São Paulo
4,ABSA - AEROLINHAS BRASILEIRAS S.A.,Mariscal Sucre International Airport,SEQM,Pichincha,Miami International Airport,KMIA,Florida


### Criacao da Visao Significant Air Companies
Com os dados referentes as pricipais companias aereas em cada aeroporto

Como no caso anterior executamos a modelagem da base via os script "significant.sql" que está anexado.

Esse script é executado e então usamos Pandas para gravar essa visão no banco.

In [46]:
query = open('significant.sql', 'r')
best_routes = pd.read_sql_query(query.read(), con=cnx)
query.close() 

In [47]:
best_routes.to_sql(name='air_cia_per_airport',con=cnx, if_exists='replace', index=False)
best_routes.head()

Unnamed: 0,nome_aeroporto,icao_aeroporto,cia_area,n_destinos,n_origens,total_voos
0,Viracopos/Campinas International Airport,SBKP,AZUL LINHAS AÉREAS BRASILEIRAS S/A,69,70,83423
1,São Paulo–Guarulhos International Airport,SBGR,TAM LINHAS AÉREAS S.A.,65,66,66354
2,Tancredo Neves International Airport (Confins ...,SBCF,AZUL LINHAS AÉREAS BRASILEIRAS S/A,45,45,41939
3,Recife/Guararapes–Gilberto Freyre Internationa...,SBRF,AZUL LINHAS AÉREAS BRASILEIRAS S/A,34,35,40991
4,Brasília International Airport (Presidente J. ...,SBBR,TAM LINHAS AÉREAS S.A.,37,37,32344
