In [1]:
#! /usr/bin/env python

from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[2]"))

import re

In [2]:
n = 150  # number of partitions
start, end = 12, 34

def parse_edge(s):
    """Parse raw data 'user\tfollwer into a tuple'"""
    user, follower = s.split("\t")
    return (int(user), int(follower))

def step(item):
    # Add one move along the graph
    prev_v, prev_d, next_v = item[0], item[1][0], item[1][1]
    return (next_v, prev_d + [next_v])

edges = sc.textFile("/data/twitter/twitter_sample_small.txt").map(parse_edge).cache()

In [None]:
# We want to find a path from 'start' to 'end' thus require forward edges
forward_edges = edges.map(lambda e: (e[1], e[0])).partitionBy(n).persist()

# Create a dataset composed of a tuple (current node, path).
# We will fill up the array with possible path
paths = sc.parallelize([(start, [start])]).partitionBy(n)

def found():
    return  paths.filter(lambda x: x[0] == end).count()

while not found():
    paths = paths.join(forward_edges, n).map(step)
    
paths.cache()

final_paths = (paths
               .filter(lambda value: value[0] == end)
               .map(lambda value: value[1])
              ).cache()

result = final_paths.take(1)[0]

print(','.join(map(str,result)))