## Init

In [3]:
import findspark
import os
import sys

from dotenv import load_dotenv
from IPython.core.magic import register_cell_magic

# Find Spark package
findspark.init()

# Get environment variable from .env file
env = load_dotenv()

# Add project working directory to PATH
sys.path.append(os.getenv("PROJECT_FOLDER"))

In [4]:
from src.transform.etl import *
from src.transform.common import *

from IPython.core.magic import register_cell_magic

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType

# Initialize Spark session with configurations
spark = SparkSession.builder \
    .appName("Delta-Unity-Catalog") \
    .master("local[*]") \
    .config("spark.jars.packages", 
            "io.delta:delta-spark_2.12:3.2.1,io.unitycatalog:unitycatalog-spark_2.12:0.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "io.unitycatalog.spark.UCSingleCatalog") \
    .config("spark.sql.catalog.unity", "io.unitycatalog.spark.UCSingleCatalog") \
    .config("spark.sql.catalog.unity.uri", "http://localhost:8080") \
    .config("spark.sql.catalog.unity.token", "") \
    .config("spark.sql.defaultCatalog", "unity") \
    .getOrCreate()

24/12/16 18:43:52 WARN Utils: Your hostname, khoa-le-MS-7B19 resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface eno1)
24/12/16 18:43:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/khoa-le/.ivy2/cache
The jars for the packages stored in: /home/khoa-le/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
io.unitycatalog#unitycatalog-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-59408991-f204-4b50-8d1a-b1f841c0c977;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/khoa-le/data/app/spark-3.5.3-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found io.delta#delta-spark_2.12;3.2.1 in central
	found io.delta#delta-storage;3.2.1 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
	found io.unitycatalog#unitycatalog-spark_2.12;0.2.0 in central
	found io.unitycatalog#unitycatalog-client;0.2.0 in central
	found org.slf4j#slf4j-api;2.0.13 in central
	found org.apache.logging.log4j#log4j-slf4j2-impl;2.23.1 in central
	found org.apache.logging.log4j#log4j-api;2.23.1 in central
	found org.apache.logging.log4j#log4j-core;2.23.1 in central
	found com.fasterxml.jackson.datatype#jackson-datatype-jsr310;2.17.0 in central
	found org.openapitools#jackson-databind-nullable;0.2.6 in central
	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.fasterxml.jackson.core#jackson-databind;2.15.0 in central
	found com.fasterxml.jackson.core#jackson-annotations;2.15.0 in central
	found com.fasterxml.jackson.core#jackson-core;2.15.0 in central
	found com.fasterxml.jackson.module#jackson-module-scala_2.12;2.15.0 in central
	found 

In [6]:
# Create a global variable for the Spark session
@register_cell_magic
def sql(line, cell=None):
    query = cell or line
    df = spark.sql(query)
    return df.show()

In [10]:
# Declare variables
catalog = os.getenv("CATALOG")
schema = os.getenv("RAW_SCHEMA")
storage_folder = os.getenv("STORAGE_FOLDER")

## Load Dataset

In [11]:
customer_df = (
    spark.read.format("csv")
    .option("header", "true")
    . option("inferSchema","true").load(f"{storage_folder}/uc_{catalog}/{schema}/_volumes/txt_files/customer_data.csv")
)

In [12]:
staff_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema","true").load(f"{storage_folder}/uc_{catalog}/{schema}/_volumes/txt_files/staff_data.csv")
)

In [13]:
store_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true").load(f"{storage_folder}/uc_{catalog}/{schema}/_volumes/txt_files/store_data.csv")
)

In [14]:
product_df = (
    spark.read.format("parquet")
    .load(f"{storage_folder}/other_sources/product_data.parquet")
)

In [15]:
# Define the schema
transaction_schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("store", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("staff_id", StringType(), True),
    StructField("transaction_items", ArrayType(
        StructType([
            StructField("item_id", StringType(), True),
            StructField("quantity", IntegerType(), True),
            StructField("item_order", IntegerType(), True)
        ])
    ), True),
    StructField("utc_dt", StringType(), True)
])

transaction_df = spark.read.json(
    f"{storage_folder}/uc_{catalog}/{schema}/_volumes/json_files/transaction_data.json", 
    schema=transaction_schema, 
    multiLine=True
)

## Raw

In [16]:
customer_df = ( 
    customer_df
     .transform(add_processing_ts)
)

In [17]:
customer_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- yob: date (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- job: string (nullable = true)
 |-- address: string (nullable = true)
 |-- first_transaction: date (nullable = true)
 |-- membership: string (nullable = true)
 |-- last_processed_ts: timestamp (nullable = false)



In [18]:
staff_df = (
    staff_df
    .transform(add_processing_ts)
)

In [19]:
staff_df.printSchema()

root
 |-- staff_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- last_processed_ts: timestamp (nullable = false)



In [20]:
store_df = (
    store_df
    .transform(add_processing_ts)
)

In [22]:
store_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- last_processed_ts: timestamp (nullable = false)



In [21]:
product_df = (
    product_df
    .transform(add_processing_ts)
)

In [23]:
product_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- unit_price: long (nullable = true)
 |-- last_processed_ts: timestamp (nullable = false)



In [24]:
transaction_df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- store: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- staff_id: string (nullable = true)
 |-- transaction_items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item_id: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- item_order: integer (nullable = true)
 |-- utc_dt: string (nullable = true)



In [25]:
transaction_df = (
    transaction_df
    .transform(process_transaction, spark)
    .transform(add_processing_ts)
)

## Merge table

In [26]:
# customer
merge_table(
    df=customer_df,
    uc_path=uc_path(catalog, schema, "customers"),
    merge_columns=["customer_id"],
    spark=spark
)

# product
merge_table(
    df=product_df,
    uc_path=uc_path(catalog, schema, "products"),
    merge_columns=["product_id"],
    spark=spark
)

# staff
merge_table(
    df=staff_df,
    uc_path=uc_path(catalog, schema, "staffs"),
    merge_columns=["staff_id"],
    spark=spark
)

# store
merge_table(
    df=store_df,
    uc_path=uc_path(catalog, schema, "stores"),
    merge_columns=["name"],
    spark=spark
)

# transaction
merge_table(
    df=transaction_df, 
    uc_path=uc_path(catalog, schema, "transactions"), 
    merge_columns=["transaction_id", "item_order"],
    spark=spark
)

Query: 
    MERGE INTO unity.raw.customers as target
    USING source as source
    ON  target.customer_id = source.customer_id  
    WHEN MATCHED THEN
        UPDATE SET *
    WHEN NOT MATCHED THEN
        INSERT *
    


24/12/16 18:47:37 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|              100|               0|               0|              100|
+-----------------+----------------+----------------+-----------------+

Query: 
    MERGE INTO unity.raw.products as target
    USING source as source
    ON  target.product_id = source.product_id  
    WHEN MATCHED THEN
        UPDATE SET *
    WHEN NOT MATCHED THEN
        INSERT *
    
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|               35|               0|               0|               35|
+-----------------+----------------+----------------+-----------------+

Query: 
    MERGE INTO unity.raw.staffs as target
    USING 