<a href="https://colab.research.google.com/github/jalorenzo/SparkNotebook25-26/blob/main/BDF_06_Persistence_and_Partitioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#00 - Configuration of Apache Spark on Collaboratory


###Installing Java, Spark, and Findspark


---


This code installs Apache Spark 3.0.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [None]:
import os

os.environ["SPARK_VERSION"] = "spark-4.1.1"
!apt-get update
!apt-get install openjdk-17-jdk-headless
!wget https://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!echo $SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!rm $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

### Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Enseignement/2025-2026/ING3/HPDA/BigDataFrameworks/data/"

!rm /content/spark
!ln -s /content/$SPARK_VERSION-bin-hadoop3 /content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

### Start a SparkSession
This will start a local Spark session.

In [None]:
!python -V

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Example: shows the PySpark version
print("PySpark version {0}".format(sc.version))

# Example: parallelise an array and show the 2 first elements
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

In [None]:
from pyspark.sql import SparkSession
# We create a SparkSession object (or we retrieve it if it is already created)
spark = SparkSession \
.builder \
.appName("My application") \
.config("spark.some.config.option", "some-value") \
.master("local[4]") \
.getOrCreate()
# We get the SparkContext
sc = spark.sparkContext

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')


---


# 06 - Persistence and Partitioning

We will show here two important aspects of Apache Spark

- `Persistence`: how to store DataFrames and RDDs in a way so that they do not need to be recalculated
- `Partitioning`:  how to specify and change the partitions of a DataFrame or RDD

## Persistence

Issues when reusing an RDD several times:

-   Spark recalculates the RDD as well as its dependencies every time an action is executed
-   Very costly (particularly in iterative problems)

Solution

-   Keep the RDD in memory and/or disk
-   Use `cache()` or `persist()` methods

### Persistence levels (as defined in [`pyspark.StorageLevel`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html#pyspark.StorageLevel) and [`org.apache.spark.storage.StorageLevel`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.storage.StorageLevel))
 Level                | Space used  | CPU time     | Memory/Disk   | Comments
 :------------------: | :------:    | :-----:      | :-------------: | ------------------
 MEMORY_ONLY          |   High      |   Low        |     Memory   | Stores the RDD in the JVM as a non-serialised Java object. If the RDD does not fit in memory, some partitions will not be cached in memory and will be recalculated on the fly every time they are required. Default level in Java and Scala.
 MEMORY_ONLY_SER      |   Low       |   High       |     Memory   | Stores the RDD as a serialised Java object (a *byte array* per partition). Default level in Python, using [`pickle`](http://docs.python.org/2/library/pickle.html).
 MEMORY_AND_DISK      |   High      |   Medium     |     Both     | Stores the RDD in the JVM as a non-serialised Java object. If the RDD does not fit in memory, the partitions that do not fit will be spilled to disk and read from there every time they are required.
 MEMORY_AND_DISK_SER  |   Low       |   High       |     Both     | Similar to MEMORY_AND_DISK but using serialised objects.
 DISK_ONLY            |   Low       |   High       |     Disk     | Stores the RDD partitions only on disk.
 OFF_HEAP             |   Low       |   High       |   Memory     | Stores the serialised RDD using *off-heap* memory (outside the JVM's heap) which can reduce the overhead of the garbage collector.
   


    
### Persistence levels

-   In Scala and Java, the default level is MEMORY\_ONLY

-   In Python, data are always serialised (by default as *pickled* objects)

    - MEMORY_ONLY and MEMORY_AND_DISK levels are equivalent to MEMORY_ONLY_SER and MEMORY_AND_DISK_SER
    - When creating the SparkContext it is possible to request a serialisation [`marshal`](https://docs.python.org/2/library/marshal.html#module-marshal)
    
```python
sc = SparkContext(master="local", appName="My app", serializer=pyspark.MarshalSerializer())
```
    
### Fault tolerance

-   If a node with stored data fails, the RDD is recomputed

    -   Adding `_2` to the persistence level, 2 copies of the RDD are stored
        
### Cache management

-   LRU algorithm to manage the cache memory

    -   For *only memory* levels, the old RDDs are deleted and recalculated
    -   For *memory and disk* levels, partitions that do not fit in memory are spilled to disk


### Persistence with DataFrames

In [None]:
dfFlightsData = (spark
    .read
    .option("inferSchema", "true")
    .option("header", "true")
    .csv(os.environ["DRIVE_DATA"] + "2015-summary.csv"))
print("Cached: {0}".format(dfFlightsData.is_cached))
print("Level without persistence: {0}".format(dfFlightsData.storageLevel))

In [None]:
dfFlightsData.cache()
print("Cached: {0}".format(dfFlightsData.is_cached))
print("Persistence level by default: {0}".format(dfFlightsData.storageLevel))

In [None]:
# To chanche the persistence level, we need first to remove it from cache
dfFlightsData.unpersist()

from pyspark import StorageLevel
dfFlightsData.persist(StorageLevel.MEMORY_ONLY_2)
print("Cached: {0}".format(dfFlightsData.is_cached))
print("New persistence level: {0}".format(dfFlightsData.storageLevel))

In [None]:
# Persistence is not inherited in transformations
dfData2 = dfFlightsData.select("DEST_COUNTRY_NAME")
print("Cached: {0}".format(dfData2.is_cached))


### Persistence with RDDs

In [None]:
rdd = sc.parallelize(range(1000), 10)
print("Cached: {0}".format(rdd.is_cached))
print("Level without persistence: {0}".format(rdd.getStorageLevel()))

In [None]:
rdd.cache()
print("Cached: {0}".format(rdd.is_cached))
print("Default persistence level: {0}".format(rdd.getStorageLevel()))

In [None]:
# Take rdd out of the cache memory
rdd.unpersist()

from pyspark import StorageLevel
rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
print("Cached: {0}".format(rdd.is_cached))
print("New persistence level: {0}".format(rdd.getStorageLevel()))

In [None]:
# Persistence is not inherited in transformations
rdd2 = rdd.map(lambda x: x*x)
print("Cached: {0}".format(rdd2.is_cached))

### Checkpointing with RDDs
RDDs can be checkpointed, forcing them to be stored on disk.

- I is a *lazy* operation: data are not stored on disk until an Action is dispatched
- Future references to those RDDs will load them from disk instead of recomputing them


In [None]:
!mkdir -p "$DRIVE_DATA"/CP
!ls "$DRIVE_DATA"
#just in case...
#rm -rf "$DRIVE_DATA"/CP/*

In [None]:
rdd = sc.parallelize(range(100000))
spark.sparkContext.setCheckpointDir(os.environ["DRIVE_DATA"] + "CP")
rdd.checkpoint()

In [None]:
!ls -lR "$DRIVE_DATA"CP/

In [None]:
rdd.count()
!ls -lR "$DRIVE_DATA"CP/

In [None]:
!rm -rf "$DRIVE_DATA"CP/
!ls -lR "$DRIVE_DATA"CP/

## Partitioning

The number of partitions is a function of the cluster size or the number of blocks of the HDFS file

-   It can be adjusted when creating or operating on an RDD
    
    - RDDs offer a greater control on their partitioning

-   For DataFrames it is possible to modify it once created.

-   The parallelism of RDDs derived from other ones depends on their parent's.

-   Useful properties:
    -    `spark.default.parallelism` For RDDs, numbre of partitions by default returned by default by transformations like parallelize, join and reduceByKey
        - Fixed value for a SparkContext
        - The property `sc.defaultParallelism` indicates its value
    -    `spark.sql.shuffle.partitions` For DataFrames, number of partitions to use when using data in *wide* transformations
        - It can be modified using `spark.conf.set`

- Useful functions:
    -   `rdd.getNumPartitions()` returns the number of partitions of the RDD
    -   `rdd.glom()` returns a new RDD joining the elements on each partition into a list

    - `repartition(n)` returns a new DataFrame or RDD with exactly `n` partitions
    - `coalesce(n)` optimised version of `repartition`, allows avoiding data movement
        - But only if you are decreasing the number of partitions.
    - `partitionBy(n,[partitionFunc])` Partitioning by key, using a partitioning function (by default, a hash of the key)
        - Only for key/value RDDs
        - Ensures that pairs with the same key go to the same partition



### Partitions and RDDs

In [None]:
print("Number of partitions by default for RDDs: {0}"
       .format(sc.defaultParallelism))
rdd = sc.parallelize([1, 2, 3, 4, 2, 4, 1], 2)
pairs = rdd.map(lambda x: (x, x*x))

print("RDD pairs = {0}".format(pairs.collect()))
print("Pairs partitioning: {0}".format(pairs.glom().collect()))
print("Number of pair partitions = {0}".format(pairs.getNumPartitions()))

In [None]:
# Reduction keeping the number of partitions
from operator import add
print("Reduction keeping partitions: {0}".format(
        pairs.reduceByKey(add).glom().collect()))

In [None]:
# Reduction modifying the number of partitions
print("Reduction with 3 partitions: {0}".format(
       pairs.reduceByKey(add, 3).glom().collect()))

In [None]:
# Repartitions example
pairs4 = pairs.repartition(4)
print("pairs4 with {0} partitions: {1}".format(
        pairs4.getNumPartitions(),
        pairs4.glom().collect()))

In [None]:
# Coalesce example
pairs2 = pairs4.coalesce(2)
print("pairs2 with {0} partitions: {1}".format(
        pairs2.getNumPartitions(),
        pairs2.glom().collect()))

In [None]:
# Partitioning by key
pairs_key = pairs2.partitionBy(4)
print("Partitions by key ({0} partitions): {1}".format(
        pairs_key.getNumPartitions(),
        pairs_key.glom().collect()))

In [None]:
# Using a partitioning function
def partitionEvenOdd(key):
    if key%2:
        return 0  # Odd keys go to partition 0
    else:
        return 1  # Even keys go to partition 1

pairs_evenodd = pairs2.partitionBy(2, partitionEvenOdd)
print("Partition by key ({0} partitions): {1}".format(
        pairs_evenodd.getNumPartitions(),
        pairs_evenodd.glom().collect()))

### Partitions and DataFrames

In [None]:
# Convert the RDD to a DataFrame
dfPairs = pairs.toDF()
dfPairs.show()

In [None]:
# The DataFrame inherits the number of partitions from the RDD
print("Number of partitions of the DataFrame: {0}"
      .format(dfPairs.rdd.getNumPartitions()))

In [None]:
# A narrow transformation keeps the number of partitions
print("Number of partitions after a narrow transformation: {0}"
      .format(dfPairs.replace(1, 2).rdd.getNumPartitions()))

In [None]:
# A wide transformation does not keep the number of partitions
print("Number of partitions after a wide transformation: {0}"
      .format(dfPairs.sort("_1").rdd.getNumPartitions()))

In [None]:
# It is possibe to specify the number of partitions to use in the wide transformation
spark.conf.set("spark.sql.shuffle.partitions", 2)
print("Number of partitions after a wide transformation: {0}"
      .format(dfPairs.sort("_1").rdd.getNumPartitions()))

## Working at partition level

A `map`  operation is applied to each element of the RDD (or a `foreach` for each row of the DataFrame)

-  It may imply redundant operations (f.ex. opening a connection to a DB)

-  It may not be very efficient

`map` and `foreach` can be called once per partition:

-   Methods `mapPartitions()`, `mapPartitionsWithIndex()` and `foreachPartition()`



In [None]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9], 4)
print(nums.glom().collect())

def addAndCount(iterator):
    addCount = [0,0]
    for i in iterator:
        addCount[0] += i
        addCount[1] += 1
    return addCount

# Call the addAndCount function once per partition
# The iterator includes the values of the partition
print(nums.mapPartitions(addAndCount).glom().collect())

In [None]:
def addAndCountIndex(index, iterator):
    return "Partition "+str(index), addAndCount(iterator)

# index is the number of partition
print(nums.mapPartitionsWithIndex(addAndCountIndex).glom().collect())

In [None]:
import os
import tempfile

def f(iterator):
    tempfich, tempname = tempfile.mkstemp(dir=tempdir,text=True)
    for x in iterator:
        print(list(iterator)[0])
        os.write(tempfich, (str(x)+'\t').encode())
    os.close(tempfich)

tempdir = "/tmp/foreachPartition"

if not os.path.exists(tempdir):
    os.mkdir(tempdir)
    # For each partition of the RDD, a temporary file is created.
    # The values of the partition are written to that file.
    nums.foreachPartition(f)

In [None]:
!ls -al /tmp/foreachPartition