In [1]:
#start the SparkContext
import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext(master="local[4]")

In [2]:
pair_rdd = sc.parallelize([(1,2), (3,4)])
print(pair_rdd.collect())

[(1, 2), (3, 4)]


In [3]:
regular_rdd = sc.parallelize([1, 2, 3, 4, 2, 5, 6])
pair_rdd = regular_rdd.map( lambda x: (x, x*x) )
print(pair_rdd.collect())

[(1, 1), (2, 4), (3, 9), (4, 16), (2, 4), (5, 25), (6, 36)]


In [4]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.reduceByKey(lambda a,b: a+b).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 2), (2, 10)]


In [5]:
rdd = sc.parallelize([(2,2), (1,4), (3,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.sortByKey().collect())

Original RDD : [(2, 2), (1, 4), (3, 6)]
After transformation :  [(1, 4), (2, 2), (3, 6)]


In [6]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.mapValues(lambda x: x*2).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 4), (2, 8), (2, 12)]


In [7]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())
print("After transformation : ", rdd.groupByKey().mapValues(lambda x:[a for a in x]).collect())

Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, [2]), (2, [4, 6])]


In [8]:
rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("RDD: ", rdd.collect())
result = rdd.countByKey()
print("Result:", result)

RDD:  [(1, 2), (2, 4), (2, 6)]
Result: defaultdict(<class 'int'>, {1: 1, 2: 2})


In [9]:
print("RDD: ", rdd.collect())
result = rdd.collectAsMap()
print("Result:", result)

RDD:  [(1, 2), (2, 4), (2, 6)]
Result: {1: 2, 2: 6}


In [10]:
print("RDD: ", rdd.collect())
result = rdd.lookup(2 )
print("Result:", result)

RDD:  [(1, 2), (2, 4), (2, 6)]
Result: [4, 6]


In [15]:
print(rdd.keys().collect())
print(rdd.values().collect())

[1, 2, 2]
[2, 4, 6]


In [20]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('d','Blue')])
rdd = rdd1.join(rdd2)
print(rdd.collect())

[('b', (2, 'Red')), ('c', (3, 'Green'))]


In [21]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue')])
rdd = rdd1.join(rdd2)
print(rdd.collect())

[('b', (2, 'Red')), ('c', (3, 'Green')), ('c', (3, 'Blue'))]


In [24]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue'), ('d','Grey')])
rdd = rdd1.leftOuterJoin(rdd2)
print(rdd.collect())

[('a', (1, None)), ('b', (2, 'Red')), ('c', (3, 'Green')), ('c', (3, 'Blue'))]


In [25]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue'), ('d','Grey')])
rdd = rdd1.rightOuterJoin(rdd2)
print(rdd.collect())

[('b', (2, 'Red')), ('c', (3, 'Green')), ('c', (3, 'Blue')), ('d', (None, 'Grey'))]


In [27]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue'), ('d','Grey')])
rdd = rdd1.fullOuterJoin(rdd2)
print(rdd.collect())

[('a', (1, None)), ('b', (2, 'Red')), ('c', (3, 'Green')), ('c', (3, 'Blue')), ('d', (None, 'Grey'))]


In [28]:
rdd1 = sc.parallelize([('a', 1), ('b',2), ('c',3)])
rdd2 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue'), ('d','Grey')])
rdd = rdd1.subtractByKey(rdd2)
print(rdd.collect())

[('a', 1)]


In [5]:
rdd1 = sc.parallelize([('b', 'Red'), ('c','Green'), ('c','Blue'), ('d','Grey'), ('b','Grey'), ('b','Yellow')])
rdd2 = rdd1.groupByKey().mapValues(lambda x:[a for a in x])
print(rdd2.collect())

[('b', ['Red', 'Grey', 'Yellow']), ('c', ['Green', 'Blue']), ('d', ['Grey'])]
