# Breadth-first search in Spark SQL

In the first course of the specialization you've already implemented BFS (Breadth-first search) using the RDD API. In this assignment you will implement the same algorithm using the Dataframe API.

The point of this assignment is to see in practice how fast Spark SQL is and why is this the default API in Spark now.

Your goal is to compute the length of the shortest path between two vertices. But now your implementation will be tested against the dataset of the greater size. Notice, that the answer will change because the graph is more dense now.

It is instructive to remember the implementation of the algorithm in Spark Core:

    def parse_edge(s):
      user, follower = s.split("\t")
      return (int(user), int(follower))
    
    def step(item):
      prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
      return (next_v, prev_d + 1)
    
    def complete(item):
      v, old_d, new_d = item[0], item[1][0], item[1][1]
      return (v, old_d if old_d is not None else new_d)
    
    n = 400  # number of partitions
    edges = sc.textFile("/data/twitter/twitter_sample.txt").map(parse_edge).cache()
    forward_edges = edges.map(lambda e: (e[1], e[0])).partitionBy(n).persist()
    
    x = 12
    d = 0
    distances = sc.parallelize([(x, d)]).partitionBy(n)
    while True:
      candidates = distances.join(forward_edges, n).map(step)
      new_distances = distances.fullOuterJoin(candidates, n).map(complete, True).persist()
      count = new_distances.filter(lambda i: i[1] == d + 1).count()
      if count > 0:
        d += 1
        distances = new_distances
        print("d = ", d, "count = ", count)
      else:
        break

Your goal is to implement the same algorithm, using Spark SQL. Keep in mind that you should avoid using UDFs, if you are stuck, take a look at pyspark.sql.functions module. You will definitely need it.

Your task is to find the shortest path between vertices 12 and 34. In case of multiple shortest paths, the first one will suffice. Output format is a single number which is the length of the shortest path.

    12

The result on the sample dataset:

    8

In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/opt/conda/bin/python'
os.environ['PYTHONHASHSEED'] = '42'

In [2]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.enableHiveSupport().master("local[2]").getOrCreate()

from pyspark.sql.types import StructType, StructField, IntegerType
import pyspark.sql.functions as f

In [3]:
graph_schema = StructType([
    StructField("to", IntegerType(), False),
    StructField("from", IntegerType(), False)
])
dist_schema = StructType([
    StructField("vertex", IntegerType(), False),
    StructField("distance", IntegerType(), False)
])

In [4]:
dataset_path = '/data/twitter/twitter_sample2.txt'
edges = sparkSession.read.csv(dataset_path, sep="\t", schema=graph_schema)
edges.createOrReplaceTempView("edges")

In [5]:
FROM = 12
TO = 34

distances = sparkSession\
    .createDataFrame([{ "vertex": FROM, "distance": 0}], dist_schema)
distances.createOrReplaceTempView("distances")
# distances.toPandas()

## Make the first step manually

In [6]:
sql_candidates = """SELECT
      edges.to AS vertex,
      distances.distance + 1 AS distance
    FROM distances LEFT JOIN edges
    ON distances.vertex = edges.from
    """
candidates = sparkSession.sql(sql_candidates.replace("\n", " "))
candidates.createOrReplaceTempView("candidates")

In [7]:
# candidates.toPandas()

In [8]:
sql_new_distances = """SELECT
      IF (di.vertex IS NOT NULL, di.vertex, ca.vertex) vertex,
      IF (IF (di.distance IS NOT NULL, di.distance, 9999)
           < IF (ca.distance IS NOT NULL, ca.distance, 9999),
        di.distance,
        ca.distance
      ) AS distance
    FROM distances AS di FULL OUTER JOIN candidates AS ca
    ON di.vertex = ca.vertex
    """  
new_distances = sparkSession.sql(sql_new_distances.replace("\n", " "))
new_distances.createOrReplaceTempView("new_distances")

In [9]:
# new_distances.toPandas()

In [10]:
sql_count = """SELECT COUNT(*) FROM new_distances WHERE distance=%s"""
count = sparkSession.sql(sql_count.replace("\n", " ") % 1)
new_count = count.collect()[0][0]
print(new_count)

4


In [11]:
def get_distance(vertex):
    sql = "SELECT distance FROM distances WHERE vertex=%s" % vertex
    dist = sparkSession.sql(sql).collect()
    if not len(dist):
        return None
    return dist[0][0]

In [12]:
print(get_distance(FROM))
print(get_distance(TO))

0
None


## Calculation

In [13]:
distance = 0
while True:
    distance += 1
    candidates = sparkSession.sql(sql_candidates.replace("\n", " "))
    candidates.createOrReplaceTempView("candidates")
    new_distances = sparkSession.sql(sql_new_distances.replace("\n", " "))
    new_distances.createOrReplaceTempView("new_distances")
    count = sparkSession.sql(sql_count.replace("\n", " ") % distance)
    new_count = count.collect()[0][0]
    print("dist, cnt:", distance, new_count)
    if not count:
        break
    new_distances.createOrReplaceTempView("distances")
    new_distances.persist()
    d = get_distance(TO)
    if d is not None:
        print(d)
        break

dist, cnt: 1 4
dist, cnt: 2 4
dist, cnt: 3 11
dist, cnt: 4 19
dist, cnt: 5 31
dist, cnt: 6 39
dist, cnt: 7 33


Py4JJavaError: An error occurred while calling o99.persist.
: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:215)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:125)
	at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:124)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:123)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83)
	at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
	at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
	at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:246)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38)
	at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
	at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
	at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:313)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:354)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
	at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:112)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:111)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:235)
	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:368)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryRelation.scala:90)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryRelation.scala:85)
	at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryRelation.scala:41)
	at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:100)
	at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:68)
	at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:92)
	at org.apache.spark.sql.Dataset.persist(Dataset.scala:2524)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResultInForkJoinSafely(ThreadUtils.scala:212)
	... 99 more
