In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
arr1 = [1, 2, 3]
arr2 = [4, 5, 6, 1, 2]

art1 = [('a', 1), ('b', 2), ('c', 3)]
art2 = [('a', 10), ('f', 6), ('e', 5), ('d', 4)]

In [3]:
rdd1 = sc.parallelize(arr1)
rdd2 = sc.parallelize(arr2)

print(rdd2.distinct().collect())
print(rdd2.filter(lambda x: x % 2 == 0).collect())
print(rdd2.map(lambda x: [x, x * 10]).collect())
print(rdd2.flatMap(lambda x: [x, x * 10]).collect())

[1, 2, 4, 5, 6]
[4, 6, 2]
[[4, 40], [5, 50], [6, 60], [1, 10], [2, 20]]
[4, 40, 5, 50, 6, 60, 1, 10, 2, 20]


In [4]:
print(rdd1.union(rdd2).collect())
print(rdd1.intersection(rdd2).collect())
print(rdd1.cartesian(rdd2).collect())

[1, 2, 3, 4, 5, 6, 1, 2]
[1, 2]
[(1, 4), (1, 5), (1, 6), (1, 1), (1, 2), (2, 4), (2, 5), (2, 6), (2, 1), (2, 2), (3, 4), (3, 5), (3, 6), (3, 1), (3, 2)]


In [5]:
rdd1 = sc.parallelize(art1)
rdd2 = sc.parallelize(art2)

print(rdd1.join(rdd2).sortByKey().collect())
print(rdd1.leftOuterJoin(rdd2).sortByKey().collect())
print(rdd1.rightOuterJoin(rdd2).sortByKey().collect())
print(rdd1.fullOuterJoin(rdd2).sortByKey().collect())

[('a', (1, 10))]
[('a', (1, 10)), ('b', (2, None)), ('c', (3, None))]
[('a', (1, 10)), ('d', (None, 4)), ('e', (None, 5)), ('f', (None, 6))]
[('a', (1, 10)), ('b', (2, None)), ('c', (3, None)), ('d', (None, 4)), ('e', (None, 5)), ('f', (None, 6))]


In [6]:
x = rdd1.reduce(lambda a, b: ('', a[1] + b[1]))
print(x[1])

6


In [7]:
bigRdd1 = rdd1.fullOuterJoin(rdd2).flatMap(lambda x: [(x[0], val) for val in x[1]])
bigRdd2 = rdd2.fullOuterJoin(rdd1).flatMap(lambda x: [(x[0], val) for val in x[1]])
bigRdd = bigRdd1.union(bigRdd2)

bigRddArr = bigRdd.collect()

broad = sc.broadcast(10)
acc = sc.accumulator(0)

def mapyo(x):
    val = x[1]
    
    if val is None:
        acc.add(1)
    else:
        val *= broad.value
    
    return (x[0], val)

res = bigRdd.map(mapyo)
print(res.collect())
print(acc.value)

[('b', 20), ('b', None), ('c', 30), ('c', None), ('f', None), ('f', 60), ('a', 10), ('a', 100), ('e', None), ('e', 50), ('d', None), ('d', 40), ('b', None), ('b', 20), ('c', None), ('c', 30), ('f', 60), ('f', None), ('a', 100), ('a', 10), ('e', 50), ('e', None), ('d', 40), ('d', None)]
10


In [20]:
friends = [
    ('a', ['b', 'c', 'd']), 
    ('b', ['a', 'c', 'd', 'e']), 
    ('c', ['a', 'b', 'd']), 
    ('d', ['a', 'b', 'c']), 
    ('e', ['b']), 
]

rdd = sc.parallelize(friends)

def map_stage(x):
    friend_x = x[0]
    friends = x[1]
    arr = []
    
    for friend in friends:       
        key = friend_x + friend if friend_x < friend else friend + friend_x
        arr.append((key, [x for x in friends if x != friend]))
        
    return arr

mutuals = rdd.flatMap(map_stage).reduceByKey(lambda a, b: list(set(a) & set(b))).sortByKey()
display(mutuals.collect())

[('ab', ['c', 'd']),
 ('ac', ['b', 'd']),
 ('ad', ['b', 'c']),
 ('bc', ['a', 'd']),
 ('bd', ['a', 'c']),
 ('be', []),
 ('cd', ['a', 'b'])]