# Module 2: Pathfinding & Graph Search 



<img src="images/Pathfinding-Algo-Icon.png" alt="Pathfinding" width="120" style="float:right"/>

Graph search algorithms explore a graph either for general discovery or explicit search. These algorithms carve paths through the graph, but there is no expectation that those paths are computationally optimal.

Pathfinding algorithms build on top of graph search algorithms and explore routes between nodes, starting at one node and traversing through relationships until the destination has been reached.

In this notebook we'll learn how to use these algorithms in Spark and Neo4j. Before we get started let's import those libraries:

In [1]:
from pyspark.sql.types import *
from graphframes import *
from neo4j import GraphDatabase
from pyspark.sql import functions as F
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

import pandas as pd

## Connect to Spark and Neo4j

Let's create connections to Spark and Neo4j. The following code will create a SparkContext that we'll use to connect to Spark:

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

And let's create a connection to the database. We'll need to update the password to use the one that we chose when creating our database:

In [3]:
user = "neo4j"
password = "neo"
driver = GraphDatabase.driver("bolt://graph-algorithms-in-practice-neo4j", auth=(user, password))

## The Transport Graph

The transport graph is a subset of the data from the [International E-road Network](elbruz.org/e-roads).

### Importing the Data into Apache Spark

First let's import the data into Apache Spark. The function below creates a GraphFrame based on these CSV files:

In [7]:
def create_transport_graph():
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]
    nodes = spark.read.csv("../data/transport-nodes.csv", header=True,
                           schema = StructType(node_fields))
    
    rels = spark.read.csv("../data/transport-relationships.csv", header=True)
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
                     .withColumn("newDst", rels.src)
                     .drop("dst", "src")
                     .withColumnRenamed("newSrc", "src")
                     .withColumnRenamed("newDst", "dst")
                     .select("src", "dst", "relationship", "cost"))
    relationships = rels.union(reversed_rels)
    
    return GraphFrame(nodes, relationships)

And now let's call that function, and assign the GraphFrame to a variable:

In [8]:
g = create_transport_graph()

### Importing the Data into Neo4j

Now we'll import the data into Neo4j. 



In [9]:
with driver.session() as session:
    session.run("""
    WITH "file:///transport-nodes.csv" AS uri
    LOAD CSV WITH HEADERS FROM uri AS row
    MERGE (place:Place {id:row.id})
    SET place.latitude = toFloat(row.latitude),
        place.longitude = toFloat(row.latitude),
        place.population = toInteger(row.population)
    """)
    
    session.run("""
    WITH "file:///transport-relationships.csv" AS uri
    LOAD CSV WITH HEADERS FROM uri AS row
    MATCH (origin:Place {id: row.src})
    MATCH (destination:Place {id: row.dst})
    MERGE (origin)-[:EROAD {distance: toInteger(row.cost)}]->(destination)
    """)

## Breadth First Search

Spark’s implementation of the Breadth First Search algorithm finds the shortest path between two nodes by the number of relationships (i.e., hops) between them. You can explicitly name your target node or add criteria to be met.

The diagram below shows the order in which we would visit the nodes of our transport graph if we were performing a breadth first search that started from the Dutch city, Den Haag (in English, The Hague). The numbers next to the city name indicate the order in which each node is visited.

![](images/traversing_bfs.svg)

For example, we can use the bfs function to find the first medium-sized (by European standards) city that has a population of between 100,000 and 300,000 people.

Let’s first check which places have a population matching those criteria:

In [10]:
(g.vertices
 .filter("population > 100000 and population < 300000")
 .sort("population")
 .show())

+----------+--------+---------+----------+
|        id|latitude|longitude|population|
+----------+--------+---------+----------+
|Colchester|51.88921|  0.90421|    104390|
|   Ipswich|52.05917|  1.15545|    133384|
+----------+--------+---------+----------+



There are only two places matching our criteria, and we’d expect to reach Ipswich first based on a breadth first search.

The following code finds the shortest path from Den Haag to a medium-sized city:

In [12]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)
result.show(truncate=False)

+---------------------------------------+---------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------+--------------------------------+------------------------------------+
|from                                   |e0                                     |v1                                        |e1                                        |v2                                   |e2                              |to                                  |
+---------------------------------------+---------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------+--------------------------------+------------------------------------+
|[Den Haag, 52.078663, 4.288788, 514861]|[Den Haag, Hoek van Holland, EROAD, 27]|[Hoek van Holland, 51.9775, 4.13333, 9382]|[Hoek van Holland, Felixstowe, EROAD, 207]|[Feli

By default the result has fields for nodes and relationships. We're only interested in the nodes, so let's remove the relationship columns. All those columns start with 'e', so the following code will remove them:

In [13]:
columns = [F.col(column + ".id").alias(column) 
           for column in result.columns 
           if not column.startswith("e")]

result.select(columns).show()

+--------+----------------+----------+-------+
|    from|              v1|        v2|     to|
+--------+----------------+----------+-------+
|Den Haag|Hoek van Holland|Felixstowe|Ipswich|
+--------+----------------+----------+-------+



The image below shows the route that the breadth first search took to get from Den Haag to Ipswich:

![](images/traversing_bfs_highlighted.svg)

Colchester was the only other node that satisfied our criteria but, as we can see, it comes after Ipswich in the breadth first search.

## Shortest Path


The Shortest Path algorithm calculates the shortest (weighted) path between a pair of nodes. It’s useful for user interactions and dynamic workflows because it works in real time.

<img src="images/Edsger_Wybe_Dijkstra.jpg" alt="drawing" width="120" style="float:right"/>

Dijkstra’s Shortest Path algorithm operates by first finding the lowest-weight relationship from the start node to directly connected nodes. It keeps track of those weights and moves to the “closest” node. 

It then performs the same calculation, but now as a cumulative total from the start node. The algorithm continues to do this, evaluating a “wave” of cumulative weights and always choosing the lowest weighted cumulative path to advance along, until it reaches the destination node.

We're going to explore how to use this one using Neo4j. We'll start by computing unweighted shortest paths, which we can do by passing `null` as the 3rd parameter to the procedure:

In [14]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, null)
YIELD nodeId, cost
RETURN algo.getNodeById(nodeId).id AS place, cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,1.0,Immingham
2,2.0,Doncaster
3,3.0,London


The cost here is the cumulative number of relationships. If we want to find the cost in terms of distance, we can write some post processing code in Cypher:

In [15]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, null)
YIELD nodeId, cost

// collect all the nodes in the path
WITH collect(algo.getNodeById(nodeId)) AS path

// iterate over pairs of nodes, 
// find the EROAD relationship between them, 
// and extract the distance
UNWIND range(0, size(path)-1) AS index
WITH path[index] AS current, path[index+1] AS next
WITH current, next, [(current)-[r:EROAD]-(next) | r.distance][0] AS distance

// create a list of places and the distance of that part of the journey
WITH collect({current: current, next:next, distance: distance}) AS stops

// iterate over that list, computing the cumulative distance 
// at each stage of the journey
UNWIND range(0, size(stops)-1) AS index
WITH stops[index] AS location, stops, index
RETURN location.current.id AS place,
reduce(acc=0.0,
       distance in [stop in stops[0..index] | stop.distance] |
       acc + distance) AS cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,369.0,Immingham
2,443.0,Doncaster
3,720.0,London


The unweighted shortest path from Amsterdam to London, routing us through the fewest number of cities. It has a total cost of 720 km.

![](images/unweighted_shortest.svg)

Now let's have a look at how to find the shortest path while taking into account the distance of each road.

## Weighted Shortest Path

To do this we need to pass the name of the property that contains the relationship weight. 

In [None]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.stream(source, destination, "distance")
YIELD nodeId, cost
RETURN algo.getNodeById(nodeId).id AS place, cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,59.0,Den Haag
2,86.0,Hoek van Holland
3,293.0,Felixstowe
4,315.0,Ipswich
5,347.0,Colchester
6,453.0,London


We can compute weighted shortest with Spark as well, but we'll need to write our own function using the [aggregateMessages framework](http://graphframes.github.io/graphframes/docs/_site/user-guide.html#message-passing-via-aggregatemessages).

In [13]:
from pyspark.sql.types import *
from graphframes import *

from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F

add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

def shortest_path(g, origin, destination, column_name="cost"):
    if g.vertices.filter(g.vertices.id == destination).count() == 0:
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                .withColumn("path", F.array()))

    vertices = (g.vertices.withColumn("visited", F.lit(False))
                .withColumn("distance", F.when(g.vertices["id"] == origin, 0)
                            .otherwise(float("inf")))
                .withColumn("path", F.array()))
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id, F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                             sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                  (new_distances.aggMess["col1"] < g2.vertices.distance),
                                  new_distances.aggMess["col1"]) \
            .otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                              (new_distances.aggMess["col1"] < g2.vertices.distance),
                              new_distances.aggMess["col2"].cast("array<string>")) \
            .otherwise(g2.vertices.path)

        new_vertices = (g2.vertices.join(new_distances, on="id", how="left_outer")
                        .drop(new_distances["id"])
                        .withColumn("visited", new_visited_col)
                        .withColumn("newDistance", new_distance_col)
                        .withColumn("newPath", new_path_col)
                        .drop("aggMess", "distance", "path")
                        .withColumnRenamed('newDistance', 'distance')
                        .withColumnRenamed('newPath', 'path'))
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)
        if g2.vertices.filter(g2.vertices.id == destination).first().visited:
            return (g2.vertices.filter(g2.vertices.id == destination)
                    .withColumn("newPath", add_path_udf("path", "id"))
                    .drop("visited", "path")
                    .withColumnRenamed("newPath", "path"))
    return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
            .withColumn("path", F.array()))

In [14]:
result = shortest_path(g, "Amsterdam", "Colchester", "cost")
result.select("id", "distance", "path").show(truncate=False)

+----------+--------+------------------------------------------------------------------------+
|id        |distance|path                                                                    |
+----------+--------+------------------------------------------------------------------------+
|Colchester|347.0   |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester]|
+----------+--------+------------------------------------------------------------------------+



## A*

The A* Shortest Path algorithm improves on Dijkstra’s by finding shortest paths more quickly. It does this by allowing the inclusion of extra information that the algorithm can use, as part of a heuristic function, when determining which paths to explore next.

In Neo4j’s implementation, geospatial distance is used as the heuristic. In our example transportation dataset we use the latitude and longitude of each location as part of the heuristic function.

In [15]:
query = """
MATCH (source:Place {id: $source}),
      (destination:Place {id: $destination})
CALL algo.shortestPath.astar.stream(source, destination, "distance", "latitude", "longitude")
YIELD nodeId, cost
RETURN algo.getNodeById(nodeId).id AS place, cost
"""

params = {
    "source": "Amsterdam",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,place
0,0.0,Amsterdam
1,59.0,Den Haag
2,86.0,Hoek van Holland
3,293.0,Felixstowe
4,315.0,Ipswich
5,347.0,Colchester
6,453.0,London


We get the same result using the shortest path algorithm, but on more complex datasets, the A* algorithm will be faster as it evaluates fewer paths.

## Yen's k-Shortest Paths

Yen’s k-Shortest Paths algorithm is similar to the Shortest Path algorithm, but rather than finding just the shortest path between two pairs of nodes, it also calculates the second shortest path, third shortest path, and so on up to k-1 deviations of shortest paths.

In [None]:
query = """
MATCH (start:Place {id:$source}),
      (end:Place {id:$destination})
CALL algo.kShortestPaths.stream(start, end, 5, "distance")
YIELD index, nodeIds, path, costs
RETURN index,
       [node in algo.getNodesById(nodeIds[1..-1]) | node.id] AS via,
       reduce(acc=0.0, cost in costs | acc + cost) AS totalCost
"""

params = {
    "source": "Gouda",
    "destination": "London"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])
pd.set_option('max_colwidth', 100)
display(df)

## All Pairs Shortest Path

The All Pairs Shortest Path (APSP) algorithm calculates the shortest (weighted) path between all pairs of nodes. It’s more efficient than running the Single Source Shortest Path algorithm for every pair of nodes in the graph.

Spark’s `shortestPaths` function is designed for finding the shortest paths from all nodes to a set of nodes called landmarks. If we wanted to find the shortest path from every location to Colchester, Immingham, and Hoek van Holland, we would write the following query:

In [None]:
result = g.shortestPaths(["Colchester", "Immingham", "Hoek van Holland"])
result.sort(["id"]).select("id", "distances").show(truncate=False)

We can also run this algorithm using Neo4j. The first parameter to this procedure is the property to use to work out the shorest weighted path. If we set this to `null` then the algorithm will calculate the unweighted
shortest paths between all pairs of nodes.

In [None]:
query = """
CALL algo.allShortestPaths.stream(null)
YIELD sourceNodeId, targetNodeId, distance
WHERE sourceNodeId < targetNodeId
RETURN algo.getNodeById(sourceNodeId).id AS source,
       algo.getNodeById(targetNodeId).id AS target,
       distance
ORDER BY distance DESC
LIMIT 10
"""

with driver.session() as session:
    rows = session.run(query)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

The output shows the 10 pairs of locations that have the most hops between them. If we want to calculate shortest weighted paths, we can pass in the property name that contains the `cost` to be used in the calculation. The following query does this:

In [None]:
query = """
CALL algo.allShortestPaths.stream("distance")
YIELD sourceNodeId, targetNodeId, distance
WHERE sourceNodeId < targetNodeId
RETURN algo.getNodeById(sourceNodeId).id AS source,
algo.getNodeById(targetNodeId).id AS target,
distance
ORDER BY distance DESC
LIMIT 10
"""

with driver.session() as session:
    rows = session.run(query)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Now we're seeing the 10 pairs of locations furthest from each other in terms of the total distance between them. 

## Single Source Shortest Path

The Single Source Shortest Path (SSSP) algorithm, which came into prominence at around the same time as Dijkstra’s Shortest Path algorithm, acts as an implementation for both problems.

Neo4j implements a variation of SSSP, called the Delta-Stepping algorithm that divides Dijkstra’s algorithm into a number of phases that can be executed in parallel. The following query executes the Delta-Stepping algorithm:

In [6]:
query = """
MATCH (n:Place {id:$place})
CALL algo.shortestPath.deltaStepping.stream(n, "distance", 1.0)
YIELD nodeId, distance
WHERE algo.isFinite(distance)
RETURN algo.getNodeById(nodeId).id AS destination, distance
ORDER BY distance
"""

params = {
    "place": "Utrecht"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,destination,distance
0,Utrecht,0.0
1,Gouda,35.0
2,Amsterdam,46.0
3,Rotterdam,60.0
4,Den Haag,67.0
5,Hoek van Holland,93.0
6,Felixstowe,300.0
7,Ipswich,322.0
8,Colchester,354.0
9,Immingham,415.0


## Minimum Spanning Tree

The Minimum (Weight) Spanning Tree algorithm starts from a given node and finds all its reachable nodes and the set of relationships that connect the nodes together with the minimum possible weight. It traverses to the next unvisited node with the lowest weight from any visited node, avoiding cycles.

In [4]:
query = """
MATCH (n:Place {id:$place})
CALL algo.spanningTree.minimum("Place", "EROAD", "distance", id(n),
{write:true, writeProperty:"MINST"})
YIELD loadMillis, computeMillis, writeMillis, effectiveNodeCount
RETURN loadMillis, computeMillis, writeMillis, effectiveNodeCount
"""

params = {
    "place": "Amsterdam"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,computeMillis,effectiveNodeCount,loadMillis,writeMillis
0,2,12,28,116


In [5]:
query = """
MATCH path = (n:Place {id:$place})-[:MINST*]->(end)
WHERE not((end)-[:MINST]->())
WITH relationships(path) AS rels
UNWIND rels AS rel
RETURN startNode(rel).id AS source, endNode(rel).id AS destination, rel.distance AS cost
"""

params = {
    "place": "Amsterdam"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

Unnamed: 0,cost,destination,source
0,46.0,Utrecht,Amsterdam
1,35.0,Gouda,Utrecht
2,25.0,Rotterdam,Gouda
3,26.0,Den Haag,Rotterdam
4,27.0,Hoek van Holland,Den Haag
5,207.0,Felixstowe,Hoek van Holland
6,22.0,Ipswich,Felixstowe
7,32.0,Colchester,Ipswich
8,106.0,London,Colchester
9,277.0,Doncaster,London


If we were in Amsterdam and wanted to visit every other place in our dataset during the same trip, this would be  the shortest continuous route to do so.

## Random Walk

The Random Walk algorithm provides a set of nodes on a random path in a graph.

In [None]:
query = """
MATCH (source:Place {id: $place})
CALL algo.randomWalk.stream(id(source), 5, 1)
YIELD nodeIds
UNWIND algo.getNodesById(nodeIds) AS place
RETURN place.id AS place
"""

params = {
    "place": "Den Haag"
}

with driver.session() as session:
    rows = session.run(query, params)
    df = pd.DataFrame([dict(record) for record in rows])

display(df)

At each stage of the random walk the next relationship is chosen randomly. This means that if we rerun the algorithm, even with the same parameters, we likely won’t get the same result.