### Import the required libraries then Create SparkContext

**preparing for google colab**

In [3]:
!pwd
! wget https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz

/content
--2021-10-08 12:51:24--  https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 224062525 (214M) [application/x-gzip]
Saving to: ‘spark-3.0.1-bin-hadoop3.2.tgz’


2021-10-08 12:51:41 (12.8 MB/s) - ‘spark-3.0.1-bin-hadoop3.2.tgz’ saved [224062525/224062525]



In [None]:
!tar -xvzf spark-3.0.1-bin-hadoop3.2.tgz

In [7]:
!pip install findspark

Collecting findspark
  Downloading findspark-1.4.2-py2.py3-none-any.whl (4.2 kB)
Installing collected packages: findspark
Successfully installed findspark-1.4.2


In [8]:
import os
import findspark

os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"
findspark.init()

**Importing libraries**

In [9]:
from pyspark.sql import SparkSession

**Initialize spark session**

In [10]:
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Create and display an RDD from the following list

In [11]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)]

In [13]:
list_rdd = sc.parallelize(list)
list_rdd.collect()

[('JK', 22),
 ('V', 24),
 ('Jimin', 24),
 ('RM', 25),
 ('J-Hope', 25),
 ('Suga', 26),
 ('Jin', 27)]

### Read sample1.txt file into RDD and displaying the first 4 elements

In [14]:
file_rdd = sc.textFile('sample1.txt')
file_rdd.take(4)

['Utilitatis causa amicitia est quaesita.',
 'Lorem ipsum dolor sit amet, consectetur adipiscing elit. ',
 'Collatio igitur ista te nihil iuvat. Honesta oratio, Socratica, Platonis etiam. Primum in nostrane potestate est, quid meminerimus? ',
 'Duo Reges: constructio interrete. ']

### Count the total number of rows in RDD

In [16]:
len(file_rdd.collect())

7

### Create a function to convert the data into lower case and splitting it

In [17]:
def line_lowercase_split_by_space(line):
  line = line.lower()
  return line.split()

In [19]:
filtered_file_rdd = file_rdd.map(line_lowercase_split_by_space)
filtered_file_rdd.collect()

[['utilitatis', 'causa', 'amicitia', 'est', 'quaesita.'],
 ['lorem',
  'ipsum',
  'dolor',
  'sit',
  'amet,',
  'consectetur',
  'adipiscing',
  'elit.'],
 ['collatio',
  'igitur',
  'ista',
  'te',
  'nihil',
  'iuvat.',
  'honesta',
  'oratio,',
  'socratica,',
  'platonis',
  'etiam.',
  'primum',
  'in',
  'nostrane',
  'potestate',
  'est,',
  'quid',
  'meminerimus?'],
 ['duo', 'reges:', 'constructio', 'interrete.'],
 ['quid,',
  'si',
  'etiam',
  'iucunda',
  'memoria',
  'est',
  'praeteritorum',
  'malorum?',
  'si',
  'quidem,',
  'inquit,',
  'tollerem,',
  'sed',
  'relinquo.',
  'an',
  'nisi',
  'populari',
  'fama?'],
 [],
 ['quamquam',
  'id',
  'quidem',
  'licebit',
  'iis',
  'existimare,',
  'qui',
  'legerint.',
  'summum',
  'a',
  'vobis',
  'bonum',
  'voluptas',
  'dicitur.',
  'at',
  'hoc',
  'in',
  'eo',
  'm.',
  'refert',
  'tamen,',
  'quo',
  'modo.',
  'quid',
  'sequatur,',
  'quid',
  'repugnet,',
  'vident.',
  'iam',
  'id',
  'ipsum',
  'absurdu

### Filter the stopwords from the previous text

In [20]:
stopwords = ['a','all','the','as','is','am','an','and',
             'be','been','from','had','I','I’d','why','with']
# Hint: you may need use flatMap

In [49]:
filtered_file_rdd = filtered_file_rdd.flatMap(lambda line: line).filter(lambda word: word not in stopwords)

In [50]:
filtered_file_rdd.take(10)

['utilitatis',
 'causa',
 'amicitia',
 'est',
 'quaesita.',
 'lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,']

### Filter the words starting with ‘c’

In [53]:
filtered_file_rdd.filter(lambda word: word[0] == 'c').collect()

['causa', 'consectetur', 'collatio', 'constructio']

### Reduce the data by key and sum it (use the data from the following list)

In [57]:
list = [('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25)
        , ('J-Hope', 25), ('Suga', 26), ('Jin', 27)
       , ('J-Hope', 12), ('Suga', 25), ('Jin', 34)
       , ('JK', 32), ('V', 44), ('Jimin',14), ('RM', 35)]
# Hint: use reduceByKey

In [58]:
list2_rdd = sc.parallelize(list)
list2_rdd.reduceByKey(lambda a,b: a+b).collect()

[('Suga', 51),
 ('Jin', 61),
 ('JK', 54),
 ('V', 68),
 ('Jimin', 38),
 ('RM', 60),
 ('J-Hope', 37)]

### Creat some key value pairs RDDs

In [59]:
rdd1 = sc.parallelize([('a',2),('b',3)])
rdd2 = sc.parallelize([('a',9),('b',7),('c',10)])

### Perform Join operation on the RDDs (rdd1,rdd2)

In [61]:
rdd1.join(rdd2).collect()

[('b', (3, 7)), ('a', (2, 9))]