#### Author:- Sailesh Chauhan
#### Modified:- 03/07/2021
#### Title:- PySpark Session implementing RDD creation, RDD Transformation and 
####            Action, Set Operations.


In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
!pip install findspark



In [4]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('Spark_RDD').getOrCreate()
sc=spark.sparkContext

In [5]:

myRDD = sc.parallelize([('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)],3)
print(str(myRDD.getNumPartitions()))

3


In [6]:
rdd=sc.parallelize([1,2,3,4,5])
rddCollect = rdd.collect()
print("Number of Partitions: "+str(rdd.getNumPartitions()))
print("Action: First two element: "+str(rdd.take(2)))
print(rddCollect)

Number of Partitions: 4
Action: First two element: [1, 2]
[1, 2, 3, 4, 5]


In [8]:
rdd.count()

5

In [20]:
new_RDD = sc.textFile("EBook.txt")
print("Number of Partitions ",new_RDD.getNumPartitions())

Number of Partitions  2


In [10]:
def Func(lines):
    lines = lines.lower()
    lines=lines.split()
    return lines
Split_rdd = new_RDD.flatMap(Func)
Split_rdd.take(2)

['there', 'stood']

In [11]:
print(Split_rdd.count())
RDD=Split_rdd.distinct()
RDD.count()

50194


9939

In [12]:
countWord=Split_rdd.countByValue()

In [13]:
countOf50Words=dict(list(countWord.items())[0:50])
print(countOf50Words)

{'there': 108, 'stood': 48, 'on': 311, 'campden': 2, 'hill': 3, 'a': 1112, 'large,': 1, 'dun-coloured': 2, 'house,': 7, 'enclosed': 1, 'by': 174, 'walled-in': 1, 'garden': 7, 'of': 1466, 'several': 7, 'acres': 1, 'in': 824, 'extent.': 1, 'it': 402, 'belonged': 3, 'to': 1306, 'no': 130, 'particular': 6, 'order': 7, 'architecture,': 1, 'and': 1274, 'was': 550, 'more': 78, 'suggestive': 1, 'comfort': 3, 'than': 81, 'splendour,': 1, 'with': 532, 'its': 44, 'great': 62, 'windows,': 4, 'rambling,': 1, 'nondescript': 2, 'proportions.': 1, 'one': 193, 'side,': 2, 'built': 3, 'out': 104, 'from': 258, 'the': 2591, 'house': 25, 'itself,': 6, 'big': 10, 'glass': 5, 'structure,': 2}


In [14]:
stopwords = ['a','all','the','as','is','am','an','and','be','been','from','had','I','Iâ€™d','why','with']
RDD = new_RDD.flatMap(Func)
RDD1 = RDD.filter(lambda x: x not in stopwords)
RDD.take(10)

['there',
 'stood',
 'on',
 'campden',
 'hill',
 'a',
 'large,',
 'dun-coloured',
 'house,',
 'enclosed']

In [15]:
import re
filteredRDD = RDD.filter(lambda x: x.startswith('c'))
filteredRDD.distinct().take(20)

['campden',
 'certain',
 'charge',
 'cut',
 'conservatory,',
 'canvas;',
 'creatures',
 'clinging',
 'calamity',
 'complexion',
 'coarse',
 'colour',
 'creature',
 'child',
 'charms.',
 'cast',
 'chair,',
 'clear',
 'cannot',
 'consenting']

##  RDD With key Value pair

In [17]:
a = sc.parallelize([('a',2),('b',3)])
b = sc.parallelize([('a',9),('b',7),('c',10)])

### Rdd joins transformation and collect action.

In [19]:
c = a.join(b)
c.collect()

[('a', (2, 9)), ('b', (3, 7))]

### Use of saveAsTextFile

In [14]:
RDD.saveAsTextFile("C:/Users/chauh/OneDrive/Desktop/rdd__desktop.txt")

In [15]:
rdd_a = sc.parallelize([1,2,3,4])
rdd_b = sc.parallelize([3,4,5,6])

In [16]:
rdd_a.intersection(rdd_b).collect()

[3, 4]

In [17]:
rdd_a.subtract(rdd_b).collect()

[1, 2]

In [18]:
rdd_a.cartesian(rdd_b).collect()

[(1, 3),
 (1, 4),
 (1, 5),
 (1, 6),
 (2, 3),
 (2, 4),
 (2, 5),
 (2, 6),
 (3, 3),
 (3, 4),
 (3, 5),
 (3, 6),
 (4, 3),
 (4, 4),
 (4, 5),
 (4, 6)]

In [19]:
rdd_a.union(rdd_b).collect()

[1, 2, 3, 4, 3, 4, 5, 6]

In [20]:
rdd_a.count()

4