# Exemplo 03a: Análise de Enlace usando NetworkX
## Identificação de Aeroportos com mais conectividade

Muitos problemas em ciência de dados podem ser modelados com um grafo. Um exemplo é a análise da rede aérea em uma região, onde os vertices são os aeroportos e as areastas são as linhas aéreas. Usando algoritmos de análise de enlace podemos extrair informações como aeroportos mais movimentados e menor caminho entre duas localidades.

Porém essa análise não é tão trivial. A maneira mais simples de determinar o aeroporto mais movimentado é contar o número de voos realizados de e para esta cidade. No entanto, como a maioria das companhias aéreas utiliza um sistema de *hub-and-spoke*, a simples contagem dos voos de entrada e saída não transmite a importância do aeroporto para o tráfego aéreo geral. Isso ocorre porque determinados aeroportos centrais podem ser pontos de passagem para os vôos em outros aeroportos e, como resultado, esses aeroportos centrais podem ser considerados mais importantes, mesmo que tenham contagens total de voos seja igual ou até menores.

O algoritmo Pagerank foi originalmente criado para medir a importância relativa das páginas da web, avaliando os links de ligação da página. Qualquer página da web é considerada mais importante se outras páginas importantes tiverem links para essa página. Podemos aplicar esse mesmo conceito de importância aos aeroportos. Se você substituir “página da web” por “aeroporto” e substituir “link da web” por “voo da companhia aérea”, poderá ver que o PageRank pode ser usado para avaliar a importância de um aeroporto. O PageRank acaba sendo uma boa maneira de medir a importância do aeroporto, dado o uso do molelo *hub-and-spoke* usado pelas companhias hub e spoke das companhias aéreas. Um aeroporto importante acabaria sendo um aeroporto que por si só é um *hub* no qual outros aeroportos possuem muitos vôos ou um “hub de hubs”.

Para este exemplo vamos usar uma base de dados de aeroportos e voos nos Estados Unidos e Canada. Então, para identificar os aeroportos mais importantes da região, considerando como um determinado aeroporto influencia os vôos para outros aeroportos, podemos usar o algoritmo Pagerank. Este exemplo mostra os aeroportos com mais voos e os aeroportos mais conectados calculados pelo Pagerank.

In [1]:
import pyspark
from pyspark.sql import SparkSession

import pandas as pd
import networkx as nx

import time
start_time = time.time()

data_path='./data/'

In [2]:
# Cria Spark Session
sparkSession = SparkSession.builder \
       .master("local[*]") \
       .appName("LinkAnalisysAirlines") \
       .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/19 15:46:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Reading datasets
### Airports

| Field | Description |
| --------- | :-------------: |
| node_id | Unique identifier for the airport. |
| name | Name of airport or city and state.|
| metro_pop | City/region population.|
| latitude | Airport latitude |
| longitude | Airport longitude |


In [3]:
# Le arquivo de dados Airport para Dataframe Spark

airport = sparkSession.read.format("csv").options(sep=',',header='true',inferschema='true').\
          load(data_path+"reachability-meta.csv.gz")

#Exibe campos da tabela e os tipos de dados
#airport.dtypes
airport.show()

+-------+--------------------+---------+---------+-----------+
|node_id|                name|metro_pop| latitude|  longitude|
+-------+--------------------+---------+---------+-----------+
|      0|      Abbotsford, BC| 133497.0|49.051575|-122.328849|
|      1|        Aberdeen, SD|  40878.0| 45.45909| -98.487324|
|      2|         Abilene, TX| 166416.0|32.449175| -99.741424|
|      3|    Akron/Canton, OH| 701456.0| 40.79781| -81.371567|
|      4|         Alamosa, CO|   9433.0| 37.46818|-105.873599|
|      5|          Albany, GA| 157688.0| 31.58076| -84.155989|
|      6|          Albany, NY| 871478.0|42.651455| -73.755274|
|      7|     Albuquerque, NM| 898642.0| 35.08418|-106.648639|
|      8|      Alexandria, LA| 154505.0|31.312685| -92.445649|
|      9|Allentown/Bethleh...| 824916.0|40.651428| -75.434219|
|     10|        Alliance, NE|   8499.0| 42.09712|-102.871454|
|     11|          Alpena, MI|  29386.0|45.061565| -83.445154|
|     12|         Altoona, PA| 127099.0| 40.50719| -78.

### Flights

| Field | Description |
| --------- | :-------------: |
| FromNodeId | Origin airport (node_id).|
| ToNodeId | Destination airport (node_id).|
| Weight | Distance |


In [4]:
# Le arquivo de dados Linhas Aereas para Dataframe Spark

routes = sparkSession.read.format("csv").options(sep=' ',header='true',inferschema='true').\
         load(data_path+"reachability.txt.gz")

#routes.dtypes
routes.show()

+----------+--------+------+
|FromNodeId|ToNodeId|Weight|
+----------+--------+------+
|        27|       0|  -757|
|        57|       0|   -84|
|        70|       0| -1290|
|        74|       0|  -465|
|        86|       0|  -700|
|        94|       0|  -526|
|       100|       0|  -448|
|       113|       0|   -90|
|       138|       0|  -256|
|       154|       0|  -270|
|       166|       0|  -515|
|       178|       0|  -400|
|       230|       0|  -486|
|       235|       0|  -170|
|       242|       0|  -469|
|       246|       0|  -325|
|       262|       0|  -200|
|       269|       0|  -525|
|       275|       0|  -585|
|       280|       0|  -405|
+----------+--------+------+
only showing top 20 rows



## Building Graph

In [5]:
# Extrair campos relevantes para definir os vertices do grafo
vertice = airport.select("node_id","name")

# Troca nome "node_id" por "id"
vertice = vertice.withColumnRenamed("node_id", "id")

#caso precise converter algum campo
#vertice = vertice.withColumn("id", vertice["id"].cast("string"))

# Converte Spark Dataframe para Pandas Dataframe
vertice_pd = vertice.toPandas()

# Salva lista com nomes das cidades ccom aeroportos
airport_name = airport.select("node_id", "name").withColumnRenamed("node_id", "id")

#vertice.dtypes
vertice.show(5)

+---+----------------+
| id|            name|
+---+----------------+
|  0|  Abbotsford, BC|
|  1|    Aberdeen, SD|
|  2|     Abilene, TX|
|  3|Akron/Canton, OH|
|  4|     Alamosa, CO|
+---+----------------+
only showing top 5 rows



In [6]:
# Extrair campos relevantes para definir os arestas do grafo
edge = routes.select("FromNodeId","ToNodeId","Weight")

# Graphframe exige que colunas com identificação das arestas possua nome 'src' para origem e 'dst' para destino
# Troca nome "FromNodeId" por "src" e "ToNodeId" por "dst"
edge = edge.withColumnRenamed("FromNodeId", "src") \
           .withColumnRenamed("ToNodeId", "dst")

# Converte Spark Dataframe para Pandas Dataframe
edge_pd = edge.toPandas()

edge.show(5)

+---+---+------+
|src|dst|Weight|
+---+---+------+
| 27|  0|  -757|
| 57|  0|   -84|
| 70|  0| -1290|
| 74|  0|  -465|
| 86|  0|  -700|
+---+---+------+
only showing top 5 rows



In [7]:
# Cria um Digraph com as rotas das linhas aéreas 
# edge_pd é o gdataframe com as linhas aereas  no formato Pandas
# sorce e target ao aeroportos de origem e destino das linhas aereas
# edge_attr é o atributo da aresta do grafo, neste caso o valor negativo da distância (quanto mais distante pior) 

G = nx.from_pandas_edgelist(edge_pd, source='src', target='dst', edge_attr=['Weight'], create_using=nx.DiGraph())

print(G)

DiGraph with 456 nodes and 71959 edges


In [8]:
# imprime caracteristica do grafo

G_nodes = G.number_of_nodes()
G_edges = G.number_of_edges()

print("Airports = ", G_nodes, "      Flights = ",G_edges)

Airports =  456       Flights =  71959


## In-degree of each vertex in the graph

In [9]:
# Aeroportos com mais chegadas

airport_degree = nx.degree(G)

print(airport_degree)

#rairport_degree_df = sparkSession.createDataFrame(data=airport_degree), schema = ["name","degree"])
#airport_degree_df.printSchema()
#airport_degree_df.show(truncate=False)

#airport_degree_df.select("id", "degree").join(airport_name, on=['id'], how='inner').select("name", "degree").orderBy("degree",ascending=False).show()

[(27, 753), (0, 70), (57, 669), (70, 171), (74, 859), (86, 729), (94, 859), (100, 856), (113, 607), (138, 94), (154, 89), (166, 406), (178, 800), (230, 861), (235, 72), (242, 314), (246, 886), (262, 70), (269, 817), (275, 215), (280, 637), (294, 856), (304, 642), (308, 805), (309, 519), (324, 847), (347, 315), (364, 713), (367, 799), (378, 344), (401, 308), (413, 301), (416, 813), (419, 658), (451, 533), (6, 583), (1, 224), (7, 648), (14, 625), (15, 429), (16, 506), (19, 770), (23, 663), (32, 441), (45, 638), (46, 803), (49, 256), (52, 598), (63, 486), (68, 764), (76, 652), (78, 742), (95, 631), (101, 597), (102, 786), (110, 365), (112, 258), (115, 590), (127, 615), (129, 455), (134, 242), (136, 801), (139, 586), (153, 595), (157, 505), (159, 531), (167, 242), (170, 697), (173, 249), (176, 693), (184, 732), (185, 245), (193, 662), (195, 222), (201, 442), (202, 325), (204, 669), (217, 601), (221, 325), (227, 442), (241, 620), (247, 559), (251, 596), (252, 539), (256, 354), (257, 248), (

## Triangle Count for each vertice

In [10]:
# Count the number of triagles in each vertice

triang = G.in_degree

print(triang)

#triang_df = sparkSession.createDataFrame(data=airport_degree), schema = ["name","count"])
#triang_df.printSchema()
#triang_df.show(truncate=False)

#triang_df.select("id", "degree").join(airport_name, on=['id'], how='inner').select("name", "count").orderBy("count",ascending=False).show()

[(27, 377), (0, 34), (57, 327), (70, 82), (74, 429), (86, 370), (94, 432), (100, 427), (113, 317), (138, 46), (154, 43), (166, 193), (178, 400), (230, 429), (235, 34), (242, 163), (246, 443), (262, 33), (269, 411), (275, 123), (280, 312), (294, 428), (304, 320), (308, 407), (309, 260), (324, 423), (347, 157), (364, 359), (367, 397), (378, 164), (401, 148), (413, 151), (416, 404), (419, 329), (451, 258), (6, 304), (1, 113), (7, 309), (14, 314), (15, 216), (16, 258), (19, 389), (23, 336), (32, 211), (45, 320), (46, 399), (49, 129), (52, 314), (63, 239), (68, 382), (76, 323), (78, 373), (95, 309), (101, 298), (102, 393), (110, 186), (112, 130), (115, 282), (127, 309), (129, 226), (134, 122), (136, 400), (139, 285), (153, 273), (157, 257), (159, 259), (167, 117), (170, 356), (173, 126), (176, 345), (184, 373), (185, 124), (193, 322), (195, 112), (201, 228), (202, 165), (204, 341), (217, 297), (221, 162), (227, 219), (241, 313), (247, 277), (251, 301), (252, 277), (256, 179), (257, 125), (2

## Page Rank

In [11]:
# Run PageRank algorithm, and show results.

result_pg = nx.pagerank(G, max_iter=20, weight='Weight')
#result_pg = nx.pagerank(G, tol=0.001, weight='weight')

print(result_pg)

#result_pg_df = sparkSession.createDataFrame(data=result), schema = ["name","pagerank"])
#result_pg_df.printSchema()
#result_pg_df.show(truncate=False)

#result_pg_df.select("name", "pagerank").orderBy("pagerank",ascending=False).show()

{27: 0.0046893362805071745, 0: 0.0009021308560587239, 57: 0.004658809097161969, 70: 0.0021374080709735625, 74: 0.004339118720974536, 86: 0.004017355312852984, 94: 0.005106744140501227, 100: 0.004625797503467854, 113: 0.004900661335047314, 138: 0.000975530872534099, 154: 0.001027895945498401, 166: 0.0035666190101212403, 178: 0.00472433882686759, 230: 0.005764698625665773, 235: 0.0007353721701661279, 242: 0.0021977345776955743, 246: 0.0061864370879562034, 262: 0.0007756381439608225, 269: 0.004362641293018228, 275: 0.0029420583419757077, 280: 0.0038703108622261512, 294: 0.005392278484662518, 304: 0.003741152135648531, 308: 0.005412397245067099, 309: 0.003655116636340416, 324: 0.005341121969895439, 347: 0.0024035538195724773, 364: 0.004532788260107675, 367: 0.005717626667160882, 378: 0.002667198183961245, 401: 0.003015519859419969, 413: 0.00205974539127725, 416: 0.00481460726058821, 419: 0.004532160610127738, 451: 0.003438494561430279, 6: 0.004594532591314983, 1: 0.0014486228142582443, 7: 

## HITS

In [12]:
# Run HITS algorithm, and show results.

result_h, result_a = nx.hits(G, max_iter=20, normalized=True)
#result_h, result_a = nx.hits(G, tol=0.001, normalized=True)

print("===== List of Hubs =====")
print(result_h)

print("===== List of Authorities =====")
print(result_a)

#resulth_df = sparkSession.createDataFrame(data=result_ht), schema = ["name","pagerank"])
#resulth_df.printSchema()
#resulth_df.show(truncate=False)

#resulth_df.select("name", "pagerank").orderBy("pagerank",ascending=False).show()

===== List of Hubs =====
{27: 0.004282281748745433, 0: 0.0005521017265008731, 57: 0.003909884297876859, 70: 0.0013217588187616266, 74: 0.004521919704514936, 86: 0.0042077633120628025, 94: 0.0045037884086596034, 100: 0.0045177128802813635, 113: 0.0034178932105466513, 138: 0.0006628985451452959, 154: 0.0006708097324027768, 166: 0.0028935127702813808, 178: 0.004420662123868966, 230: 0.004531866919168021, 235: 0.0006045931945662495, 242: 0.00227959131869405, 246: 0.004571513680199409, 262: 0.0005869937995135645, 269: 0.004443298495611365, 275: 0.001347122635783357, 280: 0.0038139162359919375, 294: 0.004465592970779326, 304: 0.003950372783192954, 308: 0.004350368333566799, 309: 0.0032479269374983298, 324: 0.004520311566615979, 347: 0.0022039915668808003, 364: 0.004192450899688902, 367: 0.0044011895875192865, 378: 0.0025132531710242603, 401: 0.002240571955360024, 413: 0.0022478334935501555, 416: 0.004371701942490906, 419: 0.004054507913233606, 451: 0.0032695307338986767, 6: 0.003484264246682

In [13]:
sparkSession.stop()
print("--- Execution time: %s seconds ---" % (time.time() - start_time))

--- Execution time: 21.4187331199646 seconds ---
