#### Use only if autocomplete does not work
```
%config Completer.use_jedi = False
%config IPCompleter.greedy = True
```

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Row
from pyspark.sql.window import Window
from delta.tables import DeltaTable

spark = (
    SparkSession.builder 
    .appName("DeltaExample")
    .master("local[*]")
    .config("spark.ui.enabled", "true")   
    .config("spark.jars.packages", "io.delta:delta-spark_2.13:4.0.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

In [None]:
# Show the SparkUI url (useful for monitoring and debuging)
spark.sparkContext.uiWebUrl

# Lego warm-up
Let's load the Lego data set that can be found in Kaggle

In [None]:
schema = T.StructType([
    T.StructField("set_number", T.StringType(), False),
    T.StructField("set_name", T.StringType(), False),
    T.StructField("year_released", T.DoubleType(), False),
    T.StructField("number_of_parts", T.DoubleType(), False),
    T.StructField("image_url", T.StringType(), False),
    T.StructField("theme_name", T.StringType(), False)
])

df = (
    spark.read
        .schema(schema)
        .option("header", "true")
        .csv("data/lego_sets_and_themes.csv")
        .withColumn("year_released", F.col("year_released").cast(T.IntegerType()))
        .withColumn("number_of_parts", F.col("number_of_parts").cast(T.IntegerType()))
        .na.drop(subset=["set_number"])
        .where("number_of_parts > 10")
)

df.show(10, False)

## Popular theme per decade

In [None]:
df_decade = (
    df
        .withColumn("decade", (F.col("year_released")/10).cast("int")*10)
        .groupBy("decade", "theme_name")
        .agg(F.avg("number_of_parts").alias("avg_parts"), F.count("*").alias("set_count"))
)

### By average number of parts

In [None]:
w_parts = Window.partitionBy("decade").orderBy(F.desc("avg_parts"))

(df_decade
    .withColumn("rank", F.row_number().over(w_parts))    
    .where("rank=1")
    .drop("rank")
    .orderBy(F.desc("decade"))
    .limit(10)
    .show(truncate=False)
)

### By number of sets

In [None]:
w_sets = Window.partitionBy("decade").orderBy(F.desc("set_count"))

(df_decade
    .withColumn("rank", F.row_number().over(w_sets))    
    .where("rank=1")
    .drop("rank")
    .orderBy(F.desc("decade"))
    .limit(10)
    .show(truncate=False)
)

## Top10 sets ever by number of parts

In [None]:
w = Window.partitionBy("theme_name").orderBy(F.desc("number_of_parts"))

top10_sets = (
    df.withColumn("rank", F.row_number().over(w))
        .filter("rank = 1")
        .select("theme_name", "set_name", "number_of_parts", "image_url")
        .orderBy(F.col("number_of_parts").desc())
        .limit(10)
)

top10_sets.show(truncate=False)

#### ...and just to show-off 

In [None]:
%%bash
uv add requests

In [None]:
import requests
from IPython.display import Image, display

for url in [r["image_url"] for r in top10_sets.select("image_url").collect()]:
    resp = requests.get(url, timeout=5)
    display(Image(data=resp.content))


# Delta format
* Create (or load from csv) a data frame
* Save it as delta
* Update a couple of records
* Insert a few more
* Show time travel

In [None]:
%load_ext sparksql_magic

In [None]:
%%sparksql
create database if not exists lego;

In [None]:
%%sparksql
create table if not exists lego.lego_sets
(
  set_number STRING,
  set_name STRING,
  year_released INT,
  number_of_parts INT,
  image_url STRING,
  theme_name STRING
)
using delta
partitioned by (year_released)

In [None]:
%%sparksql
describe table lego.lego_sets

In [None]:
# 1. Create a DataFrame
data = [(1, "Alice", 30),
        (2, "Bob", 25),
        (3, "Carol", 40)]
df = spark.createDataFrame(data, ["id", "name", "age"])

In [None]:
# 2. Save as Delta (local path)
path = "./delta/people"
df.write.format("delta").mode("overwrite").save(path)

# Load as DeltaTable
deltaTable = DeltaTable.forPath(spark, path)

In [None]:
# 3. Update records
deltaTable.update(
    condition="id = 1",
    set={"age": F.col("age") + 1}  # Alice becomes 31
)

In [None]:
# 4. Insert new records
new_data = [(4, "David", 22),
            (5, "Eva", 29)]
new_df = spark.createDataFrame(new_data, ["id", "name", "age"])
deltaTable.alias("t").merge(
    new_df.alias("s"),
    "t.id = s.id"
).whenNotMatchedInsertAll().execute()


In [None]:
# 5. Show history (for time travel)
deltaTable.history().show(truncate=False)# Current data
print("Current data:")
spark.read.format("delta").load(path).show()

In [None]:
# Current data
print("Current data:")
spark.read.format("delta").load(path).show()

In [None]:
# 6. Time travel: load version 0 (original data)
print("Version 0:")
spark.read.format("delta").option("versionAsOf", 0).load(path).show()

In [None]:
spark.read.format("delta").option("timestampAsOf", "2025-09-26T08:55:00").load(path).show()