# Import

In [0]:
import polars as pl
from pyspark.sql import SparkSession, functions as F, types as T
from datetime import datetime, timedelta
import uuid

import sys
import os
from pathlib import Path
repo_root = Path(os.getcwd()).parent
src_path = str(repo_root / "src")
if src_path not in sys.path:
    sys.path.insert(0, src_path)

from databricks_scaffold import VolumeSpiller, frame_shape

# Configuration
IS_DEV = True # In Dev, we preserve volumes; in Prod, we wipe them
CATALOG = "main"
SCHEMA = "default"
VOL_NAME = f"showcase_vol_{uuid.uuid4().hex[:6]}"

spark = SparkSession.builder.appName("VolumeSpillerShowcase").getOrCreate()

spill = VolumeSpiller(spark, CATALOG, SCHEMA, VOL_NAME, is_dev=IS_DEV)

# Generate data

In [0]:
data = [(i, f"User_{i%5}", "Electronics" if i%2==0 else "Books", i * 15.5, 
         datetime(2024, 1, 1) + timedelta(days=i)) for i in range(1, 101)]

schema = T.StructType([
    T.StructField("id", T.IntegerType(), False),
    T.StructField("user", T.StringType(), True),
    T.StructField("category", T.StringType(), True),
    T.StructField("amount", T.DoubleType(), True),
    T.StructField("timestamp", T.TimestampType(), True)
])

df_spark = spark.createDataFrame(data, schema)
print(f"üìä Initial Spark DataFrame Count: {df_spark.count()}")

# Workflow

## 1. Persistent Spark Checkpoints

Use this when you want to save intermediate Spark results without creating a formal managed table.

In [0]:
spill.save_checkpoint_spark(df_spark, "raw_transactions_spark")

# Reloading to verify
df_reloaded = spill.load_checkpoint_spark("raw_transactions_spark")
print(f"‚úÖ Reloaded {df_reloaded.count()} rows from Spark checkpoint.")

## 2. The Conversion Bridge (Spark ‚û°Ô∏è Polars)

When the data is small enough for a single node, flip to Polars for high-performance local processing.

In [0]:
# Convert and immediately prep for Polars (fixing precision)
df_polars = (
    spill.spark_to_polars(df_spark, cleanup=True)
    .with_columns(pl.col("timestamp").dt.cast_time_unit("ms"))
)

print(f"‚ö° Data converted to Polars. Shape: {df_polars.shape}")
print(df_polars.head(3))

## 3. Hybrid Storage Options

VolumeSpiller supports two tiers of storage for Polars:

 - Volume: Persistent, accessible by other users/jobs.
 - Local: Ephemeral, fast, sits on the driver's /tmp directory.

In [0]:
# Save to both tiers
spill.save_checkpoint_pl(df_polars, "processed_gold_vol", storage="volume")
spill.save_checkpoint_pl(df_polars, "temp_scratchpad", storage="local")

print("üìÇ Current Checkpoints:")
print(f"  Volume: {spill.list_checkpoints('volume')}")
print(f"  Local:  {spill.list_checkpoints('local')}")

## 4. Closing the Loop (Polars ‚û°Ô∏è Spark)

After doing your complex local logic in Polars, move the result back to Spark to join with massive tables or write to a final Delta table.

In [0]:
# Convert back to Spark
df_final_spark = spill.polars_to_spark(df_polars, cleanup=True)

df_final_spark.select("user", "amount").show(5)

## 5. Extra: Datetime conversion

In [0]:
from datetime import datetime

# Create Polars DF with nanosecond precision (the default in many Polars operations)
df_ns = pl.DataFrame({
    "id": [1, 2, 3],
    "event_time": [datetime(2024, 1, 1), datetime(2024, 1, 2), datetime(2024, 1, 3)]
}).with_columns(pl.col("event_time").dt.cast_time_unit("ns"))

print(f"Original Precision: {df_ns.schema['event_time']}") 
# Output: Datetime(time_unit='ns', time_zone=None)

Watch how VolumeSpiller detects the datetime precision issue and fixes it on the fly during conversion or saving.

In [0]:
df_spark_time = spill.polars_to_spark(df_ns, cleanup=True)

df_spark_time.show()

## 6. Cleanup & Safety

The teardown() method behaves differently based on your IS_DEV flag.

 - In Dev: It leaves the Volume intact so you can go to the Catalog Explorer and inspect the Parquet files manually.
 - In Prod: It cleans up after itself to avoid storage costs and clutter.

In [0]:
# Inspecting the environment before we finish
print(f"Cleanup mode: {'Preserving data' if IS_DEV else 'Dropping Volume'}")
spill.teardown()