# Module 3: Centrality Algorithms

In [3]:
from pyspark.sql.types import *
from graphframes import *
from neo4j import GraphDatabase
import pandas as pd

## The Transport Graph

### Importing the Data into Apache Spark

In [1]:
def create_social_graph():
    v = spark.read.csv("data/social-nodes.csv", header=True)
    e = spark.read.csv("data/social-relationships.csv", header=True)
    return GraphFrame(v, e)

In [4]:
g = create_social_graph()

### Importing the Data into Neo4j

In [6]:
user = "neo4j"
password = "neo"
driver = GraphDatabase.driver("bolt://localhost", auth=(user, password))

In [9]:
with driver.session() as session:
    session.run("""
    WITH "https://github.com/neo4j-graph-analytics/book/raw/master/data/social-nodes.csv"
    AS uri
    LOAD CSV WITH HEADERS FROM uri AS row
    MERGE (:User {id: row.id})
    """)
    
    session.run("""
    WITH "https://github.com/neo4j-graph-analytics/book/raw/master/data/social-relationships.csv"
    AS uri
    LOAD CSV WITH HEADERS FROM uri AS row
    MATCH (source:User {id: row.src})
    MATCH (destination:User {id: row.dst})
    MERGE (source)-[:FOLLOWS]->(destination)
    """)

## Degree Centrality

In [11]:
total_degree = g.degrees
in_degree = g.inDegrees
out_degree = g.outDegrees
(total_degree.join(in_degree, "id", how="left")
 .join(out_degree, "id", how="left")
 .fillna(0)
 .sort("inDegree", ascending=False)
 .show())

+-------+------+--------+---------+
|     id|degree|inDegree|outDegree|
+-------+------+--------+---------+
|   Doug|     6|       5|        1|
|  Alice|     7|       3|        4|
|Michael|     5|       2|        3|
|Bridget|     5|       2|        3|
|    Amy|     1|       1|        0|
|   Mark|     3|       1|        2|
|Charles|     2|       1|        1|
|  David|     2|       1|        1|
|  James|     1|       0|        1|
+-------+------+--------+---------+



In [14]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

In [17]:
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()

+--------------------+--------------------+--------------------+--------------------+
|                from|                  v1|                  v2|                  to|
+--------------------+--------------------+--------------------+--------------------+
|[Den Haag, 52.078...|[Hoek van Holland...|[Felixstowe, 51.9...|[Ipswich, 52.0591...|
+--------------------+--------------------+--------------------+--------------------+



## Shortest Path

In [24]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, null)
YIELD nodeId, cost
RETURN algo.getNodeById(nodeId).id AS place, cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,1.0,Immingham
2,2.0,Doncaster
3,3.0,London


In [25]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, null)
YIELD nodeId, cost

WITH collect(algo.getNodeById(nodeId)) AS path
UNWIND range(0, size(path)-1) AS index
WITH path[index] AS current, path[index+1] AS next
WITH current, next, [(current)-[r:EROAD]-(next) | r.distance][0] AS distance

WITH collect({current: current, next:next, distance: distance}) AS stops
UNWIND range(0, size(stops)-1) AS index
WITH stops[index] AS location, stops, index
RETURN location.current.id AS place,
reduce(acc=0.0,
       distance in [stop in stops[0..index] | stop.distance] |
       acc + distance) AS cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,369.0,Immingham
2,443.0,Doncaster
3,720.0,London


## Weighted Shortest Path

In [26]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, "distance")
YIELD nodeId, cost
RETURN algo.getNodeById(nodeId).id AS place, cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,59.0,Den Haag
2,86.0,Hoek van Holland
3,293.0,Felixstowe
4,315.0,Ipswich
5,347.0,Colchester
6,453.0,London


## All Pairs Shortest Path

In [27]:
result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
result.sort(["id"]).select("id", "distances").show(truncate=False)

+----------------+--------------------------------------------------------+
|id              |distances                                               |
+----------------+--------------------------------------------------------+
|Amsterdam       |[Immingham -> 1, Hoek van Holland -> 2, Colchester -> 4]|
|Colchester      |[Colchester -> 0, Immingham -> 3, Hoek van Holland -> 3]|
|Den Haag        |[Hoek van Holland -> 1, Immingham -> 2, Colchester -> 4]|
|Doncaster       |[Immingham -> 1, Colchester -> 2, Hoek van Holland -> 4]|
|Felixstowe      |[Hoek van Holland -> 1, Colchester -> 2, Immingham -> 4]|
|Gouda           |[Hoek van Holland -> 2, Immingham -> 3, Colchester -> 5]|
|Hoek van Holland|[Hoek van Holland -> 0, Immingham -> 3, Colchester -> 3]|
|Immingham       |[Immingham -> 0, Colchester -> 3, Hoek van Holland -> 3]|
|Ipswich         |[Colchester -> 1, Hoek van Holland -> 2, Immingham -> 4]|
|London          |[Colchester -> 1, Immingham -> 2, Hoek van Holland -> 4]|
|Rotterdam  

In [28]:
query = """
CALL algo.allShortestPaths.stream(null)
YIELD sourceNodeId, targetNodeId, distance
WHERE sourceNodeId < targetNodeId
RETURN algo.getNodeById(sourceNodeId).id AS source,
algo.getNodeById(targetNodeId).id AS target,
distance
ORDER BY distance DESC
LIMIT 10
"""

with driver.session() as session:
    rows = session.run(query)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,distance,source,target
0,5.0,London,Gouda
1,5.0,Utrecht,Ipswich
2,5.0,London,Rotterdam
3,5.0,Colchester,Gouda
4,5.0,Utrecht,Colchester
5,4.0,Amsterdam,Colchester
6,4.0,Immingham,Ipswich
7,4.0,Den Haag,Colchester
8,4.0,Doncaster,Felixstowe
9,4.0,Utrecht,Felixstowe


In [29]:
query = """
CALL algo.allShortestPaths.stream("distance")
YIELD sourceNodeId, targetNodeId, distance
WHERE sourceNodeId < targetNodeId
RETURN algo.getNodeById(sourceNodeId).id AS source,
algo.getNodeById(targetNodeId).id AS target,
distance
ORDER BY distance DESC
LIMIT 10
"""

with driver.session() as session:
    rows = session.run(query)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,distance,source,target
0,529.0,Doncaster,Hoek van Holland
1,528.0,Doncaster,Rotterdam
2,524.0,Doncaster,Gouda
3,511.0,Immingham,Felixstowe
4,502.0,Den Haag,Doncaster
5,489.0,Immingham,Ipswich
6,489.0,Utrecht,Doncaster
7,460.0,Utrecht,London
8,457.0,Immingham,Colchester
9,455.0,Immingham,Hoek van Holland


## Single Source Shortest Path

In [33]:
query = """
MATCH (n:Place {id:$place})
CALL algo.shortestPath.deltaStepping.stream(n, "distance", 1.0)
YIELD nodeId, distance
WHERE algo.isFinite(distance)
RETURN algo.getNodeById(nodeId).id AS destination, distance
ORDER BY distance
"""

params = {
    "place": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,destination,distance
0,London,0.0
1,Colchester,106.0
2,Ipswich,138.0
3,Felixstowe,160.0
4,Doncaster,277.0
5,Immingham,351.0
6,Hoek van Holland,367.0
7,Den Haag,394.0
8,Rotterdam,400.0
9,Gouda,425.0


## Minimum Spanning Tree

In [35]:
query = """
MATCH (n:Place {id:$place})
CALL algo.spanningTree.minimum("Place", "EROAD", "distance", id(n),
{write:true, writeProperty:"MINST"})
YIELD loadMillis, computeMillis, writeMillis, effectiveNodeCount
RETURN loadMillis, computeMillis, writeMillis, effectiveNodeCount
"""

params = {
    "place": "Amsterdam"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,computeMillis,effectiveNodeCount,loadMillis,writeMillis
0,8,12,32,15


In [46]:
query = """
MATCH path = (n:Place {id:$place})-[:MINST*]->(end)
WHERE not((end)-[:MINST]->())
WITH relationships(path) AS rels
UNWIND rels AS rel
RETURN startNode(rel).id AS source, endNode(rel).id AS destination, rel.distance AS cost
"""

params = {
    "place": "Amsterdam"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,destination,source
0,46.0,Utrecht,Amsterdam
1,35.0,Gouda,Utrecht
2,25.0,Rotterdam,Gouda
3,26.0,Den Haag,Rotterdam
4,27.0,Hoek van Holland,Den Haag
5,207.0,Felixstowe,Hoek van Holland
6,22.0,Ipswich,Felixstowe
7,32.0,Colchester,Ipswich
8,106.0,London,Colchester
9,277.0,Doncaster,London
