<a href="https://colab.research.google.com/github/marcelpinheiro/projetas/blob/main/Projetas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


## Instalando o PySpark no Google Colab

Instalar o PySpark não é um processo direto como de praxe em Python. Não basta usar um pip install apenas. Na verdade, antes de tudo é necessário instalar dependências como o Java 8, Apache Spark 2.3.2 junto com o Hadoop 2.7.

In [1]:
#instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q inflection
!pip install unidecode

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting unidecode
  Downloading Unidecode-1.3.4-py3-none-any.whl (235 kB)
[K     |████████████████████████████████| 235 kB 5.2 MB/s 
[?25hInstalling collected packages: unidecode
Successfully installed unidecode-1.3.4


A próxima etapa é configurar as variáveis de ambiente, pois isso habilita o ambiente do Colab a identificar corretamente onde as dependências estão rodando.

Para conseguir “manipular” o terminal e interagir como ele, você pode usar a biblioteca os.

In [2]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.functions import split


import inflection
import re
import unidecode

sc = SparkSession.builder.master('local[*]').getOrCreate()

In [3]:
# download do http para arquivo local
!wget --quiet --show-progress https://github.com/marcelpinheiro/projetas/raw/main/AIR_CIA.rar
!wget --quiet --show-progress https://github.com/marcelpinheiro/projetas/raw/main/VRA.rar

#Descompactando arquivos
!unrar x /content/VRA.rar
!unrar x /content/AIR_CIA.rar






UNRAR 5.50 freeware      Copyright (c) 1993-2017 Alexander Roshal


Extracting from /content/VRA.rar

Creating    VRA                                                       OK
Extracting  VRA/VRA_20211.json                                            10%  OK 
Extracting  VRA/VRA_202110.json                                           21%  OK 
Extracting  VRA/VRA_202111.json                                           34%  OK 
Extracting  VRA/VRA_20212.json                                            42%  OK 
Extracting  VRA/VRA_20213.json                                            49%  OK 
Extracting  VRA/VRA_20214.json                                            55%  OK 
Extracting  VRA/VRA_20215.json                                            62%  OK 
Extracting  VRA/VRA_20216.json                                            69%  OK 
Extracting  VRA/VRA_20217.json                                            79%  

# Views

In [4]:
df_air_cia= sc.read.option("recursiveFileLookup","true").option("header","true").option("delimiter", ";").csv("/content/AIR_CIA")


Com tudo pronto, vamos rodar uma sessão local para testar se a instalação funcionou corretamente.

## VRA


In [5]:
df_vra = sc.read.json("/content/VRA/*.json")



for col in df_vra.columns:    
  df_vra = df_vra.withColumnRenamed(col,unidecode.unidecode(col))
  df_vra = df_vra.withColumnRenamed(col,inflection.underscore(col))

df_vra.registerTempTable("vra")
df_vra = sc.sql("select * from vra")
df_vra.show()

+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|   chegada_prevista|       chegada_real|CodigoAutorizacao|CodigoJustificativa|CodigoTipoLinha|ICAOAerodromoDestino|ICAOAerodromoOrigem|ICAOEmpresaAerea|NumeroVoo|   partida_prevista|       partida_real|SituacaoVoo|
+-------------------+-------------------+-----------------+-------------------+---------------+--------------------+-------------------+----------------+---------+-------------------+-------------------+-----------+
|2021-11-12 08:30:00|2021-11-12 08:24:00|                0|                N/A|              X|                KORD|               SBGR|             UAL|     0844|2021-11-11 22:00:00|2021-11-11 22:14:00|  REALIZADO|
|2021-11-15 08:30:00|2021-11-15 08:05:00|                0|                N/A|              X|                KORD|               SBGR|

## AIR_CIA

In [6]:
df_air_cia = sc.read.option("delimiter",";").option("header","true").csv("/content/AIR_CIA/*.csv")

# Dividindo a coluna ICAO IATA
df_air_cia = df_air_cia.withColumn('icao', split(df_air_cia['ICAO IATA'], ' ').getItem(0)) \
       .withColumn('iata', split(df_air_cia['ICAO IATA'], ' ').getItem(1)) 
df_air_cia = df_air_cia.drop('ICAO IATA')


for col in df_air_cia.columns:
    df_air_cia = df_air_cia.withColumnRenamed(col,unidecode.unidecode(col))    
    df_air_cia = df_air_cia.withColumnRenamed(col, re.sub(r'(?<!^)(\s)(?=[A-Z])', '_', col).lower())    

for col in df_air_cia.columns:    
    
    df_air_cia = df_air_cia.withColumnRenamed(col, re.sub(r' ', '', col))


df_air_cia.registerTempTable("air_cia")
df_air_cia = sc.sql("select * from air_cia")


In [7]:
df_air_cia.show(50)

+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+----+----+
|         RazaoSocial|              cnpj|    AtividadesAereas|        EnderecoSede|            telefone|              e-mail|  DecisaoOperacional|DataDecisaoOperacional|validade_operacional|icao|iata|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+----+----+
|ABSA - AEROLINHAS...|00.074.635/0001-33|TRANSPORTE AÉREO ...|AEROPORTO INTERNA...|      (11) 5582-8055|      gar@tam.com.br|       DECISÃO Nº 41|            22/04/2015|          23/04/2025| LTG|  M3|
|AEROSUL TÁXI AÉRE...|27.315.694/0001-02|SERVIÇOS AÉREOS P...|RODOVIA PR 218, K...|      (43) 3176-4030|operacoes@aerosul...|      DECISÃO Nº 282|            10/02/2021|                null| ASO| 

In [8]:
import requests

def consultaAerodromos(icao):
  url = "https://airport-info.p.rapidapi.com/airport"

  querystring = {"icao":icao}

  headers = {
    "X-RapidAPI-Host": "airport-info.p.rapidapi.com",
    "X-RapidAPI-Key": "31cc9d652dmsh7cd96c1805671fap19380djsnb46510a1d18c"
  }

  response = requests.request("GET", url, headers=headers, params=querystring)
  return response 

## API

In [None]:
import json

icao_list = df_vra.select("ICAOAerodromoOrigem").distinct().rdd.flatMap(lambda x: x).collect()
icao_list2 = df_vra.select("ICAOAerodromoDestino").distinct().rdd.flatMap(lambda x: x).collect()
icao_list_final = icao_list + icao_list2

# icao_list_final = ['KORD']

listay = []

for i in icao_list_final:     
  listay.append(json.loads(consultaAerodromos(i).text))

schema = StructType([ \
  StructField("city",StringType(),True), \
  StructField("country",StringType(),True), \
  StructField("country_iso",StringType(),True), \
  StructField("county",StringType(),True), \
  StructField("iata",StringType(),True), \
  StructField("icao",StringType(),True), \
  StructField("id",StringType(),True), \
  StructField("latitude",StringType(),True), \
  StructField("location",StringType(),True), \
  StructField("longitude",StringType(),True), \
  StructField("name",StringType(),True), \
  StructField("phone",StringType(),True), \
  StructField("postal_code",StringType(),True), \
  StructField("state",StringType(),True), \
  StructField("street",StringType(),True), \
  StructField("street_number",StringType(),True), \
  StructField("uct",StringType(),True), \
  StructField("website",StringType(),True), \
  ])


dataframe = sc.createDataFrame(data=listay,schema=schema)
# dataframe.show()

dataframe.registerTempTable("aerodromos")
dataframe = sc.sql("select * from aerodromos")
dataframe.show()


Criar as seguintes views (Priorize o uso de SQL para esta parte):
Para cada companhia aérea trazer a rota(origem-destino) mais utilizada com as seguintes informações:
	Razão social da companhia aérea
	Nome Aeroporto de Origem
	ICAO do aeroporto de origem
	Estado/UF do aeroporto de origem
	Nome do Aeroporto de Destino
	ICAO do Aeroporto de destino
	Estado/UF do aeroporto de destino

In [None]:
df_vra = sc.sql(
"""
Select  ICAOEmpresaAerea,RazaoSocial, AeroportoOrigem, ICAOAerodromoOrigem, AeroportoOrigemEstado, AeroportoDestino, ICAOAerodromoDestino, AeroportoDestinoEstado from (
select
    a.RazaoSocial,
    ae.name AeroportoOrigem,
    ICAOEmpresaAerea,
    ICAOAerodromoOrigem,
    ae.state AeroportoOrigemEstado,
    ae2.name AeroportoDestino,
    ICAOAerodromoDestino,
    ae2.state AeroportoDestinoEstado,
    ROW_NUMBER() OVER(PARTITION BY ICAOEmpresaAerea order by count(concat(ICAOAerodromoOrigem,ICAOAerodromoDestino)) desc) NrLinha,
    MAX(count(concat(ICAOAerodromoOrigem,ICAOAerodromoDestino))) OVER(PARTITION BY ICAOEmpresaAerea, ICAOAerodromoOrigem, ICAOAerodromoDestino) origem_destino_counter
    from vra v
    Left join air_cia a on a.icao = v.ICAOEmpresaAerea
    Left join aerodromos ae on ae.icao = v.ICAOAerodromoOrigem
    Left join aerodromos ae2 on ae2.icao = v.ICAOAerodromoDestino

    group by ICAOEmpresaAerea, ICAOAerodromoOrigem, ICAOAerodromoDestino, a.RazaoSocial,ae.name, ae.state, AeroportoDestino, AeroportoOrigemEstado, AeroportoDestinoEstado
    order by ICAOEmpresaAerea) t
  
  where NrLinha = 1
   """)
df_vra.show(50)



In [None]:
df = sc.sql("""
  select * from
  (select 
  ICAOAerodromoOrigem Aeroporto,
  ICAOEmpresaAerea CiaArea, 
  count(ICAOEmpresaAerea) as TotalPousoDecolagem,
  count(NumeroVoo) OVER(PARTITION BY ICAOAerodromoOrigem) QtdRotasAPartirDoAeroporto,
  count(NumeroVoo) OVER(PARTITION BY ICAOAerodromoDestino) QtdRotasDestinoAeroporto,
  ROW_NUMBER() OVER(PARTITION BY ICAOAerodromoOrigem order by count(ICAOEmpresaAerea) desc ) NrLinha
  from vra 
  where CAST(partida_real as date) between '2021-01-01' and '2021-12-31'
  group by ICAOAerodromoOrigem, ICAOEmpresaAerea, NumeroVoo,ICAOAerodromoDestino)t
  where NrLinha = 1 """)
  
df.show()