# Personal Note
This notebook contains exercises and brief excerpts from Graph Algorithms by Mark Needham and Amy E. Hodler (available here https://neo4j.com/graph-algorithms-book/, errata : http://oreilly.com/catalog/errata.csp?isbn=9781492047681) run on bare metal
    
    To Do :
        - Figure out how to visualize graph data in jupyter
        - Setup Neo4j or make equivalent Spark functions
        - Move all functions to one script
        - Split notebooks by chapter
        - Do something original

In [58]:
!spark-shell --version
!scala --version
!python --version
!conda --version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.2
      /_/
                        
Using Scala version 2.12.10, OpenJDK 64-Bit Server VM, 1.8.0_292
Branch HEAD
Compiled by user centos on 2021-02-16T04:53:13Z
Revision 648457905c4ea7d00e3d88048c63f360045f0714
Url https://gitbox.apache.org/repos/asf/spark.git
Type --help for more information.
Scala code runner version 2.13.4 -- Copyright 2002-2020, LAMP/EPFL and Lightbend, Inc.
Python 3.9.4
conda 4.10.1


# Chapter 4 | Pathfinding and Graph Search Algorithms

!pyspark --jars graphframes-0.8.1-spark3.0-s_2.12.jar

In [16]:
import os.path as op
import pandas as pd
pd.set_option('display.max_colwidth', None)
from pyspark.sql import SparkSession

from graphframes import *
from graphframes.lib import AggregateMessages as AM

from pyspark.sql import functions as F
from pyspark.sql.types import *

from operator import itemgetter

# from pyspark import SparkContext -> https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html


spark = SparkSession.builder.getOrCreate() 

In [11]:
## Pull
!wget -P ../../data/ https://resources.oreilly.com/examples/0636920233145/raw/master/data/transport-nodes.csv
!wget -P ../../data/ https://resources.oreilly.com/examples/0636920233145/raw/master/data/transport-relationships.csv

--2021-05-23 10:11:30--  https://resources.oreilly.com/examples/0636920233145/raw/master/data/transport-nodes.csv
Resolving resources.oreilly.com (resources.oreilly.com)... 199.27.144.213, 199.27.144.212
Connecting to resources.oreilly.com (resources.oreilly.com)|199.27.144.213|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 465 [text/plain]
Saving to: ‘../../data/transport-nodes.csv’


2021-05-23 10:11:31 (26.3 MB/s) - ‘../../data/transport-nodes.csv’ saved [465/465]

--2021-05-23 10:11:31--  https://resources.oreilly.com/examples/0636920233145/raw/master/data/transport-relationships.csv
Resolving resources.oreilly.com (resources.oreilly.com)... 199.27.144.212, 199.27.144.213
Connecting to resources.oreilly.com (resources.oreilly.com)|199.27.144.212|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 550 [text/plain]
Saving to: ‘../../data/transport-relationships.csv’


2021-05-23 10:11:31 (27.1 MB/s) - ‘../../data/transport-relationship

In [6]:
data_path = ("../../data/")
rels_fname = op.join(data_path, 'transport-relationships.csv')
node_fname = op.join(data_path, 'transport-nodes.csv')

In [8]:
def create_transport_graph():
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]    
    nodes = spark.read.csv(node_fname, header=True, schema=StructType(node_fields))
    
    rels = spark.read.csv(rels_fname, header=True)
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
        .withColumn("newDst", rels.src)
        .drop("dst", "src")
        .withColumnRenamed("newSrc", "src")
        .withColumnRenamed("newDst", "dst")
        .select("src", "dst", "relationship", "cost"))
    relationships = rels.union(reversed_rels)
    return GraphFrame(nodes, relationships)


In [4]:
g = create_transport_graph()

In [48]:
g.vertices.toPandas()

Unnamed: 0,id,latitude,longitude,population
0,Amsterdam,52.379189,4.899431,821752
1,Utrecht,52.092876,5.10448,334176
2,Den Haag,52.078663,4.288788,514861
3,Immingham,53.612389,-0.22219,9642
4,Doncaster,53.52285,-1.13116,302400
5,Hoek van Holland,51.977501,4.13333,9382
6,Felixstowe,51.963749,1.3511,23689
7,Ipswich,52.05917,1.15545,133384
8,Colchester,51.88921,0.90421,104390
9,London,51.509865,-0.118092,8787892


In [47]:
g.edges.toPandas()

Unnamed: 0,src,dst,relationship,cost
0,Amsterdam,Utrecht,EROAD,46
1,Amsterdam,Den Haag,EROAD,59
2,Den Haag,Rotterdam,EROAD,26
3,Amsterdam,Immingham,EROAD,369
4,Immingham,Doncaster,EROAD,74
5,Doncaster,London,EROAD,277
6,Hoek van Holland,Den Haag,EROAD,27
7,Felixstowe,Hoek van Holland,EROAD,207
8,Ipswich,Felixstowe,EROAD,22
9,Colchester,Ipswich,EROAD,32


### Breadth First Search with Apache Spark

BFS is most commonly used as the basis for other more goal-oriented algorithms. For example, Shortest Path, Connected Components, and Closeness Centrality all use the BFS algorithm. It can also be used to find the shortest path between nodes.

**Find the first medium-sized (by European standards) city that has a population of between 100,000 and 300,000 people:**

In [5]:
(g.vertices
    .filter("population > 100000 and population < 300000")
    .sort("population")
    .toPandas())

Unnamed: 0,id,latitude,longitude,population
0,Colchester,51.88921,0.90421,104390
1,Ipswich,52.05917,1.15545,133384


**Find the shortest path from Den Haag to a medium-sized city**

In [6]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

print(result.columns)

['from', 'e0', 'v1', 'e1', 'v2', 'e2', 'to']


Columns beginning with e represent relationships (edges) and columns beginning with v represent nodes (vertices). We’re only interested in the nodes, so let’s filter out any columns that begin with e from the resulting DataFrame:

In [7]:
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).toPandas()

Unnamed: 0,from,v1,v2,to
0,"(Den Haag, 52.07866287231445, 4.288787841796875, 514861)","(Hoek van Holland, 51.977500915527344, 4.13332986831665, 9382)","(Felixstowe, 51.963748931884766, 1.351099967956543, 23689)","(Ipswich, 52.05916976928711, 1.1554499864578247, 133384)"


## Shortest Path

The Shortest Path algorithm calculates the shortest (weighted) path between a pair of nodes. It’s useful for user interactions and dynamic workflows because it works in real time.

### Shortest Path (Weighted) with Apache Spark

Use Shortest Path to find optimal routes between a pair of nodes, based on either the number of hops or any weighted relationship value. For example, it can provide realtime
answers about degrees of separation, the shortest distance between points, or the least expensive route. You can also use this algorithm to simply explore the connections
between particular nodes.

In [27]:
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

In [28]:
def shortest_path(g, origin, destination, column_name="cost"):
    if g.vertices.filter(g.vertices.id == destination).count() == 0:
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                .withColumn("path", F.array()))

    vertices = (g.vertices.withColumn("visited", F.lit(False))
                .withColumn("distance", F.when(g.vertices["id"] == origin, 0)
                            .otherwise(float("inf")))
                .withColumn("path", F.array()))
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                             sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
            .otherwise(g2.vertices.path)

        new_vertices = (g2.vertices.join(new_distances, on="id", how="left_outer")
                        .drop(new_distances["id"])
                        .withColumn("visited", new_visited_col)
                        .withColumn("newDistance", new_distance_col)
                        .withColumn("newPath", new_path_col)
                        .drop("aggMess", "distance", "path")
                        .withColumnRenamed('newDistance', 'distance')
                        .withColumnRenamed('newPath', 'path'))
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)
        if g2.vertices.filter(g2.vertices.id == destination).first().visited:
            return (g2.vertices.filter(g2.vertices.id == destination)
                    .withColumn("newPath", add_path_udf("path", "id"))
                    .drop("visited", "path")
                    .withColumnRenamed("newPath", "path"))
    return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
            .withColumn("path", F.array()))

In [None]:
result = shortest_path(g, "Amsterdam", "Colchester", "cost")
result.select("id", "distance", "path").toPandas()

## All Pairs Shortest Path
The All Pairs Shortest Path (APSP) algorithm calculates the shortest (weighted) path between all pairs of nodes. It’s more efficient than running the Single Source Shortest Path algorithm for every pair of nodes in the graph. APSP optimizes operations by keeping track of the distances calculated so far and running on nodes in parallel. Those known distances can then be reused when calculating the shortest path to an unseen node.

### All Pairs Shortest Path with Apache Spark

All Pairs Shortest Path is commonly used for understanding alternate routing when the shortest route is blocked or becomes suboptimal. For example, this algorithm is used in logical route planning to ensure the best multiple paths for diversity routing. Use All Pairs Shortest Path when you need to consider all possible routes between all or most of your nodes.

In [45]:
result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
result.sort(["id"]).select("id", "distances").show(truncate = False)

+----------------+--------------------------------------------------------+
|id              |distances                                               |
+----------------+--------------------------------------------------------+
|Amsterdam       |[Immingham -> 1, Hoek van Holland -> 2, Colchester -> 4]|
|Colchester      |[Hoek van Holland -> 3, Immingham -> 3, Colchester -> 0]|
|Den Haag        |[Hoek van Holland -> 1, Immingham -> 2, Colchester -> 4]|
|Doncaster       |[Hoek van Holland -> 4, Immingham -> 1, Colchester -> 2]|
|Felixstowe      |[Immingham -> 4, Hoek van Holland -> 1, Colchester -> 2]|
|Gouda           |[Hoek van Holland -> 2, Immingham -> 3, Colchester -> 5]|
|Hoek van Holland|[Immingham -> 3, Hoek van Holland -> 0, Colchester -> 3]|
|Immingham       |[Hoek van Holland -> 3, Immingham -> 0, Colchester -> 3]|
|Ipswich         |[Immingham -> 4, Hoek van Holland -> 2, Colchester -> 1]|
|London          |[Hoek van Holland -> 4, Immingham -> 2, Colchester -> 1]|
|Rotterdam  

## Single Source Shortest Path
The SSSP algorithm calculates the shortest (weighted) path from a root node to all other nodes in the graph.

## Single Source Shortest Path with Apache Spark

Use Single Source Shortest Path when you need to evaluate the optimal route from a fixed start point to all other individual nodes. Because the route is chosen based on the total path weight from the root, it’s useful for finding the best path to each node, but not necessarily when all nodes need to be visited in a single trip.

In [8]:
#User Defined Function to Construct Paths
add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

In [9]:
def sssp(g, origin, column_name="cost"):
    vertices = g.vertices \
        .withColumn("visited", F.lit(False)) \
        .withColumn("distance",
            F.when(g.vertices["id"] == origin, 0).otherwise(float("inf"))) \
        .withColumn("path", F.array())
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(
            F.min(AM.msg).alias("aggMess"), sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
                            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
                        .otherwise(g2.vertices.path)

        new_vertices = g2.vertices.join(new_distances, on="id", how="left_outer") \
            .drop(new_distances["id"]) \
            .withColumn("visited", new_visited_col) \
            .withColumn("newDistance", new_distance_col) \
            .withColumn("newPath", new_path_col) \
            .drop("aggMess", "distance", "path") \
            .withColumnRenamed('newDistance', 'distance') \
            .withColumnRenamed('newPath', 'path')
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)

    return g2.vertices \
                .withColumn("newPath", add_path_udf("path", "id")) \
                .drop("visited", "path") \
                .withColumnRenamed("newPath", "path")


In [10]:
via_udf = F.udf(lambda path: path[1:-1], ArrayType(StringType()))

In [13]:
result = sssp(g, "Amsterdam", "cost")
(result
 .withColumn("via", via_udf("path"))
 .select("id", "distance", "via")
 .sort("distance")
 .toPandas())

Unnamed: 0,id,distance,via
0,Amsterdam,0.0,[]
1,Utrecht,46.0,[]
2,Den Haag,59.0,[]
3,Gouda,81.0,[Utrecht]
4,Rotterdam,85.0,[Den Haag]
5,Hoek van Holland,86.0,[Den Haag]
6,Felixstowe,293.0,"[Den Haag, Hoek van Holland]"
7,Ipswich,315.0,"[Den Haag, Hoek van Holland, Felixstowe]"
8,Colchester,347.0,"[Den Haag, Hoek van Holland, Felixstowe, Ipswich]"
9,Immingham,369.0,[]


## Minimum Spanning Tree
The Minimum (Weight) Spanning Tree algorithm starts from a given node and finds all its reachable nodes and the set of relationships that connect the nodes together with the minimum possible weight. It traverses to the next unvisited node with the lowest weight from any visited node, avoiding cycles.

### When Should I Use Minimum Spanning Tree?
Use Minimum Spanning Tree when you need the best route to visit all nodes. Because the route is chosen based on the cost of each next step, it’s useful when you must visit all nodes in a single walk. You can use this algorithm for optimizing paths for connected systems like water pipes and circuit design. It’s also employed to approximate some problems with
unknown compute times, such as the Traveling Salesman Problem and certain types of rounding problems. Although it may not always find the absolute optimal solution, this algorithm makes potentially complicated and compute-intensive analysis much
more approachable.

## Random Walk
The Random Walk algorithm provides a set of nodes on a random path in a graph. The term was first mentioned by Karl Pearson in 1905 in a letter to Nature magazine titled “The Problem of the Random Walk”. Although the concept goes back even further, it’s only more recently that random walks have been applied to network science.

### When Should I Use Random Walk?
Use the Random Walk algorithm as part of other algorithms or data pipelines when you need to generate a mostly random set of connected nodes.

# Chapter 5 Centrality Algorithms
Centrality algorithms are used to understand the roles of particular nodes in a graph and their impact on that network. They’re useful because they identify the most important nodes and help us understand group dynamics such as credibility, accessibility, the speed at which things spread, and bridges between groups.

|                               Algorithm type                              |                                                 What it does                                                |                                                                     Examples                                                                     |   |
|:-------------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|---|
| Degree Centrality                                                         | Measures the number of relationships a node has                                                             | Estimating a person’s popularity by looking at their in-degree and using their out-degree to estimate gregariousness                             |   |
| Closeness Centrality Variations: Wasserman and Faust, Harmonic Centrality | Calculates which nodes have the shortest paths to all other nodes                                           | Finding the optimal location of new public services for maximum accessibility                                                                    |   |
| Betweenness Centrality Variation: Randomized-Approximate Brandes          | Measures the number of shortest paths that pass through a node                                              | Improving drug targeting by finding the control genes for specific diseases                                                                      |   |
| PageRank Variation: Personalized PageRank                                 | Estimates a current node’s importance from its linked neighbors and their neighbors (popularized by Google) | Finding  the most influential features for extraction in machine learning and  ranking text for entity relevance in natural language processing. |   |

[Insert Image]

## The Social Graph : Importing the Data into Apache Spark

In [80]:
# Pull Social Data
!wget -P ../../data/ https://resources.oreilly.com/examples/0636920233145/raw/master/data/social-nodes.csv;
!wget -P ../../data/ https://resources.oreilly.com/examples/0636920233145/raw/master/data/social-relationships.csv;

--2021-05-25 17:27:47--  https://resources.oreilly.com/examples/0636920233145/raw/master/data/social-nodes.csv
Resolving resources.oreilly.com (resources.oreilly.com)... 199.27.144.213, 199.27.144.212
Connecting to resources.oreilly.com (resources.oreilly.com)|199.27.144.213|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 76 [text/plain]
Saving to: ‘../../data/social-nodes.csv’


2021-05-25 17:27:48 (3.73 MB/s) - ‘../../data/social-nodes.csv’ saved [76/76]

--2021-05-25 17:27:48--  https://resources.oreilly.com/examples/0636920233145/raw/master/data/social-relationships.csv
Resolving resources.oreilly.com (resources.oreilly.com)... 199.27.144.213, 199.27.144.212
Connecting to resources.oreilly.com (resources.oreilly.com)|199.27.144.213|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 449 [text/plain]
Saving to: ‘../../data/social-relationships.csv’


2021-05-25 17:27:48 (21.4 MB/s) - ‘../../data/social-relationships.csv’ saved [449/449

In [11]:
social_rels_fname = op.join(data_path, 'social-relationships.csv')
social_node_fname = op.join(data_path, 'social-nodes.csv')

v = spark.read.csv(social_node_fname, header=True)
e = spark.read.csv(social_rels_fname, header=True)
g = GraphFrame(v, e)

## Degree Centrality
Counts the number of incoming and outgoing relationships from a node, and is used to find popular nodes in a graph.

### Reach
Understanding the reach of a node is a fair measure of importance. How many other nodes can it touch right now? The degree of a node is the number of direct relationships it has, calculated for in-degree and out-degree. You can think of this as the immediate reach of node. For example, a person with a high degree in an active social network would have a lot of immediate contacts and be more likely to catch a cold circulating in their network.

### When Should I Use Degree Centrality?
Use Degree Centrality if you’re attempting to analyze influence by looking at the number of incoming and outgoing relationships, or find the “popularity” of individual nodes. It works well when you’re concerned with immediate connectedness or near-term probabilities. However, Degree Centrality is also applied to global analysis when you want to evaluate the minimum degree, maximum degree, mean degree, and standard deviation across the entire graph.

### Degree Centrality with Apache Spark

In [12]:
total_degree = g.degrees
in_degree = g.inDegrees
out_degree = g.outDegrees

(total_degree.join(in_degree, "id", how="left")
 .join(out_degree, "id", how="left")
 .fillna(0)
 .sort("inDegree", ascending=False)
 .toPandas())

Unnamed: 0,id,degree,inDegree,outDegree
0,Doug,6,5,1
1,Alice,7,3,4
2,Michael,5,2,3
3,Bridget,5,2,3
4,Amy,1,1,0
5,Charles,2,1,1
6,Mark,3,1,2
7,David,2,1,1
8,James,1,0,1


## Closeness Centrality
Closeness Centrality is a way of detecting nodes that are able to spread information efficiently through a subgraph.
The measure of a node’s centrality is its average farness (inverse distance) to all other nodes. Nodes with a high closeness score have the shortest distances from all other nodes.

\begin{equation*}
C(u) = {\frac{1}{\sum_{v=1}^{n-1} d(u,v)}}
\end{equation*}

\begin{equation*}
C_{norm}(u) = {\frac{n-1}{\sum_{v=1}^{n-1} d(u,v)}}
\end{equation*}

- *u* is a node
- *N* is the total node count
- *n* is the number of nodes in the same component as *u*
- *d(u,v)* is the shortest-path distance between anohter node *v* and *u* 

### When Should I Use Closeness Centrality?
Apply Closeness Centrality when you need to know which nodes disseminate things the fastest. Using weighted relationships can be especially helpful in evaluating interaction speeds in communication and behavioral analyses.

### Closeness Centrality with Apache Spark

In [13]:
def collect_paths(paths):
    return F.collect_set(paths)


collect_paths_udf = F.udf(collect_paths, ArrayType(StringType()))

paths_type = ArrayType(
    StructType([StructField("id", StringType()), StructField("distance", IntegerType())]))


def flatten(ids):
    flat_list = [item for sublist in ids for item in sublist]
    return list(dict(sorted(flat_list, key=itemgetter(0))).items())


flatten_udf = F.udf(flatten, paths_type)


def new_paths(paths, id):
    paths = [{"id": col1, "distance": col2 + 1} for col1, col2 in paths if col1 != id]
    paths.append({"id": id, "distance": 1})
    return paths


new_paths_udf = F.udf(new_paths, paths_type)


def merge_paths(ids, new_ids, id):
    joined_ids = ids + (new_ids if new_ids else [])
    merged_ids = [(col1, col2) for col1, col2 in joined_ids if col1 != id]
    best_ids = dict(sorted(merged_ids, key=itemgetter(1), reverse=True))
    return [{"id": col1, "distance": col2} for col1, col2 in best_ids.items()]


merge_paths_udf = F.udf(merge_paths, paths_type)


def calculate_closeness(ids):
    nodes = len(ids)
    total_distance = sum([col2 for col1, col2 in ids])
    return 0 if total_distance == 0 else nodes * 1.0 / total_distance


closeness_udf = F.udf(calculate_closeness, DoubleType())

In [14]:
vertices = g.vertices.withColumn("ids", F.array())
cached_vertices = AM.getCachedDataFrame(vertices)
g2 = GraphFrame(cached_vertices, g.edges)

for i in range(0, g2.vertices.count()):
    msg_dst = new_paths_udf(AM.src["ids"], AM.src["id"])
    msg_src = new_paths_udf(AM.dst["ids"], AM.dst["id"])
    agg = g2.aggregateMessages(F.collect_set(AM.msg).alias("agg"),
                               sendToSrc=msg_src, sendToDst=msg_dst)
    res = agg.withColumn("newIds", flatten_udf("agg")).drop("agg")
    new_vertices = (g2.vertices.join(res, on="id", how="left_outer")
                    .withColumn("mergedIds", merge_paths_udf("ids", "newIds", "id"))
                    .drop("ids", "newIds")
                    .withColumnRenamed("mergedIds", "ids"))
    cached_new_vertices = AM.getCachedDataFrame(new_vertices)
    g2 = GraphFrame(cached_new_vertices, g2.edges)

In [15]:
(g2.vertices
 .withColumn("closeness", closeness_udf("ids"))
 .sort("closeness", ascending=False)
 .toPandas())

Unnamed: 0,id,ids,closeness
0,Doug,"[(Charles, 1), (Mark, 1), (Alice, 1), (Bridget, 1), (Michael, 1)]",1.0
1,Alice,"[(Charles, 1), (Mark, 1), (Bridget, 1), (Doug, 1), (Michael, 1)]",1.0
2,David,"[(James, 1), (Amy, 1)]",1.0
3,Michael,"[(Charles, 2), (Mark, 2), (Alice, 1), (Doug, 1), (Bridget, 1)]",0.714286
4,Bridget,"[(Charles, 2), (Mark, 2), (Alice, 1), (Doug, 1), (Michael, 1)]",0.714286
5,James,"[(Amy, 2), (David, 1)]",0.666667
6,Amy,"[(James, 2), (David, 1)]",0.666667
7,Charles,"[(Bridget, 2), (Mark, 2), (Michael, 2), (Doug, 1), (Alice, 1)]",0.625
8,Mark,"[(Bridget, 2), (Charles, 2), (Michael, 2), (Doug, 1), (Alice, 1)]",0.625


_This score represents the closeness of each user to others within their subgraph but not the entire graph._

### Closeness Centrality Variation: Wasserman and Faust
Stanley Wasserman and Katherine Faust came up with an improved formula for calculating closeness for graphs with multiple subgraphs without connections between those groups. Details on their formula are in their book, Social Network Analysis: Methods and Applications. The result of this formula is a ratio of the fraction of nodes in the group that are reachable to the average distance from the reachable nodes.

[Do Neo4j Example or write up your own function for Spark]

\begin{equation*}
C_{WF}(u) = \frac{n-1}{N-1}({\frac{n-1}{\sum_{v=1}^{n-1} d(u,v)})}
\end{equation*}


### Closeness Centrality Variation: Harmonic Centrality
Harmonic Centrality (also known as Valued Centrality) is a variant of Closeness Centrality, invented to solve the original problem with unconnected graphs

\begin{equation*}
H(u) = {\sum_{v=1}^{n-1}}{\frac{1}{d(u,v)}}
\end{equation*}

\begin{equation*}
H_{norm}(u) = \frac{{\sum_{v=1}^{n-1}}{\frac{1}{d(u,v)}}}{n-1}
\end{equation*}

## Betweenness Centrality
Betweenness Centrality is a way of detecting the amount of influence a node has over the flow of information or resources in a graph. It is typically used to find nodes that serve as a bridge from one part of a graph to another.
The Betweenness Centrality algorithm first calculates the shortest (weighted) path between every pair of nodes in a connected graph. Each node receives a score, based on the number of these shortest paths that pass through the node. The more shortest paths that a node lies on, the higher its score.

#### Bridges and control points
A bridge in a network can be a node or a relationship. In a very simple graph, you can find them by looking for the node or relationship that, if removed, would cause a section of the graph to become disconnected.

A node is considered *pivotal* for two other nodes if it lies on every shortest path between those nodes,if you remove a pivotal node, the new shortest path for the original node pairs will be longer or more costly. This can be a consideration for evaluating single points of vulnerability.

#### Calculating betweenness centrality

\begin{equation*}
B(u) = {\sum_{s\neq{u}\neq{t}}}{\frac{p(u)}{p}}
\end{equation*}

- *u* is a node
- *p* is the total number of shortest paths between nodes s and t
- *p(u)* is the number of shortest paths between nodes *s* and *t* that pass through node *u*

### When Should I Use Betweenness Centrality?
Betweenness Centrality applies to a wide range of problems in real-world networks.
We use it to find bottlenecks, control points, and vulnerabilities.

### Betweenness Centrality Variation: Randomized-Approximate Brandes

The Randomized-Approximate Brandes (RA-Brandes for short) algorithm is the best-known algorithm for calculating an approximate score for betweenness centrality. Rather than calculating the shortest path between every pair of nodes, the RABrandes algorithm considers only a subset of nodes. Two common strategies for selecting the subset of nodes are:

#### Random

Nodes are selected uniformly, at random, with a defined probability of selection. The
default probability is: $\frac{log10(N)}{e^2}$

#### Degree
Nodes are selected randomly, but those whose degree is lower than the mean are automatically excluded (i.e., only nodes with a lot of relationships have a chance of being visited).

## PageRank
Pagerank measures the transitive (or directional) influence of nodes. All the other centrality algorithms discussed measure the direct influence of a node, whereas PageRank considers the influence of a node’s neighbors, and their neighbors.

The basic assumption is that a page with more incoming and more influential incoming links is more likely a credible source. PageRank measures the number and quality of incoming relationships to a node to determine an estimation of how important that node is. Nodes with more sway over a network are presumed to have more incoming relationships from other influential nodes.

### Influence
relationships to more important nodes contribute more to the influence of the node in question than equivalent connections to less important nodes Measuring influence usually involves scoring nodes, often with weighted relationships, and then updating the scores over many iterations. Sometimes all nodes are scored, and sometimes a random selection is used as a representative distribution.

### The Page Rank Formula

\begin{equation*}
PR(u) = (1-d)+d(\frac{PR(T1)}{C(T1)}+...+\frac{PR(Tn)}{C(Tn)})
\end{equation*}

- Assume a page *u* has citations from pages *T1* to *Tn*
- *d* is a damping factor set between 0 and 1 (usually 0.85)
- *1-d* is the probability that a node is reached directly without following any relationships
- *C(Tn)* is defined as the out-degree of a node T.

### Iteration, Random Surfers, and Rank Sinks
PageRank is an iterative algorithm that runs either until scores converge or until a set number of iterations is reached.

A node, or group of nodes, without outgoing relationships (also called a dangling node) can monopolize the PageRank score by refusing to share. This is known as a *rank sink*. *Teleportation* used to overcome dead ends.

Circular references cause an increase in their ranks as the surfer bounces back and forth among the nodes. A damping factor is used to introduce random node visits.

If you see unexpected results from PageRank, it is worth doing some exploratory analysis of the graph to see if any of these problems are the cause.

### When Should I Use PageRank?
Use this algorithm whenever you’re looking for broad influence over a network. For instance, if you’re looking to target a gene that has the highest overall impact to a biological function, it may not be the most connected one. It may, in fact, be the gene with the most relationships with other, more significant functions.

### PageRank with Apache Spark

#### PageRank with a fixed number of iterations

In [24]:
results = g.pageRank(resetProbability=0.15, maxIter=20)
results.vertices.sort("pagerank", ascending=False).toPandas()

Unnamed: 0,id,pagerank
0,Doug,2.286537
1,Mark,2.142448
2,Alice,1.520331
3,Michael,0.727443
4,Bridget,0.727443
5,Charles,0.521385
6,Amy,0.509714
7,David,0.366558
8,James,0.19814


#### PageRank until convergence

In [25]:
results = g.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.sort("pagerank", ascending=False).toPandas()

Unnamed: 0,id,pagerank
0,Doug,2.223319
1,Mark,2.090451
2,Alice,1.505629
3,Michael,0.733739
4,Bridget,0.733739
5,Amy,0.559447
6,Charles,0.533881
7,David,0.402323
8,James,0.217472


#### PageRank Variation: Personalized PageRank
Variant of the PageRank algorithm that calculates the importance of nodes in a graph from the perspective of a specific node. For PPR, random jumps refer back to a given set of starting nodes.

Creates bias and and localization towards the start nodes making PPR useful for highly targeted recommendations.

#### Personalized PageRank with Apache Spark

In [26]:
me = "Doug"
results = g.pageRank(resetProbability=0.15, maxIter=20, sourceId=me)
people_to_follow = results.vertices.sort("pagerank", ascending=False)

already_follows = list(g.edges.filter(f"src = '{me}'").toPandas()["dst"])
people_to_exclude = already_follows + [me]

people_to_follow[~people_to_follow.id.isin(people_to_exclude)].toPandas()

Unnamed: 0,id,pagerank
0,Alice,0.165018
1,Michael,0.048842
2,Bridget,0.048842
3,Charles,0.034978
4,David,0.0
5,James,0.0
6,Amy,0.0
