# <font color=red> PySPark Introduction</font>

####  what is pyspark? 

- PySpark is a Spark library written in Python to run Python applications using Apache Spark capabilities, using PySpark   we can run applications parallelly on the distributed cluster (multiple nodes).

-  PySpark is a Python API for Apache Spark. Apache Spark is an analytical processing engine for large scale powerful     distributed data processing and machine learning applications. 

# <font color=red>PySpark Features </font>

#### What are Pyspark features

- In-memory computation
- Distributed processing using parallelize
- Can be used with many cluster managers (Spark, Yarn, Mesos e.t.c)
- Fault-tolerant
- Immutable
- Lazy evaluation
- Cache & persistence
- Inbuild-optimization when using DataFrames
- Supports ANSI SQL

# <font color=red>PySpark Architecture </font>

<img src = "../database/01.png">

# <font color=red> Cluster Managers </font>

#### What are the different clusters that can be integrated into pyspark ?

- Standalone Cluster: <font color=blue> This is a local cluster and need to setup manually.</font>
- Apache Mesos: <font color=blue>  Mesons is a Cluster manager that can also run Hadoop MapReduce and PySpark                                              applications</font>
- Hadoop YARN: <font color=blue> This is mostly used, cluster manager which integrated in a hadoop eco system</font>
- Kubernets: <font color=blue> An open-source system for automating deployment, scaling, and management of containerized applications.</font>

# <font color=red> PySpark Modules </font>

- PySpark.RDD
- Pyspark.sql
- Pyspark.streaming
- Pyspark.mlib
- Pyspark.ml
- Pyspark.GraphFrames
- Pyspark.resource

# <font color=red> PySpark Installation </font>

- Follow the below link to install PySpark




# <font color=red>1. Spark Session</font>

#### Sparksession has Two Modes
  - Client mode
  - Cluster mode

#### Session includes all the APIs available in different contexts –

- SparkContext
- SQLContext
- StreamingContext
- HiveContext.

<font color=red> Note:</font>
- Before spark-2.0 version there is a spark context
- From spark-2.0 Sparksession is introduced and developed
- Above all contexts are integrated in the sparksession
- It is not recommended to create the spark context explicitly becuase spark session itself already have all the above contexts
- Remember, stop the spark context always afer using it because you can create only one spark context for one JVM. 


In [1]:
# CLient Mode
# Create SparkSession from builder
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]") \
                    .appName('My Application') \
                    .getOrCreate()


In [2]:
# Create new SparkSession using newSession method
spark2 = SparkSession.newSession
print(spark2)


<function SparkSession.newSession at 0x000002787FC41A80>


In [3]:
# Get Existing SparkSession
spark3 = SparkSession.builder.getOrCreate
print(spark3)


<bound method SparkSession.Builder.getOrCreate of <pyspark.sql.session.SparkSession.Builder object at 0x000002787FDC7250>>


# <font color=red> Core Spark Session </font>

### <font color=red >pyspark.sql.SparkSession </font>

- The entry point to programming Spark with the Dataset and DataFrame API.
- A SparkSession can be used to create DataFrame, register DataFrame as tables, execute SQL over 
  tables, cache tables, and read parquet files. To create a SparkSession, use the following builder pattern.

<font color=red>SparkSession(sparkContext[, jsparkSession, …])</font>

- spark = (
    SparkSession.builder
        .master("local")
        .appName("Word Count")
        .config("spark.some.config.option", "some-value")
        .getOrCreate()
    )

- spark = (
    SparkSession.builder
        .remote("sc://localhost")
        .getActive
        .config("spark.some.config.option", "some-value")
        .getOrCreate()
    )  ''' 

<img src= "../database/s1.png">

<img src = "../database/s2.png">

### <font color=red> pyspark.sql.sparkSession.builder.appName(name) </font>

Sets a name for the application, which will be shown in the Spark web UI

- syntax: SparkSession.builder.appName("My app")

### <font color=red>pyspark.sql.SparkSession.builder.config</font>

Sets a config option. Options set using this method are automatically propagated to both SparkConf and SparkSession’s own configuration.

- from pyspark.conf import SparkConf
- SparkSession.builder.config(conf=SparkConf())
- SparkSession.builder.config("spark.some.config.option", "some-value")
- syntax: SparkSession.builder.config(map={"spark.some.config.number": 123, "spark.some.config.float":              0.123})


### <font color=red> pyspark.sql.SparkSession.builder.enableHiveSupport </font>

Enables Hive support, including connectivity to a persistent Hive metastore, support for Hive SerDes, and Hive user-defined functions.

- syntax: SparkSession.builder.enableHiveSupport()


### <font color=red> pyspark.sql.SparkSession.builder.getOrCreate </font>

- Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

This method first checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default.

- syntax: s1 = SparkSession.builder.config("k1", "v1").getOrCreate()

The configuration of the SparkSession can be changed afterwards

- s1.conf.set("k1", "v1_new")
- s1.conf.get("k1") == "v1_new"

In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.
- s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
- s1.conf.get("k1") == s2.conf.get("k1") == "v1_new"
- s1.conf.get("k2") == s2.conf.get("k2") == "v2"

### <font color=red>pyspark.sql.SparkSession.builder.master</font>
 Sets the Spark master URL to connect to, such as “local” to run locally, “local[4]” to run locally with 4 cores, or “spark://master:7077” to run on a Spark standalone cluster.
 
 - syntax: SparkSession.builder.master("local")

# <font color=red>2. Spark Context</font>

In PySpark, initializing a SparkContext can be done in a few different ways. 

<img src="../database/pys.png"> </img>

<font color=red>1. Using SparkSession (Recommended):</font>

- Starting from Spark 2.0, the recommended way to initialize Spark is by using the SparkSession,
  which encapsulates both the SparkContext and SQLContext.

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

# Use spark to create RDDs, DataFrames, etc.

# Stop the SparkSession when done
spark.stop()

<font color=red>2. Creating SparkContext Directly (Legacy):</font>

- If you're working with an older version of Spark or have specific requirements, 
  you can create a SparkContext directly.

In [5]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("My Application").setMaster("local[*]")
sc = SparkContext(conf=conf)

# Use sc to create RDDs and perform operations

# Stop the SparkContext when done
sc.stop()


<font color=red>3. Creating SparkContext with Additional Configuration:</font>

- If you need to set additional configuration options for your SparkContext, you can do so by passing them in the SparkConf

In [6]:
from pyspark import SparkContext, SparkConf

conf = SparkConf() \
    .setAppName("My Application") \
    .setMaster("local[*]") \
    .set("spark.some.config.option", "config-value")

sc = SparkContext(conf=conf)

# Use sc to create RDDs and perform operations

# Stop the SparkContext when done
sc.stop()


<font color=red>Note</font>

-  you can create only one SparkContext per JVM, in order to create another first you need to stop the existing one        using stop() method.

# <font color=red> SparkContext Commonly Used Methods </font>

- accumulator(value[, accum_param]) – It creates an pyspark accumulator variable with initial specified value. Only a     driver can access accumulator variables.

- broadcast(value) – read-only PySpark broadcast variable. This will be broadcast to the entire cluster. You can        broadcast a variable to a PySpark cluster only once.

- emptyRDD() – Creates an empty RDD

- getOrCreate() – Creates or returns a SparkContext

- hadoopFile() – Returns an RDD of a Hadoop file

- newAPIHadoopFile() – Creates an RDD for a Hadoop file with a new API InputFormat.

- sequenceFile() – Get an RDD for a Hadoop SequenceFile with given key and value types.

- setLogLevel() – Change log level to debug, info, warn, fatal, and error

- textFile() – Reads a text file from HDFS, local or any Hadoop supported file systems and returns an RDD

- union() – Union two RDDs

- wholeTextFiles() – Reads a text file in the folder from HDFS, local or any Hadoop supported file systems and returns an RDD of Tuple2. The first element of the tuple consists file name and the second element consists context of the text file.

# <font color=red> 3.RDD (Resilient Distributed Dataset) </font>

# <font color=red>What is a RDD?</font>

- RDD (Resilient Distributed Dataset) is a fundamental building block of PySpark which is fault-tolerant, immutable distributed collections of objects. Immutable meaning once you create an RDD you cannot change it. Each record in RDD is divided into logical partitions, which can be computed on different nodes of the cluster. 

- In other words, RDDs are a collection of objects similar to list in Python, with the difference being RDD is computed on several processes scattered across multiple physical servers also called nodes in a cluster while a Python collection lives and process in just one process.

<img src = "../database/rdd1.png">

# <font color=red>RDD Benefits</font>

- In-Memory Processing
<font color=blue> PySpark loads the data from disk and process in memory and keeps the data in memory, this is the main difference between PySpark and Mapreduce (I/O intensive). In between the transformations, we can also cache/persists the RDD in memory to reuse the previous computations.</font>

- Immutability
<font color=blue>PySpark RDD’s are immutable in nature meaning, once RDDs are created you cannot modify. When we apply transformations on RDD, PySpark creates a new RDD and maintains the RDD Lineage.</font>

- Fault Tolerance
<font color=blue>PySpark operates on fault-tolerant data stores on HDFS, S3 e.t.c hence any RDD operation fails, it automatically reloads the data from other partitions. Also, When PySpark applications running on a cluster, PySpark task failures are automatically recovered for a certain number of times (as per the configuration) and finish the application seamlessly.</font>

- Lazy Evolution
<font color=blue>PySpark does not evaluate the RDD transformations as they appear/encountered by Driver instead it keeps the all transformations as it encounters(DAG) and evaluates the all transformation when it sees the first RDD action.
</font>

- Partitioning
<font color=blue> When you create RDD from a data, It by default partitions the elements in a RDD. By default it partitions to the number of cores available.</font>

#### <font color=red> what types of data structures can rdd does process ? </font>

- RDD (Resilient Distributed Dataset) in Spark is designed to store and manage any type of data. It is a fundamental abstraction that can hold a wide range of data structures, including:

1. **Basic Types**: RDDs can store simple types such as integers, floats, strings, and booleans.

2. **Tuples**: RDDs can store tuples, which are ordered collections of elements. Tuples can contain a mix of different data types.

3. **Lists and Arrays**: RDDs can store lists or arrays of elements. Each element in the RDD can be a list or array of arbitrary length.

4. **Dictionaries/Maps**: RDDs can store dictionaries or maps, which are key-value pairs. Each element in the RDD can be a dictionary with key-value pairs.

5. **Custom Objects**: RDDs can store instances of custom classes or objects that you define. However, the objects need to be serializable since RDDs are distributed across the cluster.

6. **Structured Data**: RDDs can store structured data, such as rows or records with named fields.

7. **Nested Structures**: RDDs can store complex nested structures, where elements can be combinations of lists, tuples, dictionaries, or custom objects.

Keep in mind that while RDDs can store a variety of data structures, the actual data type and structure of the RDD is determined by how you create and transform it. The operations you perform on the RDD, such as `map`, `filter`, and `reduce`, influence the structure and type of the resulting RDD.

It's also important to note that while RDDs offer a lot of flexibility in terms of data types, they don't provide built-in optimizations for complex data types as compared to higher-level abstractions like DataFrames or Datasets in Spark. These higher-level abstractions provide schema information and optimizations that make working with structured and semi-structured data more efficient.

# <font color=red>Creating RDD's</font>

- RDD’s are created primarily in two different ways
    - parallelizing collection 
      (creating a new collection from an existing collection)
    - referencing a dataset from an external storage system (HDFS, S3 and many more). 

<img src = "../database/rdd2.jpg" width = 700 height=300>

## <font color=red>First Method: Parallelized Collections</font>

#### <font color=red>Create RDD using sparkContext.parallelize()</font>

In [7]:
# Firstly, you need to create a spark session which contains sparkContext

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("My Application").getOrCreate()    

# This is the basic method in creating RDD.
# When you alreaady have data in memory that is either loaded from a file or a database.
# Initialising RDD usoing Parallelize()

data = [1,2,3,4,5,6,7,8]

rdd = spark.sparkContext.parallelize(data)

print(rdd.collect())

### Creating an empy RDD 

rdd1 = spark.sparkContext.emptyRDD

print(rdd1)

#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],10) #This creates 10 partitions

print(rdd2.collect())

spark.stop()


[1, 2, 3, 4, 5, 6, 7, 8]
<bound method SparkContext.emptyRDD of <SparkContext master=local[*] appName=My Application>>
[]


## <font color=red>Second Method: External Datasets</font>

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

print(data.columns)

df = data[["Selling_Price","Car_Name"]]

df1 = df.values.tolist()

rdd_df = spark.sparkContext.parallelize(df1)

print(rdd_df.collect())

spark.stop()


Index(['Car_Name', 'Year', 'Selling_Price', 'Present_Price', 'Kms_Driven',
       'Fuel_Type', 'Seller_Type', 'Transmission', 'Owner'],
      dtype='object')
[[3.35, 'ritz'], [4.75, 'sx4'], [7.25, 'ciaz'], [2.85, 'wagon r'], [4.6, 'swift'], [9.25, 'vitara brezza'], [6.75, 'ciaz'], [6.5, 's cross'], [8.75, 'ciaz'], [7.45, 'ciaz'], [2.85, 'alto 800'], [6.85, 'ciaz'], [7.5, 'ciaz'], [6.1, 'ertiga'], [2.25, 'dzire'], [7.75, 'ertiga'], [7.25, 'ertiga'], [7.75, 'ertiga'], [3.25, 'wagon r'], [2.65, 'sx4'], [2.85, 'alto k10'], [4.9, 'ignis'], [4.4, 'sx4'], [2.5, 'alto k10'], [2.9, 'wagon r'], [3.0, 'swift'], [4.15, 'swift'], [6.0, 'swift'], [1.95, 'alto k10'], [7.45, 'ciaz'], [3.1, 'ritz'], [2.35, 'ritz'], [4.95, 'swift'], [6.0, 'ertiga'], [5.5, 'dzire'], [2.95, 'sx4'], [4.65, 'dzire'], [0.35, '800'], [3.0, 'alto k10'], [2.25, 'sx4'], [5.85, 'baleno'], [2.55, 'alto k10'], [1.95, 'sx4'], [5.5, 'dzire'], [1.25, 'omni'], [7.5, 'ciaz'], [2.65, 'ritz'], [1.05, 'wagon r'], [5.8, 'ertiga'], [7.75, 'c

In [9]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

# data = pd.read_csv("../input/car_data.csv")

data = spark.read.csv("../input/car_data.csv", header=True, inferSchema=True)

data.show()

spark.stop()


+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|     Car_Name|Year|Selling_Price|Present_Price|Kms_Driven|Fuel_Type|Seller_Type|Transmission|Owner|
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|         ritz|2014|         3.35|         5.59|     27000|   Petrol|     Dealer|      Manual|    0|
|          sx4|2013|         4.75|         9.54|     43000|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2017|         7.25|         9.85|      6900|   Petrol|     Dealer|      Manual|    0|
|      wagon r|2011|         2.85|         4.15|      5200|   Petrol|     Dealer|      Manual|    0|
|        swift|2014|          4.6|         6.87|     42450|   Diesel|     Dealer|      Manual|    0|
|vitara brezza|2018|         9.25|         9.83|      2071|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2015|         6.75|         8.12|     18796|   Petrol|     Dealer|      Manu

### <font color=red> Conversions of data columns from one structure to another </font >

In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

df = data[["Car_Name", "Selling_Price"]]

my_list = df.values.tolist()

# print(my_list[0:5])

rdd = spark.sparkContext.parallelize(my_list)

print("list elements:", rdd.collect())

rdd_df = rdd.collect()

# Converting to a dictionary
data_dict = dict(rdd_df)

print("dictionaries:", data_dict)

spark.stop()


list elements: [['ritz', 3.35], ['sx4', 4.75], ['ciaz', 7.25], ['wagon r', 2.85], ['swift', 4.6], ['vitara brezza', 9.25], ['ciaz', 6.75], ['s cross', 6.5], ['ciaz', 8.75], ['ciaz', 7.45], ['alto 800', 2.85], ['ciaz', 6.85], ['ciaz', 7.5], ['ertiga', 6.1], ['dzire', 2.25], ['ertiga', 7.75], ['ertiga', 7.25], ['ertiga', 7.75], ['wagon r', 3.25], ['sx4', 2.65], ['alto k10', 2.85], ['ignis', 4.9], ['sx4', 4.4], ['alto k10', 2.5], ['wagon r', 2.9], ['swift', 3.0], ['swift', 4.15], ['swift', 6.0], ['alto k10', 1.95], ['ciaz', 7.45], ['ritz', 3.1], ['ritz', 2.35], ['swift', 4.95], ['ertiga', 6.0], ['dzire', 5.5], ['sx4', 2.95], ['dzire', 4.65], ['800', 0.35], ['alto k10', 3.0], ['sx4', 2.25], ['baleno', 5.85], ['alto k10', 2.55], ['sx4', 1.95], ['dzire', 5.5], ['omni', 1.25], ['ciaz', 7.5], ['ritz', 2.65], ['wagon r', 1.05], ['ertiga', 5.8], ['ciaz', 7.75], ['fortuner', 14.9], ['fortuner', 23.0], ['innova', 18.0], ['fortuner', 16.0], ['innova', 2.75], ['corolla altis', 3.6], ['etios cross', 

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

df = data[["Car_Name", "Selling_Price"]]

my_list = df.values.tolist()

print(my_list[0:5])

rdd = spark.sparkContext.parallelize(my_list)

rdd_df = rdd.collect()

#Converting to a set 
rdd_set = set(df)

print(rdd_set)

spark.stop()

[['ritz', 3.35], ['sx4', 4.75], ['ciaz', 7.25], ['wagon r', 2.85], ['swift', 4.6]]
{'Selling_Price', 'Car_Name'}


### <font color=red>Getting partitions used by resource </font>

- ###  <font color=red> getNumPartitions() </font>
 This a RDD function which returns a number of partitions our dataset split into.

<font color=blue> you can set the partitions manually inside the parallelize() method </font>

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark sessiona
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

df = data["Selling_Price"]

start_time = time.time()

rdd_df = spark.sparkContext.parallelize(df)

print("initial partition count:"+str(rdd_df.getNumPartitions()))

print(rdd_df.collect())

end_time = time.time()

execution_time = end_time - start_time
print(f"Execution time: {execution_time} seconds")

spark.stop()



initial partition count:16
[3.35, 4.75, 7.25, 2.85, 4.6, 9.25, 6.75, 6.5, 8.75, 7.45, 2.85, 6.85, 7.5, 6.1, 2.25, 7.75, 7.25, 7.75, 3.25, 2.65, 2.85, 4.9, 4.4, 2.5, 2.9, 3.0, 4.15, 6.0, 1.95, 7.45, 3.1, 2.35, 4.95, 6.0, 5.5, 2.95, 4.65, 0.35, 3.0, 2.25, 5.85, 2.55, 1.95, 5.5, 1.25, 7.5, 2.65, 1.05, 5.8, 7.75, 14.9, 23.0, 18.0, 16.0, 2.75, 3.6, 4.5, 4.75, 4.1, 19.99, 6.95, 4.5, 18.75, 23.5, 33.0, 4.75, 19.75, 9.25, 4.35, 14.25, 3.95, 4.5, 7.45, 2.65, 4.9, 3.95, 5.5, 1.5, 5.25, 14.5, 14.73, 4.75, 23.0, 12.5, 3.49, 2.5, 35.0, 5.9, 3.45, 4.75, 3.8, 11.25, 3.51, 23.0, 4.0, 5.85, 20.75, 17.0, 7.05, 9.65, 1.75, 1.7, 1.65, 1.45, 1.35, 1.35, 1.35, 1.25, 1.2, 1.2, 1.2, 1.15, 1.15, 1.15, 1.15, 1.11, 1.1, 1.1, 1.1, 1.05, 1.05, 1.05, 1.05, 1.0, 0.95, 0.9, 0.9, 0.75, 0.8, 0.78, 0.75, 0.75, 0.75, 0.72, 0.65, 0.65, 0.65, 0.65, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.55, 0.55, 0.52, 0.51, 0.5, 0.5, 0.5, 0.5, 0.5, 0.48, 0.48, 0.48, 0.48, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.42, 0.42, 0.4,

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

df = data["Selling_Price"]

start_time = time.time()

rdd_gnp = spark.sparkContext.parallelize(df,20)

print("partition count is:"+str(rdd_gnp.getNumPartitions()))

print(rdd_gnp.collect())

end_time = time.time()

execution_time = end_time - start_time

print(f"Execution time: {execution_time} seconds")

spark.stop()

partition count is:20
[3.35, 4.75, 7.25, 2.85, 4.6, 9.25, 6.75, 6.5, 8.75, 7.45, 2.85, 6.85, 7.5, 6.1, 2.25, 7.75, 7.25, 7.75, 3.25, 2.65, 2.85, 4.9, 4.4, 2.5, 2.9, 3.0, 4.15, 6.0, 1.95, 7.45, 3.1, 2.35, 4.95, 6.0, 5.5, 2.95, 4.65, 0.35, 3.0, 2.25, 5.85, 2.55, 1.95, 5.5, 1.25, 7.5, 2.65, 1.05, 5.8, 7.75, 14.9, 23.0, 18.0, 16.0, 2.75, 3.6, 4.5, 4.75, 4.1, 19.99, 6.95, 4.5, 18.75, 23.5, 33.0, 4.75, 19.75, 9.25, 4.35, 14.25, 3.95, 4.5, 7.45, 2.65, 4.9, 3.95, 5.5, 1.5, 5.25, 14.5, 14.73, 4.75, 23.0, 12.5, 3.49, 2.5, 35.0, 5.9, 3.45, 4.75, 3.8, 11.25, 3.51, 23.0, 4.0, 5.85, 20.75, 17.0, 7.05, 9.65, 1.75, 1.7, 1.65, 1.45, 1.35, 1.35, 1.35, 1.25, 1.2, 1.2, 1.2, 1.15, 1.15, 1.15, 1.15, 1.11, 1.1, 1.1, 1.1, 1.05, 1.05, 1.05, 1.05, 1.0, 0.95, 0.9, 0.9, 0.75, 0.8, 0.78, 0.75, 0.75, 0.75, 0.72, 0.65, 0.65, 0.65, 0.65, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.55, 0.55, 0.52, 0.51, 0.5, 0.5, 0.5, 0.5, 0.5, 0.48, 0.48, 0.48, 0.48, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.42, 0.42, 0.4, 0.4,

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()

data = pd.read_csv("../input/car_data.csv")

df = data["Selling_Price"]

start_time = time.time()

rdd_gnp = spark.sparkContext.parallelize(df,5)

print("partition count is:"+str(rdd_gnp.getNumPartitions()))

print(rdd_gnp.collect())

end_time = time.time()

execution_time = end_time - start_time

print(f"Execution time: {execution_time} seconds")

spark.stop()

partition count is:5
[3.35, 4.75, 7.25, 2.85, 4.6, 9.25, 6.75, 6.5, 8.75, 7.45, 2.85, 6.85, 7.5, 6.1, 2.25, 7.75, 7.25, 7.75, 3.25, 2.65, 2.85, 4.9, 4.4, 2.5, 2.9, 3.0, 4.15, 6.0, 1.95, 7.45, 3.1, 2.35, 4.95, 6.0, 5.5, 2.95, 4.65, 0.35, 3.0, 2.25, 5.85, 2.55, 1.95, 5.5, 1.25, 7.5, 2.65, 1.05, 5.8, 7.75, 14.9, 23.0, 18.0, 16.0, 2.75, 3.6, 4.5, 4.75, 4.1, 19.99, 6.95, 4.5, 18.75, 23.5, 33.0, 4.75, 19.75, 9.25, 4.35, 14.25, 3.95, 4.5, 7.45, 2.65, 4.9, 3.95, 5.5, 1.5, 5.25, 14.5, 14.73, 4.75, 23.0, 12.5, 3.49, 2.5, 35.0, 5.9, 3.45, 4.75, 3.8, 11.25, 3.51, 23.0, 4.0, 5.85, 20.75, 17.0, 7.05, 9.65, 1.75, 1.7, 1.65, 1.45, 1.35, 1.35, 1.35, 1.25, 1.2, 1.2, 1.2, 1.15, 1.15, 1.15, 1.15, 1.11, 1.1, 1.1, 1.1, 1.05, 1.05, 1.05, 1.05, 1.0, 0.95, 0.9, 0.9, 0.75, 0.8, 0.78, 0.75, 0.75, 0.75, 0.72, 0.65, 0.65, 0.65, 0.65, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.6, 0.55, 0.55, 0.52, 0.51, 0.5, 0.5, 0.5, 0.5, 0.5, 0.48, 0.48, 0.48, 0.48, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.45, 0.42, 0.42, 0.4, 0.4, 

# <font color=red> RDD Repartition() vs Coalesce()

In [16]:

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
# Create spark session with local cores
rdd = spark.sparkContext.parallelize(range(0,1000))
print("From local cores : "+str(rdd.getNumPartitions()))

# Use parallelize with 6 partitions
rdd1 = spark.sparkContext.parallelize(range(0,25), 6)
print("parallelize : "+str(rdd1.getNumPartitions()))

spark.stop()


From local cores : 16
parallelize : 6


- Partition 1 : 0 1 2
- Partition 2 : 3 4 5
- Partition 3 : 6 7 8 9
- Partition 4 : 10 11 12
- Partition 5 : 13 14 15
- Partition 6 : 16 17 18 19

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
data = pd.read_csv("../input/car_data.csv")

rddFromFile = spark.sparkContext.parallelize(data, 30)
print("Partitions : "+str(rddFromFile.getNumPartitions()))

spark.stop()

Partitions : 30


# <font color=red>RDD repartition()</font>

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
data = pd.read_csv("../input/car_data.csv")

rddFromFile = spark.sparkContext.parallelize(data, 10)
print("TextFile : "+str(rddFromFile.getNumPartitions()))

rdd_rePartition = rddFromFile.repartition(6) 
print("TextFile : "+str(rdd_rePartition.getNumPartitions()))

rdd_rePartition1 = rddFromFile.repartition(20) 
print("TextFile : "+str(rdd_rePartition1.getNumPartitions()))

spark.stop()

TextFile : 10
TextFile : 6
TextFile : 20


# <font color=red>RDD coalesce()</font>

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
data = pd.read_csv("../input/car_data.csv")

rdd = spark.sparkContext.parallelize(data)

print("initial count:", rdd.getNumPartitions())

rddFromFile = spark.sparkContext.parallelize(data, 10)
print("TextFile : "+str(rddFromFile.getNumPartitions()))

rdd_coalesce = rddFromFile.coalesce(4) 
print("TextFile : "+str(rdd_coalesce.getNumPartitions()))

rdd_coalesce1 = rddFromFile.coalesce(11) 
print("TextFile : "+str(rdd_coalesce1.getNumPartitions()))

spark.stop()

initial count: 16
TextFile : 10
TextFile : 4
TextFile : 10


In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
data = spark.read.csv("../input/car_data.csv", header=True, inferSchema=True)

print("Initial Count:", data.rdd.getNumPartitions())

rdd = data.rdd.repartition(6) 

print("After Partioning : "+str(rdd.getNumPartitions()))

data.show()

spark.stop()


Initial Count: 1
After Partioning : 6
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|     Car_Name|Year|Selling_Price|Present_Price|Kms_Driven|Fuel_Type|Seller_Type|Transmission|Owner|
+-------------+----+-------------+-------------+----------+---------+-----------+------------+-----+
|         ritz|2014|         3.35|         5.59|     27000|   Petrol|     Dealer|      Manual|    0|
|          sx4|2013|         4.75|         9.54|     43000|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2017|         7.25|         9.85|      6900|   Petrol|     Dealer|      Manual|    0|
|      wagon r|2011|         2.85|         4.15|      5200|   Petrol|     Dealer|      Manual|    0|
|        swift|2014|          4.6|         6.87|     42450|   Diesel|     Dealer|      Manual|    0|
|vitara brezza|2018|         9.25|         9.83|      2071|   Diesel|     Dealer|      Manual|    0|
|         ciaz|2015|         6.75|         8.12|     

# <font color=red> RDD Operations </font>

- RDD has two internal operations

RDD transformations – Transformations are lazy operations, instead of updating an RDD, these operations return another RDD.
RDD actions – operations that trigger computation and return RDD values.

Transformations on PySpark RDD returns another RDD and transformations are lazy meaning they don’t execute until you call an action on RDD. Some transformations on RDD’s are flatMap(), map(), reduceByKey(), filter(), sortByKey() and return new RDD instead of updating the current.

<font color=red> flatMap </font> 
- flatMap() transformation flattens the RDD after applying the function and returns a new RDD. On the below example, first, it splits each record by space in an RDD and finally flattens it. Resulting RDD consists of a single word on each record.
- In Apache Spark's Resilient Distributed Dataset (RDD) API, the flatMap() transformation is used to apply a function to each element of the RDD and then flatten the resulting sequence of sequences into a single RDD. This is particularly useful when you want to transform each element into multiple elements and then merge those elements into a single RDD, essentially "flattening" the structure.

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
    
rdd = spark.sparkContext.textFile("../input/test.txt")

rdd2 = rdd.flatMap(lambda x: x.split(" "))

spark.stop()

<font color=red> map </font>
- map() transformation is used to apply any complex operations like adding a column, updating a column e.t.c, the output of map transformations would always have the same number of records as input.

- Key points

- Both map() & flatMap() returns Dataset (DataFrame=Dataset[Row]).
- Both these transformations are narrow meaning they do not result in Spark Data Shuffle.
- flatMap() results in redundant data on some columns.
- One of the use cases of flatMap() is to flatten column which contains arrays, list, or any nested collection(one cell with one value).
- map() always return the same size/records as in input DataFrame whereas flatMap() returns many records for each record (one-many).

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
    
rdd = spark.sparkContext.textFile("../input/test.txt")

rdd3 = rdd.map(lambda x: (x,1))

spark.stop()

<font color=red> reduceByKey </font>
- reduceByKey() merges the values for each key with the function specified. In our example, it reduces the word string by applying the sum function on value. The result of our RDD contains unique words and their count.

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import time

# Create a Spark session
spark = SparkSession.builder \
    .appName("My Application") \
    .master("local[*]") \
    .getOrCreate()
 
    
rdd = spark.sparkContext.textFile("../input/test.txt")

rdd4 = rdd.reduceByKey(lambda a,b: a+b)

spark.stop()


<font color=red> sortByKey </font> 
- sortByKey() transformation is used to sort RDD elements on key. In our example, first, we convert RDD[(String,Int]) to RDD[(Int, String]) using map transformation and apply sortByKey which ideally does sort on an integer value. And finally, foreach with println statements returns all words in RDD and their count as key-value pair

In [26]:
rdd = spark.sparkContext.textFile("../input/test.txt")

<font color=red> filter </font> 
- filter() transformation is used to filter the records in an RDD. In our example we are filtering all words starts with “a”