In [0]:
from pyspark import SparkConf, SparkContext

In [0]:
conf = SparkConf().setAppName('Read File')
sc = SparkContext.getOrCreate(conf=conf)
rdd = sc.textFile('/FileStore/tables/text.txt')

In [0]:
rdd.collect()

Out[6]: ['Hi how are you?', 'hope you are doing', 'great']

In [0]:
def mapper(text):
    return [len(s) for s in text.split(" ")]

In [0]:
#map function
rdd_new = rdd.map(mapper)

In [0]:
rdd_new.collect()

Out[10]: [[2, 3, 3, 4], [4, 3, 3, 5], [5]]

In [0]:
#Flat map 
rdd = sc.textFile('/FileStore/tables/text.txt')
rdd.collect()

Out[11]: ['Hi how are you?', 'hope you are doing', 'great']

In [0]:
rdd.flatMap(mapper).collect() #flattened it as a single list of elements

Out[13]: [2, 3, 3, 4, 4, 3, 3, 5, 5]

In [0]:
#filter -> is used to remove elements from the RDD
#it will create new rdd 

rdd = sc.textFile('/FileStore/tables/samples.txt')
rdd.collect()

Out[16]: ['1 2 3 4 5', '3 4 5 5 66 77', '12 43 6 7 8', '12 12 13']

In [0]:
rdd2 = rdd.filter(lambda x: x!='12 12 13')
rdd2.collect()

Out[17]: ['1 2 3 4 5', '3 4 5 5 66 77', '12 43 6 7 8']

In [0]:
rdd = sc.textFile('/FileStore/tables/filter.txt')
rdd.collect()

Out[18]: ['this mango company animal',
 'cat dog ant mic laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [0]:
rdd2 = rdd.flatMap(lambda x : x.split(" "))
def start(word):
    if word[0] == 'a' or word[0] == 'c':
        return False
    else:
        return True
rdd2.filter(start).collect()

Out[25]: ['this', 'mango', 'dog', 'mic', 'laptop', 'switch', 'mobile']

In [0]:
rdd.filter(start).collect()

Out[23]: ['this mango company animal',
 'cat dog ant mic laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [0]:
## Distinct
# all the function mentioned are all transformation functions
# it is used to get the distinct elements in RDD
# it will create a new rdd

rdd = sc.textFile('/FileStore/tables/samples.txt')
rdd2 = rdd.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.distinct()
rdd3.collect()

Out[28]: ['1', '4', '66', '77', '12', '8', '2', '3', '5', '43', '6', '7', '13']

In [0]:
#groupByKey()
#used to create groups  based on the keys in RDDs
#data must be in the format of (k, v) , (k1, v2)
rdd = sc.textFile('/FileStore/tables/filter.txt')
rdd.collect()

Out[29]: ['this mango company animal',
 'cat dog ant mic laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [0]:
# converting the data into key value pairs
rdd2 = rdd.flatMap(lambda x: x.split(" "))
rdd3 = rdd2.map(lambda x: (x,len(x)))
rdd3.collect()

Out[40]: [('this', 4),
 ('mango', 5),
 ('company', 7),
 ('animal', 6),
 ('cat', 3),
 ('dog', 3),
 ('ant', 3),
 ('mic', 3),
 ('laptop', 6),
 ('chair', 5),
 ('switch', 6),
 ('mobile', 6),
 ('am', 2),
 ('charger', 7),
 ('cover', 5),
 ('amanda', 6),
 ('any', 3),
 ('alarm', 5),
 ('ant', 3)]

In [0]:
rdd3.groupByKey().collect()


Out[42]: [('this', <pyspark.resultiterable.ResultIterable at 0x7f833c73f970>),
 ('mango', <pyspark.resultiterable.ResultIterable at 0x7f833c73f1f0>),
 ('cat', <pyspark.resultiterable.ResultIterable at 0x7f833c73f280>),
 ('ant', <pyspark.resultiterable.ResultIterable at 0x7f833c73f9a0>),
 ('laptop', <pyspark.resultiterable.ResultIterable at 0x7f833c73f550>),
 ('chair', <pyspark.resultiterable.ResultIterable at 0x7f833c72d430>),
 ('switch', <pyspark.resultiterable.ResultIterable at 0x7f833c72ddc0>),
 ('mobile', <pyspark.resultiterable.ResultIterable at 0x7f833c72db50>),
 ('am', <pyspark.resultiterable.ResultIterable at 0x7f833c72d7c0>),
 ('company', <pyspark.resultiterable.ResultIterable at 0x7f833c72d400>),
 ('animal', <pyspark.resultiterable.ResultIterable at 0x7f833c72df40>),
 ('dog', <pyspark.resultiterable.ResultIterable at 0x7f833c72dca0>),
 ('mic', <pyspark.resultiterable.ResultIterable at 0x7f833c72d940>),
 ('charger', <pyspark.resultiterable.ResultIterable at 0x7f833c72daf0>),
 

In [0]:
rdd3.groupByKey().mapValues(list).collect()

Out[43]: [('this', [4]),
 ('mango', [5]),
 ('cat', [3]),
 ('ant', [3, 3]),
 ('laptop', [6]),
 ('chair', [5]),
 ('switch', [6]),
 ('mobile', [6]),
 ('am', [2]),
 ('company', [7]),
 ('animal', [6]),
 ('dog', [3]),
 ('mic', [3]),
 ('charger', [7]),
 ('cover', [5]),
 ('amanda', [6]),
 ('any', [3]),
 ('alarm', [5])]

In [0]:
## Reducedbykey
#used to combine the data based on the keys in RDD
# transformation function so will create new rdds

rdd = sc.textFile('/FileStore/tables/samples.txt')
rdd.collect()


Out[4]: ['1 2 3 4 5', '3 4 5 5 66 77', '12 43 6 7 8', '12 12 13']

In [0]:
rdd2 =  rdd.flatMap(lambda x: x.split())
rdd3 = rdd2.map(lambda x: (x, 1))
rdd3.collect()

Out[7]: [('1', 1),
 ('2', 1),
 ('3', 1),
 ('4', 1),
 ('5', 1),
 ('3', 1),
 ('4', 1),
 ('5', 1),
 ('5', 1),
 ('66', 1),
 ('77', 1),
 ('12', 1),
 ('43', 1),
 ('6', 1),
 ('7', 1),
 ('8', 1),
 ('12', 1),
 ('12', 1),
 ('13', 1)]

In [0]:
rdd3.groupByKey().mapValues(list).collect()

Out[8]: [('1', [1]),
 ('4', [1, 1]),
 ('66', [1]),
 ('77', [1]),
 ('12', [1, 1, 1]),
 ('8', [1]),
 ('2', [1]),
 ('3', [1, 1]),
 ('5', [1, 1, 1]),
 ('43', [1]),
 ('6', [1]),
 ('7', [1]),
 ('13', [1])]

In [0]:
rdd3.reduceByKey(lambda x,y: x+y).collect() #perform some function on the values for keys

Out[11]: [('1', 1),
 ('4', 2),
 ('66', 1),
 ('77', 1),
 ('12', 3),
 ('8', 1),
 ('2', 1),
 ('3', 2),
 ('5', 3),
 ('43', 1),
 ('6', 1),
 ('7', 1),
 ('13', 1)]

In [0]:
# example 2
# write a transformation flow that will return the word count of each word present in the file as (key, value) pair.
rdd = sc.textFile('/FileStore/tables/filter.txt')
rdd.collect()

Out[12]: ['this mango company animal',
 'cat dog ant mic laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [0]:
rdd2 = rdd.flatMap(lambda x : x.split(" ")).map(lambda x: (x, len(x)))
rdd2.collect()

Out[16]: [('this', 4),
 ('mango', 5),
 ('company', 7),
 ('animal', 6),
 ('cat', 3),
 ('dog', 3),
 ('ant', 3),
 ('mic', 3),
 ('laptop', 6),
 ('chair', 5),
 ('switch', 6),
 ('mobile', 6),
 ('am', 2),
 ('charger', 7),
 ('cover', 5),
 ('amanda', 6),
 ('any', 3),
 ('alarm', 5),
 ('ant', 3)]

In [0]:
rdd2.groupByKey().mapValues(list).collect()

Out[21]: [('this', [4]),
 ('mango', [5]),
 ('cat', [3]),
 ('ant', [3, 3]),
 ('laptop', [6]),
 ('chair', [5]),
 ('switch', [6]),
 ('mobile', [6]),
 ('am', [2]),
 ('company', [7]),
 ('animal', [6]),
 ('dog', [3]),
 ('mic', [3]),
 ('charger', [7]),
 ('cover', [5]),
 ('amanda', [6]),
 ('any', [3]),
 ('alarm', [5])]

In [0]:
rdd2.reduceByKey(lambda x,y: x+y).collect()

Out[23]: [('this', 4),
 ('mango', 5),
 ('cat', 3),
 ('ant', 6),
 ('laptop', 6),
 ('chair', 5),
 ('switch', 6),
 ('mobile', 6),
 ('am', 2),
 ('company', 7),
 ('animal', 6),
 ('dog', 3),
 ('mic', 3),
 ('charger', 7),
 ('cover', 5),
 ('amanda', 6),
 ('any', 3),
 ('alarm', 5)]

In [0]:
#repartition to change the number of partitions in RDD
# it will create a new RDD

#Coalesce() --> to decrease the number of partitions in RDD

from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('partition')
sc = SparkContext.getOrCreate(conf=conf)

rdd = sc.textFile('/FileStore/tables/filter.txt')
rdd.getNumPartitions()


Out[25]: 2

In [0]:
rdd = rdd.repartition(5)
#more number of partition is sometime usefull as you can do more number of parallisation and processing
rdd.getNumPartitions()




Out[27]: 5

In [0]:
#applying flatmap transformation on the individual 5 partitions in rdd
rdd2 = rdd.flatMap(lambda x: x.split(' '))
#applying map transformation on the individual 5 partitions in rdd2
rdd3 = rdd2.map(lambda x: (x,1))
rdd3.collect()

Out[28]: [('amanda', 1),
 ('any', 1),
 ('alarm', 1),
 ('ant', 1),
 ('this', 1),
 ('mango', 1),
 ('company', 1),
 ('animal', 1),
 ('cat', 1),
 ('dog', 1),
 ('ant', 1),
 ('mic', 1),
 ('laptop', 1),
 ('chair', 1),
 ('switch', 1),
 ('mobile', 1),
 ('am', 1),
 ('charger', 1),
 ('cover', 1)]

In [0]:
rdd3.saveAsTextFile('FileStore/tables/output/5partition_output')

In [0]:
rdd = sc.textFile('/FileStore/tables/filter.txt')


rdd = rdd.repartition(5)
#more number of partition is sometime usefull as you can do more number of parallisation and processing
rdd.getNumPartitions()

Out[31]: 5

In [0]:
rdd2 = rdd.flatMap(lambda x: x.split(' '))
#applying map transformation on the individual 5 partitions in rdd2
rdd3 = rdd2.map(lambda x: (x,1))
rdd3 = rdd3.coalesce(3)

In [0]:
rdd3.saveAsTextFile('FileStore/tables/output/2partition_output')

In [0]:
rdd = sc.textFile('/FileStore/tables/output/5partition_output')
rdd.collect()

Out[36]: ["('amanda', 1)",
 "('any', 1)",
 "('alarm', 1)",
 "('ant', 1)",
 "('this', 1)",
 "('mango', 1)",
 "('company', 1)",
 "('animal', 1)",
 "('cat', 1)",
 "('dog', 1)",
 "('ant', 1)",
 "('mic', 1)",
 "('laptop', 1)",
 "('chair', 1)",
 "('switch', 1)",
 "('mobile', 1)",
 "('am', 1)",
 "('charger', 1)",
 "('cover', 1)"]