### Init

In [None]:
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.sql.functions import broadcast


In [None]:
# spark = SparkSession.builder \
#             .config("spark.driver.memory", "6g") \
#             .config("spark.sql.adaptive.enabled", "True")\
#             .master("local").getOrCreate();

# Spark session & context
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

In [None]:

# Get the number of cores
num_cores = sc.defaultParallelism

# Get the number of executors
num_executors = sc.getConf().get("spark.executor.instances")

print(num_cores)
print(num_executors)

### Basic rdd example

In [None]:
# Sum of the first 100 whole numbers
rdd = sc.parallelize(range(100 + 1))
rdd.sum()
# 5050

### Join Example

In [None]:
# Create DataFrames
data1 = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
data2 = [("Alice", "Engineer"), ("Bob", "Doctor"), ("David", "Lawyer")]
df1 = spark.createDataFrame(data1, ["Name", "ID"])
df2 = spark.createDataFrame(data2, ["Name", "Profession"])

# Join DataFrames
joined_df = df1.join(df2, "Name")

# Show the result
joined_df.show()

In [None]:
# A broadcast join is useful when one DataFrame (typically smaller) can fit entirely into 
# memory on each executor node, allowing it to be efficiently broadcasted to all executor nodes for join operations. Here's an example using a broadcast join in PySpark:

# Two DataFrames (employees_df and departments_df) are created from dummy data.
# We perform a broadcast join using the join method on the employees_df. The broadcast function is applied to the departments_df DataFrame, indicating that it should be broadcasted to all executor nodes.
# The join operation is performed based on the "dept_id" column, and the result is stored in the DataFrame broadcast_df.
# Finally, we show the result of the join operation.

In [None]:
# Dummy data for employees and their departments
employees_data = [("Alice", 1), ("Bob", 2), ("Charlie", 1), ("David", 3)]
departments_data = [(1, "HR"), (2, "Engineering"), (3, "Finance")]

# Create DataFrames from the dummy data
employees_df = spark.createDataFrame(employees_data, ["name", "dept_id"])
departments_df = spark.createDataFrame(departments_data, ["dept_id", "dept_name"])

# Perform broadcast join
broadcast_join_df = employees_df.join(broadcast(departments_df),
    on="dept_id",
    how="inner")

# Show the result
broadcast_join_df.show()

In [None]:
# In a shuffle hash join, both DataFrames are partitioned based on the join key, 
# and then shuffled across the network so that rows with the same join key end up in the same partition. This allows Spark to efficiently join the corresponding rows from both DataFrames.

# Two DataFrames (employees_df and departments_df) are created from dummy data.
# We perform a shuffle hash join using the join method on the employees_df. The join operation is performed based on the "dept_id" column, and we specify how="inner" to perform an inner join.
# Spark partitions both DataFrames based on the join key ("dept_id") and shuffles the data across the network so that rows with the same join key end up in the same partition.
# Spark then efficiently joins the corresponding rows from both DataFrames based on the join key.
# Finally, we show the result of the join operation.

# If a broadcast hash join can be used (by the broadcast hint or by total size of a relation), Spark SQL chooses it over other joins (see JoinSelection execution planning strategy).

# Shuffle Hash Join:

# Size of Datasets: Shuffle hash joins are typically more efficient when one or both of the datasets being joined are small enough to fit entirely in memory on each executor node. If one dataset is small enough to be broadcasted, a shuffle hash join can be more efficient.
# Data Distribution: Shuffle hash joins work well when the data is evenly distributed across partitions and the join keys have high cardinality. In this case, the shuffle operation redistributes the data across partitions based on the hash of the join key, which can lead to more balanced partition sizes.
# Memory Usage: Shuffle hash joins are less memory-intensive compared to sort merge joins, as they don't require sorting of data. This can be advantageous when memory resources are limited.
# Sort Merge Join:

# Size of Datasets: Sort merge joins are generally more suitable for joining large datasets that cannot fit entirely in memory on each executor node. They involve sorting the data, which can be memory-intensive, but they can handle larger datasets more efficiently than shuffle hash joins in many cases.
# Data Distribution: Sort merge joins can handle skewed data distributions and low cardinality join keys more effectively than shuffle hash joins. They don't rely on hashing and shuffling data across partitions, so they can handle data skewness without causing performance issues.
# Memory Usage: Sort merge joins are more memory-intensive compared to shuffle hash joins because they involve sorting large datasets. They may require more memory resources, so it's important to ensure that sufficient memory is available on the cluster.


In [None]:
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")

# Dummy data for employees and their departments
employees_data = [("Alice", 1), ("Bob", 2), ("Charlie", 1), ("David", 3)]
departments_data = [(1, "HR"), (2, "Engineering"), (3, "Finance")]

# Create DataFrames from the dummy data
employees_df = spark.createDataFrame(employees_data, ["name", "dept_id"])
departments_df = spark.createDataFrame(departments_data, ["dept_id", "dept_name"])

# Repartition DataFrames based on the join key
employees_repartitioned = employees_df.repartition("dept_id")
departments_repartitioned = departments_df.repartition("dept_id")

# Perform shuffle hash join
shuffle_hash_join_df = employees_repartitioned.join(
    departments_repartitioned.hint("SHUFFLE_HASH"),
    on="dept_id",
    how="inner"
)

# Show the result
shuffle_hash_join_df.show()

In [None]:
### Sort merge join is typically used when joining large datasets that cannot fit into memory on each executor node. 
### It efficiently handles large datasets by sorting them and then merging them together based on the join key.

# Two DataFrames (employees_df and departments_df) are created from dummy data.
# We perform a sort merge join using the join method on the employees_df. The join operation is performed based on the "dept_id" column, and we specify how="inner" to perform an inner join.
# Spark sorts both DataFrames based on the join key ("dept_id") and then merges them together.
# Finally, we show the result of the join operation.

In [None]:
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")

# Dummy data for employees and their departments
employees_data = [("Alice", 1), ("Bob", 2), ("Charlie", 1), ("David", 3)]
departments_data = [(1, "HR"), (2, "Engineering"), (3, "Finance")]

# Create DataFrames from the dummy data
employees_df = spark.createDataFrame(employees_data, ["name", "dept_id"])
departments_df = spark.createDataFrame(departments_data, ["dept_id", "dept_name"])

# Perform sort merge join
sort_merge_join_df = employees_df.join(
    departments_df,
    on="dept_id",
    how="inner"
)

# Show the result
sort_merge_join_df.show()

### Demo - The importance of partitioning

In [None]:
# Dummy graph data representing nodes and their outgoing links
links = spark.sparkContext.parallelize([
    ("A", ["B", "C"]),
    ("B", ["C"]),
    ("C", ["A"]),
    ("D", ["A", "C"])
]).partitionBy(4)  # Partition the data into 4 partitions

# Initialize ranks for each node
ranks = links.map(lambda pair: (pair[0], 1.0))

# Perform 10 iterations of PageRank
for _ in range(10):
    # Compute contributions of each node to its neighbors
    contributions = links.join(ranks).flatMap(lambda pair: [(dest, pair[1][1] / len(pair[1][0])) for dest in pair[1][0]])

    # Aggregate contributions to compute new ranks
    ranks = contributions.reduceByKey(lambda a, b: a + b).mapValues(lambda rank: 0.15 + 0.85 * rank)

# Collect the final ranks
final_ranks = ranks.collect()
print(final_ranks)

In [None]:
# Generate sample data with 20 records and a partitionable column
data = [(i, "Name_" + str(i), str(i % 5)) for i in range(1, 101)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name", "Department"])

print(df.rdd.getNumPartitions())

df = df.repartition(20)

print(df.rdd.getNumPartitions())


# Show the DataFrame
df.show()

In [None]:
# Generate sample data with 20 records and a partitionable column
data = [(i, "Name_" + str(i), str(i % 5)) for i in range(1, 101)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name", "Department"])

print(df.rdd.getNumPartitions())

df = df.repartition(1)

print(df.rdd.getNumPartitions())


# Show the DataFrame
df.show()

In [None]:
# Generate sample data with 20 records and a partitionable column
data = [(i, "Name_" + str(i), str(i % 5)) for i in range(1, 101)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name", "Department"])

print(df.rdd.getNumPartitions())

df = df.repartition(1000)

print(df.rdd.getNumPartitions())


# Show the DataFrame
df.show()

### Coalesce and Repartition

In [None]:
### The output will show that the number of partitions is reduced from 4 to 2 after applying coalesce(). 
### Note that coalesce() is a narrow transformation, meaning it does not trigger a full shuffle of the data, but it can still cause data movement between partitions. It's typically used to optimize the number of partitions for better parallelism and resource utilization.

In [None]:
# Generate sample data with 20 records and a partitionable column
data = [(i, "Name_" + str(i), str(i % 5)) for i in range(1, 101)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name", "Department"])

print(df.rdd.getNumPartitions())

df = df.coalesce(1)

print(df.rdd.getNumPartitions())


# Show the DataFrame
df.show()

In [None]:
# repartition() triggers a full shuffle of the data across the cluster, 
# so it's more expensive compared to coalesce(). It's typically used when you want to either increase or decrease the number of partitions to a specific value, regardless of the current number of partitions.

In [None]:
# Generate sample data with 20 records and a partitionable column
data = [(i, "Name_" + str(i), str(i % 5)) for i in range(1, 101)]

# Create DataFrame
df = spark.createDataFrame(data, ["ID", "Name", "Department"])

print(df.rdd.getNumPartitions())

df = df.repartition(1)

print(df.rdd.getNumPartitions())


# Show the DataFrame
df.show()

In [None]:
# Use repartition() when you want to explicitly change the number of partitions to a specific value, regardless of the current number of partitions.
# It triggers a full shuffle of the data across the cluster, which can be computationally expensive.
# It's useful when you need to increase or decrease the number of partitions to distribute the data more evenly or to prepare for operations that benefit from a specific number of partitions (e.g., join operations).
# Use repartition() when you want to increase the number of partitions, for example, to prepare for parallel operations that require more parallelism.

# Use coalesce() when you want to reduce the number of partitions, ideally to a smaller number than the current number of partitions.
# It's a narrow transformation that minimizes data movement and avoids a full shuffle whenever possible.
# It's useful when you want to optimize resource utilization and reduce the overhead of managing many partitions, without triggering a full shuffle.
# Use coalesce() when you want to decrease the number of partitions, for example, to reduce memory overhead or to optimize performance when the current number of partitions is higher than necessary.


# coalesce may run faster than repartition, but unequal sized partitions are generally slower to work with than equal sized partitions. You'll usually need to repartition datasets after filtering a large data set. 
# I've found repartition to be faster overall because Spark is built to work with equal sized partitions.

# If you need to filter your data before writing, then repartition is much more suitable than coalesce, since coalesce will be pushed-down right before the loading operation.

# For instance: load().map(…).filter(…).coalesce(1).save()

# translates to: load().coalesce(1).map(…).filter(…).save()

# This means that all your data will collapse into a single partition, where it will be filtered, losing all parallelism. This happens even for very simple filters like column='value'.

# This does not happen with repartition: load().map(…).filter(…).repartition(1).save()

# In such case, filtering happens in parallel on the original partitions.

### Caching

In [None]:
# Create a DataFrame
data = [("Alice", 34), ("Bob", 45), ("Charlie", 29), ("David", 55)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Cache the DataFrame
df.cache()

# Perform some transformations and actions
filtered_df = df.filter(df["Age"] > 30)
count1 = filtered_df.count()

# Perform more transformations and actions
selected_df = df.select("Name")
count2 = selected_df.count()

# Show the counts
print("Count 1:", count1)
print("Count 2:", count2)


### Resources

https://w.amazon.com/bin/view/BDT/Products/Cradle/Docs/Coalescence/
https://mungingdata.com/apache-spark/filter-where/
https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce
https://medium.com/airbnb-engineering/on-spark-hive-and-small-files-an-in-depth-look-at-spark-partitioning-strategies-a9a364f908