In [1]:
# to be loaded everytime when Jupyter Notebook starts
from pyspark import SparkContext
sc =SparkContext()
sc.master

'local[*]'

In [2]:
# Example of map
x = sc.parallelize(["b", "a", "c"]) 
y = x.map(lambda z: (z, 1))
print(x.collect())
print(y.collect())


['b', 'a', 'c']
[('b', 1), ('a', 1), ('c', 1)]


In [3]:
# Use of filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1) #keep odd values
print(x.collect())
print(y.collect())

[1, 2, 3]
[1, 3]


In [4]:
#example of flatMap
x = sc.parallelize([1,2,3]) 
y = x.flatMap(lambda x: (x, x*100, 42))
print(x.collect())
print(y.collect())


[1, 2, 3]
[1, 100, 42, 2, 200, 42, 3, 300, 42]


In [11]:
#example of groupBy
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.groupBy(lambda w: w[0])
print (y.collect()) 
print("-------------------------")
# will print a object and hence a detailed print statement below
print ([(k, list(v)) for (k, v) in y.collect()])


[('J', <pyspark.resultiterable.ResultIterable object at 0x7febd53a0278>), ('F', <pyspark.resultiterable.ResultIterable object at 0x7febd53a0780>), ('A', <pyspark.resultiterable.ResultIterable object at 0x7febd53a0128>)]
-------------------------
[('J', ['John', 'James']), ('F', ['Fred']), ('A', ['Anna'])]


In [7]:
# Example of groupByKey
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print(list((j[0], list(j[1])) for j in y.collect()))

[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
[('B', [5, 4]), ('A', [3, 2, 1])]


In [8]:
# example of MapPartitions
x = sc.parallelize([1,2,3,4,5,6,7,8,9,10], 2)

def f(iterator): yield sum(iterator);

y = x.mapPartitions(f)

# glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())


[[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]]
[[15], [40]]


In [10]:
#Example of sample
'''
withReplacement - can elements be sampled multiple times 
(replaced when sampled out)

fraction - expected size of the sample as a fraction of this
RDD's size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

seed - seed for the random number generator
'''

x = sc.parallelize([1, 2, 3, 4, 5])
y = x.sample(False, 0.4, 42)
print(x.collect())
print(y.collect())


[1, 2, 3, 4, 5]
[1, 2]


In [5]:
# union here is like union all in RDBMS
x = sc.parallelize([1,2,3], 2)
print (x.glom().collect())
y = sc.parallelize([3,4], 1)
print (y.glom().collect())
z = x.union(y)
print(z.glom().collect())

[[1], [2, 3]]
[[3, 4]]
[[1], [2, 3], [3, 4]]


In [12]:
# join will work only with pairedRDDs
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
z = x.join(y,2)
#print(x.getNumPartitions())
#print(y.getNumPartitions())
#print(z.getNumPartitions())
print(x.glom().collect())
print(y.glom().collect())
print(z.glom().collect())


[[('a', 1)], [('b', 2)]]
[[('a', 3)], [('a', 4), ('b', 5)]]
[[('b', (2, 5))], [('a', (1, 3)), ('a', (1, 4))]]


In [13]:
#distinct
x = sc.parallelize([1,2,3,3,4])
y = x.distinct()

print(x.glom().collect())
print(y.glom().collect())


[[1, 2], [3, 3, 4]]
[[2, 4], [1, 3]]


In [17]:
#coalese --> reduce the number of partitions
# if shuffle = False, which is the default
# if shuffle = True, then there would be a movement 
# of elements in one partition to another partition - new Stage 
x = sc.parallelize([1, 2, 3, 4, 5], 3)
y = x.coalesce(2,shuffle=True)
print(x.glom().collect())
print(y.glom().collect())


[[1], [2, 3], [4, 5]]
[[1, 2, 4], [3, 5]]


In [3]:
# keyBy Example
x = sc.parallelize(['John', 'Fred', 'Anna', 'James'])
y = x.keyBy(lambda w: w[0])
print(y.collect())

[('J', 'John'), ('F', 'Fred'), ('A', 'Anna'), ('J', 'James')]


In [2]:
#partitionBy example
x = sc.parallelize([('J','James'),('F','Fred'),
				 ('A','Anna'),('J','John')], 3)

y = x.partitionBy(2, lambda w: 0 if w[0] < 'H' else 1)
print (x.glom().collect())
print (y.glom().collect())


[[('J', 'James')], [('F', 'Fred')], [('A', 'Anna'), ('J', 'John')]]
[[('F', 'Fred'), ('A', 'Anna')], [('J', 'James'), ('J', 'John')]]


In [4]:
#keyBy Example --> typical usecase of fuse in ETL
x = sc.parallelize([1, 2, 3])
y = x.map(lambda n:n*n)
z = x.zip(y)

print(z.collect())


[(1, 1), (2, 4), (3, 9)]


In [5]:
# reduce is a action and not a transformation as it returns the result to 
# the driver
x = sc.parallelize([1,2,3,4])
y = x.reduce(lambda a,b: a+b)

print(x.collect())
print(y)


[1, 2, 3, 4]
10
