In [1]:
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("Delta CRUD") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

:: loading settings :: url = jar:file:/Users/sharattadimalla/github/deltalake/myenv/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/sharattadimalla/.ivy2/cache
The jars for the packages stored in: /Users/sharattadimalla/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-88f88112-de04-42aa-a82d-a379dd16cfdd;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.1.1 in central
	found io.delta#delta-storage;2.1.1 in central
	found org.antlr#antlr4-runtime;4.8 in local-m2-cache
	found org.codehaus.jackson#jackson-core-asl;1.9.13 in local-m2-cache
:: resolution report :: resolve 189ms :: artifacts dl 9ms
	:: modules in use:
	io.delta#delta-core_2.12;2.1.1 from central in [default]
	io.delta#delta-storage;2.1.1 from central in [default]
	org.antlr#antlr4-runtime;4.8 from local-m2-cache in [default]
	org.codehaus.jackson#jackson-core-asl;1.9.13 from local-m2-cache in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |


22/11/23 23:38:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
df = spark.range(0,5)
df.show()

[Stage 0:>                                                          (0 + 0) / 8]

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



[Stage 0:>                                                          (0 + 8) / 8]                                                                                

## Insert Records

In [5]:
%%sh
rm -rf output/append-delta-tbl/

In [6]:
df.write \
  .format("delta") \
  .mode("append") \
  .save("output/append-delta-tbl/")



## Read inserted records

In [7]:
spark.read.format("delta").load("output/append-delta-tbl/").show()

+---+
| id|
+---+
|  2|
|  0|
|  3|
|  1|
|  4|
+---+



## Update records by overwriting

In [8]:
update_df = spark.range(3,8)
update_df.show()

+---+
| id|
+---+
|  3|
|  4|
|  5|
|  6|
|  7|
+---+



In [9]:
update_df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("output/append-delta-tbl/")

## Read updated records

In [10]:
spark.read.format("delta").load("output/append-delta-tbl/").show()

+---+
| id|
+---+
|  6|
|  7|
|  5|
|  3|
|  4|
+---+



## conditional update

In [11]:
from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, "output/append-delta-tbl/")

## Update even values with their squared value

In [12]:
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id * id") })



## Read conditionally updated records

In [13]:
spark.read.format("delta").load("output/append-delta-tbl/").show()

+---+
| id|
+---+
|  7|
|  5|
|  3|
| 16|
| 36|
+---+



## delete records - all odd numbers

In [14]:
deltaTable.delete(condition = expr("id % 2 <> 0"))

## Read after conditional deletion

In [15]:
spark.read.format("delta").load("output/append-delta-tbl/").show()

+---+
| id|
+---+
| 16|
| 36|
+---+



## Upsert or Merge

In [16]:
upsert_df = spark.range(250,257)

deltaTable.alias("oldData") \
  .merge(
    upsert_df.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()

deltaTable.toDF().show()

+---+
| id|
+---+
|250|
|251|
|252|
|253|
|254|
|255|
|256|
| 16|
| 36|
+---+

