In [1]:
#importing findspark
import findspark
findspark.init()

In [2]:
#importing pyspark
import pyspark
findspark.init()

In [3]:
#importing SparkSession
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession

In [4]:
#creating conf object and creating a spark session
conf=pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
sc=pyspark.SparkContext(conf=conf)
spark=SparkSession(sc)

#### Q1: Convert all words in a rdd to lowercase and split the lines of a document using space. 

In [5]:
text_file=sc.textFile('spark_file.txt')
rdd=text_file.map(lambda x : x.lower())

In [6]:
rdd.take(5)

['apache spark is written in scala programming language. to support python with spark, apache spark community released a tool, pyspark. ',
 'using pyspark, you can work with rdds in python programming language also.',
 '',
 'apache spark is a lightning fast real-time processing framework. it does in-memory computations to analyze data in real-time. it came into picture as apache hadoop mapreduce was performing batch processing only and lacked a real-time processing feature. hence, apache spark was introduced as it can perform stream processing in real-time and can also take care of batch processing.',
 '']

In [7]:
rdd1=rdd.flatMap(lambda x:x.split(' '))

In [8]:
rdd1.take(5)

['apache', 'spark', 'is', 'written', 'in']

#### Q2: Next, I want to remove the words, which are not necessary to analyze this text. We call these words as “stop words”; Stop words do not add much value in a text. For example, “is”, “am”, “are” and “the” are few examples of stop words. 

In [9]:
stop_words=['is','in','am','are','the','a','and','but','how','or','what']
rdd2=rdd1.filter(lambda x:x not in stop_words)
rdd2.take(5)

['apache', 'spark', 'written', 'scala', 'programming']

#### Q3: After getting the results into rdd3, we want to group the words in rdd3 based on which letters they start with. For example, suppose I want to group each word of rdd3 based on first 3 characters. 

In [10]:
rdd3=rdd2.groupBy(lambda x: x[0:3])
print((k,list(v)) for (k,v) in rdd3.collect())

<generator object <genexpr> at 0x000002393340BDD0>


#### Q4: What if we want to calculate how many times each word is coming in corpus ?

In [11]:
rdd4=rdd3.map(lambda x: (x,1))
rdd5=rdd4.groupByKey()

In [12]:
print(list((j[0],list(j[1])) for j in rdd5.take(5)))

[(('apa', <pyspark.resultiterable.ResultIterable object at 0x0000023933430730>), [1]), (('spa', <pyspark.resultiterable.ResultIterable object at 0x00000239334B0040>), [1]), (('wri', <pyspark.resultiterable.ResultIterable object at 0x00000239334B0100>), [1]), (('sca', <pyspark.resultiterable.ResultIterable object at 0x00000239334B01F0>), [1]), (('pro', <pyspark.resultiterable.ResultIterable object at 0x00000239334B0280>), [1])]


#### Q5: How do I perform a task (say count the words ‘spark’ and ‘apache’ in rdd3) separately on each partition and get the output of the task performed in these partition ?

In [13]:
rdd6=rdd1.filter(lambda x: x=='spark'or x=='python').collect()
print(rdd6)
print(len(rdd6))

['spark', 'python', 'spark', 'python', 'spark', 'spark', 'spark', 'spark', 'spark']
9


#### Q6: What if I want to work with samples instead of full data ? 

In [14]:
rdd_sample1=rdd1.sample(False,0.3,21)
print(len(rdd_sample1.collect()),len(rdd1.collect()))

48 149


In [15]:
rdd_sample1.take(5)

['apache', 'written', 'a', 'pyspark.', 'using']

#### Q7: What if I want to create a RDD which contains all the elements (a.k.a. union) of two RDDs ? 

In [16]:
rdd_sample2=rdd1.sample(False,0.3,36)
print(len(rdd_sample1.collect()))

48


In [17]:
rdd7=rdd_sample1.union(rdd_sample2)
print(rdd7.take(10))
print(len(rdd7.collect()))

['apache', 'written', 'a', 'pyspark.', 'using', 'pyspark,', 'work', 'python', 'programming', 'a']
98


#### Q8: If we want to join the two pair RDDs based on their key. 

In [18]:
rdd_sample3=rdd4.sample(False,0.4,10)

In [19]:
rdd_sample4=rdd4.sample(False,0.4,20)

In [20]:
len(rdd_sample3.collect())

31

In [21]:
len(rdd_sample4.collect())

31

In [22]:
rdd_sample3.join(rdd_sample4)

PythonRDD[31] at RDD at PythonRDD.scala:53

#### Q9: How to calculate distinct elements in a RDD ? 

In [23]:
rdd8=rdd1.distinct()
len(rdd8.collect())

92

#### Q10: What if I want to reduce the number of partition of a RDD and get the result in a new RDD? 

In [24]:
rdd9=sc.parallelize([],10)

In [25]:
rdd9.getNumPartitions()

10

In [26]:
rdd10=rdd9.coalesce(5)
rdd10.getNumPartitions()

5

#### Q11: How do I find out number of parition in RDD ? 

In [27]:
rdd9.getNumPartitions()

10

#### Q13: Count the number of elements in RDD. 

In [28]:
rdd1.count()

149

#### Q14: Find the maximum, minimum, sum, variance and standard deviation of “num_rdd”. 

In [29]:
data=[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]

In [36]:
num_rdd=sc.parallelize(data)

In [37]:
num_rdd.max()

15

In [38]:
num_rdd.min()

1

In [39]:
num_rdd.sum()

120

In [40]:
num_rdd.variance()

18.666666666666668

In [41]:
num_rdd.stdev()

4.320493798938574