<a href="https://colab.research.google.com/github/tianygoulart/Machine-Leaning/blob/main/pagerank.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#programa utilizado para o exemplo de grafos para aplicação do PageRank

In [None]:
from pyspark.sql import SparkSession #importa a biblioteca que cria a seção do spark

In [None]:
#inicia a seção para a utilização do spark
spark = SparkSession.builder.appName("pageRankGrafos").getOrCreate() #cria a seção caso não exista ou obtém a já criada

Criando o nosso grafo

In [None]:
#criando os nós para o nosso grafo
nos = spark.createDataFrame([
    ("A", "ANA"  ,350 ),
    ("B", "BERNARDO"  ,360 ),
    ("C", "CLARA" ,195 ),
    ("D", "DANIEL",90),
    ("E", "ERIC"  ,90),
    ("F", "FERNANDA" ,215 ),
    ("G", "GUSTAVO",30 ),
    ("H", "HENRIQUE" ,25 ),
    ("I", "IOLANDA"  ,25 ),
    ("J", "JENNIFER"   ,20 )
], ["id", "nome", "total_segundos"])

In [None]:
nos.show()

In [None]:
relacionamentos=spark.createDataFrame([
    ("A", "B", 60),
    ("B", "A", 50),
    ("A", "C", 50),
    ("C", "A", 100),
    ("A", "D", 90),
    ("C", "I", 25),
    ("C", "J", 20),
    ("B", "F", 50),
    ("F", "B", 110),
    ("F", "G", 30),
    ("F", "H", 25),
    ("B", "E", 90)
],["src","dst","duracao_chamada"])

In [None]:
relacionamentos.show() # a fonte (src) indica quem ligou e (dst) indica quem recebeu a ligação

Definindo e criando o grafo

In [None]:
#importando as funções para utilizar os grafos
from pyspark.sql.types import *
from graphframes import *  #contém os métodos para serem utilizados no processamento através dos grafos
#obs: a biblioteca graphframes deve ser adicionada ao databricks, pois não é nativa. Para isso acesse New->"Library"-> "Source"-> "Maven Coordinate"-> pesquise por "graphframes" e adicione a biblioteca para cada cluster

In [None]:
#controi o grafo a partir dos dataframes
grafo = GraphFrame(nos,relacionamentos)


Explorando o nosso grafo

In [None]:
#encontrando chamadas com duração total maior do que 150 min
from pyspark.sql.functions import col

grafo.vertices\
.filter("total_segundos > 150")\
.sort(col("total_segundos").desc())\
.show()

In [None]:
#encontrando as estatísticas para o total de segundos existentes no grafo (para cada um dos nós) 
grafo.vertices\
.describe(['total_segundos'])\
.show()

In [None]:
#encontrando as estatísticas para cada uma das ligações (relacionamentos)
grafo.edges\
.describe(['duracao_chamada'])\
.show()

In [None]:
#mostrando a quantidade de caminhos diretos (chegando) ->  ligações recebidas
display(grafo.inDegrees)

id,inDegree
F,1
E,1
B,2
D,1
C,1
J,1
A,2
G,1
I,1
H,1


In [None]:
#mostrando a quantidade de caminhos inversos (saindo) -> ligações efetuadas
display(grafo.outDegrees)

id,outDegree
F,3
B,3
C,3
A,3


In [None]:
#qual é o nó mais "importante" (tem mais caminhos que levam até ele) -> quem mais recebeu ligações
total_degree = grafo.degrees
in_degree = grafo.inDegrees
out_degree = grafo.outDegrees


In [None]:
total_degree.show()

In [None]:
#realizando a união dos dois dataframes (in e out degree)
#podem existir nós que não ligaram ou receberam ligação, assim é necessário preencher o Nan
#fica mais interessante mostrar em ordem decrescente os dados
total_degree.join(in_degree, "id", how="left")\
.join(out_degree, "id", how="left")\
.fillna(0)\
.sort("inDegree", ascending=False)\
.show()

Aplicando o algoritmo PageRank

In [None]:
pageRank = grafo.pageRank(resetProbability=0.15, tol=0.001) # resetProbability= probabilidade de sair de uma página e visitar outra sem link direto (garante que todas possam ser visitadas)-> entre 0 e 1
#tol=tolerância->indica o critério de parada(se não melhorou o anterior em tol-valor)

In [None]:
#page rank gera ou outro grafo 
#indicando quais são os nós mais "importantes" em nosso grafo 
pageRank.vertices.sort(['pagerank'],ascending=False).show()

In [None]:
#identificando os pesos existentes entre cada conexão
pageRank.edges.show() #page rank não leva em consideração os pesos, apenas os tipos de relacionamentos. Assim, realiza a normalização dos pesos encontrados.