In [None]:
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "4g"))
sc = SparkContext.getOrCreate(conf)

In [None]:
def parse_edge(s):
  user, follower = s.split("\t")
  return (int(user), int(follower))

def step(item):
  prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
  return (next_v, prev_d + 1)

def complete(item):
  v, old_d, new_d = item[0], item[1][0], item[1][1]
  return (v, old_d if old_d is not None else new_d)

n = 4  # number of partitions
edges = sc.textFile("/data/twitter/twitter_sample_small.txt").map(parse_edge).cache()
forward_edges = edges.map(lambda e: (e[1], e[0])).partitionBy(n).persist()

x = 12
d = 0
distances = sc.parallelize([(x, d)]).partitionBy(n)
while True:
  candidates = distances.join(forward_edges, n).map(step)
  new_distances = distances.fullOuterJoin(candidates, n).map(complete, True).persist()
  count = new_distances.filter(lambda i: i[1] == d + 1).count()
  if count > 0:
    d += 1
    distances = new_distances
  else:
    break

In [None]:
distances.take(10)

In [None]:
def step_2(item):
    prev_v, prev_d, prev_path, next_v = item[0], item[1][0][0], item[1][0][1], item[1][1]
    return (next_v, (prev_d + 1, prev_path + (prev_v,)))

def complete_2(item):
    v, old_tuple, new_tuple = item[0], item[1][0], item[1][1]
    if old_tuple is not None:
        return (v, old_tuple)
    else:
        return (v, new_tuple)

def shortest_path(from_v, to_v):
    d = 0
    distances_with_path = sc.parallelize([(to_v, (0, ()))]).partitionBy(n)
    while True:
        candidates = distances_with_path.join(forward_edges, n).map(step_2)
        count = candidates.filter(lambda i: i[0] == from_v).count()
        if count > 0:
            break
        new_distances = distances_with_path.fullOuterJoin(candidates, n).map(complete_2, True).persist()
        count = new_distances.filter(lambda i: i[1][0] == d + 1).count()
        if count > 0:
            d += 1
            distances_with_path = new_distances
        else:
            break
    path = candidates.filter(lambda i: i[0] == from_v).collect()
    if path is None:
        return None
    full_path = (path[0][1][1] + (from_v,))
    return full_path

In [None]:
shortest_path(34, 12)