### Carga e visualização dos dados

In [2]:
# importo biblioteca Pandas 
import pandas as pd

In [4]:
# carrego os meus dados num DataFrame (df)
raw_data = pd.read_csv(
    "/Users/anamariarodrigues/Desktop/BIG_DATA/Projeto_Final/Air_Traffic_Passenger_Statistics.csv")

raw_data.head()

Unnamed: 0,Activity Period,Operating Airline,Operating Airline IATA Code,Published Airline,Published Airline IATA Code,GEO Summary,GEO Region,Activity Type Code,Price Category Code,Terminal,Boarding Area,Passenger Count,Adjusted Activity Type Code,Adjusted Passenger Count,Year,Month
0,200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Deplaned,Low Fare,Terminal 1,B,27271,Deplaned,27271,2005,July
1,200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Enplaned,Low Fare,Terminal 1,B,29131,Enplaned,29131,2005,July
2,200507,ATA Airlines,TZ,ATA Airlines,TZ,Domestic,US,Thru / Transit,Low Fare,Terminal 1,B,5415,Thru / Transit * 2,10830,2005,July
3,200507,Air Canada,AC,Air Canada,AC,International,Canada,Deplaned,Other,Terminal 1,B,35156,Deplaned,35156,2005,July
4,200507,Air Canada,AC,Air Canada,AC,International,Canada,Enplaned,Other,Terminal 1,B,34090,Enplaned,34090,2005,July


In [None]:
# dados estatísticos das colunas numéricas
raw_data.describe()

In [None]:
# informação geral do df
raw_data.info()

In [None]:
raw_data["GEO Summary"].unique()

In [None]:
raw_data["Price Category Code"].unique()

In [None]:
raw_data["Boarding Area"].unique()

In [None]:
# visualização valores nulos
raw_data.isnull().sum()

In [None]:
# visualizo os registos que tem valor nulo na coluna referida
raw_data[raw_data["Operating Airline IATA Code"].isnull()]

In [None]:
# valores NC existentes 
raw_data[raw_data["Operating Airline IATA Code"] == "NC"]

In [6]:
# preencho valores nulos
df = raw_data.fillna("NC")
df.isnull().sum()

Activity Period                0
Operating Airline              0
Operating Airline IATA Code    0
Published Airline              0
Published Airline IATA Code    0
GEO Summary                    0
GEO Region                     0
Activity Type Code             0
Price Category Code            0
Terminal                       0
Boarding Area                  0
Passenger Count                0
Adjusted Activity Type Code    0
Adjusted Passenger Count       0
Year                           0
Month                          0
dtype: int64

In [None]:
df["Month"].unique()

In [8]:
df["Month Number"] = df["Month"].replace({"January": 1, "February" : 2,
                                        "March": 3, "April":4,
                                        "May":5, "June": 6, "July": 7,
                                        "August": 8, "September": 9,
                                        "October": 10, "November": 11,
                                        "December": 12                                                
                                          })
df["Month Number"] = df["Month Number"].astype("int")

  df["Month Number"] = df["Month"].replace({"January": 1, "February" : 2,


In [10]:
# Apago coluna duplicada
df.drop(columns = ["Month"], inplace = True)
df.rename(columns = {"Month Number" : "Month"}, inplace = True)

In [None]:
df["Month"].head()

### Conexão com MongoDB

In [12]:
# importo bibliotecas
from pymongo import MongoClient
from json import loads

#### Transfromação para JSON

In [14]:
# transformo o DF para o formato JSON
df_json = df.to_json(orient = "records")
parsed = loads(df_json)

In [24]:
parsed[0]

{'Activity Period': 200507,
 'Operating Airline': 'ATA Airlines',
 'Operating Airline IATA Code': 'TZ',
 'Published Airline': 'ATA Airlines',
 'Published Airline IATA Code': 'TZ',
 'GEO Summary': 'Domestic',
 'GEO Region': 'US',
 'Activity Type Code': 'Deplaned',
 'Price Category Code': 'Low Fare',
 'Terminal': 'Terminal 1',
 'Boarding Area': 'B',
 'Passenger Count': 27271,
 'Adjusted Activity Type Code': 'Deplaned',
 'Adjusted Passenger Count': 27271,
 'Year': 2005,
 'Month': 7,
 '_id': ObjectId('67f8fe255435822a73f3ec8c')}

#### Função e inserção de objetos

In [20]:
# função para inserir vários documentos
def insert_many(uri, db, collection, docs):
    client = MongoClient(uri)
    try:
        database = client.get_database(db)
        collection = database.get_collection(collection)
        collection.insert_many(docs)
        client.close()
        print("Dados inseridos com sucesso")
    except Exception as e:
        print(f"Erro durante a conexão: {e}")

In [22]:
# aplico função para inserir
uri = "mongodb://localhost:27017/"
db = "tokiodb"
collection = "sf_aeroporto"

insert_many(uri, db, collection, parsed)

Dados inseridos com sucesso


In [58]:
from pymongo import MongoClient

uri = "mongodb://localhost:27017/"
client = MongoClient(uri)

try:
    database = client.get_database("mydb")
    users = database.get_collection("users")

    users.insert_many(parsed)

    client.close()
    print("Insersão com sucesso")
except Exception as e:
    print(f"Erro durante a conexão: {e}")

Insersão com sucesso


In [None]:
import psycopg2

# faço conexão com a minha Base de Dados
conexao = psycopg2.connect(
     dbname = "tokio_viagens",
     user = "postgres",
     host = "localhost",
     port = 5432)

# para operar na DB abro um 'cursor' 
cursor = conexao.cursor()

# crio loop for para inserir dados na Base de Dados
for _, row in df.iterrows():
    cursor.execute("""INSERT INTO sf_aeroporto (
        periodo, empresa_op, iata_op ,empresa_com, iata_com, 
        geo_info, geo_regiao, estado, categoria, terminal, 
        porta, passageiros, estado_atual, pass_atual, ano, mes) 
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
        """, tuple(row)
                  )

# gero o comit e finalizo as conexões com a minha DB
conexao.commit()
cursor.close()
conexao.close()

print("Dados inseridos com sucesso")

### Trabalhar dados com PySpark

In [None]:
# importação bibliotecas
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

In [None]:
# crio sessão Spark
spark = SparkSession.builder.appName("Tokio_Airline").getOrCreate()

In [None]:
# crio df do spark, partindo do df criado de Pandas
df_sp = spark.createDataFrame(df)

### Exercício 3

In [None]:
# número de companhias existentes no df
df_sp.select(f.count_distinct("Operating Airline")\
            .alias("Companhias operacionais"),
            f.count_distinct("Published Airline")\
            .alias("Companhias comerciais"))\
            .show()

In [None]:
# média de passageiros por período e companhias
df_avg = df_sp.groupBy(["Activity Period", "Operating Airline"])\
                        .agg(f.round(f.avg("Adjusted Passenger Count"))\
                        .alias("Avg Passenger")
                            )\
                        .orderBy("Activity Period", ascending = False)
df_avg.show(5)

In [None]:
# salvar resultados num ficheiro .csv
df_avg.write.mode("overwrite")\
        .option("header","true")\
        .csv("media_passageiros.csv")

In [None]:
# apagando duplicados pela região
geo_regiao = df_sp.orderBy("Adjusted Passenger Count", ascending = False)\
                .dropDuplicates(["GEO Region"])

In [None]:
geo_regiao.select("Operating Airline", "GEO Region", "Adjusted Passenger Count")\
        .orderBy("Adjusted Passenger Count", ascending = False)\
        .show()

In [None]:
geo_regiao.write.mode("overwrite")\
        .option("header","true")\
        .csv("geo_regiao_unique.csv")

### Análise descritiva com PySpark

In [None]:
df_sp.printSchema()

#### Médias e Desvio padrão

In [None]:
# média de passageiros agrupados por companhias
df_sp.groupBy("Operating Airline")\
    .agg(
        f.round(f.avg("Adjusted Passenger Count")).alias("Media de passageiros"),
        f.round(f.stddev("Adjusted Passenger Count")).alias("Desvio padrão"),
        f.max("Adjusted Passenger Count").alias("Max"),
        f.min("Adjusted Passenger Count").alias("Min")
    )\
    .orderBy("Media de passageiros", ascending = False)\
    .show(15)

In [None]:
# resutlados agrupados por região
df_sp.groupBy("GEO Region")\
    .agg(
        f.round(f.avg("Adjusted Passenger Count")).alias("Media passageiros"),
        f.round(f.stddev("Adjusted Passenger Count")).alias("Desvio padrão"),
        f.max("Adjusted Passenger Count").alias("Max"),
        f.min("Adjusted Passenger Count").alias("Min"),
        f.median("Adjusted Passenger Count").alias("Mediana")
    )\
    .orderBy("Media passageiros", ascending = False)\
    .show()

In [None]:
# cálculo de valores onde o destino é 'US'
df_sp.filter(df_sp["GEO Region"] == "US")\
    .groupBy("Month")\
    .agg(
        f.round(f.avg("Adjusted Passenger Count")).alias("Media passageiros"),
        f.round(f.stddev("Adjusted Passenger Count")).alias("Desvio padrão"),
        f.max("Adjusted Passenger Count").alias("Max"),
        f.min("Adjusted Passenger Count").alias("Min"),
    )\
    .orderBy("Month", ascending = True)\
    .show()

In [None]:
# análise descritiva agrupado por tipo de voo
df_sp.groupBy("GEO Summary")\
    .agg(
        f.round(f.avg("Adjusted Passenger Count")).alias("Media passageiros"),
        f.round(f.stddev("Adjusted Passenger Count")).alias("Desvio padrão"),
        f.max("Adjusted Passenger Count").alias("Max"),
        f.min("Adjusted Passenger Count").alias("Min"),
        f.median("Adjusted Passenger Count").alias("Mediana")
    )\
    .orderBy("Media passageiros", ascending = False)\
    .show()

In [None]:
# gráfico auxiliar para compreensão no tipo de voos
avg_pass = df_sp.groupby("GEO Summary")\
        .agg(f.round(f.avg("Adjusted Passenger Count"))\
             .alias("Media Passageiros")).toPandas()

avg_pass.plot(kind = "bar",
              figsize = (5,4),
              x = "GEO Summary",
              xlabel = "Tipo de voo",
              y = "Media Passageiros",
              legend = False,
              title = "Media de passageiros por tipo de voo")

In [None]:
# crio tabela pivot para dividir os dados pelo tipo de voo
pivot = df_sp.groupBy("Month")\
        .pivot("GEO Summary")\
        .agg(f.count("GEO Summary")\
        .alias("Contagem"))\
        .orderBy("Month")

pivot.show()

In [None]:
import matplotlib.pyplot as plt
# gráfico comparativo na frequência de voos
pivot_pd = pivot.toPandas()

y_values = pivot_pd["International"].values.tolist()
y_values2 = pivot_pd["Domestic"].values.tolist()

plt.figure(figsize = (6,4))
plt.plot(y_values, "red", label = "Internacionais")
plt.plot(y_values2, "blue", label ="Nacionais")
plt.title("Frequência de voos")
plt.legend()
plt.show()

In [None]:
# gráfico auxiliar da influéncia sazonal nos voos internacionais
pandas_df = df_sp.filter(f.col("GEO Summary") == "International")\
        .groupBy("Month")\
        .agg(f.round(f.avg("Adjusted Passenger Count"))\
             .alias("Media Passageiros"))\
        .orderBy("Month").toPandas()

y = pandas_df["Media Passageiros"].tolist()

plt.figure(figsize = (8,5))
plt.plot(y)
plt.title("Média de Passageiros por Mês")
plt.xlabel("Month")
plt.show()

#### Distribuição dos dados

In [None]:
# distribuição de tipo de voos
dis_geo = df_sp.groupBy("GEO Summary")\
        .agg(f.count("GEO Summary").alias("Freq Absoluta"),\
             f.round(f.count("GEO Summary") * 100 / 15007 ,2)\
             .alias("Freq Relativa"))

dis_geo.show()

In [None]:
# grafico de barras
pandas_df = dis_geo.toPandas()
pandas_df.plot(kind = "bar",
                x= "GEO Summary", 
                xlabel = "Tipo de voo",
                y= "Freq Relativa",
                figsize = (5,4),
                title = "Frequencia de tipo de voos",
                legend = False,
                ylim = (0, 75))

In [None]:
dis_reg = df_sp.filter(df_sp["GEO Summary"] == "International")\
    .groupBy("GEO Region")\
    .agg(f.count("GEO Region").alias("Freq Absoluta"),\
        f.round(f.count("GEO Region") * 100 / 15007, 2)\
        .alias("Freq Relativa"))\
    .orderBy("Freq Absoluta", ascending = False)

dis_reg.show()

In [None]:
import seaborn as sns
# gráfico da distribuição de regiões internacionais
reg_pd = dis_reg.toPandas()

plt.figure()
sns.barplot(x = "GEO Region", y = "Freq Absoluta", data = reg_pd)
plt.title("Distribuição voos Internacionais")
plt.ylabel("Frequência")
plt.xticks(rotation = 35, fontsize = "small")
plt.show()

### Análise de correlação entre variáveis

In [None]:
from pyspark.ml.feature import StringIndexer

In [None]:
#Defino colunas que vou utilizar na matriz de correlação
vars = ["Operating Airline", "Published Airline",
        "Activity Period", "GEO Summary", "GEO Region", 
        "Activity Type Code", "Price Category Code",
        "Terminal", "Boarding Area", "Adjusted Passenger Count", 
        "Year", "Month"]

# crio copia do df com as variáveis selecionadas
corr = df_sp.select(vars)
corr.printSchema()

In [None]:
input = ["Operating Airline", "Published Airline", "GEO Summary", "GEO Region", 
        "Activity Type Code", "Price Category Code",
        "Terminal", "Boarding Area"]
output = ["operator_ind", "published_ind ", "geo_summary_index", "region_index", 
        "activity_index", "price_index",
        "terminal_index", "boarding_index"]

# defino o objeto StringIndexer e treino ele com os meus dados
indexer = StringIndexer(inputCols = input, outputCols = output)
si_model = indexer.fit(corr)

# transformo o df para valores numéricos
corr = si_model.transform(corr)

In [None]:
# visualizo frequência de variáveis segundo StringIndexer 
for i in si_model.labelsArray:
    print (i)

In [None]:
import numpy as np
# apago colunas com valores em formato string
corr = corr.drop("Operating Airline", "Published Airline",
                 "Activity Period" ,"GEO Summary", "GEO Region", 
                 "Activity Type Code", "Price Category Code",
                 "Terminal", "Boarding Area")

corr_pd = corr.toPandas()

# calculo matriz de correlação
matriz = np.corrcoef(corr_pd, rowvar = False)

In [None]:
# crio mapa de calor para visualizar correalações
plt.figure(figsize=(11,6))
sns.heatmap(matriz, cmap= "coolwarm", annot = True, fmt = ".2f", linewidths = 0.5)
plt.title("Mapa calor de correlação entre variáveis")
plt.xlabel("Passageiros - Ano - Mês - Operacão - Publicidade - Tipo - Região - Estado - Categoria - Terminal - Sala")
plt.show()

### Modelo de Machine Learning Agrupamentos Hierárquicos

In [None]:
# importo as bibliotecas necessárias
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler

In [None]:
inputs = ["Operating Airline", "Published Airline", "GEO Summary", "GEO Region", 
         "Activity Type Code", "Price Category Code",
         "Terminal", "Boarding Area"] 
outputs = ["operator_ind", "published_ind ", "geo_summary_index", "region_index", 
        "activity_index", "price_index",
        "terminal_index", "boarding_index"]

indexer = StringIndexer(inputCols = inputs, outputCols = outputs)
ind_modelo = indexer.fit(df_sp)

indexer_df = ind_modelo.transform(df_sp)

In [None]:
features = ["Activity Period", "operator_ind", "published_ind ", 
            "geo_summary_index", "region_index", "activity_index", "price_index",
            "terminal_index", "boarding_index", "Month", "Adjusted Passenger Count"]

assembler = VectorAssembler(inputCols = features, outputCol = "features")
va_df = assembler.transform(indexer_df)

#### Voos internacionais

In [None]:
inter = va_df.filter(f.col("GEO Summary") == "International")

In [None]:
# objeto para calcular o Silhouette
evaluator = ClusteringEvaluator(predictionCol = "prediction",
                                featuresCol = "features",
                                metricName = "silhouette")

In [None]:
# loop para conhecer eficácia de diferentes K
for k in range(3, 7):
    bkm = BisectingKMeans().setK(k).setSeed(1)
    modelo = bkm.fit(inter)
    prediction = modelo.transform(inter)
    silhouette = evaluator.evaluate(prediction)
    print(f"K={k}, Silhouette Score={silhouette}")

In [None]:
bkm = BisectingKMeans().setK(3).setSeed(1)
modelo = bkm.fit(inter)
prediction_inter = modelo.transform(inter)
silhouette = evaluator.evaluate(prediction_inter)
print(f"K=3, Silhouette Score={silhouette}")

print("Cluster Centers: ")
centers = modelo.clusterCenters()
for center in centers:
    print(center)

In [None]:
# tabela com centroides dos clusters
summary_df = prediction_inter.groupBy("prediction")\
    .agg(f.round(f.avg("Activity Period")).alias("Periodo"),
         f.round(f.avg("operator_ind"),2).alias("Operador"),
         f.round(f.avg("region_index"),2).alias("Regiao"),
         f.round(f.avg("activity_index"),2).alias("Estado"),
         f.round(f.avg("price_index")).alias("Categoria"),
         f.round(f.avg("terminal_index"),2).alias("Terminal"),
         f.round(f.avg("boarding_index"),2).alias("Embarque"),
         f.round(f.avg("Month")).alias("Mês"),
         f.round(f.avg("Adjusted Passenger Count")).alias("Media Passageiros"),
        ).orderBy("prediction").toPandas()

summary_df

In [None]:
# gráfico dendograma
from scipy.cluster.hierarchy import dendrogram, linkage

linkage = linkage(modelo.clusterCenters(), "single")

plt.figure(figsize =(5,3))
dendrogram(linkage)
plt.title("Agrupamento Hierárquico Bisecting KMeans")
plt.xlabel("Clusters")
plt.ylabel("Distância")
plt.show()

#### Exploração das propriedades

In [None]:
clusters = prediction_inter.groupBy("prediction")\
                .count()\
                .orderBy("prediction")\

clusters.show()

In [None]:
df_pandas = clusters.toPandas()

plt.figure(figsize = (5,3))
sns.barplot(x= "prediction", y= "count", data = df_pandas)
plt.title("Tamanho dos Clusters")
plt.xlabel("Clusters")
plt.ylabel("Registos")
plt.show()

In [None]:
prediction_inter.groupBy("prediction")\
    .agg(f.round(f.avg("Adjusted Passenger Count"))\
         .alias("média passageiros"),
         f.round(f.stddev("Adjusted Passenger Count"),2)\
         .alias("desvio"))\
    .orderBy("prediction").show()

In [None]:
df_pandas = prediction_inter.select("prediction", "Adjusted Passenger Count").toPandas()

plt.figure(figsize = (10,5))
sns.boxplot(x = "prediction", 
            y="Adjusted Passenger Count", 
            data = df_pandas,
            hue = "prediction",
            palette = "Set2")
plt.xlabel("Agrupamentos")
plt.ylabel("Numero de Passageiros")
plt.title("Número de Passageiros por Agrupamento")
plt.grid(True)
plt.show()

In [None]:
sazonal =prediction_inter.groupBy("prediction", "Month")\
            .agg(f.count("Month").alias("Frequencia"))\
            .orderBy("prediction", "Month")
sazonal.show(5)

In [None]:
df_pandas = sazonal.toPandas()

plt.figure(figsize = (6,5))
sns.barplot(x= "Month", 
            y= "Frequencia", 
            hue="prediction", 
            palette = "Set2",
            data= df_pandas)
plt.title("Frequência de voos mensais")
plt.xlabel("Mês")
plt.ylabel("Frequência")
plt.legend(title = "Clusters")
plt.show()

In [None]:
regiao = prediction_inter.groupBy("prediction", "GEO Region")\
            .agg(f.count("GEO Region").alias("Contagem"))\
            .orderBy("prediction")

regiao.show(5)

In [None]:
df_pandas = regiao.toPandas()

plt.figure(figsize = (6,5))
sns.barplot(x= "GEO Region", 
            y= "Contagem", 
            data = df_pandas, 
            hue="prediction",
            palette = "Set2")
plt.title("Regiões por cluster")
plt.xlabel("Região")
plt.xticks(rotation = 35, fontsize = "small")
plt.legend(title = "Cluster")
plt.show()