### Import the required libraries then Create SparkContext

In [1]:
# Import the required libraries
from pyspark import SparkContext



### Create and display an RDD from the following list

In [2]:
sc = SparkContext()

list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]

In [4]:
rdd = sc.parallelize(list)

# Display the RDD using collect method
print(rdd.collect())

                                                                                

[('JK', 22), ('V', 24), ('Jimin', 24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]


### Create a sample1.txt file to contain the text shown below.

In [13]:
file = open("sample1.txt", "w")

# Write the text to the file
file.write('''
Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem,''')

352

### Read sample1.txt file into RDD and displaying the first 4 elements

In [18]:

rdd = sc.textFile("sample1.txt")

print(rdd.take(4))


[Stage 5:>                                                          (0 + 1) / 1]

['Utilitatis causa amicitia est quaesita.', 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. ', 'Collatio igitur ista tenihil iuvat. ', 'Honesta oratio, Socratica, Platonis etiam. '] /n


                                                                                

### Count the total number of rows in RDD

In [19]:
rdd.count()

                                                                                

8

### Create a function to convert the data into lower case and splitting it

In [24]:

def lower_and_split(rdd):
  rdd = rdd.lower()
  rdd = rdd.split()
  return rdd

rdd = '''
Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem
'''
result = lower_and_split(rdd)
print(result,"\n")


['utilitatis', 'causa', 'amicitia', 'est', 'quaesita.', 'lorem', 'ipsum', 'dolor', 'sit', 'amet,', 'consectetur', 'adipiscing', 'elit.', 'collatio', 'igitur', 'ista', 'tenihil', 'iuvat.', 'honesta', 'oratio,', 'socratica,', 'platonis', 'etiam.', 'primum', 'in', 'nostranepotestate', 'est,', 'quid', 'meminerimus?', 'duo', 'reges:', 'constructio', 'interrete.', 'quid,', 'sietiam', 'iucunda', 'memoria', 'est', 'praeteritorum', 'malorum?', 'si', 'quidem,', 'inquit,', 'tollerem'] 



### Remove the stopwords from the previous text. i.e. Remove it.

In [3]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']
# Hint: you may need use flatMap

In [1]:
def filter_stopwords(words):
  filtered_words = []
  for word in words:
    if word not in stopwords:
      filtered_words.append(word)
  return filtered_words

rdd = '''
Utilitatis causa amicitia est quaesita.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. 
Collatio igitur ista tenihil iuvat. 
Honesta oratio, Socratica, Platonis etiam. 
Primum in nostranepotestate est, quid meminerimus? 
Duo Reges: constructio interrete.
Quid, sietiam iucunda memoria est praeteritorum malorum? 
Si quidem, inquit, tollerem
'''
rdd = rdd.map(lower_and_split)

rdd = rdd.flatMap(filter_stopwords)

print(rdd.collect())

### Find the words starting with ‘c’

In [28]:


def filter_c(words):
  filtered_words = []
  for word in words:
    if word.startswith('c'):
      filtered_words.append(word)
  return filtered_words

rdd = rdd.map(lower_and_split)

rdd = rdd.flatMap(filter_c)

print(rdd.collect())


[Stage 8:>                                                          (0 + 2) / 2]

['causa', 'consectetur', 'collatio', 'constructio']


                                                                                

### Reduce the rdd by key and sum it (use the rdd from the following list)

In [29]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25)
        , ('J-Hope', 25), ('Suga', 26), ('Jin', 27)
       , ('J-Hope', 12), ('Suga', 25), ('Jin', 34)
       , ('JK', 32), ('V', 44), ('Jimin',14), ('RM', 35)]
# Hint: use reduceByKey

In [30]:
rdd = sc.parallelize(list)

rdd = rdd.reduceByKey(lambda x, y: x + y)

# Display the RDD using collect method
print(rdd.collect())




[('Suga', 51), ('Jin', 61), ('JK', 54), ('V', 68), ('Jimin', 38), ('RM', 60), ('J-Hope', 37)]


                                                                                

### Creat some key value pairs RDDs

In [31]:
rdd1 = sc.parallelize([('a',2),('b',3)])
rdd2 = sc.parallelize([('a',9),('b',7),('c',10)])

In [32]:

rdd3 = rdd1.join(rdd2)

print(rdd3.collect())




[('b', (3, 7)), ('a', (2, 9))]


                                                                                

### Perform Join operation on the RDDs (rdd1,rdd2)

In [33]:

rdd3 = rdd1.join(rdd2)

print(rdd3.collect())

[('b', (3, 7)), ('a', (2, 9))]