In [1]:
!pip install delta-spark

[0m

In [2]:
from delta import configure_spark_with_delta_pip
from linkml_runtime.utils.schemaview import SchemaView
from pyspark.sql.types import StructType, StructField, StringType, FloatType
from pyspark.sql import SparkSession
import os


In [3]:

# Parse the LinkML schema
sv = SchemaView("schema.yaml")
product_class = sv.get_class("Product")
slots = sv.class_slots("Product")

def map_range_to_spark_type(slot_range):
    if slot_range == "string":
        return StringType()
    elif slot_range == "float":
        return FloatType()
    else:
        return StringType()

fields = [
    StructField(slot.name, map_range_to_spark_type(slot.range), True)
    for slot in [sv.induced_slot(s, "Product") for s in slots]
]
product_schema = StructType(fields)

# Initialize Spark


builder = SparkSession.builder \
    .appName("LinkML to Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Sample data
data = [("p001", "Widget", 19.99)]
df = spark.createDataFrame(data, schema=product_schema)

# Write Delta table
output_path = "/tmp/delta/products"
os.makedirs(output_path, exist_ok=True)
df.write.format("delta").mode("overwrite").save(output_path)


:: loading settings :: url = jar:file:/opt/spark-3.4.1-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cf3a09fb-f1da-42e8-9a92-72eb8cfe7487;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 303ms :: artifacts dl 27ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0   |   0   |   0 

In [4]:
df_read = spark.read.format("delta").load("/tmp/delta/products")
df_read.printSchema()


root
 |-- product_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: float (nullable = true)



In [5]:
df_read.show()


                                                                                

+----------+------+-----+
|product_id|  name|price|
+----------+------+-----+
|      p001|Widget|19.99|
+----------+------+-----+



In [6]:
new_data = [("p002", "Gadget", 29.99)]
df_new = spark.createDataFrame(new_data, schema=product_schema)

# Append to Delta table
df_new.write.format("delta").mode("append").save("/tmp/delta/products")


                                                                                

In [7]:
# Show version 0 (initial write)
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/products")
df_v0.show()




+----------+------+-----+
|product_id|  name|price|
+----------+------+-----+
|      p001|Widget|19.99|
+----------+------+-----+



                                                                                

In [8]:
df_all = spark.read.format("delta").load("/tmp/delta/products")
df_all.show()




+----------+------+-----+
|product_id|  name|price|
+----------+------+-----+
|      p001|Widget|19.99|
|      p002|Gadget|29.99|
+----------+------+-----+



                                                                                

In [19]:
# Parse the LinkML schema
sv = SchemaView("cluster-schema.yaml")
cluster_class = sv.get_class("Cluster")
slots = sv.class_slots("Cluster")

def map_range_to_spark_type(slot_range):
    if slot_range == "string":
        return StringType()
    elif slot_range == "float":
        return FloatType()
    else:
        return StringType()

fields = [
    StructField(slot.name, map_range_to_spark_type(slot.range), True)
    for slot in [sv.induced_slot(s, "Cluster") for s in slots]
]
cluster_schema = StructType(fields)

# Initialize Spark


builder = SparkSession.builder \
    .appName("LinkML to Delta") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Sample data
data = [("550e8400-e29b-41d4-a716-446655440000", "First cluster for test dataset", "prot-001")]

# Create DataFrame
df = spark.createDataFrame(data, schema=cluster_schema)

# Write Delta table
output_path = "/tmp/delta/cluster"
os.makedirs(output_path, exist_ok=True)
df.write.format("delta").option("overwriteSchema", "true").mode("overwrite").save(output_path)


                                                                                

In [20]:
df_read = spark.read.format("delta").load("/tmp/delta/cluster")
df_read.printSchema()

root
 |-- cluster_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- protocol_id: string (nullable = true)



In [21]:
df_all = spark.read.format("delta").load("/tmp/delta/cluster")
df_all.show()

+--------------------+--------------------+-----------+
|          cluster_id|         description|protocol_id|
+--------------------+--------------------+-----------+
|550e8400-e29b-41d...|First cluster for...|   prot-001|
+--------------------+--------------------+-----------+



In [22]:
# New record to insert
new_data = [("123e4567-e89b-12d3-a456-426614174000", "Second test cluster", "prot-002")]

# Create DataFrame from the new data
new_df = spark.createDataFrame(new_data, schema=cluster_schema)

# Append to existing Delta table
new_df.write.format("delta").mode("append").save(output_path)


                                                                                

In [23]:
df_all = spark.read.format("delta").load("/tmp/delta/cluster")
df_all.show()

+--------------------+--------------------+-----------+
|          cluster_id|         description|protocol_id|
+--------------------+--------------------+-----------+
|550e8400-e29b-41d...|First cluster for...|   prot-001|
|123e4567-e89b-12d...| Second test cluster|   prot-002|
+--------------------+--------------------+-----------+



In [24]:
df_v_0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/cluster")
df_v_0.show(truncate=False)




+------------------------------------+------------------------------+-----------+
|cluster_id                          |description                   |protocol_id|
+------------------------------------+------------------------------+-----------+
|550e8400-e29b-41d4-a716-446655440000|First cluster for test dataset|prot-001   |
+------------------------------------+------------------------------+-----------+



                                                                                

In [25]:
df = spark.read.format("delta").load("/tmp/delta/cluster")
df.show(truncate=False)
df.printSchema()


+------------------------------------+------------------------------+-----------+
|cluster_id                          |description                   |protocol_id|
+------------------------------------+------------------------------+-----------+
|550e8400-e29b-41d4-a716-446655440000|First cluster for test dataset|prot-001   |
|123e4567-e89b-12d3-a456-426614174000|Second test cluster           |prot-002   |
+------------------------------------+------------------------------+-----------+

root
 |-- cluster_id: string (nullable = true)
 |-- description: string (nullable = true)
 |-- protocol_id: string (nullable = true)



In [26]:
from delta.tables import DeltaTable

history_df = DeltaTable.forPath(spark, "/tmp/delta/cluster").history()
history_df.select("version", "timestamp", "operation", "operationParameters").show(truncate=False)


+-------+-----------------------+---------+--------------------------------------+
|version|timestamp              |operation|operationParameters                   |
+-------+-----------------------+---------+--------------------------------------+
|1      |2025-05-18 17:21:13.957|WRITE    |{mode -> Append, partitionBy -> []}   |
|0      |2025-05-18 17:20:59.196|WRITE    |{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+---------+--------------------------------------+



In [27]:
df_v_0 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/cluster")
df_v_0.show(truncate=False)


+------------------------------------+------------------------------+-----------+
|cluster_id                          |description                   |protocol_id|
+------------------------------------+------------------------------+-----------+
|550e8400-e29b-41d4-a716-446655440000|First cluster for test dataset|prot-001   |
|123e4567-e89b-12d3-a456-426614174000|Second test cluster           |prot-002   |
+------------------------------------+------------------------------+-----------+



In [28]:
df_v_0 = spark.read.format("delta").option("versionAsOf", 2).load("/tmp/delta/cluster")
df_v_0.show(truncate=False)


AnalysisException: Cannot time travel Delta table to version 2. Available versions: [0, 1].