# Delta Lake Basic by Bordin
## Set Up Your Spark Session for Delta Lake
#### Before you use Delta Lake, make sure your SparkSession is set up correctly

In [1]:
%manage_spark

MagicsControllerWidget(children=(Tab(children=(ManageSessionWidget(children=(HTML(value='<br/>'), HTML(value='…

In [3]:
# Import necessary libraries
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
java_import(spark._sc._jvm, "org.apache.spark.sql.api.python.*")
from delta.tables import DeltaTable

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
# Define the folder path
delta_path = "s3a://truepoc-bkt-raw/test/"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Connects to EzPresto over SSL, executes a SQL query, and loads the result into a Spark DataFrame
df = spark.read.format("jdbc").\
      option("driver", "com.facebook.presto.jdbc.PrestoDriver").\
      option("url", "jdbc:presto://ezpresto.truepoc.ezapac.com:443").\
      option("SSL", "true").\
      option("IgnoreSSLChecks", "true").\
      option("query", "select * from nyctaxi.default.test").\
      load()

df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+
|id3686174|        2|2016-04-04 14:36:10|              2|-73.98367309570312| 40.75807189941406|-73.99024963378906|40.748538970947266|                 N|
|id0672137|        2|2016-04-04 14:36:03|              1|-73.97254180908203|40.756038665771484|-73.97496795654297|40.751468658447266|                 N|
|id2263472|        2|2016-04-04 14:36:00|              1|-73.98406219482422|40.743316650390625|-73.97007751464844| 40.76292037963867|                 N|
|id1710714|        1|2016-04-04 14:35:59|              2|-73.97333526611328| 40.76

In [7]:
# Save the DataFrame as a Delta table in the S3 bucket ---Might takes some time---
df.write.format("delta").mode("overwrite").save(delta_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Read Data from Delta
df = spark.read.format("delta").load(delta_path)
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+---------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+
|       id|vendor_id|    pickup_datetime|passenger_count|  pickup_longitude|  pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|
+---------+---------+-------------------+---------------+------------------+-----------------+------------------+------------------+------------------+
|id3004672|        1|2016-06-30 23:59:58|              1|-73.98812866210938|40.73202896118164|-73.99017333984375| 40.75667953491211|                 N|
|id3505355|        1|2016-06-30 23:59:53|              1|-73.96420288085938|40.67999267578125|-73.95980834960938| 40.65540313720703|                 N|
|id1217141|        1|2016-06-30 23:59:47|              1| -73.9974365234375|40.73758316040039|-73.98616027832031|40.729522705078125|                 N|
|id2150126|        2|2016-06-30 23:59:41|              1|-73.95606994628906|40.771900177

# Perform ACID Operations
#### One of the main advantages of Delta Lake is the ability to perform ACID operations. For instance, you can update, delete, or merge data.

### Update

In [29]:
# Load the Delta table
deltaTable = DeltaTable.forPath(spark, delta_path)

# Update the email address for customer with c_customer_sk = 1
deltaTable.update(
  condition="c_customer_sk = 1",
  set={"c_email_address": "'newemail@ezmeral.sg'"}
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c_email_address` cannot be resolved. Did you mean one of the following? [`vendor_id`, `id`, `dropoff_latitude`, `pickup_datetime`, `pickup_latitude`].;
'DeltaUpdateTable ['c_email_address], [newemail@ezmeral.sg], ('c_customer_sk = 1)
+- Relation [id#1072,vendor_id#1073L,pickup_datetime#1074,passenger_count#1075L,pickup_longitude#1076,pickup_latitude#1077,dropoff_longitude#1078,dropoff_latitude#1079,store_and_fwd_flag#1080] parquet

Traceback (most recent call last):
  File "/usr/lib/python3.9/site-packages/delta/tables.py", line 153, in update
    self._jdt.update(jcolumn, jmap)
  File "/opt/mapr/spark/spark-3.4.0/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/opt/mapr/spark/spark-3.4.0/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
    raise converted from None
pyspark.err

In [None]:
# Display the updated record
df_updated = spark.read.format("delta").load(delta_path)
df_updated.filter("c_customer_sk = 1").show()

### Delete Operation

In [None]:
# Delete the records
deltaTable.delete(condition="c_customer_sk = 1 AND c_email_address = 'newemail@ezapac.com'")

In [None]:
# Display the records after delete operation
df_after_delete = spark.read.format("delta").load(delta_path)
df_after_delete.filter("c_customer_sk = 1").show()

# Time Travel (Data Versioning)
#### Delta Lake has built-in support for versioning. You can query an older snapshot of your Delta table by using a version number or a timestamp.

In [None]:
# Determine the Version You Want to Restore
deltaTable.history().show(truncate=False)

In [None]:
# Restore to Previous Version Using Time Travel
df_restored = spark.read.format("delta").option("versionAsOf", 1).load(delta_path)

# Overwrite the Current Version with the Restored Version
df_restored.write.format("delta").mode("overwrite").save(delta_path)

In [None]:
# Verify the Restoration
df_after_restore = spark.read.format("delta").load(delta_path)
df_after_restore.filter("c_customer_sk = 1").show()

### Insert Operation

In [None]:
from pyspark.sql.types import DecimalType, Row
from decimal import Decimal

# Create a new DataFrame with the new customer data
new_customer = spark.createDataFrame([
    Row(c_customer_sk='1', c_customer_id="BBBBBBBBBB", c_email_address="newBBcustomer@example.com")
])

# Cast the c_customer_sk column to DecimalType(38,0)
new_customer = new_customer.withColumn("c_customer_sk", col("c_customer_sk").cast(DecimalType(38,0)))

# Append the new customer data to the Delta table
new_customer.write.format("delta").mode("append").save(delta_path)

In [None]:
# Check result after insert
df_after_insert = spark.read.format("delta").load(delta_path)
df_after_insert.filter(col("c_customer_sk") == Decimal('1')).show()