RDD Operations

### Actions

#### collect

In [2]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"], 2)
c.collect()

['Gnu', 'Cat', 'Rat', 'Dog', 'Gnu', 'Rat']

#### collectAsMap

In [5]:
a = sc.parallelize([1, 2, 1, 3], 1) 
b = a.zip(a)
b.collectAsMap()

{1: 1, 2: 2, 3: 3}

#### Count

In [7]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog"], 2) 
c.count()

4

#### reduce

In [12]:
a = sc.parallelize([1, 2, 3, 4, 5]).reduce(lambda a,b : a+b)
a

15

#### take

In [14]:
b = sc.parallelize(["dog", "cat", "ape", "salmon", "gnu"], 2)
b.take(2)

['dog', 'cat']

In [15]:
b = sc.parallelize(range(1 , 100), 5) 
b.take(30)

[1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30]

#### first

In [17]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog"], 2)
c.first()

'Gnu'

#### countByValue

In [18]:
b = sc.parallelize([1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1]) 
b.countByValue()

defaultdict(int, {1: 6, 2: 3, 3: 1, 4: 2, 5: 1, 6: 1, 7: 1, 8: 1})

#### lookup

In [19]:
a = sc.parallelize(["dog", "tiger", "lion", "cat", "panther", "eagle"], 2) 
b = a.map(lambda x : (len(x), x))
b.lookup(5)

['tiger', 'eagle']

In [20]:
type(b)

pyspark.rdd.PipelinedRDD

#### max

In [22]:
y = sc.parallelize(range(10, 30)) 
y.max()

29

In [23]:
a = sc.parallelize([(10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")])
a.max()

(18, 'cat')

#### min

In [25]:
y = sc.parallelize(range(10 , 30)) 
y.min()

10

In [26]:
a = sc.parallelize([(10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")])
a.min()

(3, 'tiger')

#### mean

In [27]:
a = sc.parallelize([9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5], 3) 
a.mean()

5.3

#### variance

In [29]:
a = sc.parallelize([9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5], 3)
a.variance()

10.605333333333332

#### countByKey

In [30]:
c = sc.parallelize([(3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")], 2)
c.countByKey()

defaultdict(int, {3: 3, 5: 1})

#### groupByKey

In [32]:
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))

[('a', [1, 1]), ('b', [1])]

#### reduceByKey

In [33]:
a = sc.parallelize(["dog", "cat", "owl", "gnu", "ant"], 2) 
b = a.map(lambda x : (len(x), x)) 
b.reduceByKey(lambda x,y : x + y).collect()

[(3, 'dogcatowlgnuant')]

#### foldByKey

In [34]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) 
from operator import add 
rdd.foldByKey(0, add).collect()

[('a', 2), ('b', 1)]

#### cogroup

In [36]:
x = sc.parallelize([("a", 1), ("b", 4)]) 
y = sc.parallelize([("a", 2)]) 
map((lambda (x,y): (x, (list(y[0]), list(y[1])))),
sorted(list(x.cogroup(y).collect())))

[('a', ([1], [2])), ('b', ([4], []))]

#### cogroup

In [38]:
x = sc.parallelize([(1, "apple"), (2, "banana"), (3, "orange"), (4, "kiwi")], 2) 
y = sc.parallelize([(5, "computer"), (1, "laptop"), (1, "desktop"), (4, "iPad")], 2)
map((lambda (x,y): (x, (list(y[0]), list(y[1])))), 
sorted(list(x.cogroup(y).collect())))

[(1, (['apple'], ['laptop', 'desktop'])),
 (2, (['banana'], [])),
 (3, (['orange'], [])),
 (4, (['kiwi'], ['iPad'])),
 (5, ([], ['computer']))]

#### Join

In [41]:
a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"], 3)
b = a.keyBy(lambda x:len(x))
c = sc.parallelize(["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"], 3) 
d = c.keyBy(lambda x:len(x)) 
b.join(d).collect()

[(6, ('salmon', 'salmon')),
 (6, ('salmon', 'rabbit')),
 (6, ('salmon', 'turkey')),
 (6, ('salmon', 'salmon')),
 (6, ('salmon', 'rabbit')),
 (6, ('salmon', 'turkey')),
 (3, ('dog', 'dog')),
 (3, ('dog', 'cat')),
 (3, ('dog', 'gnu')),
 (3, ('dog', 'bee')),
 (3, ('rat', 'dog')),
 (3, ('rat', 'cat')),
 (3, ('rat', 'gnu')),
 (3, ('rat', 'bee'))]

#### leftOuterJoin

a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"], 3)
b = a.keyBy(lambda x:len(x))
c = sc.parallelize(["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"], 3)
d = c.keyBy(lambda x:len(x))
b.leftOuterJoin(d).collect()

#### rightOuterJoin

In [45]:
a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"], 3)
b = a.keyBy(lambda x:len(x))
c = sc.parallelize(["dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"], 3) 
d = c.keyBy(lambda x:len(x))
b.rightOuterJoin(d).collect()

[(6, ('salmon', 'salmon')),
 (6, ('salmon', 'rabbit')),
 (6, ('salmon', 'turkey')),
 (6, ('salmon', 'salmon')),
 (6, ('salmon', 'rabbit')),
 (6, ('salmon', 'turkey')),
 (3, ('dog', 'dog')),
 (3, ('dog', 'cat')),
 (3, ('dog', 'gnu')),
 (3, ('dog', 'bee')),
 (3, ('rat', 'dog')),
 (3, ('rat', 'cat')),
 (3, ('rat', 'gnu')),
 (3, ('rat', 'bee')),
 (4, (None, 'wolf')),
 (4, (None, 'bear'))]

#### keyBy

In [47]:
a = sc.parallelize(["dog", "salmon", "salmon", "rat", "elephant"], 3)
b = a.keyBy(lambda x:len(x)) 
b.collect()

[(3, 'dog'), (6, 'salmon'), (6, 'salmon'), (3, 'rat'), (8, 'elephant')]

In [48]:
#### mapValues

In [49]:
a = sc.parallelize(["dog", "tiger", "lion", "cat", "panther", "eagle"], 2) 
b = a.map(lambda x : (len(x), x)) 
b.mapValues(lambda y :"x" + y + "x").collect()

[(3, 'xdogx'),
 (5, 'xtigerx'),
 (4, 'xlionx'),
 (3, 'xcatx'),
 (7, 'xpantherx'),
 (5, 'xeaglex')]

In [None]:
#### cache
- default storage level, which is StorageLevel.MEMORY_ONLY 

** Storage Levels **
- MEMORY_ONLY
- MEMORY_ONLY_SER
- MEMORY_AND_DISK
- MEMORY_AND_DISK_SER
- DISK_ONLY

In [50]:
c = sc.parallelize(["Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"], 2)
c.getStorageLevel()

StorageLevel(False, False, False, False, 1)

In [51]:
c.cache()
c.getStorageLevel()

StorageLevel(False, True, False, False, 1)

#### repartition

In [52]:
rdd = sc.parallelize([1, 2, 10, 4, 5, 2, 1, 1, 1], 3) 
rdd.getNumPartitions()
rdd2 = rdd.repartition(5) 
rdd2.getNumPartitions()

5