# **A study about the use of Graphs applied in a Page Rank algorithm**
**This notebook is a form to practice my knowledge in data science**

The notebook walks us through a workflow for solving a problem with a page rank algorithm.

The main purpose of this notebook is to serve as a step-by-step workflow guide, allowing me to review this notebook myself and serve as a study for future cases.

## Workflow stages
The solution workflow goes through five stages.
1.   Load the Data.
2.   Define and create the Graph.
3.   Explore the Graph.
4.   Apply the PageRank algorithm.

In [0]:
#Import the library that creates the spark section
from pyspark.sql import SparkSession 

In [0]:
#Starts the section for using spark
spark = SparkSession.builder.appName("pageRankgraphs").getOrCreate()

#1) Load the Data

In [0]:
#Create the nodes for the graph
nodes = spark.createDataFrame([
    ("A", "ANA"  ,350 ),
    ("B", "BERNARDO"  ,360 ),
    ("C", "CLARA" ,195 ),
    ("D", "DANIEL",90),
    ("E", "ERIC"  ,90),
    ("F", "FERNANDA" ,215 ),
    ("G", "GUSTAVO",30 ),
    ("H", "HENRIQUE" ,25 ),
    ("I", "IOLANDA"  ,25 ),
    ("J", "JENNIFER"   ,20 )
], ["id", "name", "total_seconds"])

In [0]:
nodes.show()

+---+--------+-------------+
| id|    name|total_seconds|
+---+--------+-------------+
|  A|     ANA|          350|
|  B|BERNARDO|          360|
|  C|   CLARA|          195|
|  D|  DANIEL|           90|
|  E|    ERIC|           90|
|  F|FERNANDA|          215|
|  G| GUSTAVO|           30|
|  H|HENRIQUE|           25|
|  I| IOLANDA|           25|
|  J|JENNIFER|           20|
+---+--------+-------------+



In [0]:
relationships=spark.createDataFrame([
    ("A", "B", 60),
    ("B", "A", 50),
    ("A", "C", 50),
    ("C", "A", 100),
    ("A", "D", 90),
    ("C", "I", 25),
    ("C", "J", 20),
    ("B", "F", 50),
    ("F", "B", 110),
    ("F", "G", 30),
    ("F", "H", 25),
    ("B", "E", 90)
],["src","dst","call_duration"])

In [0]:
#The source (src) indicates who called and (dst) indicates who received the call
relationships.show()

+---+---+-------------+
|src|dst|call_duration|
+---+---+-------------+
|  A|  B|           60|
|  B|  A|           50|
|  A|  C|           50|
|  C|  A|          100|
|  A|  D|           90|
|  C|  I|           25|
|  C|  J|           20|
|  B|  F|           50|
|  F|  B|          110|
|  F|  G|           30|
|  F|  H|           25|
|  B|  E|           90|
+---+---+-------------+



#2) Define and create the Graph

In [0]:
from pyspark.sql.types import *
from graphframes import *  #Librarie for graph processing
#obs: The graphframes library must be added to databricks, as it is not native. For that go to New->"Library"-> "Source"-> "Maven Coordinate"-> search for "graphframes" and add the library for each cluster

In [0]:
#Build the graph from the dataframes
graph = GraphFrame(nodes,relationships)



#3) Explore the Graph

In [0]:
#Finding calls with total duration greater than 150 min
from pyspark.sql.functions import col

graph.vertices\
.filter("total_seconds > 150")\
.sort(col("total_seconds").desc())\
.show()

+---+--------+-------------+
| id|    name|total_seconds|
+---+--------+-------------+
|  B|BERNARDO|          360|
|  A|     ANA|          350|
|  F|FERNANDA|          215|
|  C|   CLARA|          195|
+---+--------+-------------+



In [0]:
#Finding the statistics for the total seconds existing in the graph (for each of the nodes)
graph.vertices\
.describe(['total_seconds'])\
.show()

+-------+-----------------+
|summary|    total_seconds|
+-------+-----------------+
|  count|               10|
|   mean|            140.0|
| stddev|132.9578045011942|
|    min|               20|
|    max|              360|
+-------+-----------------+



In [0]:
#Finding the statistics for each of the links (relationships)
graph.edges\
.describe(['call_duration'])\
.show()

+-------+------------------+
|summary|     call_duration|
+-------+------------------+
|  count|                12|
|   mean|58.333333333333336|
| stddev| 31.79003083682148|
|    min|                20|
|    max|               110|
+-------+------------------+



In [0]:
#Shows the number of direct paths (incoming) -> incoming calls
display(graph.inDegrees)



id,inDegree
B,2
C,1
A,2
D,1
I,1
J,1
F,1
G,1
E,1
H,1


In [0]:
#Shows the number of inverse paths (outgoing) -> outgoing calls
display(graph.outDegrees)

id,outDegree
A,3
B,3
C,3
F,3


In [0]:
#What is the most "important" node (has more paths leading to it) -> who received the most calls
total_degree = graph.degrees
in_degree = graph.inDegrees
out_degree = graph.outDegrees


In [0]:
total_degree.show()

+---+------+
| id|degree|
+---+------+
|  B|     5|
|  A|     5|
|  C|     4|
|  D|     1|
|  I|     1|
|  J|     1|
|  F|     4|
|  G|     1|
|  E|     1|
|  H|     1|
+---+------+



In [0]:
#Performing the union of the two dataframes (in and out degree)
#There may be nodes that did not call or received a call, so it is necessary to fill in the Nan
#It becomes more interesting to show the data in descending order
total_degree.join(in_degree, "id", how="left")\
.join(out_degree, "id", how="left")\
.fillna(0)\
.sort("inDegree", ascending=False)\
.show()

+---+------+--------+---------+
| id|degree|inDegree|outDegree|
+---+------+--------+---------+
|  B|     5|       2|        3|
|  A|     5|       2|        3|
|  C|     4|       1|        3|
|  D|     1|       1|        0|
|  I|     1|       1|        0|
|  J|     1|       1|        0|
|  F|     4|       1|        3|
|  G|     1|       1|        0|
|  E|     1|       1|        0|
|  H|     1|       1|        0|
+---+------+--------+---------+



#4) Apply the PageRank algorithm

In [0]:
pageRank = graph.pageRank(resetProbability=0.15, tol=0.001) # resetProbability = Probability of leaving a page and visiting another without direct link (ensures that all can be visited) -> Between 0 and 1
#tol= Tolerance -> Indicates the stopping criterion

In [0]:
#Page rank generates another graph
#Indicates which are the most "important" nodes in this graph
pageRank.vertices.sort(['pagerank'],ascending=False).show()

+---+--------+-------------+------------------+
| id|    name|total_seconds|          pagerank|
+---+--------+-------------+------------------+
|  B|BERNARDO|          360|1.2519236648772267|
|  A|     ANA|          350|1.2519236648772267|
|  F|FERNANDA|          215|0.9759477302972548|
|  E|    ERIC|           90|0.9759477302972548|
|  D|  DANIEL|           90|0.9759477302972548|
|  C|   CLARA|          195|0.9759477302972548|
|  J|JENNIFER|           20|0.8980904372641323|
|  G| GUSTAVO|           30|0.8980904372641323|
|  I| IOLANDA|           25|0.8980904372641323|
|  H|HENRIQUE|           25|0.8980904372641323|
+---+--------+-------------+------------------+



In [0]:
#Identify the existing weights between each connection
pageRank.edges.show() #Page rank doesn't take into account weights, only relationship types. Thus, it performs the normalization of the weights found.

+---+---+-------------+------------------+
|src|dst|call_duration|            weight|
+---+---+-------------+------------------+
|  F|  H|           25|0.3333333333333333|
|  F|  G|           30|0.3333333333333333|
|  F|  B|          110|0.3333333333333333|
|  B|  F|           50|0.3333333333333333|
|  B|  A|           50|0.3333333333333333|
|  B|  E|           90|0.3333333333333333|
|  C|  J|           20|0.3333333333333333|
|  C|  I|           25|0.3333333333333333|
|  C|  A|          100|0.3333333333333333|
|  A|  B|           60|0.3333333333333333|
|  A|  D|           90|0.3333333333333333|
|  A|  C|           50|0.3333333333333333|
+---+---+-------------+------------------+

