üîç Why Use checkpoint()?

Fault tolerance: If a job fails, Spark can restart from the checkpoint instead of recomputing from the beginning.

Performance: Long transformation chains create long DAGs; checkpointing truncates the lineage, reducing recomputation overhead.

In [0]:
#‚úÖ Example: Using checkpoint() in PySpa
from pyspark.sql.functions import col, sum as spark_sum

# Set checkpoint directory (should be HDFS, DBFS, or similar persistent storage in production)
spark.sparkContext.setCheckpointDir("/tmp/spark_checkpoints")

# Create a sample DataFrame
data = [
    (1, "A", 100),
    (2, "B", 200),
    (3, "A", 150),
    (4, "B", 50),
    (5, "C", 300)
]

columns = ["id", "category", "amount"]

df = spark.createDataFrame(data, columns)

print("Initial DataFrame:")
df.display()

Initial DataFrame:


id,category,amount
1,A,100
2,B,200
3,A,150
4,B,50
5,C,300


In [0]:
# Perform a few transformations
df_transformed = df.withColumn("amount_with_tax", col("amount") * 1.18)

df_aggregated = df_transformed.groupBy("category") \
    .agg(spark_sum("amount_with_tax").alias("total_amount"))

print("Before checkpoint:")
df_aggregated.display()

Before checkpoint:


category,total_amount
A,295.0
B,295.0
C,354.0


In [0]:
# Checkpoint to truncate lineage
df_checkpointed = df_aggregated.checkpoint(eager=True)  # eager=True triggers immediate checkpoint

print("After checkpoint (lineage truncated):")
df_checkpointed.display()

After checkpoint (lineage truncated):


category,total_amount
A,295.0
B,295.0
C,354.0


In [0]:
# Perform further transformations safely on the checkpointed DataFrame
df_final = df_checkpointed.withColumn("discounted_amount", col("total_amount") * 0.9)

print("Final output:")
df_final.display()

Final output:


category,total_amount,discounted_amount
A,295.0,265.5
B,295.0,265.5
C,354.0,318.6


### ‚öôÔ∏è Key Points

| Parameter                             | Description                                                                                                  |
| ------------------------------------- | ------------------------------------------------------------------------------------------------------------ |
| `sparkContext.setCheckpointDir(path)` | Sets where checkpoint data is stored. Should be a **reliable** filesystem (HDFS, DBFS, ADLS, S3, etc.).      |
| `.checkpoint(eager=True)`             | Forces checkpoint immediately instead of waiting for an action.                                              |
| `.persist()` vs `.checkpoint()`       | `persist()` caches data in memory/disk but keeps lineage; `checkpoint()` **cuts lineage** and saves to disk. |

üß† When to Use Checkpoint

Use checkpoint() when:

You have iterative algorithms (e.g., PageRank, graph algorithms).

You have very long lineage chains (many transformations).

You need fault recovery from a stable intermediate point.