In [1]:
import random

from pyspark.sql import SparkSession
from pyspark import StorageLevel
from pyspark.sql.functions import broadcast, when

In [2]:
spark = SparkSession.builder.appName("chap3").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/24 23:56:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Spark Architecture

## Working With Partitions

In [3]:
random.seed(0)

In [4]:
data = [(i, random.randrange(100), random.randint(-50, 50))
        for i in range(20)]
df1 = spark.createDataFrame(data, schema=["id", "x", "y"])
df1.head(3)

                                                                                

[Row(id=0, x=49, y=47), Row(id=1, x=53, y=-45), Row(id=2, x=33, y=15)]

Number of partitions in resilient distributed dataset (RDD):

In [5]:
df1.rdd.getNumPartitions()

8

In [6]:
def print_partitions(df):
    partitions = df.rdd.glom().collect()
    for i in range(len(partitions)):
        print(f"Partition #{i + 1}")
        print(*partitions[i])
        print()

In [7]:
print_partitions(df1)

Partition #1
Row(id=0, x=49, y=47) Row(id=1, x=53, y=-45)

Partition #2
Row(id=2, x=33, y=15) Row(id=3, x=62, y=1)

Partition #3
Row(id=4, x=38, y=11) Row(id=5, x=45, y=24)

Partition #4
Row(id=6, x=27, y=14) Row(id=7, x=17, y=-14) Row(id=8, x=17, y=46) Row(id=9, x=12, y=29)

Partition #5
Row(id=10, x=32, y=18) Row(id=11, x=90, y=27)

Partition #6
Row(id=12, x=18, y=-11) Row(id=13, x=12, y=43)

Partition #7
Row(id=14, x=9, y=37) Row(id=15, x=42, y=10)

Partition #8
Row(id=16, x=71, y=-38) Row(id=17, x=45, y=5) Row(id=18, x=40, y=28) Row(id=19, x=81, y=-24)



Repartition without a shuffle (often used when reducing partitioning, i.e., decreasing the number of partitions):

In [8]:
rdd1a = df1.coalesce(5)
print_partitions(rdd1a)

Partition #1
Row(id=0, x=49, y=47) Row(id=1, x=53, y=-45)

Partition #2
Row(id=2, x=33, y=15) Row(id=3, x=62, y=1) Row(id=4, x=38, y=11) Row(id=5, x=45, y=24)

Partition #3
Row(id=6, x=27, y=14) Row(id=7, x=17, y=-14) Row(id=8, x=17, y=46) Row(id=9, x=12, y=29)

Partition #4
Row(id=10, x=32, y=18) Row(id=11, x=90, y=27) Row(id=12, x=18, y=-11) Row(id=13, x=12, y=43)

Partition #5
Row(id=14, x=9, y=37) Row(id=15, x=42, y=10) Row(id=16, x=71, y=-38) Row(id=17, x=45, y=5) Row(id=18, x=40, y=28) Row(id=19, x=81, y=-24)



Repartition with a shuffle:

In [9]:
rdd1b = df1.repartition(5)
print_partitions(rdd1b)

Partition #1
Row(id=0, x=49, y=47) Row(id=2, x=33, y=15) Row(id=5, x=45, y=24) Row(id=7, x=17, y=-14) Row(id=11, x=90, y=27)

Partition #2
Row(id=8, x=17, y=46) Row(id=12, x=18, y=-11) Row(id=14, x=9, y=37) Row(id=19, x=81, y=-24)

Partition #3
Row(id=13, x=12, y=43) Row(id=15, x=42, y=10) Row(id=16, x=71, y=-38)

Partition #4
Row(id=6, x=27, y=14) Row(id=18, x=40, y=28)

Partition #5
Row(id=1, x=53, y=-45) Row(id=3, x=62, y=1) Row(id=4, x=38, y=11) Row(id=9, x=12, y=29) Row(id=10, x=32, y=18) Row(id=17, x=45, y=5)



Repartition by columns:

In [10]:
print_partitions(df1.repartition(5, "x", "y"))

Partition #1
Row(id=0, x=49, y=47) Row(id=13, x=12, y=43) Row(id=14, x=9, y=37) Row(id=16, x=71, y=-38)

Partition #2
Row(id=3, x=62, y=1)

Partition #3
Row(id=4, x=38, y=11) Row(id=11, x=90, y=27) Row(id=15, x=42, y=10)

Partition #4
Row(id=2, x=33, y=15) Row(id=6, x=27, y=14) Row(id=7, x=17, y=-14) Row(id=8, x=17, y=46) Row(id=10, x=32, y=18) Row(id=12, x=18, y=-11) Row(id=17, x=45, y=5) Row(id=19, x=81, y=-24)

Partition #5
Row(id=1, x=53, y=-45) Row(id=5, x=45, y=24) Row(id=9, x=12, y=29) Row(id=18, x=40, y=28)



Set the default number of partitions to use when shuffling data (200 by default):

In [11]:
spark.conf.set("spark.sql.shuffle.partitions", 100)

With Adaptive Query Engine (AQE), we don't really need to set this number since AQE automatically uses runtime statistics to optimize our Spark instructions.

## Caching

Cache a DataFrame with `cache()`: The default storage level is MEMORY_AND_DISK; data is stored in memory when using memory and disk modes, but if needed, disk mode is also used.

In [12]:
df1.cache()

DataFrame[id: bigint, x: bigint, y: bigint]

Validate that the data has been cached and stored correctly:

In [13]:
df1.is_cached

True

In [14]:
df1.storageLevel.useMemory

True

In [15]:
df1.storageLevel.useDisk

True

Remove data from cache:

In [16]:
df1.unpersist()
df1.is_cached

False

Specify a storage level when caching:

In [17]:
df1.persist(StorageLevel.MEMORY_ONLY)
df1.storageLevel.useMemory, df1.storageLevel.useDisk

(True, False)

## Broadcasting

In [18]:
df2 = spark.createDataFrame(
    [(1, "a"), (3, "b"), (5, "c")],
    ["id", "z"]
)

Broadcast join: Send the smaller dataset across all nodes and then join each node's portion of the larger dataset.

In [19]:
df1.join(broadcast(df2), df1.id == df2.id).show()

+---+---+---+---+---+
| id|  x|  y| id|  z|
+---+---+---+---+---+
|  1| 53|-45|  1|  a|
|  3| 62|  1|  3|  b|
|  5| 45| 24|  5|  c|
+---+---+---+---+---+



# Delta Lake

## Grouping Tables With Databases

Create a database, using the default location:

In [20]:
db = "simple_database"

In [21]:
spark.sql(
    f"""
    CREATE DATABASE IF NOT EXISTS {db}
    COMMENT 'Create a managed database'"""
)
db_info = spark.sql(f"DESCRIBE DATABASE {db}")
spark.sql(f"DROP DATABASE IF EXISTS {db} CASCADE")

DataFrame[]

In [22]:
db_info.columns

['info_name', 'info_value']

In [23]:
db_info.withColumn(
    "info_value",
    when(
        db_info["info_name"].isin("Location", "Owner"),
        "[REDACTED]"
    ).otherwise(db_info["info_value"])
).show(truncate=False)

+--------------+-------------------------+
|info_name     |info_value               |
+--------------+-------------------------+
|Catalog Name  |spark_catalog            |
|Namespace Name|simple_database          |
|Comment       |Create a managed database|
|Location      |[REDACTED]               |
|Owner         |[REDACTED]               |
+--------------+-------------------------+

