## Learning Pyspark from Basics

#### Installing PySpark and findspark

In [1]:
!pip install pyspark



In [2]:
# Mandatory step so that jupyter notebook finds your pyspark
! pip install findspark pyspark



In [3]:
import os
os.chdir('F:\CDAC\Big Data\pyspark')

In [4]:
import pyspark

# mandatory step
import findspark
findspark.init()

## Understanding SparkContext

In [5]:
# importing SparkContext - Entry point for spark

from pyspark import SparkContext

In [44]:
# mandaatory step
# we are setting spark context to local mode and appName as - 'understanding Spark Context'

sc = SparkContext(master = 'local[4]', appName = 'Understanding Spark Context')

In [45]:
sc

In [7]:
# prints start time of the SparkContext Session

sc.startTime

1712388061787

In [8]:
# prints the current version of SparkContext

sc.version

'3.5.1'

In [9]:
# prints the python version of SparkContext 

sc.pythonVer

'3.11'

In [10]:
# prints where our Spark is running

sc.master

'local'

In [11]:
# fetching the configurations of SparkContext

sc.getConf

<bound method SparkContext.getConf of <SparkContext master=local appName=Understanding Spark Context>>

# RDD - RESILIENT DISTRUBUTED DATASET

In [12]:
# Loading the data in Pyspark from local data structure

# Parallelize method is used to create RDD from local data structure(here-list)

numRDD = sc.parallelize([1,2,3,4,5])
print("Parallelize returns: ", type(numRDD))

Parallelize returns:  <class 'pyspark.rdd.RDD'>


In [13]:
# Loading the data in Pyspark from text file

# if full file:/// with path is provided if loads file from *LOCAL FILE SYSTEM*
# if only file is specified it  finds the file in *HADOOP FILE SYSTEM*
# (here-local)

fileRDD = sc.textFile('file:///F:\CDAC\Big Data\pyspark\test.txt')
print("Text File returns: ", type(fileRDD))

Text File returns:  <class 'pyspark.rdd.RDD'>


##### Understanding partitoning in PySpark

In [14]:
# for parallelize method
# create RDD from 0-9 with partitions(numSlices) -> 6

numRDD = sc.parallelize(range(10), numSlices = 6)
type(numRDD)

pyspark.rdd.PipelinedRDD

In [15]:
# for text file method
# create RDD from given file in HDFS with minimum partitions -> 3 

fileRDD = sc.textFile('test.txt', minPartitions = 3)
type(fileRDD)

pyspark.rdd.RDD

#### Get Number of partitions of RDD

In [16]:
# this method return number of partitions 

print("Number of partitions of numRDD1 are: ", numRDD.getNumPartitions() )

Number of partitions of numRDD1 are:  6


#### Display contents of RDD

In [17]:
# display contents of RDD
# returns list and show O/P without partitions

print("Output without partitions: ", numRDD.collect())

Output without partitions:  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


In [18]:
# display contents of RDD 
# *BUT*
# returns list, with *partitions* it was stored with

print("Output with partitions: ", numRDD.glom().collect())

Output with partitions:  [[0], [1, 2], [3, 4], [5], [6, 7], [8, 9]]


## TRANSFORMATION - ACTION on RDDD

### Basic RDD Transformations

### MAP

In [19]:
# map will iterate through each element in the list and perform the operation for each element
# will return O/P in collection of collection

rdd = sc.parallelize(["hello world", "how are you"])
rdd_map = rdd.map(lambda x: x.split(" "))

rdd_map

PythonRDD[9] at RDD at PythonRDD.scala:53

### FILTER

In [20]:
# filter will iterate through each element and return elements which give True for given condition

rdd = sc.parallelize([1,2,3,4,5,6,7,8])
rdd_filter = rdd.filter(lambda x : x%2 ==0)

rdd_filter

PythonRDD[11] at RDD at PythonRDD.scala:53

### FLATMAP

In [21]:
# same as map but will return O/P in single collection

rdd = sc.parallelize(["hello world", "how are you"])
rdd_flatmap = rdd.flatMap(lambda x: x.split(" "))

rdd_flatmap

PythonRDD[13] at RDD at PythonRDD.scala:53

### UNION

In [22]:
# combines 2 or more RDD

error_rdd = sc.parallelize(['error1', 'error2'])
warning_rdd = sc.parallelize(['warning1', 'warning2'])

rdd_union = error_rdd.union(warning_rdd)

### Basic RDD Actions

### COLLECT

In [23]:
# collect return all elements of dataset in ARRAY

print("Map vs Flatmap O/P: ")
print("Map output:", rdd_map.collect())
print("Flatmap output:", rdd_flatmap.collect() ,"\n")
print("Filter output:", rdd_filter.collect())
print("Union output:", rdd_union.collect())

Map vs Flatmap O/P: 
Map output: [['hello', 'world'], ['how', 'are', 'you']]
Flatmap output: ['hello', 'world', 'how', 'are', 'you'] 

Filter output: [2, 4, 6, 8]


### TAKE

In [24]:
# take returns specified first n number of elements in array

print("Map vs Flatmap O/P: ")
print("Map output: ", rdd_map.take(1))
print("FLatmap output: ", rdd_flatmap.take(1), "\n")
print("Filter output: ", rdd_filter.take(3))
print("Union output: ", rdd_union.take(1))

Map vs Flatmap O/P: 
Map output:  [['hello', 'world']]
FLatmap output:  ['hello'] 

Filter output:  [2, 4, 6]
Union output:  ['error1']


### FIRST

In [25]:
# first prints the first element of collection

print("Map vs Flatmap O/P: ")
print("Map output: ", rdd_map.first())
print("FLatmap output: ", rdd_flatmap.first(), "\n")
print("Filter output: ", rdd_filter.first())
print("Union output: ", rdd_union.first())

Map vs Flatmap O/P: 
Map output:  ['hello', 'world']
FLatmap output:  hello 

Filter output:  2
Union output:  error1


### COUNT

In [26]:
# count returns the count of elements in collection

print("Map vs Flatmap O/P: ")
print("Map output: ", rdd_map.count())
print("FLatmap output: ", rdd_flatmap.count(), "\n")
print("Filter output: ", rdd_filter.count())
print("Union output: ", rdd_union.count())

Map vs Flatmap O/P: 
Map output:  2
FLatmap output:  5 

Filter output:  4
Union output:  4


### REDUCE

In [27]:
# aggregates the elements from regular RDD

rdd = sc.parallelize([2,3,7,8])
# passing a function as parameter is compulsory
rdd.reduce(lambda x,y: x+y)

20

### SAVE AS TEXT FILE

In [28]:
# by default saveAsTextFile saves into text file in partitions it was created with

# rdd_flatmap.saveAsTextFile("Basic_rdd_operations.txt")

# if used coalesce you can give number of partitions you want for the text file
# coalesce is TRANSFORMATION

# rdd_flatmap.coalesce(1).saveAsTextFile("Basic_rdd_operations_using_coalesce.txt")

## PAIRED RDD - Key, Value pair

In [29]:
# creating key value pair using tuple in list

paired_rdd_tuple = sc.parallelize( [('Sahil', 42), ('Omkar', 34), ('Swarada', 34)] )
paired_rdd_tuple.collect()

[('Sahil', 42), ('Omkar', 34), ('Swarada', 34)]

In [30]:
# another way of creating paired RDD
# extract tuple from list using map

lst = ['Sahil 42', 'Omkar 34', 'Swarada 34']
rdd_list = sc.parallelize(lst)
# using map to split string in tuple 0th index- key, 1st index- value 
paired_rdd_list = rdd_list.map(lambda x : (x.split(' ')[0], x.split(' ')[1]) )
paired_rdd_list.collect()

[('Sahil', '42'), ('Omkar', '34'), ('Swarada', '34')]

### TRANSFORMATIONS ON PAIRED RDD
All basic RDD transformations along with additional below

In [31]:
pairedRDD = sc.parallelize( [ ('A',1), ('B',2), ('C',1), ('A',9), ('A', 11), ('C', 7) ] )

### REDUCE BY KEY

In [32]:
# reduceByKey reduces the K,V based on keys and perform operation for values that match a key
# the below lambda function sums up the values for same keys

pairedRDD_reduceByKey = pairedRDD.reduceByKey(lambda x,y : x + y)
pairedRDD_reduceByKey.collect()

[('A', 21), ('B', 2), ('C', 8)]

### GROUP BY KEY

In [33]:
# groupByKey groups the values based on key

pairedRDD_groupByKey = pairedRDD.groupByKey().collect()
for k,v in pairedRDD_groupByKey:
    print( (k, list(v)) )

('A', [1, 9, 11])
('B', [2])
('C', [1, 7])


### SORT BY KEY

In [34]:
# sortByKey sorts the collection in ascending order of key by default

pairedRDD_sortByKey = pairedRDD_reduceByKey.sortByKey()
print("Ascending sort: ", pairedRDD_sortByKey.collect())

# sorting in descending order

pairedRDD_sortByKey_desc = pairedRDD_reduceByKey.sortByKey(ascending = False)
print("Descending sort:", pairedRDD_sortByKey_desc.collect())

Ascending sort:  [('A', 21), ('B', 2), ('C', 8)]
Descending sort: [('C', 8), ('B', 2), ('A', 21)]


### JOIN

In [35]:
# joins 2 RDD's on the basis of key

rdd1 = sc.parallelize( [('A',12), ('B',20), ('C',1)] )
rdd2 = sc.parallelize( [('B',1), ('A',99), ('C',4)] )
pairedRDD_join = rdd1.join(rdd2)
pairedRDD_join.collect()

[('A', (12, 99)), ('B', (20, 1)), ('C', (1, 4))]

### ACTIONS ON PAIRED RDD
All basic RDD actions along with additional below

### COUNT BY KEY

In [38]:
# countByKey returns count of elements for each key

pairedRDD.countByKey()

defaultdict(int, {'A': 3, 'B': 1, 'C': 2})

### COLLECT AS MAP

In [40]:
# collectAsMap returns paired RDD as 'Dictionary'

rdd1.collectAsMap()

{'A': 12, 'B': 20, 'C': 1}