In [1]:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName("MyApp").setMaster("local[8]"))

In [2]:
raw_data = sc.textFile('/data/twitter/twitter_sample.txt')

Top 5 most followers

In [3]:
# (a, b) = 
#      a follows b
#      b followed by a
def parse_edge(s):
    user, follower = s.split('\t')
    return (int(user), int(follower))

In [4]:
edges = raw_data.map(parse_edge).cache()
edges.take(2)

[(12, 18), (12, 41)]

In [5]:
import operator

# follower_counts = edges.mapValues(lambda v: 1).reduceByKey(operator.add)
follower_counts = edges.aggregateByKey(0, lambda a,x: a+1, operator.iadd)

follower_counts.top(5, operator.itemgetter(1))

[(20, 121425), (13, 102854), (12, 100113), (586, 92777), (291, 92026)]

Shortest path

In [6]:
def parse_edge(s):
    user, follower = s.split('\t')
    return (int(user), int(follower))


def step(item): 
    prev_vertex, prev_distance, next_vertex = item[0], item[1][0], item[1][1] 
    return (next_vertex, prev_distance + 1)


def complete(item): 
    vertex, old_distance, new_distance = item[0], item[1][0], item[1][1]
    return (vertex, old_distance if old_distance is not None else new_distance)


def update(item):
    old_distance, new_distance = item[1][0], item[1][1]
    return (new_distance, old_distance + (new_distance, ))

In [7]:
v_from=12
v_to=34
dataset_path='/data/twitter/twitter_sample.txt'
numPartitions=400


# Parse data
edges = sc.textFile(dataset_path).map(parse_edge).cache()

# forward_edges = (follower, id)
forward_edges = edges \
    .map(lambda e: (e[1], e[0])) \
    .partitionBy(numPartitions).persist()

# iterator distance
d = 0

# distances = (id, d)
distances = sc.parallelize([(v_from, d)]).partitionBy(numPartitions)

# path = (id, (v1, v2, ...))
paths = sc.parallelize([(v_from, (v_from, ))]).partitionBy(numPartitions)


while True:

    # All candidates at loop d = (id, distance)
    candidates = distances \
        .join(forward_edges, numPartitions) \
        .map(step)

    # All distances discovered = (id, distance)
    all_distances = distances \
        .fullOuterJoin(candidates, numPartitions) \
        .map(complete, True).persist()

    # Number of new vertices discovered
    count = all_distances \
        .filter(lambda i: i[1] == d + 1).count()

    # Check if v_to reached
    found = all_distances \
        .filter(lambda i: i[0] == v_to).count()

    # update paths
    paths = paths \
        .join(forward_edges, numPartitions) \
        .map(update) \
        .union(paths).persist()
    
    # debug
    if d <= 2:
        print '\n\n\nd =', d
        print 'candidates = '
        for cand in candidates.collect():
            print '\t', cand
        print 'distances = '
        for dist in distances.collect():
            print '\t', dist
        print 'paths = '
        for path in  paths.collect():
            print '\t', path
        print 'all distances = '
        for ad in all_distances.collect():
            print '\t', ad
        print 'count = ', count

    # Stop conditions: no new vertices discovered or found v_to
    if count == 0 or found > 0:
        break
    else:
        d += 1
        distances = all_distances.distinct()

results = paths.filter(lambda x: x[1][0] == v_from and x[1][-1] == v_to).collect()




d = 0
candidates = 
	(126, 1)
	(380, 1)
	(422, 1)
	(648, 1)
distances = 
	(12, 0)
paths = 
	(126, (12, 126))
	(380, (12, 380))
	(422, (12, 422))
	(648, (12, 648))
	(12, (12,))
all distances = 
	(12, 0)
	(422, 1)
	(126, 1)
	(648, 1)
	(380, 1)
count =  4



d = 1
candidates = 
	(126, 1)
	(380, 1)
	(422, 1)
	(648, 1)
	(53, 2)
	(12, 2)
	(31, 2)
	(690, 2)
distances = 
	(12, 0)
	(422, 1)
	(380, 1)
	(648, 1)
	(126, 1)
paths = 
	(126, (12, 126))
	(380, (12, 380))
	(422, (12, 422))
	(648, (12, 648))
	(53, (12, 422, 53))
	(12, (12, 648, 12))
	(31, (12, 380, 31))
	(690, (12, 380, 690))
	(126, (12, 126))
	(380, (12, 380))
	(422, (12, 422))
	(648, (12, 648))
	(12, (12,))
all distances = 
	(12, 0)
	(422, 1)
	(31, 2)
	(53, 2)
	(126, 1)
	(648, 1)
	(690, 2)
	(380, 1)
count =  3



d = 2
candidates = 
	(126, 1)
	(380, 1)
	(422, 1)
	(648, 1)
	(53, 2)
	(13, 3)
	(15, 3)
	(87, 3)
	(380, 3)
	(31, 3)
	(52, 3)
	(57, 3)
	(150, 3)
	(187, 3)
	(292, 3)
	(652, 3)
	(12, 2)
	(31, 2)
	(690, 2)
distances = 
	(12, 0)

In [8]:
result = sorted(results, key=lambda (x,y): len(y))[0][1]
result = ','.join(str(i) for i in result)
print result

12,422,53,52,107,20,23,274,34
