In [None]:
!pip install delta-spark==3.3.0

In [None]:
from pyspark.sql import SparkSession

# Configure SparkSession to connect to the cluster
spark = SparkSession.builder \
    .appName("Jupyter-Spark-Delta") \
    .master("spark://spark-master:7077") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "2g") \
    .config("spark.cores.max", "4") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://hdfs-namenode:8020") \
    .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.3.0") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print(spark.version)

In [None]:
spark.stop()


# Reading and Writing to/from a Delta Table

In [None]:
df = spark.read.csv("/sample_data.csv", header=True, inferSchema=True)

In [None]:
df.write.format("delta").mode("append").save("/delta_tables/sample")

In [None]:
df = spark.read.format("delta").load("/delta_tables/sample")

In [None]:
df.show()

# Upserting: Todo

In [None]:
# New data with some updates
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
df.write.format("delta").mode("overwrite").save("/delta_tables/people")

In [None]:
df.show()

In [None]:
from delta.tables import DeltaTable

# New data with some updates
new_data = [("Alice", 26), ("Eve", 28)]
df_new = spark.createDataFrame(new_data, columns)

# Load Delta table
delta_table = DeltaTable.forPath(spark, "/delta_tables/people")
delta_table.toDF().show()
df_new.show()


In [None]:
delta_table.update(
  condition = "name = 'Alice'",
  set = { "age": "'28'" }
)

In [None]:
# Merge data
delta_table.alias("old") \
    .merge(df_new.alias("new"), "old.name = new.name") \
    .whenMatchedUpdate(set={"age": "new.age"}) \
    .whenNotMatchedInsert(values={"name": "new.name", "age": "new.age"}) \
    .execute()

# Time Travel

In [None]:
# Read previous version by timestamp
df_old = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load(hdfs_path)

# Read previous version by version number
df_old = spark.read.format("delta").option("versionAsOf", 1).load(hdfs_path)


In [None]:
# Vacuum

In [None]:
delta_table.vacuum(retentionHours=168)  # Removes files older than 7 days
