In [1]:
from pyspark import SparkContext, SparkConf
sc = SparkContext(conf= SparkConf().setAppName("App").setMaster('local'))

# line input (from /data/twitter/twitter_sample_small.txt):
#    v1<tab>v2
# output:
#    (v1, v2)

def parse_edge(s):
    user, follower = s.split("\t")
    return (int(user), int(follower))

# input (result of join of distances to edges)
#    distances: (v1, (d1, p1))
#    edges: (v1, v2)
#    result: (v1, ((d1, p1), v2))
# output
#    (v2, (d1 + 1, p1 + v1))

def step(item):
    prev_v, prev_d, next_v,path = item[0], item[1][0][0], item[1][1], item[1][0][1]
    new_path = list(path)
    new_path.append(prev_v)
    return (next_v, (prev_d + 1, tuple(new_path)))


# input (result of full outer join of distances and candidates (result of step function))
#    distances: (v, (d1, p1))
#    candidates: (v, (d2, p2))
#    result: (v, ((d1, p1),(d2, p2) ))
# output (v, (d, p))

def complete(item):
    v = item[0]
    old_d = item[1][0][0] if item[1][0] is not None else None
    new_d = None
    path = ()
    if item[1][1] is not None:
        new_d = item[1][1][0]
        path = item[1][1][1]
    return (v, (old_d if old_d is not None else new_d, path))


n = 200  # number of partitions
edges = sc.textFile("/data/twitter/twitter_sample_small.txt") \
        .map(parse_edge) \
        .cache()
forward_edges = edges.map(lambda e: (e[1], e[0])) \
        .partitionBy(n) \
        .persist()

start = 12
end = 34
d = 0
path = ()
path_result = "Path not found"


distances = sc.parallelize([ (start, (d, path)) ]) \
        .partitionBy(n)

while True:
    candidates = distances.join(forward_edges, n) \
              .map(step) \
              .reduceByKey(lambda v1, v2: v1 if v1[0] < v2[0] else v2)
            
    found = distances.filter(lambda c: c[0] == end)   
    if (found.count() > 0):
        p = list(found.take(1)[0][1][1]);
        p.append(end)
        path_result = '%s' % ','.join(map(str, p))
        break;
        
    new_distances = distances.fullOuterJoin(candidates, n) \
            .map(complete, True) \
            .persist()

    count = new_distances.filter(lambda i: i[1][0] == d + 1).count()
    if count > 0:
        d += 1
        distances = new_distances
    else:
        break

print path_result


12, 422, 53, 52, 107, 20, 23, 274, 34
