# RDD programs

1.	Create RDDs in three different ways.

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
Spark = SparkSession.builder.appName('RDD_programs').getOrCreate()

(i) From local collection using parallelise method of spark context

In [5]:
myCollection = "This is just my collection of words".split(" ")
#Spark.sparkContext.broadcast()
words = Spark.sparkContext.parallelize(myCollection, 2)
words.collect()

['This', 'is', 'just', 'my', 'collection', 'of', 'words']

(ii) From data source

In [6]:
rdd_datasource = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/milk_production.csv",minPartitions=4).map(lambda ele : ele.split(","))
rdd_datasource.take(2)

[['Category',
  'State/ UT Name',
  'Cow Milk-2010-11',
  'Cow Milk-2011-12',
  'Cow Milk-2013-14',
  'Cow Milk-2014-15',
  'Cow Milk-2015-16',
  'Boffalo Milk-2010-11',
  'Boffalo Milk-2011-12',
  'Boffalo Milk-2013-14',
  'Boffalo Milk-2014-15',
  'Boffalo Milk-2015-16',
  'Goat Milk-2010-11',
  'Goat Milk-2011-12',
  'Goat Milk-2013-14',
  'Goat Milk-2014-15',
  'Goat Milk-2015-16'],
 ['State',
  'Andhra Pradesh',
  '3102',
  '3377',
  '3799',
  '3079',
  '3369',
  '8101',
  '8710',
  '9207',
  '6574',
  '7445',
  '1',
  '1',
  '1',
  '3',
  '3']]

(iii) RDD from another RDD through transformation

In [22]:
# first rdd
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
rdd2 = rdd1.filter(lambda x: x.startswith('T'))

In [24]:
rdd2.collect()

['This', 'The', 'The', 'The', 'The', 'There']

2.	Read a text file and count the number of words in the file using RDD operations.

In [25]:
# getting a text file
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
rdd1.count()

173

3.	Write a program to find the word frequency in a given file.

In [31]:
# getting a text file
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))

#Mapping words
rdd_map = rdd1.map(lambda x: (x,1))
rdd_map.reduceByKey(lambda a,b: a+b).take(10)

[('is', 7),
 ('just', 1),
 ('file.', 1),
 ('The', 4),
 ('in', 5),
 ('English', 3),
 ('are', 3),
 ('indefinite', 2),
 ('an.', 1),
 ('used', 3)]

4.	Write a program to convert all words in a file to uppercase.

In [113]:
# getting a text file
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
upper_rdd = rdd1.map(lambda word: word.upper())

In [114]:
upper_rdd.collect()

['THIS',
 'IS',
 'JUST',
 'A',
 'TESTING',
 'TEXT',
 'FILE.',
 'THE',
 'ARTICLES',
 'IN',
 'ENGLISH',
 'ARE',
 'THE',
 'DEFINITE',
 'ARTICLE',
 'THE',
 'AND',
 'THE',
 'INDEFINITE',
 'ARTICLES',
 'A',
 'AND',
 'AN.',
 'THE',
 'DEFINITE',
 'ARTICLE',
 'IS',
 'USED',
 'WHEN',
 'THE',
 'SPEAKER',
 'BELIEVES',
 'THAT',
 'THE',
 'LISTENER',
 'KNOWS',
 'THE',
 'IDENTITY',
 'OF',
 'THE',
 "NOUN'S",
 'REFERENT',
 '(BECAUSE',
 'IT',
 'IS',
 'OBVIOUS,',
 'BECAUSE',
 'IT',
 'IS',
 'COMMON',
 'KNOWLEDGE,',
 'OR',
 'BECAUSE',
 'IT',
 'WAS',
 'MENTIONED',
 'IN',
 'THE',
 'SAME',
 'SENTENCE',
 'OR',
 'AN',
 'EARLIER',
 'SENTENCE).',
 'THE',
 'INDEFINITE',
 'ARTICLE',
 'IS',
 'USED',
 'WHEN',
 'THE',
 'SPEAKER',
 'BELIEVES',
 'THAT',
 'THE',
 'LISTENER',
 'DOES',
 'NOT',
 'HAVE',
 'TO',
 'BE',
 'TOLD',
 'THE',
 'IDENTITY',
 'OF',
 'THE',
 'REFERENT.',
 'NO',
 'ARTICLE',
 'IS',
 'USED',
 'IN',
 'SOME',
 'NOUN',
 'PHRASES.',
 '',
 'ENGLISH',
 'GRAMMAR',
 'REQUIRES',
 'THAT',
 'IN',
 'MOST',
 'CASES',
 '

In [115]:
upper_rdd.saveAsTextFile("C:/Users/Dell/Downloads/upper_test.txt")

In [116]:
rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/upper_test.txt")
rdd.collect()

['THIS',
 'IS',
 'JUST',
 'A',
 'TESTING',
 'TEXT',
 'FILE.',
 'THE',
 'ARTICLES',
 'IN',
 'ENGLISH',
 'ARE',
 'THE',
 'DEFINITE',
 'ARTICLE',
 'THE',
 'AND',
 'THE',
 'INDEFINITE',
 'ARTICLES',
 'A',
 'AND',
 'AN.',
 'THE',
 'DEFINITE',
 'ARTICLE',
 'IS',
 'USED',
 'WHEN',
 'THE',
 'SPEAKER',
 'BELIEVES',
 'THAT',
 'THE',
 'LISTENER',
 'KNOWS',
 'THE',
 'IDENTITY',
 'OF',
 'THE',
 "NOUN'S",
 'REFERENT',
 '(BECAUSE',
 'IT',
 'IS',
 'OBVIOUS,',
 'BECAUSE',
 'IT',
 'IS',
 'COMMON',
 'KNOWLEDGE,',
 'OR',
 'BECAUSE',
 'IT',
 'WAS',
 'MENTIONED',
 'IN',
 'THE',
 'SAME',
 'SENTENCE',
 'OR',
 'AN',
 'EARLIER',
 'SENTENCE).',
 'THE',
 'INDEFINITE',
 'ARTICLE',
 'IS',
 'USED',
 'WHEN',
 'THE',
 'SPEAKER',
 'BELIEVES',
 'THAT',
 'THE',
 'LISTENER',
 'DOES',
 'NOT',
 'HAVE',
 'TO',
 'BE',
 'TOLD',
 'THE',
 'IDENTITY',
 'OF',
 'THE',
 'REFERENT.',
 'NO',
 'ARTICLE',
 'IS',
 'USED',
 'IN',
 'SOME',
 'NOUN',
 'PHRASES.',
 '',
 'ENGLISH',
 'GRAMMAR',
 'REQUIRES',
 'THAT',
 'IN',
 'MOST',
 'CASES',
 '

In [117]:
# Creating a new text file that has lowercase letters
file_upper = open("C:/Users/Dell/Downloads/upper_text.txt","wt")
content = upper_rdd.reduce(lambda x,y: x+" "+y)
file_upper.write(content)
file_upper.close()

5.	Write a program to convert all words in a file to lowercase.

In [110]:
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
lower_rdd = rdd1.map(lambda word: word.lower())
lower_rdd.collect()
lower_rdd.saveAsTextFile("C:/Users/Dell/Downloads/lower_test.txt")

In [111]:
rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/lower_test.txt")
rdd.collect()

['this',
 'is',
 'just',
 'a',
 'testing',
 'text',
 'file.',
 'the',
 'articles',
 'in',
 'english',
 'are',
 'the',
 'definite',
 'article',
 'the',
 'and',
 'the',
 'indefinite',
 'articles',
 'a',
 'and',
 'an.',
 'the',
 'definite',
 'article',
 'is',
 'used',
 'when',
 'the',
 'speaker',
 'believes',
 'that',
 'the',
 'listener',
 'knows',
 'the',
 'identity',
 'of',
 'the',
 "noun's",
 'referent',
 '(because',
 'it',
 'is',
 'obvious,',
 'because',
 'it',
 'is',
 'common',
 'knowledge,',
 'or',
 'because',
 'it',
 'was',
 'mentioned',
 'in',
 'the',
 'same',
 'sentence',
 'or',
 'an',
 'earlier',
 'sentence).',
 'the',
 'indefinite',
 'article',
 'is',
 'used',
 'when',
 'the',
 'speaker',
 'believes',
 'that',
 'the',
 'listener',
 'does',
 'not',
 'have',
 'to',
 'be',
 'told',
 'the',
 'identity',
 'of',
 'the',
 'referent.',
 'no',
 'article',
 'is',
 'used',
 'in',
 'some',
 'noun',
 'phrases.',
 '',
 'english',
 'grammar',
 'requires',
 'that',
 'in',
 'most',
 'cases',
 '

In [112]:
# Creating a new text file that has lowercase letters
file_lower = open("C:/Users/Dell/Downloads/lower_text.txt","wt")
content = lower_rdd.reduce(lambda x,y: x+" "+y)
file_lower.write(content)
file_lower.close()
#lower_rdd.reduce(lambda x,y: x+" "+y)

6.	Write a program to capitalize first letter of each words in file (use string capitalize() method).

In [171]:
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
caps_rdd = rdd1.map(lambda x: x.capitalize())

In [172]:
caps_rdd.collect()

['This',
 'Is',
 'Just',
 'A',
 'Testing',
 'Text',
 'File.',
 'The',
 'Articles',
 'In',
 'English',
 'Are',
 'The',
 'Definite',
 'Article',
 'The',
 'And',
 'The',
 'Indefinite',
 'Articles',
 'A',
 'And',
 'An.',
 'The',
 'Definite',
 'Article',
 'Is',
 'Used',
 'When',
 'The',
 'Speaker',
 'Believes',
 'That',
 'The',
 'Listener',
 'Knows',
 'The',
 'Identity',
 'Of',
 'The',
 "Noun's",
 'Referent',
 '(because',
 'It',
 'Is',
 'Obvious,',
 'Because',
 'It',
 'Is',
 'Common',
 'Knowledge,',
 'Or',
 'Because',
 'It',
 'Was',
 'Mentioned',
 'In',
 'The',
 'Same',
 'Sentence',
 'Or',
 'An',
 'Earlier',
 'Sentence).',
 'The',
 'Indefinite',
 'Article',
 'Is',
 'Used',
 'When',
 'The',
 'Speaker',
 'Believes',
 'That',
 'The',
 'Listener',
 'Does',
 'Not',
 'Have',
 'To',
 'Be',
 'Told',
 'The',
 'Identity',
 'Of',
 'The',
 'Referent.',
 'No',
 'Article',
 'Is',
 'Used',
 'In',
 'Some',
 'Noun',
 'Phrases.',
 '',
 'English',
 'Grammar',
 'Requires',
 'That',
 'In',
 'Most',
 'Cases',
 '

In [173]:
# Saving rdd into a file
caps_rdd.saveAsTextFile("C:/Users/Dell/Downloads/caps.txt")
rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/caps.txt")
rdd.collect()

['This',
 'Is',
 'Just',
 'A',
 'Testing',
 'Text',
 'File.',
 'The',
 'Articles',
 'In',
 'English',
 'Are',
 'The',
 'Definite',
 'Article',
 'The',
 'And',
 'The',
 'Indefinite',
 'Articles',
 'A',
 'And',
 'An.',
 'The',
 'Definite',
 'Article',
 'Is',
 'Used',
 'When',
 'The',
 'Speaker',
 'Believes',
 'That',
 'The',
 'Listener',
 'Knows',
 'The',
 'Identity',
 'Of',
 'The',
 "Noun's",
 'Referent',
 '(because',
 'It',
 'Is',
 'Obvious,',
 'Because',
 'It',
 'Is',
 'Common',
 'Knowledge,',
 'Or',
 'Because',
 'It',
 'Was',
 'Mentioned',
 'In',
 'The',
 'Same',
 'Sentence',
 'Or',
 'An',
 'Earlier',
 'Sentence).',
 'The',
 'Indefinite',
 'Article',
 'Is',
 'Used',
 'When',
 'The',
 'Speaker',
 'Believes',
 'That',
 'The',
 'Listener',
 'Does',
 'Not',
 'Have',
 'To',
 'Be',
 'Told',
 'The',
 'Identity',
 'Of',
 'The',
 'Referent.',
 'No',
 'Article',
 'Is',
 'Used',
 'In',
 'Some',
 'Noun',
 'Phrases.',
 '',
 'English',
 'Grammar',
 'Requires',
 'That',
 'In',
 'Most',
 'Cases',
 '

In [174]:
file= open("C:/Users/Dell/Downloads/caps_view.txt","wt")
content = caps_rdd.reduce(lambda x,y: x+" "+y)
file.write(content)
file.close()


[]

7.	Find the longest length of word from given set of words.

In [93]:
rdd1 = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/test.txt").flatMap(lambda line: line.split(" "))
rdd1.take(5)

['This', 'is', 'just', 'a', 'testing']

In [94]:
def longest(l,r):
    if len(l)>len(r):
        return l
    else:
        return r

In [95]:
rdd1.reduce(lambda x,y: longest(x,y))

'determiner.[1]'

8.	Map the Registration numbers to corresponding branch. 6000 series BDA, 9000 series HAD, 1000 series MS, 2000 series VLSI, 3000 series ES, 4000 series MSc, 5000 series CC.

In [96]:
rollnum_rdd = Spark.sparkContext.parallelize((1427,3457,9876,2314,1771,1345,2985,2765,2323,4111,4561,5681,5966,6723,6887,6001))

In [97]:
def map_regnum(regnum):
    if regnum>=9000:
        return 'HAD'
    elif regnum>=6000:
        return 'BDA'
    elif regnum>=5000:
        return 'CC'
    elif regnum>4000:
        return 'MSc'
    elif regnum>=3000:
        return 'ES'
    elif regnum>=2000:
        return 'VLSI'
    elif regnum>=1000:
        return 'MS'
    else:
        return 'Invalid'

In [99]:
branch_rdd = rollnum_rdd.map(lambda rollnum:map_regnum(rollnum))
branch_rdd.collect()

['MS',
 'ES',
 'HAD',
 'VLSI',
 'MS',
 'MS',
 'VLSI',
 'VLSI',
 'VLSI',
 'MSc',
 'MSc',
 'CC',
 'CC',
 'BDA',
 'BDA',
 'BDA']

9.	Given registration number, generate a key-value pair of Registration Number and Corresponding Branch.

In [100]:
branch_rdd = rollnum_rdd.map(lambda rollnum: (rollnum,map_regnum(rollnum)))
branch_rdd.collect()

[(1427, 'MS'),
 (3457, 'ES'),
 (9876, 'HAD'),
 (2314, 'VLSI'),
 (1771, 'MS'),
 (1345, 'MS'),
 (2985, 'VLSI'),
 (2765, 'VLSI'),
 (2323, 'VLSI'),
 (4111, 'MSc'),
 (4561, 'MSc'),
 (5681, 'CC'),
 (5966, 'CC'),
 (6723, 'BDA'),
 (6887, 'BDA'),
 (6001, 'BDA')]

10.	A text file contains data about citizens of country. Fields(information in file) are Name, dob, Phone, email and state name. Another file contains mapping of state names to state code like Karnataka is codes as KA, TamilNadu as TN, Kerala KL etc. Compress the file will by changing full state name to state code.

In [242]:
# Reading citizendata
citizen_rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/citizens_data.txt", minPartitions=4).map(lambda ele:ele.split(","))
citizen_rdd.take(4)

[['personA', '03-09-2010', '2345671872', 'personA@gmail.com', 'Karnataka'],
 ['personB', '23-12-1980', '6782081736', 'personB@gmail.com', 'Maharashtra'],
 ['personC', '07-11-1999', '6391376182', 'personC@gmail.com', 'Jharkhand'],
 ['personD',
  '29-03-1955',
  '9871367281',
  'personD@gmail.com',
  'Andhra Pradesh']]

In [240]:
# Reading states and state codes
states_rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/state_code.txt", minPartitions=4).map(lambda ele:ele.split("\t"))
states_rdd.take(2)

[['Andaman and Nicobar Islands', 'AN'], ['Andhra Pradesh', 'AP']]

In [245]:
#citizen_rdd.map(lambda x:(x[4])).leftOuterJoin(states_rdd.map(lambda y:(y[0],y[1]))).collect()
citizen_rdd.leftOuterJoin(states_rdd).collect()
#newRdd.collect()


[('personA', ('03-09-2010', None)),
 ('personB', ('23-12-1980', None)),
 ('personC', ('07-11-1999', None)),
 ('personD', ('29-03-1955', None))]

In [235]:
states_rdd.map(lambda x:(x[0],(x[1]))).collect()

[('Andaman and Nicobar Islands', 'AN'),
 ('Andhra Pradesh', 'AP'),
 ('Arunachal Pradesh', 'AR'),
 ('Assam', 'AS'),
 ('Bihar', 'BR'),
 ('Chandigarh', 'CH'),
 ('Chhattisgarh', 'CT'),
 ('Dadra and Nagar Haveli', 'DN'),
 ('Daman and Diu', 'DD'),
 ('Delhi', 'DL'),
 ('Goa', 'GA'),
 ('Gujarat', 'GJ'),
 ('Haryana', 'HR'),
 ('Himachal Pradesh', 'HP'),
 ('Jammu and Kashmir', 'JK'),
 ('Jharkhand', 'JH'),
 ('Karnataka', 'KA'),
 ('Kerala', 'KL'),
 ('Lakshadweep', 'LD'),
 ('Madhya Pradesh', 'MP'),
 ('Maharashtra', 'MH'),
 ('Manipur', 'MN'),
 ('Meghalaya', 'ML'),
 ('Mizoram', 'MZ'),
 ('Nagaland', 'NL'),
 ('Odisha', 'OR'),
 ('Puducherry', 'PY'),
 ('Punjab', 'PB'),
 ('Rajasthan', 'RJ'),
 ('Sikkim', 'SK'),
 ('Tamil Nadu', 'TN'),
 ('Telangana', 'TG'),
 ('Tripura', 'TR'),
 ('Uttar Pradesh', 'UP'),
 ('Uttarakhand', 'UT'),
 ('West Bengal', 'WB')]

11.	Text file contain numbers. Numbers are separated by one white space. There is no order to store the numbers. One line may contain one or more numbers. Find the maximum, minimum, sum and mean of numbers.

In [208]:
# reading the file
nums_rdd = Spark.sparkContext.textFile("C:/Users/Dell/Downloads/numbers.txt", minPartitions=4).flatMap(lambda ele:ele.split(" "))
nums_rdd_converted=nums_rdd.map(lambda x: int(x))

In [212]:
# Getting sum
nums_sum=nums_rdd_converted.reduce(lambda x,y: x+y)

In [209]:
# Maximum of numbers
nums_rdd_converted.max()

845

In [210]:
# Minimum of the numbers
nums_rdd_converted.min()

2

In [214]:
# Getting average
nums_count = nums_rdd_converted.count()
nums_sum/nums_count

123.5909090909091