<a href="https://colab.research.google.com/github/rao-tejaswi/Text_Editor/blob/main/Spark1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
#initializing spark
conf = SparkConf().setAppName("testApp1").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [4]:
print(sc)

<SparkContext master=local[*] appName=testApp1>


In [6]:
sc.defaultParallelism
#tells the number of cores we have

2

In [8]:
# Generate Random Data
import random
randomList = random.sample(range(0,40),10)
print(randomList)

[6, 19, 28, 9, 27, 5, 33, 8, 21, 13]


In [11]:
#Create RDD (transformation)
# 4 is the number of partitions that we give
rdd1 = sc.parallelize(randomList,4)
#print RDD (action)
rdd1.collect()

[6, 19, 28, 9, 27, 5, 33, 8, 21, 13]

In [15]:
#Data distribution in partition
print(rdd1.getNumPartitions())
# glom shows you how the data is divided into partitions
print(rdd1.glom().collect())
# take(n) prints the first n partitions
print(rdd1.glom().take(2))

4
[[6, 19], [28, 9], [27, 5], [33, 8, 21, 13]]
[[6, 19], [28, 9]]


In [16]:
#print last partition
rdd1.glom().collect()[3]

[33, 8, 21, 13]

In [18]:
#count()
print(rdd1.count())
#first()
print(rdd1.first())
#top()
print(rdd1.top(2))
#distinct() removes duplicates and returns original values
rdd1.distinct()
#distinct() is a part of user transformation so it's not readable, hence we use
# an action like collect() or take()
print(rdd1.distinct().collect())

10
6
[33, 28]
[28, 8, 9, 5, 33, 21, 13, 6, 19, 27]


In [23]:
#map() returns a new rdd after transformation

#first define a func that we use to make changes in rdd and call it in map
def myFunc(item):
  return (item*2)+1

rdd_map = rdd1.map(lambda i: (i*2)+1)
rdd_map.collect()

[13, 39, 57, 19, 55, 11, 67, 17, 43, 27]

In [24]:
#it has same number of partitions post transformation
rdd_map.glom().collect()

[[13, 39], [57, 19], [55, 11], [67, 17, 43, 27]]

In [26]:
#filter() returns a new dataset for which the func returns true
rdd_filter = rdd1.filter(lambda x: x%3==0)
rdd_filter.collect()

rdd_filter1 = rdd_map.filter(lambda x: x%3==0)
rdd_filter1.collect()

[39, 57, 27]

In [27]:
rdd_filter1.glom().collect()

[[39], [57], [], [27]]

In [30]:
#flatMap() combines all the elements into a list and then sends it
rdd_flatmap = rdd1.flatMap(lambda x: [x+2, x+5])
rdd_flatmap.glom().collect()

[[8, 11, 21, 24],
 [30, 33, 11, 14],
 [29, 32, 7, 10],
 [35, 38, 10, 13, 23, 26, 15, 18]]

In [32]:
#reduce(func) aggregates the elements of dataset using a function func
rdd_flatmap.reduce(lambda x,y: x+y)

408

In [33]:
#Descriptive statistics
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.stdev(), rdd1.sum()])

[33, 5, 16.9, 9.606768447297979, 169]


In [34]:
#mapPartitions()
def myfunc(partition):
  sum = 0
  for i in partition:
    sum = sum + i
  yield sum

rdd1.mapPartitions(myfunc).collect()

[25, 37, 32, 75]