In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local"))

In [2]:
def step(item):
    prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
    return (next_v, (prev_v, prev_d + 1))

def complete(item):
    vertex = item[0]
    prev_d = item[1][0]
    if prev_d is not None:
        return (vertex, (None, prev_d))
    else:
        prev_vertex =  item[1][1][0]
        new_d = item[1][1][1]
        return (vertex, (prev_vertex, new_d))
    
def strip_vertex_from(item):
    to_vertex = item[0]
    from_vertex = item[1][0]
    distance = item[1][1]
    return (to_vertex, distance)

In [3]:
def parse_edge(s):
    user, follower = s.split("\t")
    return (int(user), int(follower))

edges = sc.textFile("/data/twitter/twitter_sample.txt").map(parse_edge).cache()

In [4]:
n = 1 # number of partitions

x = 12 # source vertex
y = 34 # destination vertex
d = 0

forward_edges = edges.map(lambda e: (e[1], e[0])).partitionBy(n).persist()
distances = sc.parallelize([(x, 0)]).partitionBy(n)

In [5]:
path_to = {}
destination_found = False

while not destination_found:
    # produces: [(126, (12, 1)), (380, (12, 1)), (422, (12, 1)), (648, (12, 1))]
    candidates = distances.join(forward_edges, n).map(step)

    # produces: [(648, (12, 1)), (380, (12, 1)), (12, (None, 0)), (126, (12, 1)), (422, (12, 1))]
    new_distances = distances.fullOuterJoin(candidates, n).map(complete, True).persist()
    
    print('Before filtering and saving paths')

    filtered_new_distances = new_distances.filter(lambda i: i[1][1] == d + 1)
    for entry in filtered_new_distances.collect():
        to_vertex = entry[0]
        from_vertex = entry[1][0]
        if from_vertex:
            path_to[to_vertex] = from_vertex
        if to_vertex == y:
            destination_found = True
            break
            
    print('One loop complete %s' % path_to)

    d += 1
    # produces: [(422, 1), (648, 1), (12, 0), (690, 2), (53, 2), (380, 1), (126, 1), (31, 2)]
    distances = new_distances.map(strip_vertex_from).distinct()
    print('distances size %s' % distances.count())

path_to

Before filtering and saving paths
One loop complete {648: 12, 380: 12, 422: 12, 126: 12}
distances size 5
Before filtering and saving paths
One loop complete {422: 12, 648: 12, 690: 380, 53: 422, 380: 12, 126: 12, 31: 380}
distances size 8
Before filtering and saving paths
One loop complete {292: 53, 422: 12, 648: 12, 652: 53, 13: 31, 15: 31, 690: 380, 52: 53, 53: 422, 150: 53, 87: 31, 57: 53, 187: 53, 380: 12, 126: 12, 31: 380}
distances size 17
Before filtering and saving paths
One loop complete {150: 53, 648: 12, 652: 53, 13: 31, 14: 292, 15: 31, 662: 87, 24: 15, 541: 13, 31: 380, 292: 53, 293: 187, 422: 12, 681: 87, 690: 380, 52: 53, 53: 422, 57: 53, 187: 53, 64: 292, 66: 292, 335: 87, 341: 15, 87: 31, 89: 150, 615: 87, 107: 52, 246: 87, 633: 150, 380: 12, 126: 12}
distances size 32
Before filtering and saving paths
One loop complete {448: 24, 235: 662, 150: 53, 648: 12, 652: 53, 13: 31, 14: 292, 15: 31, 16: 64, 18: 681, 20: 107, 21: 89, 662: 87, 535: 541, 24: 15, 409: 246, 541: 13

{13: 31,
 14: 292,
 15: 31,
 16: 64,
 17: 38,
 18: 681,
 20: 107,
 21: 89,
 22: 18,
 23: 20,
 24: 15,
 31: 380,
 34: 274,
 38: 16,
 47: 246,
 52: 53,
 53: 422,
 57: 53,
 64: 292,
 66: 292,
 69: 14,
 87: 31,
 89: 150,
 94: 299,
 107: 52,
 126: 12,
 136: 38,
 150: 53,
 187: 53,
 209: 409,
 235: 662,
 236: 47,
 246: 87,
 257: 16,
 259: 236,
 265: 259,
 274: 23,
 291: 14,
 292: 53,
 293: 187,
 295: 20,
 299: 662,
 314: 21,
 317: 14,
 335: 87,
 339: 299,
 341: 15,
 380: 12,
 409: 246,
 414: 450,
 418: 291,
 422: 12,
 448: 24,
 450: 21,
 535: 541,
 541: 13,
 582: 14,
 586: 107,
 607: 295,
 615: 87,
 633: 150,
 648: 12,
 652: 53,
 662: 87,
 666: 299,
 681: 87,
 690: 380,
 722: 586}

In [6]:
path = []
vertex = y

while vertex in path_to:
    path.insert(0, vertex)
    vertex = path_to[vertex]
    
path.insert(0, x)

print(','.join(str(el) for el in path))

12,422,53,52,107,20,23,274,34
