# Recommend friends using pyspark

In [1]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()

### get rdd of friend list lines

In [2]:
lines = sc.textFile("data/friend_list")
lines.take(10)

['0 1,2,3',
 '1 0,2,4,5,6',
 '2 0,1,4,6,7,8',
 '3 0,4,7',
 '4 1,2,3,5,8',
 '5 1,4',
 '6 1,2',
 '7 2,3',
 '8 2,4']

### map: generate pairs that are connected by one

In [3]:
def generate_pairs(row):
    """
    each of pair of my friends are connected via
    me.
    for given line, set all friend pairs to zero
    to ignore them.
    """
    result = []
    first, seconds = row.split(" ")
    seconds = seconds.split(",")
    
    for sec in seconds:  # mark all friends as zero
        # so that it's easy to ignore them later on
        result.append((''.join(sorted([first, sec])), 0))
    
    for sec1 in seconds:
        for sec2 in seconds:
            if sec1 != sec2:  # first and second have first
                # as a mutual friend (at least). They might
                # as well be friends, but we'll ignore them
                result.append((''.join(sorted([sec1, sec2])), 1))
    
    return result

In [4]:
mutual_pairs = lines.flatMap(generate_pairs)
mutual_pairs.take(10)

[('01', 0),
 ('02', 0),
 ('03', 0),
 ('12', 1),
 ('13', 1),
 ('12', 1),
 ('23', 1),
 ('13', 1),
 ('23', 1),
 ('01', 0)]

### shuffle

In [5]:
mutual_pairs = mutual_pairs.groupByKey()
mutual_pairs.take(10)

[('03', <pyspark.resultiterable.ResultIterable at 0x28c02457630>),
 ('12', <pyspark.resultiterable.ResultIterable at 0x28c02457fd0>),
 ('14', <pyspark.resultiterable.ResultIterable at 0x28c03b7d4e0>),
 ('16', <pyspark.resultiterable.ResultIterable at 0x28c03ba4a20>),
 ('05', <pyspark.resultiterable.ResultIterable at 0x28c03b7da58>),
 ('06', <pyspark.resultiterable.ResultIterable at 0x28c03b7d2e8>),
 ('24', <pyspark.resultiterable.ResultIterable at 0x28c03b7d438>),
 ('26', <pyspark.resultiterable.ResultIterable at 0x28c03b7d860>),
 ('45', <pyspark.resultiterable.ResultIterable at 0x28c03b7d390>),
 ('56', <pyspark.resultiterable.ResultIterable at 0x28c03b7d3c8>)]

### reducer: count num of mutual friends

In [6]:
def sum_mutual_friends(values):
    """
    count num of 1s. If there are
    any 0s, ignore the candidates
    """
    return 0 if 0 in values else sum(values) // 2

In [7]:
# Try sortByKey(_[1], False) in spark below java 11
mutual_pairs.mapValues(sum_mutual_friends).filter(lambda x: x[1] > 0).take(10)

[('05', 1),
 ('06', 2),
 ('56', 1),
 ('07', 2),
 ('17', 1),
 ('68', 1),
 ('13', 2),
 ('23', 3),
 ('04', 3),
 ('25', 2)]