In [1]:
# For Google Colaboratory
!pip install pyspark==3.5.1 delta-spark

Collecting delta-spark
  Downloading delta_spark-4.0.0-py3-none-any.whl.metadata (1.9 kB)
INFO: pip is looking at multiple versions of delta-spark to determine which version is compatible with other requirements. This could take a while.
  Downloading delta_spark-3.3.2-py3-none-any.whl.metadata (2.2 kB)
  Downloading delta_spark-3.3.1-py3-none-any.whl.metadata (1.9 kB)
  Downloading delta_spark-3.3.0-py3-none-any.whl.metadata (2.0 kB)
  Downloading delta_spark-3.2.1-py3-none-any.whl.metadata (1.9 kB)
  Downloading delta_spark-3.2.0-py3-none-any.whl.metadata (2.0 kB)
Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0


In [2]:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable

def _create_delta_spark():
  from pyspark.sql import SparkSession
  from delta import configure_spark_with_delta_pip
  builder = SparkSession.builder.appName("DeltaLakeApp") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
  .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")
  return configure_spark_with_delta_pip(builder).getOrCreate()

spark = _create_delta_spark()

In [3]:
def _enable_sparkui(port=4040):
    from google.colab import output
    return output.serve_kernel_port_as_window(port, path='/jobs/index.html')

_enable_sparkui()

Try `serve_kernel_port_as_iframe` instead. [0m


<IPython.core.display.Javascript object>

In [4]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [5]:
# For Google Colaboratory
import sys, os
if 'google.colab' in sys.modules:
    # mount google drive
    from google.colab import drive
    drive.mount('/content/gdrive')
    path_to_file = '/content/gdrive/MyDrive/Big_Data/Practicals' # Please adjust the path accordingly
    os.chdir(path_to_file)
    !pwd

Mounted at /content/gdrive
/content/gdrive/MyDrive/Big_Data/Practicals


## Practical 5a: Delta Lake

This notebook provides an example of updates and the transaction log.

Source: Bennie Haelen, “Delta Lake: Up & Running”

In [6]:
df = spark.createDataFrame([(1, 'P1'),(2, 'P2'),(3, 'P3'),(4, 'P4') ], ["patientID", "name"])

In [7]:
df.coalesce(2).write.format("delta").mode("overwrite").save( "file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath")

In [8]:
delta_log0=spark.read.format("json").load("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath/_delta_log/00000000000000000000.json")
delta_log0.show()

+--------------------+--------------------+--------------------+--------+
|                 add|          commitInfo|            metaData|protocol|
+--------------------+--------------------+--------------------+--------+
|                NULL|{Apache-Spark/3.5...|                NULL|    NULL|
|                NULL|                NULL|{1756864399484, {...|    NULL|
|                NULL|                NULL|                NULL|  {1, 2}|
|{true, 1756864407...|                NULL|                NULL|    NULL|
|{true, 1756864407...|                NULL|                NULL|    NULL|
+--------------------+--------------------+--------------------+--------+



In [9]:
delta_log0.select('add').collect()

[Row(add=None),
 Row(add=None),
 Row(add=None),
 Row(add=Row(dataChange=True, modificationTime=1756864407000, path='part-00000-f06a85a3-86bc-400f-82bc-3a769ecc40b5-c000.snappy.parquet', size=725, stats='{"numRecords":2,"minValues":{"patientID":1,"name":"P1"},"maxValues":{"patientID":2,"name":"P2"},"nullCount":{"patientID":0,"name":0}}')),
 Row(add=Row(dataChange=True, modificationTime=1756864407000, path='part-00001-95d8bb8e-4310-4eaa-86b6-35707dda6b3c-c000.snappy.parquet', size=725, stats='{"numRecords":2,"minValues":{"patientID":3,"name":"P3"},"maxValues":{"patientID":4,"name":"P4"},"nullCount":{"patientID":0,"name":0}}'))]

In [10]:
delta_log0.select('commitInfo').collect()

[Row(commitInfo=Row(engineInfo='Apache-Spark/3.5.1 Delta-Lake/3.2.0', isBlindAppend=False, isolationLevel='Serializable', operation='WRITE', operationMetrics=Row(numFiles='2', numOutputBytes='1450', numOutputRows='4'), operationParameters=Row(mode='Overwrite', partitionBy='[]'), timestamp=1756864411185, txnId='53165400-e9bc-44ed-9ae0-9f982f269977')),
 Row(commitInfo=None),
 Row(commitInfo=None),
 Row(commitInfo=None),
 Row(commitInfo=None)]

In [11]:
delta_log0.select('metaData').collect()

[Row(metaData=None),
 Row(metaData=Row(createdTime=1756864399484, format=Row(provider='parquet'), id='7ea77476-c36e-4efd-87a7-0eb0563a4d63', partitionColumns=[], schemaString='{"type":"struct","fields":[{"name":"patientID","type":"long","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}}]}')),
 Row(metaData=None),
 Row(metaData=None),
 Row(metaData=None)]

In [12]:
df = spark.createDataFrame([(5, 'P5'),(6, 'P6')], ["patientID", "name"])
df.coalesce(1).write.format("delta").mode("append").save("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath")

In [13]:
delta_log1=spark.read.format("json").load("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath/_delta_log/00000000000000000001.json")
delta_log1.show()

+--------------------+--------------------+
|                 add|          commitInfo|
+--------------------+--------------------+
|                NULL|{Apache-Spark/3.5...|
|{true, 1756864481...|                NULL|
+--------------------+--------------------+



In [14]:
spark.read.format("delta").load("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath").show()


+---------+----+
|patientID|name|
+---------+----+
|        3|  P3|
|        4|  P4|
|        5|  P5|
|        6|  P6|
|        1|  P1|
|        2|  P2|
+---------+----+



In [15]:
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath")

In [16]:
from pyspark.sql.functions import *
deltaTable.update(col("PatientID") == 1, {"name": lit("P11")})

In [17]:
spark.read.format("delta").load("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath").show()

+---------+----+
|patientID|name|
+---------+----+
|        1| P11|
|        2|  P2|
|        3|  P3|
|        4|  P4|
|        5|  P5|
|        6|  P6|
+---------+----+



In [18]:
delta_log2=spark.read.format("json").load("file:/content/gdrive/MyDrive/Big_Data/Practicals/deltaPath/_delta_log/00000000000000000002.json")
delta_log2.show()

+--------------------+--------------------+--------------------+
|                 add|          commitInfo|              remove|
+--------------------+--------------------+--------------------+
|                NULL|{Apache-Spark/3.5...|                NULL|
|{true, 1756864562...|                NULL|                NULL|
|                NULL|                NULL|{true, 1756864562...|
+--------------------+--------------------+--------------------+

