### Github Link for pyspark : https://github.com/spark-examples/pyspark-examples

In [1]:
!pip install findspark



In [3]:
import findspark
findspark.init()

### RDD-Operations

PySpark RDD (Resilient Distributed Dataset) is a fundamental data structure of PySpark that is fault-tolerant, immutable distributed collections of objects, which means once you create an RDD you cannot change it. Each dataset in RDD is divided into logical partitions, which can be computed on different nodes of the cluster

### RDD Creation

In [4]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create SparkSession 
spark = SparkSession.builder.appName("Rddapp").getOrCreate()

print("Spark Session Created")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/22 05:44:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session Created


### Spark Context

Spark session internally creates a sparkContext variable of SparkContext. You can create multiple SparkSession objects but only one SparkContext per JVM. In case if you want to create another new SparkContext you should stop existing Sparkcontext (using stop()) before creating a new one.


- Spark context is the main entry point, it consists of all basic functions
- Spark driver contains backend scheduler, DAG schdueler, task scheduler, block manager which are responsioble for translating usual written code into jobs that are actually executed on cluster

In [5]:
spark

#### As you can see sparkcontext is already created internally

### RDD Creation

Creation of RDD can be done by two methods

- .Parallelize() ---> to create RDD from list
- .textFile() ---> to create RDD from txt or csv files.i.e.,from external sources

some other types of imports are 

- .wholeTextFiles() ---> returns a PairRDD with the key being the file path and value being file content.
- .emptyRDD ---> we can create an RDD with no data. This method creates an empty RDD with no partition

### RDD operations

- Transformations
- Actions

In [6]:
rdd = spark.sparkContext.textFile('test.txt')

In [7]:
len = rdd.getNumPartitions()
print("Number of partitions:"+str(len))

Number of partitions:2


In [8]:
rdd.collect()

                                                                                

['Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Project Gutenberg’s',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone anywhere',
 'at no cost and with',
 'Alice’s Adventures in Wonderland',
 'by Lewis Carroll',
 'This eBook is for the use',
 'of anyone

### RDD Transformations


#### FlatMap 

- FlatMap() transformation flattens the RDD after applying the function and returns a new RDD. In the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.

In [9]:
rdd1 = rdd.flatMap(lambda x: x.split(" "))

In [10]:
rdd1.collect()

                                                                                

['Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'by',
 'Lewis',
 'Carroll',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'by',
 'Lewis',
 'Carroll',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'Project',
 'Gutenberg’s',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'by',
 'Lewis',
 'Carroll',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'Alice’s',
 'Adventures',
 'in',
 'Wonderland',
 'by',
 'Lewis',
 'Carroll',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'This',
 'eBook',
 'is',
 

#### Map()

RDD map() transformation is used to apply any complex operations like adding a column, updating a column, transforming the data e.t.c, the output of map transformations would always have the same number of records as input.

- Note1: DataFrame doesn’t have map() transformation to use with DataFrame hence you need to DataFrame to RDD first.
- Note2: If you have a heavy initialization use PySpark mapPartitions() transformation instead of map(), as with mapPartitions() heavy initialization executes only once for each partition instead of every record.

In [11]:
rdd2=rdd.map(lambda x: (x,1))

In [14]:
rdd2.foreach(lambda x: print(x))   ### for performing operations

('by Lewis Carroll', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('Project Gutenberg’s', 1)
('Alice’s Adventures in Wonderland', 1)
('by Lewis Carroll', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('Alice’s Adventures in Wonderland', 1)
('by Lewis Carroll', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('Project Gutenberg’s', 1)
('Alice’s Adventures in Wonderland', 1)
('by Lewis Carroll', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('Alice’s Adventures in Wonderland', 1)
('by Lewis Carroll', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at no cost and with', 1)
('This eBook is for the use', 1)
('of anyone anywhere', 1)
('at 

#### Filter()

filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”.

- NOTE: for using this filter operation we can convert RDD to dataframe for doing operations easily

##### LINK:- https://sparkbyexamples.com/pyspark/pyspark-where-filter/

In [31]:
# List of starting values you want to filter with
starting_values = ["a", "A"]

# Custom function to check if an element starts with any of the starting values
def starts_with_any(value):
    return any(value.startswith(prefix) for prefix in starting_values)

filtered_rdd = rdd.filter(starts_with_any)

In [35]:
filtered_rdd_top5 = filtered_rdd.top(5)

print(filtered_rdd_top5)

['at no cost and with', 'at no cost and with', 'at no cost and with', 'at no cost and with', 'at no cost and with']


### RDD Actions