In [None]:
spark.stop()

# Start Spark session

In [1]:
import os
import uuid
import pyspark
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from datetime import datetime, timezone, timedelta, date
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col 
from pyspark.sql.functions import max
from typing import List
import textwrap

In [2]:
AWS_S3_ENDPOINT = os.getenv("AWS_S3_ENDPOINT")
NESSIE_URI = os.getenv("NESSIE_URI")
WAREHOUSE = os.getenv("WAREHOUSE")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

In [3]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', ",".join([
            'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.2',
            'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.104.3',
            'org.apache.iceberg:iceberg-aws-bundle:1.9.2',
            'org.postgresql:postgresql:42.5.0'
            ]))
        .set('spark.sql.extensions', ",".join([
            'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions',
            'org.projectnessie.spark.extensions.NessieSparkSessionExtensions'
        ]))
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.client-api-version', '2')
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', AWS_S3_ENDPOINT)
        .set('spark.sql.catalog.nessie.warehouse', WAREHOUSE)
        .set('spark.sql.catalog.nessie.s3.path-style-access', 'true')
        .set('spark.sql.catalog.nessie.s3.access-key-id', AWS_ACCESS_KEY_ID)
        .set('spark.sql.catalog.nessie.s3.secret-access-key', AWS_SECRET_ACCESS_KEY)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set("spark.sql.iceberg.merge-schema", "true")
        .set("spark.executor.memory", "2g")
        .set("spark.driver.memory", "2g")
        .set("spark.executor.cores", "2")
)

# Start Spark Session
spark = SparkSession.builder.master("spark://spark-master:7077").config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

print("Spark Running")

/usr/local/lib/python3.8/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found


:: loading settings :: url = jar:file:/usr/local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-aws-bundle added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-ff1f7cf0-6815-4ddb-9213-241315657b8c;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.9.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.104.3 in central
	found org.apache.iceberg#iceberg-aws-bundle;1.9.2 in central
	found org.postgresql#postgresql;42.5.0 in central
	found org.checkerframework#checker-qual;3.5.0 in central
downloading https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.9.2/iceberg-spark-runtime-3.5_2.12-1.9.2.jar ...
	[SUCCESSFUL ] org

Spark Running


# DDL

In [4]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql("CREATE DATABASE nessie.bronze")
spark.sql("CREATE DATABASE nessie.silver")
spark.sql("CREATE DATABASE nessie.gold")

DataFrame[]

## Bronze

In [5]:
spark.sql("""
    CREATE TABLE nessie.bronze.customer (
        id BIGINT,
        name STRING,
        sex STRING,
        mail STRING,
        birthdate DATE,
        login_username STRING,
        login_password STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.location (
        id STRING,
        street_address STRING,
        city STRING,
        state STRING,
        zipcode INT,
        country STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.customer_location (
        id BIGINT,
        customer_id BIGINT,
        location_id STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.phone_number (
        id STRING,
        phone_number STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.customer_phone (
        id BIGINT,
        customer_id BIGINT,
        phone_id STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.shadow_product (
        id STRING,
        product_id STRING,
        product_title STRING,
        currency STRING,
        price DECIMAL(10, 2),
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.category (
        id BIGINT,
        category_name STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.product_category (
        id BIGINT,
        product_id STRING,
        category_id BIGINT,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

spark.sql("""
    CREATE TABLE nessie.bronze.review (
        id STRING,
        customer_id BIGINT,
        product_id STRING,
        star_rating STRING,
        helpful_votes INT,
        total_votes INT,
        marketplace STRING,
        verified_purchase STRING,
        review_headline STRING,
        review_body STRING,
        created_at DATE,
        updated_at DATE,
        _ingested_at TIMESTAMP,
        _batch_id STRING,
        _is_deleted BOOLEAN
    )
    USING iceberg
    TBLPROPERTIES ('write.spark.accept-any-schema'='true')
""")

DataFrame[]

## Silver

In [6]:
spark.sql("""
    CREATE TABLE nessie.silver.customer (
        customer_id BIGINT,
        name STRING,
        sex STRING,
        mail STRING,
        birthdate DATE,
        signup_date DATE
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.location (
        location_id STRING,
        street_address STRING,
        city STRING,
        state STRING,
        country STRING,
        zipcode STRING
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.customer_location (
        customer_location_id BIGINT,
        customer_id BIGINT,
        location_id STRING,
        source_created_at DATE,
        source_updated_at DATE
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.product (
        product_id STRING,
        product_title STRING,
        currency STRING,
        price DECIMAL(10, 2),
        source_created_at DATE,
        source_updated_at DATE
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.category (
        category_id BIGINT,
        category_name STRING
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.product_category (
        product_category_id BIGINT,
        product_id STRING,
        category_id BIGINT
    )
""")

spark.sql("""
    CREATE TABLE nessie.silver.review (
        review_id STRING,
        customer_id BIGINT,
        product_id STRING,
        star_rating INT,
        helpful_votes INT,
        total_votes INT,
        marketplace STRING,
        verified_purchase BOOLEAN,
        review_headline STRING,
        review_body STRING,
        modified_date DATE
    )
""")

DataFrame[]

## Gold

In [7]:
spark.sql("""
    CREATE TABLE nessie.gold.dim_customer (
        customer_key BIGINT,
        customer_id BIGINT,
        name STRING,
        sex STRING,
        mail STRING,
        birthdate DATE,
        signup_date DATE
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.dim_date (
        date_key BIGINT,
        full_date DATE,
        day INT,
        month INT,
        month_name STRING,
        quarter INT,
        quarter_name STRING,
        year INT,
        day_of_week INT,
        day_name STRING,
        week_of_year INT,
        is_weekend BOOLEAN,
        is_holiday BOOLEAN
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.dim_location (
        location_key BIGINT,
        location_id STRING,
        street_address STRING,
        city STRING,
        state STRING,
        country STRING,
        zipcode STRING
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.bridge_customer_location (
        customer_location_key BIGINT,
        customer_location_id BIGINT,
        location_key INT,
        customer_key INT,
        valid_from DATE,
        valid_to DATE,
        is_current BOOLEAN
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.dim_product (
        product_key BIGINT,
        product_id STRING,
        product_title STRING,
        currency STRING,
        price DECIMAL(10, 2),
        valid_from DATE,
        valid_to DATE,
        is_current BOOLEAN
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.dim_category (
        category_key BIGINT,
        category_id BIGINT,
        category_name STRING
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.bridge_product_category (
        product_category_key BIGINT,
        product_category_id BIGINT,
        product_key INT,
        category_key INT
    )
""")

spark.sql("""
    CREATE TABLE nessie.gold.fact_review (
        review_key BIGINT,
        review_id STRING,
        product_key INT,
        customer_key INT,
        date_key INT,
        star_rating INT,
        helpful_votes INT,
        total_votes INT,
        marketplace STRING,
        verified_purchase BOOLEAN,
        review_headline STRING,
        review_body STRING,
        is_current BOOLEAN
    )
""")

DataFrame[]

# ETL processing

In [8]:
ts = datetime.now(timezone(timedelta(hours=7))).replace(tzinfo=None).replace(microsecond=0)
ts = datetime(2012, 1, 1, 1, 1, 1)

today = str(ts.date())

etl_processing_branch_name = f"feat/etl-processing-{ts.strftime('%Y-%m-%d-%H-%M-%S')}"
spark.sql(f"""
    CREATE BRANCH
    IF NOT EXISTS {etl_processing_branch_name}
    IN nessie
    FROM main
""")

DataFrame[refType: string, name: string, hash: string]

## Bronze Layer

Create new branch for etl:

    - If success, merge branch
    - If fail, nothing change

In [9]:
bronze_layer_branch_name = f"feat/bronze-layer-{ts.strftime('%Y-%m-%d-%H-%M-%S')}"
spark.sql(f"""
    CREATE BRANCH
    IF NOT EXISTS {bronze_layer_branch_name}
    IN nessie
    FROM {etl_processing_branch_name}
""")

spark.sql(f"USE REFERENCE {bronze_layer_branch_name} IN nessie;")
spark.sql("LIST REFERENCES IN nessie").show(truncate=False)

+-------+---------------------------------------+----------------------------------------------------------------+
|refType|name                                   |hash                                                            |
+-------+---------------------------------------+----------------------------------------------------------------+
|Branch |feat/bronze-layer-2012-01-01-01-01-01  |2316eefd35c51de5b350867775a59b513a423f7a5531e9244403ed652f058633|
|Branch |feat/etl-processing-2012-01-01-01-01-01|2316eefd35c51de5b350867775a59b513a423f7a5531e9244403ed652f058633|
|Branch |main                                   |2316eefd35c51de5b350867775a59b513a423f7a5531e9244403ed652f058633|
+-------+---------------------------------------+----------------------------------------------------------------+



In [10]:
def read(table_name: str, today: date):
    # Lấy thời gian gần nhất mà bronze layer được update
    max_updated_at = spark.sql(f"SELECT max(updated_at) as max_updated_at FROM nessie.bronze.{table_name}").collect()[0]["max_updated_at"]
    max_updated_at = "1999-01-01" if max_updated_at is None else str(max_updated_at)
    print(f"max_updated_at: {max_updated_at}")
    
    query = f"""
    (
        SELECT 
            min(updated_at) AS min_date, 
            max(updated_at) AS max_date 
        FROM {table_name} 
        WHERE updated_at > '{max_updated_at}' AND updated_at <= '{today}'
    ) tmp
    """
    bounds = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/oltp") \
        .option("dbtable", query) \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .option("driver", "org.postgresql.Driver") \
        .load() \
        .collect()[0]
    
    lower, upper = bounds["min_date"], bounds["max_date"]
    if lower is None or upper is None:
        print(f"Không có bản ghi mới cho {table_name}, skip ingest.")
        df_empty = spark.createDataFrame([], StructType([]))
        return df_empty
    print(f"lower: {lower}, upper: {upper}")
    
    # Chỉ lấy những record ở source có giá trị updated_at > giá trị max(updated_at) ở bronze
    query = f"""
    (
        SELECT * 
        FROM {table_name} 
        WHERE updated_at > '{max_updated_at}' AND updated_at <= '{today}'
    ) AS {table_name}
    """
    df = (spark.read
        .format("jdbc")
        .option("url", "jdbc:postgresql://postgres:5432/oltp")
        .option("dbtable", query)
        .option("user", "postgres")
        .option("password", "postgres")
        .option("driver", "org.postgresql.Driver")
        .option("partitionColumn", "id")
        .option("lowerBound", lower)
        .option("upperBound", upper)
        .option("numPartitions", "8")
        .option("partitionColumn", "updated_at")
        .option("lowerBound", lower)
        .option("upperBound", upper)
        .option("numPartitions", "8")
        .option("fetchsize", "10000")
        .load()
    )
    return df

def ingest(
    table_name: str, 
    df: DataFrame,
    ts: date
) -> bool:
    # Thêm các cột cần thiết để theo dõi
    batch_id = str(uuid.uuid4())
    df_bronze = (df
      .withColumn("_ingested_at", F.lit(ts))
      .withColumn("_batch_id", F.lit(batch_id))
      .withColumn("_is_deleted", F.lit(False))
    )
    
    # Ghi vào Iceberg
    df_bronze.writeTo(f"nessie.bronze.{table_name}").append()
    return True

In [11]:
category = read(table_name="category", today=today)
customer = read(table_name="customer", today=today)
customer_location = read(table_name="customer_location", today=today)
customer_phone = read(table_name="customer_phone", today=today)
location = read(table_name="location", today=today)
phone_number = read(table_name="phone_number", today=today)
product_category = read(table_name="product_category", today=today)
review = read(table_name="review", today=today)
product = read(table_name="shadow_product", today=today)

                                                                                

max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2009-12-13
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01


                                                                                

max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


                                                                                

lower: 2009-12-13, upper: 2012-01-01
max_updated_at: 1999-01-01


[Stage 35:>                                                         (0 + 1) / 1]

lower: 2009-12-13, upper: 2012-01-01


                                                                                

In [None]:
# from pyspark.sql.types import (
#     StructType, StructField,
#     LongType, StringType, DateType, TimestampType, BooleanType
# )

# # Định nghĩa schema giống hệt bảng category
# category_schema = StructType([
#     StructField("id", LongType(), True),
#     StructField("category_name", StringType(), True),
#     StructField("created_at", DateType(), True),
#     StructField("updated_at", DateType(), True)
# ])

# # Tạo DataFrame rỗng với schema trên
# category = spark.createDataFrame([], schema=category_schema)

# # Kiểm tra
# category.printSchema()
# category.show()

In [None]:
ingest(table_name="category", df=category, ts=ts)
ingest(table_name="customer", df=customer, ts=ts)
ingest(table_name="customer_location", df=customer_location, ts=ts)
ingest(table_name="location", df=location, ts=ts)
ingest(table_name="customer_phone", df=customer_phone, ts=ts)
ingest(table_name="product_category", df=product_category, ts=ts)
ingest(table_name="review", df=review, ts=ts)
ingest(table_name="shadow_product", df=product, ts=ts)

In [None]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql(f"MERGE BRANCH {bronze_layer_branch_name} INTO {etl_processing_branch_name} IN nessie")
spark.sql(f"DROP BRANCH {bronze_layer_branch_name} IN nessie")

## Silver Layer

In [None]:
silver_layer_branch_name = f"feat/silver-layer-{ts.strftime('%Y-%m-%d-%H-%M-%S')}"
spark.sql(f"""
    CREATE BRANCH
    IF NOT EXISTS {silver_layer_branch_name}
    IN nessie
    FROM {etl_processing_branch_name};
""")

spark.sql(f"USE REFERENCE {silver_layer_branch_name} IN nessie;")
spark.sql("LIST REFERENCES IN nessie").show(truncate=False)

### Schema evolution

In [None]:
print("Iceberg có chức năng merge schema cho việc evolution rồi. Nếu đổi tên cột thì mới không được thôi")

### Schema enforcement

#### Schema Validation

In [None]:
print("Các này thì tuỳ trường hợp, cái này đã thực hiện ở Bronze Layer rồi")

In [None]:
# Đổi tên cột, chỉ select những cột cần thiết
customer = customer.drop("login_username", "login_password", "updated_at")
customer = customer.withColumnRenamed("created_at", "signup_date").withColumnRenamed("id", "customer_id")

location = location.drop("created_at", "updated_at")
location = location.withColumnRenamed("id", "location_id")

customer_location = customer_location.withColumnRenamed("created_at", "source_created_at").withColumnRenamed("updated_at", "source_updated_at").withColumnRenamed("id", "customer_location_id")

product = product.drop("id")
product = product.withColumnRenamed("created_at", "source_created_at").withColumnRenamed("updated_at", "source_updated_at")

review = review.drop("created_at")
review = review.withColumnRenamed("updated_at", "modified_date").withColumnRenamed("id", "review_id")

category = category.drop("created_at", "updated_at")
category = category.withColumnRenamed("id", "category_id")

product_category = product_category.drop("created_at", "updated_at")
product_category = product_category.withColumnRenamed("id", "product_category_id")

#### Data Type Validation

In [None]:
print("Các này thì tuỳ trường hợp, cái này đã thực hiện ở Bronze Layer rồi")

### Handling of null and missing values

#### customer

In [None]:
customer = customer.fillna(
    {
        "name": "Unknown",
        "sex": "Other",
        "mail": "unknown@gmail.com",
        "birthdate": "1900-01-01"
    }
)

In [None]:
print(f"Num of records before drop na: {customer.count()}")
customer = customer.na.drop(subset=["customer_id"], how="any")
print(f"Num of records after drop na: {customer.count()}")

#### location

In [None]:
print(f"Num of records before drop na: {location.count()}")
location = location.na.drop()
print(f"Num of records after drop na: {location.count()}")

#### customer_location

In [None]:
print(f"Num of records before drop na: {customer_location.count()}")
customer_location = customer_location.na.drop()
print(f"Num of records after drop na: {customer_location.count()}")

#### product

In [None]:
print(f"Num of records before drop na: {product.count()}")
product = product.na.drop()
print(f"Num of records after drop na: {product.count()}")

#### category

In [None]:
print(f"Num of records before drop na: {category.count()}")
category = category.na.drop()
print(f"Num of records after drop na: {category.count()}")

#### review

In [None]:
# Fill null các giá trị không ảnh hưởng đến báo cáo
review = review.fillna(
    {
        "helpful_votes": 0,
        "total_votes": 0,
        "marketplace": "Unknown",
        "verified_purchase": "N",
        "review_headline": "",
        "review_body": ""
    }
)

In [None]:
print(f"Num of records before drop na: {review.count()}")
review = review.na.drop(subset=["review_id", "customer_id", "product_id", "star_rating"], how="any")
print(f"Num of records after drop na: {review.count()}")

#### product_category

In [None]:
print(f"Num of records before drop na: {product_category.count()}")
product_category = product_category.na.drop()
print(f"Num of records after drop na: {product_category.count()}")

### Deduplicate

#### customer

In [None]:
print(f"Num of records before deduplicate: {customer.count()}")
customer = customer.drop_duplicates()
print(f"Num of records after deduplicate: {customer.count()}")

#### location

In [None]:
print(f"Num of records before deduplicate: {location.count()}")
location = location.drop_duplicates()
print(f"Num of records after deduplicate: {location.count()}")

#### customer_location

In [None]:
print(f"Num of records before deduplicate: {customer_location.count()}")
customer_location = customer_location.drop_duplicates()
print(f"Num of records after deduplicate: {customer_location.count()}")

#### product

In [None]:
print(f"Num of records before deduplicate: {product.count()}")
product = product.drop_duplicates()
print(f"Num of records after deduplicate: {product.count()}")

#### category

In [None]:
print(f"Num of records before deduplicate: {category.count()}")
category = category.drop_duplicates()
print(f"Num of records after deduplicate: {category.count()}")

#### review

In [None]:
print(f"Num of records before deduplicate: {review.count()}")
review = review.drop_duplicates()
print(f"Num of records after deduplicate: {review.count()}")

#### product_category

In [None]:
print(f"Num of records before deduplicate: {product_category.count()}")
product_category = product_category.drop_duplicates()
print(f"Num of records after deduplicate: {product_category.count()}")

### Resolution of out-of-order and late-arriving data issues

In [None]:
# Chưa tìm hiểu kỹ

### Data quality checks and enforcement

#### Sửa invalid values

#### Data Standardization

##### Loại bỏ khoảng trắng không cần thiết.

In [None]:
from pyspark.sql.functions import trim

def clean_whitespace(df):
    for field in df.schema.fields:
        if field.dataType.typeName() == "string":
            df = df.withColumn(field.name, trim(df[field.name]))
            # print(f"Đã xử lý cột {field.name}")
    return df

In [None]:
customer = clean_whitespace(df=customer)
location = clean_whitespace(df=location)
customer_location = clean_whitespace(df=customer_location)
product = clean_whitespace(df=product)
category = clean_whitespace(df=category)
review = clean_whitespace(df=review)
product_category = clean_whitespace(df=product_category)
category = clean_whitespace(df=category)

##### Mapping

###### customer

In [None]:
# Chuẩn hoá cột "sex" cho trường hợp tổng hợp từ nhiều nguồn.
customer = customer.withColumn("sex", lower(col("sex")))
customer = customer.withColumn(
    "sex",
    when(col("sex").isin("1", "m", "male"), "1")
    .when(col("sex").isin("2", "f", "female"), "2")
    .when(col("sex").isin("3", "o", "other"), "3")
    .otherwise("3")
)

###### category

In [None]:
valid_category_name = [
    'Personal_Care_Appliances',
    'Toys',
    'Beauty',
    'Video Games',
    'Digital_Ebook_Purchase',
    'Watches',
    'Pet Products',
    'Grocery',
    'Other',
    'Mobile_Apps',
    'Office Products',
    'Camera',
    'Wireless',
    'Apparel',
    'Automotive',
    'Outdoors',
    'Major Appliances',
    'Furniture',
    'Tools',
    'Books',
    'Musical Instruments',
    'Baby',
    'Health & Personal Care',
    'Sports',
    'Electronics',
    'Mobile_Electronics',
    'Shoes'
]

category = category.withColumn(
    colName="category_name",
    col=when(
            condition=col("category_name").isin(valid_category_name), 
            value=col("category_name")
        ).otherwise(value="Other")
)

###### review

In [None]:
# Chuẩn hoá cột "star_rating" cho trường hợp tổng hợp từ nhiều nguồn.

review = review.withColumn("star_rating", lower(col("star_rating")))
review = review.withColumn(
    "star_rating",
    when(col("star_rating").isin("one", "1", "1 star", "1*", "*"), "1")
    .when(col("star_rating").isin("two", "2", "2 stars", "2*", "**"), "2")
    .when(col("star_rating").isin("three", "3", "3 stars", "3*", "***"), "3")
    .when(col("star_rating").isin("four", "4", "4 stars", "4*", "****"), "4")
    .when(col("star_rating").isin("five", "5", "5 stars", "5*", "*****"), "5")
    .otherwise("9")
)

In [None]:
# Chuẩn hoá cột "verified_purchase" cho trường hợp tổng hợp từ nhiều nguồn.

review = review.withColumn("verified_purchase", lower(col("verified_purchase")))
review = review.withColumn(
    "verified_purchase",
    when(col("verified_purchase").isin("yes", "y", "true", "t", "1", "verified", "purchased", "confirmed"), "True")
    .when(col("verified_purchase").isin("no", "n", "false", "f", "0", "not verified", "unverified", "not purchased"), "False")
    .otherwise("False")
)

##### Filtering

###### review

In [None]:
# Loại bỏ những giá trị không xác định của cột star_rating
valid_star_rating = ["1", "2", "3", "4", "5"]
review = review.filter(col("star_rating").isin(valid_star_rating))

#### Type Casting

##### review

In [None]:
# Đổi kiểu dữ liệu "star_rating" thành int
review = review.withColumn("star_rating", col("star_rating").cast("int"))

In [None]:
# Đổi giá trị Y/N của cột "verified_purchase" thành boolean
review = review.withColumn("verified_purchase", col("verified_purchase").cast("boolean"))

##### location

In [None]:
# Đối datatype cột zipcode int->str. Dùng lpad cho thành 6 số.
location = location.withColumn("zipcode", lpad(col("zipcode").cast("string"), 6, "0"))

### Merge branch

In [None]:
spark.sql("TRUNCATE TABLE nessie.silver.customer")
customer.writeTo("nessie.silver.customer").append()

spark.sql("TRUNCATE TABLE nessie.silver.location")
location.writeTo("nessie.silver.location").append()

spark.sql("TRUNCATE TABLE nessie.silver.customer_location")
customer_location.writeTo("nessie.silver.customer_location").append()
          
spark.sql("TRUNCATE TABLE nessie.silver.product")
product.writeTo("nessie.silver.product").append()
          
spark.sql("TRUNCATE TABLE nessie.silver.category")
category.writeTo("nessie.silver.category").append()
          
spark.sql("TRUNCATE TABLE nessie.silver.review")
review.writeTo("nessie.silver.review").append()
          
spark.sql("TRUNCATE TABLE nessie.silver.product_category")
product_category.writeTo("nessie.silver.product_category").append()

In [None]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql(f"MERGE BRANCH {silver_layer_branch_name} INTO {etl_processing_branch_name} IN nessie")
spark.sql(f"DROP BRANCH {silver_layer_branch_name} IN nessie")

## Gold Layer

In [None]:
gold_layer_branch_name = f"feat/gold-layer-{ts.strftime('%Y-%m-%d-%H-%M-%S')}"
spark.sql(f"""
    CREATE BRANCH
    IF NOT EXISTS {gold_layer_branch_name}
    IN nessie
    FROM {etl_processing_branch_name};
""")

spark.sql(f"USE REFERENCE {gold_layer_branch_name} IN nessie;")
spark.sql("LIST REFERENCES IN nessie").show(truncate=False)

### Slowly Changing Dimension (SCD)

In [None]:
def generate_surrogate_key(
    source: DataFrame,
    target: DataFrame,
    sk_name: str,
    sort_cols: List[str]
) -> DataFrame:
    # Get max surrogate key in source table
    max_sk = target.select(max(sk_name)).collect()[0][0]
    max_sk = max_sk if max_sk is not None else 0

    # Generate sk in silver table
    source = source \
        .withColumn(sk_name, row_number().over(Window().orderBy(*sort_cols)) + max_sk) \
        .withColumn(sk_name, col(sk_name).cast("long"))
    
    return source

In [None]:
def switch_bk_sk(
    bridge_table: DataFrame,
    dim_table: DataFrame,
    bk_name: str,
    sk_name: str
) -> DataFrame:
    bridge_table = bridge_table.join(
        other=dim_table.select(bk_name, sk_name),
        on=bk_name,
        how="inner"
    )
    bridge_table = bridge_table.drop(bk_name)
    return bridge_table

In [None]:
# Ở trường hợp SCD1 (nghĩa là dữ liệu mới sẽ ghi đè lên dữ liệu cũ) thì ở OLTP cũng ghi đè thôi chứ không có bật CDC lên
# -> Do vậy thì mỗi lần incremental load dựa trên cột updated_at thì mỗi dim_ID sẽ chỉ xuất hiện 1 lần
# -> Xài MERGE INTO vô tư mà không cần phải xử lý deduplicate
# Còn nếu SCD1 nhưng ở OLTP lỡ bật CDC thì chỉ cần xử lý deduplicate bằng cách chỉ lấy dim_ID có updated_at lớn nhất
# Lưu ý: schema của Dim này chỉ hơn cái schema ở silver mỗi cái key thôi
def SCD1(
    source: DataFrame,
    source_name: str,
    target: DataFrame,
    target_name: str,
    sk_name: str,
    bk_name: str
):
    # Generate surrogate key base on max(surrogate_key) in source table
    source = generate_surrogate_key(source=source, target=target, sk_name=sk_name, sort_cols=[bk_name])

    # Tiến hành SCD1, match thì overwrite, miss match thì insert
    merge_expr = textwrap.dedent(f"""
        MERGE INTO nessie.gold.{target_name} g
        USING {source_name} s
        ON s.{bk_name} = g.{bk_name}
        WHEN MATCHED THEN 
            UPDATE SET {", ".join([f"g.{column} = s.{column}" for column in target.columns if column != sk_name])}
        WHEN NOT MATCHED THEN 
            INSERT ({", ".join(sorted(target.columns))})
            VALUES ({", ".join(f"s.{column}" for column in sorted(target.columns))})
    """)
    print(merge_expr)
    
    # Exec expr
    source.createOrReplaceTempView(source_name)
    spark.sql(merge_expr)
    spark.catalog.dropTempView(source_name)
    
    return True

In [None]:
def scd2(
    source: DataFrame,
    source_name: str,
    target: DataFrame,
    target_name: str,
    bk_name: str,
    sk_name: str,
) -> bool:
    # Generate surrogate key base on max(surrogate_key) in source table
    source = generate_surrogate_key(source=source, target=target, sk_name=sk_name, sort_cols=[bk_name])

    # Tạo df chỉ chứa phiên bản cũ nhất của ID trong source table và Cập nhật bản mới nhất ở dim thành bản cũ
    w = Window.partitionBy(bk_name).orderBy(F.col("source_updated_at").asc())
    earliest_record_in_source = source.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")
    earliest_record_in_source.createOrReplaceTempView(source_name)

    expr = textwrap.dedent(f"""
        MERGE INTO nessie.gold.{target_name} g
        USING {source_name} s
        ON s.{bk_name} = g.{bk_name}
        WHEN MATCHED AND g.is_current = True THEN 
            UPDATE SET g.is_current = False, g.valid_to = s.source_updated_at
    """)
    df = spark.sql(expr)
    spark.catalog.dropTempView(source_name)

    # Insert toàn bộ source vào target
    w = Window.partitionBy(bk_name).orderBy(F.col("source_updated_at").desc())
    source = source \
                    .withColumn("valid_from", col("source_updated_at")) \
                    .withColumn("valid_to", F.lag("valid_from").over(w)) \
                    .drop("source_created_at", "source_updated_at") \
                    .withColumn("is_current", when(col("valid_to").isNull(), True).otherwise(False))
    source.writeTo(f"nessie.gold.{target_name}").append()
    return True

In [None]:
from datetime import datetime

def generate_dimDate(
    start_date: datetime,
    end_date: datetime
):
    start_date = start_date.strftime("%Y-%m-%d")
    end_date = end_date.strftime("%Y-%m-%d")
    
    # Sinh cột ngày liên tục
    df_date = (
        spark.sql(f"""
            SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) as full_date
        """)
        .withColumn("date_key", F.date_format("full_date", "yyyyMMdd").cast("long"))
        .withColumn("day", F.dayofmonth("full_date"))
        .withColumn("month", F.month("full_date"))
        .withColumn("month_name", F.date_format("full_date", "MMMM"))
        .withColumn("quarter", F.quarter("full_date"))
        .withColumn("quarter_name", F.concat(F.lit("Q"), F.quarter("full_date")))
        .withColumn("year", F.year("full_date"))
        .withColumn("day_of_week", ((F.dayofweek("full_date")+5)%7)+1)
        .withColumn("day_name", F.date_format("full_date", "EEEE"))
        .withColumn("week_of_year", F.weekofyear("full_date"))
        .withColumn("is_weekend", F.col("day_of_week").isin(1,7))
        .withColumn("is_holiday", F.lit(False))
    )

    df_date.writeTo(f"nessie.gold.dim_date").append()

In [None]:
def fact(
    source: DataFrame,
    target: DataFrame,
    source_name: str,
    target_name: str,
    bk_name: str,
    sk_name: str,
    has_cur_col: bool
):
    # Generate surrogate key base on max(surrogate_key) in source table
    source = generate_surrogate_key(source=source, target=target, sk_name=sk_name, sort_cols=[bk_name])

    # Chỉnh is_current = False nếu có
    source.createOrReplaceTempView(source_name)
    expr = textwrap.dedent(f"""
        MERGE INTO nessie.gold.{target_name} g
        USING {source_name} s
        ON s.{bk_name} = g.{bk_name}
        WHEN MATCHED AND g.is_current = True AND {has_cur_col} = True THEN 
            UPDATE SET g.is_current = False
    """)
    spark.sql(expr)
    spark.catalog.dropTempView(source_name)
    print(expr)
    
    ## Append
    if "is_current" in target.columns:
        source = source.withColumn("is_current", F.lit(True))
    source.writeTo(f"nessie.gold.{target_name}").append()
    
    return True

In [None]:
# Dim_category
category = spark.table("nessie.silver.category")
dim_category = spark.table("nessie.gold.dim_category")
SCD1(
    source=category,
    target=dim_category,
    source_name="category",
    target_name="dim_category",
    bk_name="category_id",
    sk_name="category_key"
) 

# Dim_customer
customer = spark.table("nessie.silver.customer")
dim_customer = spark.table("nessie.gold.dim_customer")
SCD1(
    source=customer,
    target=dim_customer,
    source_name="customer",
    target_name="dim_customer",
    bk_name="customer_id",
    sk_name="customer_key"
)

# Dim_location
location = spark.table("nessie.silver.location")
dim_location = spark.table("nessie.gold.dim_location")
SCD1(
    source=location,
    target=dim_location,
    source_name="location",
    target_name="dim_location",
    bk_name="location_id",
    sk_name="location_key"
)

# Dim_product
product = spark.table("nessie.silver.product")
dim_product = spark.table("nessie.gold.dim_product")
scd2(
    source=product,
    source_name="product",
    target=dim_product,
    target_name="dim_product",
    bk_name="product_id",
    sk_name="product_key"
)

# Bridge_product_category
product_category = spark.table("nessie.silver.product_category")
dim_category = spark.table("nessie.gold.dim_category")
dim_product = spark.table("nessie.gold.dim_product").filter(col("is_current")==True)
bridge_product_category = spark.table("nessie.gold.bridge_product_category")

product_category = switch_bk_sk(
    bridge_table=product_category, 
    dim_table=dim_category, 
    bk_name="category_id", 
    sk_name="category_key"
)
product_category = switch_bk_sk(
    bridge_table=product_category, 
    dim_table=dim_product, 
    bk_name="product_id", 
    sk_name="product_key"
)
SCD1(
    source=product_category,
    target=bridge_product_category,
    source_name="product_category",
    target_name="bridge_product_category",
    bk_name="product_category_id",
    sk_name="product_category_key"
)

# Bridge_customer_location
customer_location = spark.table("nessie.silver.customer_location")
dim_customer = spark.table("nessie.gold.dim_customer")
dim_location = spark.table("nessie.gold.dim_location")
bridge_customer_location = spark.table("nessie.gold.bridge_customer_location")
customer_location = switch_bk_sk(
    bridge_table=customer_location, 
    dim_table=dim_customer, 
    bk_name="customer_id", 
    sk_name="customer_key"
)

customer_location = switch_bk_sk(
    bridge_table=customer_location, 
    dim_table=dim_location, 
    bk_name="location_id", 
    sk_name="location_key"
)

scd2(
    source=customer_location,
    target=bridge_customer_location,
    source_name="customer_location",
    target_name="bridge_customer_location",
    bk_name="customer_location_id",
    sk_name="customer_location_key"
)

In [None]:
# Dim_date
generate_dimDate(
    start_date=datetime(2000, 1, 1),
    end_date=datetime(2015, 1, 1)
)

In [None]:
# Fact_review
review = spark.table("nessie.silver.review")
review = review.withColumnRenamed("modified_date", "full_date")

dim_customer = spark.table("nessie.gold.dim_customer")
dim_product = spark.table("nessie.gold.dim_product").filter(col("is_current")==True)
dim_date = spark.table("nessie.gold.dim_date")
fact_review = spark.table("nessie.gold.fact_review")

review = switch_bk_sk(
    bridge_table=review, 
    dim_table=dim_customer, 
    bk_name="customer_id", 
    sk_name="customer_key"
)

review = switch_bk_sk(
    bridge_table=review, 
    dim_table=dim_product, 
    bk_name="product_id", 
    sk_name="product_key"
)

review = switch_bk_sk(
    bridge_table=review, 
    dim_table=dim_date, 
    bk_name="full_date", 
    sk_name="date_key"
)

fact(
    source=review,
    target=fact_review,
    source_name="review",
    target_name="fact_review",
    bk_name="review_id",
    sk_name="review_key",
    has_cur_col=True
)

In [None]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql(f"MERGE BRANCH {gold_layer_branch_name} INTO {etl_processing_branch_name} IN nessie")
spark.sql(f"DROP BRANCH {gold_layer_branch_name} IN nessie")

In [None]:
spark.sql("USE REFERENCE main IN nessie")
spark.sql(f"MERGE BRANCH {etl_processing_branch_name} INTO main IN nessie")
spark.sql(f"DROP BRANCH {etl_processing_branch_name} IN nessie")

In [None]:
# dim_category = spark.table("nessie.gold.dim_category")
# bridge_product_category = spark.table("nessie.gold.bridge_product_category")
# dim_product = spark.table("nessie.gold.dim_product")
# fact_review = spark.table("nessie.gold.fact_review")
# dim_customer = spark.table("nessie.gold.dim_customer")
# bridge_customer_location = spark.table("nessie.gold.bridge_customer_location")
# dim_location = spark.table("nessie.gold.dim_location")

In [None]:
spark.stop()

### Data Enrichment & Derived Columns

#### customer

#### location

#### customer_location

#### product

#### category

#### review

#### product_category