<a href="https://colab.research.google.com/github/samas-it-services/open-course-delta-lake/blob/feature%2Fblockchain-via-delta-lake/3_Deltalake_Lakehouse_architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q pyspark
# !pip install delta-spark==4.0.0rc1
!pip install delta-spark


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Collecting delta-spark
  Downloading delta_spark-3.2.0-py3-none-any.whl (21 kB)
Installing collected packages: delta-spark
Successfully installed delta-spark-3.2.0


In [2]:
!apt-get install openjdk-8-jdk-headless -qq


Selecting previously unselected package libxtst6:amd64.
(Reading database ... 121925 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u412-ga-1~22.04.1_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u412-ga-1~22.04.1) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u412-ga-1~22.04.1_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u412-ga-1~22.04.1) ...
Setting up libxtst6:amd64 (2:1.2.3-1build4) ...
Setting up openjdk-8-jre-headless:amd64 (8u412-ga-1~22.04.1) ...
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/orbd to provide /usr/bin/orbd (orbd) in auto mode
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/servertool to provide /usr/bin

In [3]:
def _create_delta_spark():
  from pyspark.sql import SparkSession
  from delta import configure_spark_with_delta_pip
  builder = SparkSession.builder.appName("DeltaLakeApp") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
  .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")
  return configure_spark_with_delta_pip(builder).getOrCreate()

spark = _create_delta_spark()
# Enable Delta Lake features
spark.conf.set("spark.databricks.delta.preview.enabled", "true")


In [4]:
def _enable_sparkui(port=4040):
    from google.colab import output
    return output.serve_kernel_port_as_window(port, path='/jobs/index.html')

_enable_sparkui()

<IPython.core.display.Javascript object>

In [5]:
!java -version
!which java

# Once inside the file, set the variable with your Java path, then save and close the file
%env JAVA_HOME=/usr/bin/java

# Test if it was set successfully
!echo "JAVA_HOME=${JAVA_HOME}"

openjdk version "11.0.23" 2024-04-16
OpenJDK Runtime Environment (build 11.0.23+9-post-Ubuntu-1ubuntu122.04.1)
OpenJDK 64-Bit Server VM (build 11.0.23+9-post-Ubuntu-1ubuntu122.04.1, mixed mode, sharing)
/usr/bin/java
env: JAVA_HOME=/usr/bin/java
JAVA_HOME=/usr/bin/java


In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from delta import *
from delta.tables import *
import os

class StorageInterface:
    def save_df(self, df, path):
        raise NotImplementedError

    def read_df(self, path):
        raise NotImplementedError

class FileSystemStorage(StorageInterface):
    def save_df(self, df, path):
        df.write.format("delta").mode('overwrite').save(path)

    def read_df(self, path):
        return spark.read.format("delta").load(path)

class S3Storage(StorageInterface):
    def __init__(self, bucket_name):
        self.bucket_name = bucket_name

    def save_df(self, df, path):
        s3_path = f"s3a://{self.bucket_name}/{path}"
        df.write.format("delta").mode('overwrite').save(s3_path)

    def read_df(self, path):
        s3_path = f"s3a://{self.bucket_name}/{path}"
        return spark.read.format("delta").load(s3_path)

class DatabaseStorage(StorageInterface):
    def __init__(self, jdbc_url, table, properties):
        self.jdbc_url = jdbc_url
        self.table = table
        self.properties = properties

    def save_df(self, df, path):
        df.write.jdbc(self.jdbc_url, self.table, mode='overwrite', properties=self.properties)

    def read_df(self, path):
        return spark.read.jdbc(self.jdbc_url, self.table, properties=self.properties)


print("Define storage layers")
bronze_layer = FileSystemStorage()
silver_layer = FileSystemStorage()
gold_layer = FileSystemStorage()

print("Sample data for bronze layer")
data = [("Mr. Bilgrami", 48), ("Mr. AZ", 51), ("Mr. Shyam", 44), ("Mr. Taimur", 25)]
columns = ["Name", "Age"]
df_bronze = spark.createDataFrame(data, columns)

print("Save bronze data")
bronze_layer.save_df(df_bronze, "bronze/people")

print("Read bronze data")
df_bronze = bronze_layer.read_df("bronze/people")

print("Showing the bronze data")
df_bronze.show()

print("Transform bronze to silver (e.g., filter data)")
df_silver = df_bronze.filter(col("Age") > 30)

print("Save silver data as Delta format")
silver_layer.save_df(df_silver, "silver/people")

print("Read silver data")
df_silver = silver_layer.read_df("silver/people")

print("Showing the silver data where Age > 30")
df_silver.show()

print("Transform silver to gold (e.g., select specific columns)")
df_gold = df_silver.select("Name")

print("Save gold data as Delta format with advanced features")
gold_table_path = "gold/people"
df_gold.write.format("delta").mode('overwrite').save(gold_table_path)

print("Create DeltaTable object")
gold_table = DeltaTable.forPath(spark, gold_table_path)

print("Coordinated commits")
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

new_data = [("David", 35)]
schema = StructType([StructField("Name", StringType(), True), StructField("Age", IntegerType(), True)])
df_new_data = spark.createDataFrame(new_data, schema)

# Using merge to demonstrate coordinated commits
gold_table.alias("gold").merge(
    df_new_data.alias("updates"),
    "gold.Name = updates.Name"
).whenMatchedUpdate(set={"Age": "updates.Age"}) \
 .whenNotMatchedInsert(values={"Name": "updates.Name", "Age": "updates.Age"}) \
 .execute()

print("Schema evolution using type widening")
new_data_variant = [("Emma", "thirty")]
schema_variant = StructType([StructField("Name", StringType(), True), StructField("Age", StringType(), True)])
df_new_data_variant = spark.createDataFrame(new_data_variant, schema_variant)

df_new_data_variant.write.format("delta").mode("append").option("mergeSchema", "true").save(gold_table_path)

print("Liquid clustering")
gold_table.optimize().executeCompaction()

print("Showing the gold data")
df_gold = gold_layer.read_df(gold_table_path)
df_gold.show()


Define storage layers
Sample data for bronze layer
Save bronze data
Read bronze data
Showing the bronze data
+------------+---+
|        Name|Age|
+------------+---+
|   Mr. Shyam| 44|
|  Mr. Taimur| 25|
|Mr. Bilgrami| 48|
|      Mr. AZ| 51|
+------------+---+

Transform bronze to silver (e.g., filter data)
Save silver data as Delta format
Read silver data
Showing the silver data where Age > 30
+------------+---+
|        Name|Age|
+------------+---+
|   Mr. Shyam| 44|
|Mr. Bilgrami| 48|
|      Mr. AZ| 51|
+------------+---+

Transform silver to gold (e.g., select specific columns)
Save gold data as Delta format with advanced features
Create DeltaTable object
Coordinated commits
Schema evolution using type widening
Liquid clustering
Showing the gold data
+------------+------+
|        Name|   Age|
+------------+------+
|        Emma|thirty|
|       David|    35|
|   Mr. Shyam|  NULL|
|Mr. Bilgrami|  NULL|
|      Mr. AZ|  NULL|
+------------+------+

