## Concept
- When Spark is reading a file, for example, a parquet, it split the data into partitions to use parallel processing.
- So, maxPartitionByteSize defines the **maximum size of each one of this partitions**. It means that a higher value will have less partitions and so, less jobs. A smaller value will have more partitions and so, more jobs.
- By default, this value is defined as **128mb**

In [0]:
sc.setJobDescription("Step A-1: Basic initialization")
spark.conf.set("spark.databricks.io.cache.enabled", "false")               

In [0]:
defaultMaxPartitionBytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b",""))
openCostInBytes = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b",""))
displayHTML(f"""<table>
  <tr><td>Max Partition Bytes:</td><td><b>{defaultMaxPartitionBytes/1024/1024}</b> MB</td></tr>
  <tr><td>Open Cost In Bytes: </td><td><b>{openCostInBytes/1024/1024}</b> MB</td></tr>
</table>""")


0,1
Max Partition Bytes:,128.0 MB
Open Cost In Bytes:,4.0 MB


1. Set two spark properties
	1. `spark.sql.files.maxPartitionBytes` → default **128 MB**.
	2. `spark.sql.files.openCostInBytes` → default cost Spark adds for each file (helps balance tiny files).
		1. It’s the **estimated cost (in bytes)** Spark assumes just for opening a file, even before reading its data.
2. Adjusting it for local development, I'm reading the data from Responsys.
3. Initially, I set the table schema and create the prediction for num_of_partitions, considering `Padded_Bytes / Target_Size`

In [0]:
sc.setJobDescription("Step A-2: Utility Function")
def predict_num_partitions(files):
    import math
    open_cost = int(spark.conf.get("spark.sql.files.openCostInBytes").replace("b", ""))
    max_partition_bytes = int(spark.conf.get("spark.sql.files.maxPartitionBytes").replace("b", ""))

    actual_bytes = sum(f.size for f in files)
    padded_bytes = actual_bytes + (len(files) * open_cost)

    bytes_per_core = padded_bytes // sc.defaultParallelism
    max_of_cost_bpc = max(open_cost, bytes_per_core)
    target_size = min(max_partition_bytes, max_of_cost_bpc)
    partitions = padded_bytes / target_size

    def row(label, value, extra=""):
        return f'<tr><td>{label}:</td><td style="text-align:right; font-weight:bold">{value:,}</td><td style="padding-left:1em">{extra}</td></tr>'

    html = "<table>" + \
        row("File Count", len(files)) + \
        row("Actual Bytes", actual_bytes) + \
        row("Padded Bytes", padded_bytes, "Actual_Bytes + (File_Count * Open_Cost)") + \
        row("Average Size", padded_bytes // len(files)) + \
        '<tr><td colspan="2" style="border-top:1px solid black">&nbsp;</td></tr>' + \
        row("Open Cost", open_cost, "spark.sql.files.openCostInBytes") + \
        row("Bytes-Per-Core", bytes_per_core) + \
        row("Max Cost", max_of_cost_bpc, "(max of Open_Cost & Bytes-Per-Core)") + \
        '<tr><td colspan="2" style="border-top:1px solid black">&nbsp;</td></tr>' + \
        row("Max Partition Bytes", max_partition_bytes, "spark.sql.files.maxPartitionBytes") + \
        row("Target Size", target_size, "(min of Max_Cost & Max_Partition_Bytes)") + \
        '<tr><td colspan="2" style="border-top:1px solid black">&nbsp;</td></tr>' + \
        row("Number of Partitions", math.ceil(partitions), f"({partitions} from Padded_Bytes / Target_Size)") + \
        "</table>"
    displayHTML(html)


In [0]:
from pyspark.sql.functions import window, col
from pyspark.sql.types import StructType, StructField, DecimalType, StringType


trxPath = "s3a://tks-dados-responsys/EXPORT_FILES/files/STATUS_OPT/"
trxFiles = [f for f in dbutils.fs.ls(trxPath) if f.name.endswith(".parquet")]
trxSchema = StructType([
    StructField("_SDC_SOURCE_LINENO", DecimalType(38, 0)),
    StructField("EMAIL_ADDRESS_", StringType()),
    StructField("EMAIL_PERMISSION_STATUS_", StringType()),
    StructField("_SDC_SOURCE_FILE", StringType()),
    StructField("_SDC_SEQUENCE", DecimalType(38, 0)),
    StructField("_SDC_RECEIVED_AT", StringType()),
    StructField("_SDC_BATCHED_AT", StringType()),
    StructField("_SDC_TABLE_VERSION", DecimalType(38, 0)),
])

sc.setJobDescription("Step C: Read at 1x")
maxPartitionBytesConf = 1 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()


In [0]:
predict_num_partitions(trxFiles)

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,134217728.0,spark.sql.files.maxPartitionBytes


From the SparkUi we have the following:
![](sparkUi_x1.png)

 **Number of tasks** → **51/51**
	    - This matches the number of partitions Spark created for the read.
	    - Since `maxPartitionBytes` is ~128 MB, Spark split the dataset into 51 chunks (considering file sizes + open cost).
	    - In `predict_num_partitions()` output, we can see 51 too.
      
**Input size** → **773.9 MiB** total
	    - This is the total amount of data Spark read from the parquet files.
	    - Dividing roughly: 773.9 MiB ÷ 51 ≈ 15 MB per task — this is smaller than 128 MB because:
	        - Many files might be much smaller than 128 MB.
	        - The open cost and file boundaries prevent perfect packing.

In [0]:
sc.setJobDescription("Step D: Read at 2x")
maxPartitionBytesConf = 2 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
predict_num_partitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,268435456.0,spark.sql.files.maxPartitionBytes


**Tasks → 26/26**
- We doubled maxPartitionBytes from ~128 MB to ~256 MB.
- That let Spark put more data in each partition, so it needed about half as many partitions as before (51 → 26).
- This matches exactly what we expected from the theory.

**Duration → 12 seconds**
- Huge improvement from 2.3 minutes in Step C!
- Less scheduling overhead (fewer tasks to start and finish).
- Each task processes more data at once.

In [0]:
sc.setJobDescription("Step E: Read at 4x")
maxPartitionBytesConf = 4 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
predict_num_partitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,536870912.0,spark.sql.files.maxPartitionBytes


In [0]:
sc.setJobDescription("Step F: Read at 8x")
maxPartitionBytesConf = 8 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
predict_num_partitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,1073741824.0,spark.sql.files.maxPartitionBytes


In [0]:
sc.setJobDescription("Step G: Read at 16x")
maxPartitionBytesConf = 16 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
predict_num_partitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,2147483648.0,spark.sql.files.maxPartitionBytes


In [0]:
sc.setJobDescription("Step H: Read at 32x")
maxPartitionBytesConf = 32 * int(defaultMaxPartitionBytes)
spark.conf.set("spark.sql.files.maxPartitionBytes", f"{maxPartitionBytesConf}b")
predict_num_partitions(trxFiles)
spark.read.schema(trxSchema).parquet(trxPath).write.format("noop").mode("overwrite").save()

0,1,2
File Count:,1466.0,
Actual Bytes:,654624309.0,
Padded Bytes:,6803473973.0,Actual_Bytes + (File_Count * Open_Cost)
Average Size:,4640841.0,
,,
Open Cost:,4194304.0,spark.sql.files.openCostInBytes
Bytes-Per-Core:,3401736986.0,
Max Cost:,3401736986.0,(max of Open_Cost & Bytes-Per-Core)
,,
Max Partition Bytes:,4294967296.0,spark.sql.files.maxPartitionBytes


**We can notice that increasing the maxSize is not always worth it. The first 51 → 26 worked really well, but the next one didn't change anything significantly**

- In theory, the next step would be check how it would perform with files around 128 MB, but I didn't find a bucket with this configuration
**- Number of tasks ≈ number of files in the dataset.**

- Input per task: roughly the size of a single file.
- Increasing maxPartitionBytes won't merge these into bigger partitions because Spark doesn't split files across partitions unless they're big enough.

Basically, **if our files are already large, changing maxPartitionBytes won't reduce the number of partitions — the file size itself becomes the limit.**

In [0]:
def auto_tune_max_partition_bytes(format, path, schema, max_steps, starting_bytes=134217728):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext

    sc.setJobDescription("Step L-1: Autotune maxPartitionBytes Function")

    cores = sc.defaultParallelism
    max_partition_bytes = starting_bytes
    original_value = spark.conf.get("spark.sql.files.maxPartitionBytes")

    for step in range(max_steps + 1):
        max_partition_bytes = starting_bytes + (step * 1024 * 1024)
        max_partition_mb = max_partition_bytes // (1024 * 1024)

        spark.conf.set("spark.sql.files.maxPartitionBytes", f"{max_partition_bytes}b")
        df = spark.read.format(format).schema(schema).load(path)
        partitions = df.rdd.getNumPartitions

        print(f"{max_partition_mb:,} MB with {partitions:,} partitions, "
              f"iterations: {partitions / cores:.2f}")

        if partitions % cores == 0:
            print("*** Found it! ***")
            print(f"{max_partition_mb:,} MB with {partitions:,} partitions, "
                  f"iterations: {partitions / cores:.2f}")
            return max_partition_bytes
    spark.conf.set("spark.sql.files.maxPartitionBytes", original_value)
    raise ValueError("An appropriate maxPartitionBytes was not found")


# auto_tune_max_partition_bytes

## Purpose

The `auto_tune_max_partition_bytes` function is designed to automatically determine an optimal value for the Spark configuration `spark.sql.files.maxPartitionBytes`. This configuration controls the maximum number of bytes to be processed per partition when reading files in Spark.

By finding a value that results in a number of partitions **evenly divisible by the number of available cores**, this function aims to achieve better **parallelism and performance** in distributed data processing.

---

## Parameters

| Parameter             | Type    | Description |
|-----------------------|---------|-------------|
| `format`              | `str`   | The file format (e.g. `"parquet"`, `"csv"`). |
| `path`                | `str`   | Path to the input data files (e.g. S3, DBFS). |
| `schema`              | `StructType` | Spark schema to apply when reading the data. |
| `max_steps`           | `int`   | The maximum number of tuning attempts. |
| `starting_bytes`      | `int`   | Initial value for `maxPartitionBytes` in bytes (default: 134,217,728 bytes = 128 MB). |

---

## Behavior

The function follows these steps:

1. Retrieves the number of default parallel tasks (`sc.defaultParallelism`) — typically based on the number of executor cores.
2. Starts with a given `starting_bytes` value.
3. Iteratively increases the `maxPartitionBytes` value by **1 MB per step** (`step * 1024 * 1024`).
4. In each iteration:
   - Updates the Spark configuration `spark.sql.files.maxPartitionBytes`.
   - Loads the data using the provided schema and format.
   - Retrieves the number of partitions created.
   - Checks if the number of partitions is divisible evenly by the number of cores.
   - If true, it returns the current `maxPartitionBytes` as the optimal value.
5. If no optimal value is found after all steps, it restores the original config and raises an error.
