Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop and
3.   Findspark (used to locate the spark in the system)

> If you have issues with spark version, please upgrade to the latest version from [here](https://archive.apache.org/dist/spark/).

In [6]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3-scala2.13.tgz
# #If there is an error please check the spark version in the link above
# !tar xf spark-3.5.5-bin-hadoop3-scala2.13.tgz
# #Please change the version accoding to the downloaded version
# !pip install -q findspark

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@8"
os.environ["SPARK_HOME"] = "/Users/saikeerthan/spark"#Please change the version accoding to the downloaded version



# Initialize a SparkSession

`import findspark` and initialize it. This helps locate Spark in your system.

Next, you create a SparkSession using this flow:

`master("local[*]")` configures Spark to run locally using all available cores.

`appName("YourAppName") `sets the name of your Spark application, which can be helpful for identification.

`getOrCreate()` ensures that only one SparkSession is created in your application, retrieving the existing one if it exists.



In [12]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
#Create a SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("YourAppName") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/23 11:51:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# What is RDD
RDD stands for Resilient Distributed Dataset.
It is a fundamental data structure in Spark that represents an immutable, partitioned collection of elements that can be operated on in parallel.



*   Resilient: RDDs are fault-tolerant, meaning that if a partition is lost, it can be reconstructed from the original data.
*   Distributed: RDDs are distributed across multiple nodes in a cluster, enabling parallel processing.
*  Dataset: RDDs store a collection of elements, which can be of any type.


## Paralezing Data

Parallelizing data means distributing the data across multiple nodes in a cluster to perform operations concurrently.

This allows for faster processing of large datasets, as each node can work on a subset of the data simultaneously.


In [13]:
# Create RDD from parallelize
data = [1,2,3,4,5,6,7,8,9,10,11,12]
rdd=spark.sparkContext.parallelize(data)

## What are partitions?
A partition in spark is an atomic chunk of data (a logical division of data) stored on a node in the cluster.  

Imagine: You are given 100kg of rice and it is stored in a silo. You are required to move it from point A to B (Single threaded resource).
You can either move it from point A to B in one go however you do not have enough capacity (Not enough CPU or RAM). So instead you split it up into 5 kg bags of rice ( Pratition it into smaller chunks) and have more people to do it for you (Scaling up your cluster).

In [14]:
print("initial partition count:"+str(rdd.getNumPartitions()))

initial partition count:8


In [15]:
#Determine the number of paritions required
rdd=spark.sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3],6)
print("initial partition count:"+str(rdd.getNumPartitions()))

initial partition count:6


In [16]:
print("initial partition count:"+str(rdd.getNumPartitions()))

initial partition count:6


In [17]:
#Create empty RDD with partition
rdd2 = spark.sparkContext.parallelize([],11) #This creates 11 partitions

In [18]:
# Creates empty RDD with no partition
rdd = spark.sparkContext.emptyRDD
# rddString = spark.sparkContext.emptyRDD[String]

In [19]:
 rdd=spark.sparkContext.parallelize([1,2,3,4,56,7,8,9,12,3], 6)

In [20]:
reparRdd = rdd.repartition(4)
print("re-partition count:"+str(reparRdd.getNumPartitions()))

re-partition count:4


# PySpark Dataframe
In PySpark, a DataFrame is a distributed collection of data organized into named columns. It resembles a table in a relational database or a spreadsheet in which data is arranged in rows and columns. Each column can have a different data type, and DataFrames are highly optimized for parallel processing, making them suitable for handling large-scale datasets.

In [21]:
from pyspark.sql import Row

spark = SparkSession.builder \
    .appName("example") \
    .getOrCreate()

# Create a list of tuples
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)]

# Create a DataFrame from the list with column names
df = spark.createDataFrame(data, ["Name", "Age"])

# Show the DataFrame
df.show()

25/04/23 11:52:30 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 34|
|    Bob| 45|
|Charlie| 29|
+-------+---+



In [22]:
# Filter the DataFrame to select rows where id is greater than 1
filtered_df = df.filter(df['Age'] > 45)
filtered_df.show()

+----+---+
|Name|Age|
+----+---+
+----+---+



In [23]:
df.createOrReplaceTempView("people")

# Run a SQL query
result = spark.sql("SELECT * FROM people WHERE Age = 34")
result.show()

+-----+---+
| Name|Age|
+-----+---+
|Alice| 34|
+-----+---+



In [24]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df = spark.createDataFrame(data=data, schema = columns)

In [25]:
df.show()

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



# Exercise

With the data here: 4,6,68,23,7,7,3,1,8,9,20,1,5,7
How do you create a RDD with 5 partition?

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("SimpleRDDExample") \
    .master("local[*]") \
    .getOrCreate()


data = [4, 6, 68, 23, 7, 7, 3, 1, 8, 9, 20, 1, 5, 7]
rdd = spark.sparkContext.parallelize(data, 5)  # Create an RDD with 5 partitions

print("Number of partitions:", rdd.getNumPartitions())

25/04/28 10:39:19 WARN Utils: Your hostname, MacBook-Air-8.local resolves to a loopback address: 127.0.0.1; using 172.27.159.167 instead (on interface en0)
25/04/28 10:39:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/28 10:39:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Number of partitions: 5


How do you print the data stored in each of the partition?

(Hint look up on the function .glom() and .collect() )

In [None]:
def show_partition(index, iterator):
    yield f"Partition {index}: {list(iterator)}"


partition_data = rdd.mapPartitionsWithIndex(show_partition).collect()


for pdata in partition_data:
    print(pdata)

[Stage 0:>                                                          (0 + 5) / 5]

Partition 0: [4, 6]
Partition 1: [68, 23]
Partition 2: [7, 7, 3, 1]
Partition 3: [8, 9]
Partition 4: [20, 1, 5, 7]


                                                                                

Create an newsparksession called "Datframe_parition"

In [4]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Datframe_partition") \
    .master("local[*]") \
    .getOrCreate()


25/04/28 10:42:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Create a dataframe to store the following information

*   id:1,2,3,4,5,6
*   Name: La,Bu,Bu,Little,John,Pop
*   Age: 23,45,33,44,11,1

Show that the dataframe has being created properly.



In [6]:
data = [(1, "La", 23), (2, "Bu", 45), (3, "Bu", 33), (4, "Little", 44), (5, "John", 11), (6, "Pop", 1 )]

df2 = spark.createDataFrame(data, ["id", "name", "age"])

df2.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|    La| 23|
|  2|    Bu| 45|
|  3|    Bu| 33|
|  4|Little| 44|
|  5|  John| 11|
|  6|   Pop|  1|
+---+------+---+



Partition the data and store it into an RDD with the same number of ID.
Print the things stored in the partition.

In [7]:
rdd2 = spark.sparkContext.parallelize(data, 6)  # Create an RDD with 6 partitions
print("Number of partitions:", rdd2.getNumPartitions())

def show_partition(index, iterator):
    yield f"Partition {index}: {list(iterator)}"

    partition_data = rdd2.mapPartitionsWithIndex(show_partition).collect()

    for pdata in partition_data:
        print(pdata)


Number of partitions: 6


In [8]:
def show_partition(index, iterator):
    yield f"Partition {index}: {list(iterator)}"

# Apply the function
partition_data = rdd2.mapPartitionsWithIndex(show_partition).collect()

# Print each partition's data
for pdata in partition_data:
    print(pdata)

Partition 0: [(1, 'La', 23)]
Partition 1: [(2, 'Bu', 45)]
Partition 2: [(3, 'Bu', 33)]
Partition 3: [(4, 'Little', 44)]
Partition 4: [(5, 'John', 11)]
Partition 5: [(6, 'Pop', 1)]


# Further into Data Partitioning

Data partitioning refers to the process of dividing a large dataset into smaller chunks or partitions, which can be processed concurrently. This is an important aspect of distributed computing, as it allows large datasets to be processed more efficiently by dividing the workload among multiple machines or processors.

* Advantages of Data Partitioning :
Improved performance: By dividing data into smaller partitions, it can be processed in parallel across multiple machines, leading to faster processing times and improved performance.
* Scalability: Partitioning allows for horizontal scalability, meaning that as the amount of data grows, more machines can be added to the cluster to handle the increased load, without having to make changes to the data processing code.
Improved fault tolerance: Partitioning also allows for data to be distributed across multiple machines, which can help to prevent data loss in the event of a single machine failure.
* Data organization: Partitioning allows for data to be organized in a more meaningful way, such as by time period or geographic location, which can make it easier to analyze and query the data.

Methods of data partitioning in PySpark:
1. Hash Partitioning
2. Range Partitioning
3. Using partitionBy

What we have tired and done so far is hash partitioning, the most common and default way of partitioning data.


# Range Partitioning

It involves dividing the data into partitions based on a range of values for a specified column.

For example, we could partition a dataset based on a range of dates, with each partition containing records from a specific time period.

In this method, we will use the repartitionByRange() function to perform range partitioning on the DataFrame based on the age column.

In [9]:
spark = SparkSession.builder.appName("range_partitioning").getOrCreate()

# Create a sample DataFrame
df = spark.createDataFrame([
    (1, "Du", 45),
    (2, "Bu", 33),
    (3, "Be", 25),
    (4, "Hangry", 26),
    (5, "Eve", 43),
    (6, "Boby", 30)
], ["id", "name", "age"])

# Perform range partitioning on the
# DataFrame based on the "age" column
df = df.repartitionByRange(3, "age")

# Print the elements in each partition
print(df.rdd.glom().collect())

25/04/28 10:50:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
[Stage 6:>                                                          (0 + 8) / 8]

[[Row(id=3, name='Be', age=25), Row(id=4, name='Hangry', age=26)], [Row(id=2, name='Bu', age=33), Row(id=6, name='Boby', age=30)], [Row(id=1, name='Du', age=45), Row(id=5, name='Eve', age=43)]]


                                                                                

From the output above you can see the data frame is partitioned into three parts [] as specified in the repartitionByRange() function.

# Using partitionBy

The partitionBy() method in PySpark is used to split a DataFrame into smaller, more manageable partitions based on the values in one or more columns. The method takes one or more column names as arguments and returns a new DataFrame that is partitioned based on the values in those column

In [10]:

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("PartitionByExample").getOrCreate()

# Sample data
data = [
    ("USA", 2022, "Laptop", 1000),
    ("India", 2022, "Laptop", 1200),
    ("USA", 2023, "Mobile", 700),
    ("India", 2023, "Mobile", 800),
    ("UK", 2023, "Laptop", 1100)
]

# Create DataFrame
columns = ["country", "year", "product", "sales_amount"]
df = spark.createDataFrame(data, columns)

# Show the DataFrame
df.show()

# Partition the DataFrame by 'country' and save it to a specific directory
df.write.option("header", True).partitionBy("country").mode("overwrite").csv("countr")



25/04/28 10:50:54 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+-------+----+-------+------------+
|country|year|product|sales_amount|
+-------+----+-------+------------+
|    USA|2022| Laptop|        1000|
|  India|2022| Laptop|        1200|
|    USA|2023| Mobile|         700|
|  India|2023| Mobile|         800|
|     UK|2023| Laptop|        1100|
+-------+----+-------+------------+



                                                                                

# Hash Partitioning

This is another example of how hash partitioning can be done based on what we have gone through at the start.  

In [11]:
# Create a pair RDD (key-value RDD)
data = [(1, "a"), (2, "b"), (3, "c"), (4, "d"), (5, "e")]
rdd = spark.sparkContext.parallelize(data)

# Partition the RDD by key using hash partitioning (3 partitions)
partitioned_rdd = rdd.partitionBy(3)

# Check the number of partitions
print("Number of partitions: ", partitioned_rdd.getNumPartitions())

# Collect data from the partitioned RDD
print("Partitioned data: ", partitioned_rdd.glom().collect())

Number of partitions:  3




Partitioned data:  [[(3, 'c')], [(1, 'a'), (4, 'd')], [(2, 'b'), (5, 'e')]]


                                                                                