## Breadth First Search

In [1]:
import findspark
findspark.init()

from pyspark.sql.types import *
from graphframes import *
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("GraphAlgorithms").getOrCreate()

In [None]:
def create_transport_graph():
    node_fields = [
        StructField("id",StringType(), True),
        StructField("latitude",StringType(), True),
        StructField("longitude",StringType(), True),
        StructField("population",StringType(), True),
    ]
    
    nodes = spark.read.csv("transport-nodes.csv", header=True, schema=StructType(node_fields))
    rels = spark.read.csv("transport-relationships.csv", header=True)
    
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
                     .withColumn("newDst", rels.src)
                     .drop("dst", "src")
                     .withColumnRenamed("newSrc", "src")
                     .withColumnRenamed("newDst", "dst")
                     .select("src", "dst", "relationship", "cost"))
    relationships = rels.union(reversed_rels)
    return GraphFrame(nodes, relationships)

In [None]:
g = create_transport_graph()

In [None]:
(g.vertices
.filter("population > 100000 and population < 300000")
.sort("population")
.show())

In [None]:
from_expr = "id='Den Haag'"
to_expr = "population > 100000 and population < 300000 and id <> 'Den Haag'"
result = g.bfs(from_expr, to_expr)

In [None]:
columns = [column for column in result.columns if not column.startswith("e")]
result.select(columns).show()

## Depth First Search