## Setup Data Loading Job
---

In [1]:
import logging as log
import hashlib

from pyspark.sql import SparkSession, functions as F, DataFrame
from datetime import datetime, UTC
from delta.pip_utils import configure_spark_with_delta_pip
from delta.tables import DeltaTable

# Define Job Parameters
# ----------------------------------------
DAG_ID = "Load CRM Data"
JOB_DATE = f"{datetime.now(UTC):%Y-%m-%d}"
JOB_NAME = f"{DAG_ID} {JOB_DATE}"
DELTA_LAKE = "/s3-datalake/lakehouse"

source_customer_data = "/s3-datalake/source/customer_data.parquet"
source_sale_data = "/s3-datalake/source/sales_data.csv"

# Setup Logging
# ----------------------------------------
log.basicConfig(level=log.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

In [2]:
# 2. Create SparkSession with Delta Config
# ----------------------------------------

builder = (
    SparkSession.builder.appName(JOB_NAME)
    .master("spark://spark:7077")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config(
        "spark.sql.catalog.spark_catalog",
        "org.apache.spark.sql.delta.catalog.DeltaCatalog",
    )
    .config("spark.sql.warehouse.dir", DELTA_LAKE)
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()
log.info(f"Started session `{JOB_NAME}`")

:: loading settings :: url = jar:file:/prj/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2.5.2/cache
The jars for the packages stored in: /root/.ivy2.5.2/jars
io.delta#delta-spark_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a26627e9-80fd-4ac3-8807-505abdce7f49;1.0
	confs: [default]
	found io.delta#delta-spark_2.13;4.0.0 in central
	found io.delta#delta-storage;4.0.0 in central
	found org.antlr#antlr4-runtime;4.13.1 in central
:: resolution report :: resolve 126ms :: artifacts dl 5ms
	:: modules in use:
	io.delta#delta-spark_2.13;4.0.0 from central in [default]
	io.delta#delta-storage;4.0.0 from central in [default]
	org.antlr#antlr4-runtime;4.13.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number|

In [3]:
## Setup initial SCHEMAs if required
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS bronze LOCATION '{DELTA_LAKE}/bronze'"
).collect()
spark.sql(
    f"CREATE SCHEMA IF NOT EXISTS silver LOCATION '{DELTA_LAKE}/silver'"
).collect()
spark.sql(f"CREATE SCHEMA IF NOT EXISTS gold LOCATION '{DELTA_LAKE}/gold'").collect()

[]

# Bronze Layer
---
This layer serves as the **raw ingestion zone**, where data is landed from external systems into our Lakehouse with **minimal transformation**. It preserves the original fidelity of the data, making it ideal for auditability and replay.

My typical practice in the Bronze layer includes the following steps:

1. **Ingest data from external sources**, such as:
   - Application databases (full or incremental extracts)
   - Change Data Capture (CDC) systems
   - Messaging systems (e.g., Kafka, Kinesis, Pub/Sub)
   - External APIs, SFTP, blob/cloud storage, etc.

2. **Apply incremental loading logic** where applicable  
   - For example, using `kafka_timestamp` or `offset` from Kafka topics

3. **Standardize column names** to align with internal naming conventions used in the Lakehouse or Warehouse

4. **Add system-level metadata** for observability and debugging:
   - Fields like `_system`, `_source`, `_ingestion_date`, etc.
   - These are useful for tracking data lineage and pipeline behavior

In [4]:
# Helper Functions
# -------------------------------


def _add_metadata(df: DataFrame, meta: dict) -> DataFrame:
    """
    Add static metadata columns to a Spark DataFrame.

    Parameters:
        df (DataFrame): The input Spark DataFrame.
        meta (dict): A dictionary of column names and their corresponding literal values.

    Returns:
        DataFrame: DataFrame with added metadata columns.
    """
    meta_columns = {col: F.lit(val) for col, val in meta.items()}
    df_with_meta = df.withColumns(meta_columns).withColumn(
        "_ingestion_timestamp", F.current_timestamp()
    )

    return df_with_meta


def load_data(
    data: DataFrame, table: str, metadata: dict, schema: str = "bronze"
) -> None:
    """
    Load a DataFrame into a Delta Lake bronze table, partitioned by _ingestion_date.
    Creates the table if it does not exist; otherwise, overwrites the partition for the current ingestion date.

    Parameters:
        data (DataFrame): The source Spark DataFrame to load.
        table (str): The target table name (used in the path and metastore).
        metadata (dict): Static metadata to add to the DataFrame before loading.
    """
    full_table_name = f"{schema}.{table.lower()}"

    # Add metadata
    data_with_metadata = _add_metadata(data, metadata)

    # Create table if it doesn't exist
    if not spark.catalog.tableExists(full_table_name):
        log.info(f"Creating managed table {full_table_name} in schema {schema}")

        data_with_metadata.write.format("delta").partitionBy("_job_date").mode(
            "overwrite"
        ).saveAsTable(full_table_name)

        spark.sql(f"""
            CREATE TABLE IF NOT EXISTS bronze.{table}
            USING DELTA
        """)
    else:
        log.info(
            f"Overwriting existing Delta partition for {JOB_DATE} in table {full_table_name}"
        )

        spark.sql(f"""
            DELETE FROM {full_table_name}
            WHERE _job_date = '{JOB_DATE}'
        """)

        data_with_metadata.write.format("delta").mode("append").partitionBy(
            "_job_date"
        ).saveAsTable(full_table_name)


# Job: Load customer_data to bronze layer
# ---------------------------------------------

# Metadata for ingestion
metadata_customer = {
    "_system": "CRM",
    "_source": source_customer_data,
    "_type": "Parquet",
    "_job_date": JOB_DATE,
}

# Read source data
customer_df = spark.read.parquet(source_customer_data)

# Load to Delta Lake
load_data(data=customer_df, table="customer", metadata=metadata_customer)

# Job: Load sale_data to bronze layer
# ---------------------------------------------

# Metadata for ingestion
metadata_sale = {
    "_system": "CRM",
    "_source": source_sale_data,
    "_type": "CSV",
    "_job_date": JOB_DATE,
}

# Read source data with header, and assuming all STRING data
sale_df = (
    spark.read.option("header", True).option("inferSchema", False).csv(source_sale_data)
)

# Load to Delta Lake
load_data(data=sale_df, table="sale", metadata=metadata_sale)

2025-07-21 04:50:14,568 - INFO - Creating managed table bronze.customer in schema bronze
2025-07-21 04:50:21,878 - INFO - Creating managed table bronze.sale in schema bronze
                                                                                

# Silver Layer
---
In this layer, we transform raw Bronze data into **cleaned, structured, and enriched datasets**, ready for modeling and analytics.

My typical actions in the Silver layer include:

1. Load data incrementally from the Bronze layer using `_job_date`
   - This approach simplifies handling late-arriving data, as late `event_timestamps` are still processed
   - More complex watermark or merge strategies can be added if needed

2. Clean and deduplicate records based on business rules

3. Cast fields to expected data types (e.g., `timestamp`, `int`, `boolean`) per Lakehouse standards

4. Generate surrogate keys for primary entities where applicable

5. Add or enrich metadata columns (e.g., `_processed_at`, flags)

6. Validate schema and content to ensure Silver layer standards are met


In [5]:
# Job: Query Definitions for Silver layer
# ---------------------------------------------

sql_silver_customer = """
        SELECT
            * EXCEPT(gender, payment_method, age),
            COALESCE(payment_method, 'unknown') AS payment_method,
            CASE
                WHEN LOWER(gender) IN ('male', 'female')
                    THEN lower(gender)
                ELSE 'unknown'
            END AS gender,
            TRY_TO_NUMBER(age, "999") AS age
        FROM bronze.customer c
        WHERE customer_id IS NOT NULL
        """


invoice_date_format = "dd-MM-yyyy"
sql_silver_sale = f"""
        WITH valid_sales AS (
            SELECT *,
                RANK() OVER (
                    PARTITION BY invoice_no, customer_id
                    ORDER BY TO_DATE(invoice_date, '{invoice_date_format}')
                ) AS rank
            FROM bronze.sale
            WHERE invoice_no IS NOT NULL
              AND customer_id IS NOT NULL
        )
        SELECT * EXCEPT(invoice_date, shopping_mall, price, quantity),
            TO_DATE(invoice_date, '{invoice_date_format}') AS invoice_date,
            LOWER(shopping_mall) AS shopping_mall,
            CAST(price AS DOUBLE) AS price,
            CAST(quantity AS INT) AS quantity
        FROM valid_sales
        WHERE rank = 1
          AND CAST(price AS DOUBLE) > 0
          AND CAST(quantity AS INT) > 0
          """

In [6]:
# Config Section
# -------------------------------
DEFAULT_LOOKBACK_DAYS = 1

silver_configs = {
    "customer": {
        "bronze_table": "bronze.customer",
        "load_type": "full",  # can also be "scd"
        "sql": sql_silver_customer,
        "surrogate_key_columns": ["customer_id"],
    },
    "sale": {
        "bronze_table": "bronze.sale",
        "load_type": "incremental",
        "incremental_column": "invoice_date",
        "loopback": 1,
        "sql": sql_silver_sale,
        "surrogate_key_columns": ["invoice_no", "customer_id"],
    },
}

# Helper Functions
# -------------------------------


def _add_surrogate_key(
    df: DataFrame, columns: list, output_col: str = "surrogate_key"
) -> DataFrame:
    concat_expr = F.concat_ws("||", *[F.col(col).cast("string") for col in columns])
    return df.withColumn(output_col, F.sha1(concat_expr))


def _add_silver_metadata(df: DataFrame, key_columns: list) -> DataFrame:
    return (
        _add_surrogate_key(df, key_columns)
        .withColumn("_updated_at", F.current_timestamp())
        .withColumn("_created_at", F.current_timestamp())
    )


def merge_into_silver(
    new_data: DataFrame,
    table_name: str,
    key_column: str = "surrogate_key",
    partition_column: str = None,
) -> None:
    if spark.catalog.tableExists(table_name):
        new_data.createOrReplaceTempView("incoming")

        set_expr = ",\n".join(
            [
                f"{col} = s.{col}"
                for col in new_data.columns
                if col not in ["_created_at"]
            ]
        )

        spark.sql(f"""
            MERGE INTO {table_name} AS t
            USING incoming AS s
            ON t.{key_column} = s.{key_column}
            WHEN MATCHED THEN
                UPDATE SET {set_expr}
            WHEN NOT MATCHED THEN
                INSERT *
        """)
    else:
        writer = new_data.write.format("delta").mode("overwrite")
        if partition_column and partition_column in new_data.columns:
            writer = writer.partitionBy(partition_column)
        writer.saveAsTable(table_name)


# Main Processing Function
# -------------------------------


def process_table(table_name: str, config: dict) -> None:
    bronze_table = config["bronze_table"]
    sql_logic = config["sql"]
    sk_cols = config["surrogate_key_columns"]
    load_type = config.get("load_type", "full")
    incr_col = config.get("incremental_column")
    loopback = config.get("loopback", DEFAULT_LOOKBACK_DAYS)

    full_table_name = f"silver.{table_name}"

    # Custom transformation logic
    log.info(f"Populating {full_table_name} from {bronze_table}")
    transformed_df = spark.sql(sql_logic)

    table_exists = spark.catalog.tableExists(full_table_name)

    # Only apply incremental filter if the Silver table exists
    if load_type == "incremental" and incr_col and table_exists:
        transformed_df = transformed_df.filter(
            F.col(incr_col) >= F.date_sub(F.current_date(), loopback)
        )
        log.info(
            f"Applying incremental filter: {incr_col} >= current_date - {loopback} days"
        )
    elif load_type == "incremental":
        log.info("Destination table doesn't exist yet, running initial increment")

    # Add metadata
    tagged_df = _add_silver_metadata(transformed_df, sk_cols)

    # Merge
    partition_col = incr_col if load_type == "incremental" else None
    merge_into_silver(
        tagged_df,
        full_table_name,
        key_column="surrogate_key",
        partition_column=partition_col,
    )


# Run All Tables
# -------------------------------

for table_name, config in silver_configs.items():
    process_table(table_name, config)

2025-07-21 04:50:26,402 - INFO - Populating silver.customer from bronze.customer
25/07/21 04:50:27 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
2025-07-21 04:50:37,833 - INFO - Populating silver.sale from bronze.sale        
2025-07-21 04:50:38,309 - INFO - Destination table doesn't exist yet, running initial increment
                                                                                

In [7]:
spark.sql("show tables in silver").show()
spark.sql("select * from silver.sale").show(n=3)

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   silver| customer|      false|
|   silver|     sale|      false|
+---------+---------+-----------+



                                                                                

+----------+-----------+----------+-------+--------------------+-----+----------+--------------------+----+------------+----------------+------+--------+--------------------+--------------------+--------------------+
|invoice_no|customer_id|  category|_system|             _source|_type| _job_date|_ingestion_timestamp|rank|invoice_date|   shopping_mall| price|quantity|       surrogate_key|         _updated_at|         _created_at|
+----------+-----------+----------+-------+--------------------+-----+----------+--------------------+----+------------+----------------+------+--------+--------------------+--------------------+--------------------+
|   I109831|    C112571|  Clothing|    CRM|/s3-datalake/sour...|  CSV|2025-07-21|2025-07-21 04:50:...|   1|  2021-01-11|       metrocity|300.08|       1|efe6782159ce986d1...|2025-07-21 04:50:...|2025-07-21 04:50:...|
|   I118183|    C223931|Technology|    CRM|/s3-datalake/sour...|  CSV|2025-07-21|2025-07-21 04:50:...|   1|  2021-01-11|mall of ista

# Gold Layer
---

In this layer, we create clean, analytics-ready **dimension** and **fact** tables following the **Star Schema** pattern, optimized for consumption by BI tools and data analysts.

This is where raw and semi-structured data is transformed into a **semantic, consistent, and high-performance model** that enables:
- Self-serve exploration
- Performance benchmarking
- Time-series trend analysis
- Operational and strategic reporting

Some usual techniques include:

1. Applying slowly-changing dimension logic (type 1.5 or 2) for dimensional data  
   - Decision depends on the level of historical info required
2. Create FAT Table (or One Big Table) for very large event streams
   - An OBT or fat table is a fact table that is already poplulated with data from dimension tables
2. Include additional details from the Date dimension for better analysis of cyclical behaviour
3. Generate analytical columns like `age_band` or `is_weekend`
4. Include surrogate key from dimensions instead of the normal key
5. Support Incremental Loading for Large Fact Tables
   - Apply incremental filters to avoid full reloads  
   - Maintain partitions (e.g., by `invoice_date`) for efficient pruning of data in analytical queries
6. De-duplication and Consistency Enforcement
   - Ensure data quality by enforcing unique constraints on surrogate keys  
   - Join facts only to the current version of each dimension row
7. Materialize Common Business Metrics
   - Derive fields like `gross_amount`, `avg_basket_size`, `customer_tenure`, etc.  
   - This reduces repetitive logic in dashboards or downstream aggregates

In [8]:
# Gold layer: Dimension queries
# ---------------------------------------------

sql_dim_customer = """
SELECT
    surrogate_key                      AS customer_sk,
    customer_id,
    gender,
    age,
    CASE                                -- derive age_band once
        WHEN age < 18  THEN 'under 18'
        WHEN age BETWEEN 18 AND 24          THEN '18‑24'
        WHEN age BETWEEN 25 AND 34          THEN '25‑34'
        WHEN age BETWEEN 35 AND 44          THEN '35‑44'
        WHEN age BETWEEN 45 AND 54          THEN '45‑54'
        WHEN age BETWEEN 55 AND 64          THEN '55‑64'
        WHEN age >= 65 THEN '65+'
        ELSE 'unknown'
    END                                   AS age_band,
    payment_method,
    current_date()                        AS _created_at,
    current_date()                        AS _updated_at,
    TRUE                                   AS _is_current
FROM silver.customer
"""

scd_dim_customer = """
MERGE INTO gold.dim_customer AS t
USING (
    SELECT *
    FROM {source}
) AS s
ON  t.{key}  = s.{key} AND t._is_current  = TRUE
WHEN MATCHED THEN UPDATE
    SET *
WHEN NOT MATCHED THEN
    INSERT *
WHEN NOT MATCHED BY SOURCE THEN
    UPDATE SET _is_current = FALSE
"""

In [9]:
# Gold layer: Fact table queries
# ---------------------------------------------

sql_fact_sale = """
SELECT
    s.surrogate_key                                 AS sale_sk,
    s.invoice_no,
    COALESCE(dc.customer_sk, 'unknown')             AS customer_sk,
    s.shopping_mall,
    s.invoice_date,
    s.category,
    s.quantity,
    s.price                                         AS unit_price,
    s.quantity * s.price                            AS gross_amount,
    CASE
        WHEN price < 50 THEN 'low'
        WHEN price BETWEEN 50 AND 199.99 THEN 'medium'
        WHEN price BETWEEN 200 AND 999.99 THEN 'high'
        WHEN price >= 1000 THEN 'premium'
        ELSE 'unknown'
    END                                             AS price_bucket,    
    dayofweek(s.invoice_date) IN (1,7)    -- Sun=1,Sat=7
                            AS is_weekend,
    dayofweek(s.invoice_date)                       AS week_day,
    dayofmonth(s.invoice_date)                      AS month_day,
    date_trunc('week',  s.invoice_date)             AS week_start_date,
    date_trunc('month', s.invoice_date)             AS month_start_date,
    concat(year(s.invoice_date), '-Q', quarter(s.invoice_date))
                                                   AS fiscal_quarter,
    current_date()                        AS _created_at,
    current_date()                        AS _updated_at
FROM silver.sale        s
LEFT JOIN gold.dim_customer   dc
       ON  dc.customer_id = s.customer_id
       AND dc._is_current = TRUE
"""

In [10]:
sql_report_sales_by_segment = """
SELECT
    SHA1(CONCAT_WS('|', LOWER(dc.gender), LOWER(dc.age_band))) AS segment_sk,
    dc.gender,
    dc.age_band,

    COUNT(DISTINCT fs.invoice_no)             AS total_invoices,
    SUM(fs.quantity)                          AS total_quantity,
    SUM(fs.gross_amount)                      AS total_spent,
    AVG(fs.gross_amount)                      AS avg_order_value,
    MAX(fs.invoice_date)                      AS last_purchase_date,
    MIN(fs.invoice_date)                      AS first_purchase_date,
    DATE_TRUNC('month', MAX(fs.invoice_date)) AS last_active_month,
    DATE_TRUNC('week', MAX(fs.invoice_date))  AS last_active_week,

    CURRENT_DATE()                            AS _created_at,
    CURRENT_DATE()                            AS _updated_at
FROM gold.fact_sale fs
LEFT JOIN gold.dim_customer dc
  ON fs.customer_sk = dc.customer_sk
WHERE dc._is_current = TRUE
GROUP BY
    dc.gender,
    dc.age_band
"""

In [12]:
gold_layer_config = {
    "dimensions": [
        {
            "table_name": "gold.dim_customer",
            "sql_query": sql_dim_customer,
            "scd_query": scd_dim_customer,
            "key": "customer_sk",
        },
    ],
    "facts": [
        {
            "table_name": "gold.fact_sale",
            "sql_query": sql_fact_sale,
            "partition_column": "invoice_date",
            "incremental_column": "invoice_date",
            "lookback_days": 1,
        },
    ],
    "reports": [
        {
            "table_name": "gold.report_sales_by_segment",
            "sql_query": sql_report_sales_by_segment,
        },
    ],
}


def load_dim_table(table_name: str, sql_query: str, scd_query: str, key: str) -> None:
    """
    Loads a dimension table, optionally apply SCD logic if it exists, otherwise creates it.

    Args:
        table_name (str): Name of the dimension table in metastore.
        sql_query (str): SQL to select the latest dim version from Silver.
        scd_query (str): SQL to apply type 1.5 or 2 SCD logic.
        key (str): Join key used in the MERGE condition.
    """
    staged_df = spark.sql(sql_query)
    if spark.catalog.tableExists(table_name):
        stage_name = table_name.split(".")[-1] + "_staged"
        staged_df.createOrReplaceTempView(stage_name)
        spark.sql(scd_query.format(source=stage_name, key=key))
    else:
        staged_df.write.format("delta").mode("overwrite").saveAsTable(table_name)


def load_fact_table(
    table_name: str,
    sql_query: str,
    partition_column: str = None,
    incremental_column: str = None,
    lookback_days: int = 1,
) -> None:
    """
    Loads a fact table. Supports incremental load and optional partitioning.

    Args:
        table_name (str): Name of the fact table in metastore.
        sql_query (str): SQL to select fact data from Silver layer.
        partition_column (str, optional): Column to partition the fact table by.
        incremental_column (str, optional): Column to filter for incremental load.
        lookback_days (int): How far back to look for late data.
    """
    table_exists = spark.catalog.tableExists(table_name)
    df = spark.sql(sql_query)

    if incremental_column and incremental_column in df.columns and table_exists:
        log.info(
            f"Applying incremental filter: {incremental_column} >= current_date - {lookback_days} days"
        )
        df = df.filter(
            F.col(incremental_column) >= F.date_sub(F.current_date(), lookback_days)
        )
    elif incremental_column and not table_exists:
        log.info("Destination table doesn't exist yet, running initial increment")

    writer = df.write.format("delta").mode("append")

    if partition_column and partition_column in df.columns:
        writer = writer.partitionBy(partition_column)

    writer.saveAsTable(table_name)


# Load dimensions
for dim in gold_layer_config["dimensions"]:
    load_dim_table(
        table_name=dim["table_name"],
        sql_query=dim["sql_query"],
        scd_query=dim["scd_query"],
        key=dim["key"],
    )

# Load facts
for fact in gold_layer_config["facts"]:
    load_fact_table(
        table_name=fact["table_name"],
        sql_query=fact["sql_query"],
        partition_column=fact.get("partition_column"),
        incremental_column=fact.get("incremental_column"),
        lookback_days=fact.get("lookback_days", 1),
    )


# Load aggregated reports
for fact in gold_layer_config["reports"]:
    load_fact_table(
        table_name=fact["table_name"],
        sql_query=fact["sql_query"],
        partition_column=fact.get("partition_column"),
        incremental_column=fact.get("incremental_column"),
        lookback_days=fact.get("lookback_days", 1),
    )

# Load aggregated reports
for report in gold_layer_config["reports"]:
    load_fact_table(
        table_name=report["table_name"],
        sql_query=report["sql_query"],
        partition_column=report.get("partition_column"),
        incremental_column=report.get("incremental_column"),
        lookback_days=report.get("lookback_days", 1),
    )

2025-07-21 04:52:08,950 - INFO - Destination table doesn't exist yet, running initial increment
                                                                                

In [13]:
# Review all created tables
# ---------------------------------------------

spark.sql("SHOW TABLES IN bronze").show()
spark.sql("SHOW TABLES IN silver").show()
spark.sql("SHOW TABLES IN gold").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   bronze| customer|      false|
|   bronze|     sale|      false|
+---------+---------+-----------+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   silver| customer|      false|
|   silver|     sale|      false|
+---------+---------+-----------+

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|     gold|        dim_customer|      false|
|     gold|           fact_sale|      false|
|     gold|report_sales_by_s...|      false|
+---------+--------------------+-----------+



In [15]:
# Review all table data
# ---------------------------------------------

from IPython.display import display

report_df = spark.sql("SELECT * FROM gold.report_sales_by_segment")
print("## gold.report_sales_by_segment")
report_df.printSchema()
display(report_df.toPandas().head(10))


fact_df = spark.sql("SELECT * FROM gold.fact_sale")
print("## gold.fact_sale")
fact_df.printSchema()
display(fact_df.toPandas().head(10))


dim_df = spark.sql("SELECT * FROM gold.dim_customer")
print("## gold.dim_customer")
dim_df.printSchema()
display(dim_df.toPandas().head(10))

## gold.report_sales_by_segment
root
 |-- segment_sk: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age_band: string (nullable = true)
 |-- total_invoices: long (nullable = true)
 |-- total_quantity: long (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- avg_order_value: double (nullable = true)
 |-- last_purchase_date: date (nullable = true)
 |-- first_purchase_date: date (nullable = true)
 |-- last_active_month: timestamp (nullable = true)
 |-- last_active_week: timestamp (nullable = true)
 |-- _created_at: date (nullable = true)
 |-- _updated_at: date (nullable = true)



Unnamed: 0,segment_sk,gender,age_band,total_invoices,total_quantity,total_spent,avg_order_value,last_purchase_date,first_purchase_date,last_active_month,last_active_week,_created_at,_updated_at
0,1d1ce738adbb27155293bfac1bdfc62fef4d11d0,female,25‑34,11456,34400,28499398.29,2487.726806,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
1,bf16c9b01c02ac268011e76b6e3822e40c58ecb0,female,unknown,70,204,134828.85,1926.126429,2023-02-20,2021-01-14,2023-02-01,2023-02-20,2025-07-21,2025-07-21
2,69258257c40aebafa6ef717543de1fdc6bf322df,male,65+,3785,11387,9906508.43,2617.307379,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
3,35b57c7ed736c4fb710049bea5021e2827436fa0,male,18‑24,5553,16474,13155744.07,2369.12373,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
4,656299a98b08addaf363f562d3aaf6640da45b89,female,45‑54,11348,33856,28616962.38,2521.762635,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
5,d9843ac7a72c928dec57c1d777e0aed62d80fafc,female,35‑44,11676,35133,29772859.25,2549.919429,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
6,264a9f1947363cce5126f03a17aa1bf0bfb8c8a4,female,18‑24,7931,24013,20275196.16,2556.448892,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
7,7a985e241a7b653fe3c2320855c2084ad7911768,male,25‑34,7597,22705,19233668.29,2531.745201,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
8,f26296b150ab62e8eb602d1c99bc117c6e3434b3,female,65+,5625,16958,13972109.14,2483.930514,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21
9,02f5f04b2fdd266d27c2636306a508c821f48345,male,35‑44,7703,23272,20254176.83,2629.388138,2023-03-08,2021-01-01,2023-03-01,2023-03-06,2025-07-21,2025-07-21


## gold.fact_sale
root
 |-- sale_sk: string (nullable = true)
 |-- invoice_no: string (nullable = true)
 |-- customer_sk: string (nullable = true)
 |-- shopping_mall: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- gross_amount: double (nullable = true)
 |-- price_bucket: string (nullable = true)
 |-- is_weekend: boolean (nullable = true)
 |-- week_day: integer (nullable = true)
 |-- month_day: integer (nullable = true)
 |-- week_start_date: timestamp (nullable = true)
 |-- month_start_date: timestamp (nullable = true)
 |-- fiscal_quarter: string (nullable = true)
 |-- _created_at: date (nullable = true)
 |-- _updated_at: date (nullable = true)



                                                                                

Unnamed: 0,sale_sk,invoice_no,customer_sk,shopping_mall,invoice_date,category,quantity,unit_price,gross_amount,price_bucket,is_weekend,week_day,month_day,week_start_date,month_start_date,fiscal_quarter,_created_at,_updated_at
0,db226f8e01503a5c23335d17ef19daaf3b9efc5e,I100231,333269ee242c2020b55a159a67c6f2f7c2aeb301,metropol avm,2021-07-19,Technology,1,1050.0,1050.0,premium,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
1,fd091a2263e69b835c47494a5a4b8b2a3f5c2d24,I100964,8d0a8dd8c8de2b74623e68942f817bd2ba2b6dce,mall of istanbul,2021-07-19,Cosmetics,4,162.64,650.56,medium,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
2,91c2954b7272b11cf362fb9225b1f38cb0d8f182,I102694,e8b4768d72bcb5df2130dfb4dd697640efaafd69,mall of istanbul,2021-07-19,Shoes,5,3000.85,15004.25,premium,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
3,6f978094d08855fcfe03f00d05ac4910d3ade425,I103853,84c413e240bf86fb4ae37c81f41fbd0cd69bb7a1,emaar square mall,2021-07-19,Shoes,4,2400.68,9602.72,premium,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
4,00d0f2a037c6c65b34b9640fb935bf8adacbf670,I104465,2ad82953b80c8fa23119ee72061681356ab8a849,cevahir avm,2021-07-19,Cosmetics,1,40.66,40.66,low,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
5,4d2c8da23d2221cb82c24a797f78832700a57e61,I113088,904af6865f0fa0ea3e0098042b0999110a94aecf,metropol avm,2021-07-19,Food & Beverage,5,26.15,130.75,low,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
6,16fb86e04557ebb1ecc0373f08477c50be6a1cd4,I114725,fa6d8b20304ba21d420ed169466963b1c6ee50d1,kanyon,2021-07-19,Toys,5,179.2,896.0,medium,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
7,1c06a51ac3596798a203d4b5ac5bd0ecf6af2b53,I116331,c1a5430035bb84c781871c44f74c9cdcc38e5d9d,mall of istanbul,2021-07-19,Food & Beverage,2,10.46,20.92,low,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
8,193b26e368117c011290bf3933962c497fb00eab,I119564,40f295decfdd43c0cce1e5c9c799cc9ca77fa1eb,metrocity,2021-07-19,Food & Beverage,1,5.23,5.23,low,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21
9,44874466742864bdaf66332eb46fce076db273ec,I123859,82c6d5a4b810c4d26ecc694b8a456ac6211b82ac,metropol avm,2021-07-19,Cosmetics,1,40.66,40.66,low,False,2,19,2021-07-19,2021-07-01,2021-Q3,2025-07-21,2025-07-21


## gold.dim_customer
root
 |-- customer_sk: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: decimal(3,0) (nullable = true)
 |-- age_band: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- _created_at: date (nullable = true)
 |-- _updated_at: date (nullable = true)
 |-- _is_current: boolean (nullable = true)



Unnamed: 0,customer_sk,customer_id,gender,age,age_band,payment_method,_created_at,_updated_at,_is_current
0,8b687283db5662770fab89cb1d063f0b567500fb,C241288,female,28,25‑34,Credit Card,2025-07-21,2025-07-21,True
1,1934f07403d2e4a74cf6d9eb60a7bff5a8b71ef2,C111565,male,21,18‑24,Debit Card,2025-07-21,2025-07-21,True
2,390f8635c228b30217c5ffa80066782d61e3aadc,C266599,male,20,18‑24,Cash,2025-07-21,2025-07-21,True
3,d236689e0b06e2d34261a7f7d205969bea88b544,C988172,female,66,65+,Credit Card,2025-07-21,2025-07-21,True
4,974c815d58acf79eac60d39dd85c32841bed2dd9,C189076,female,53,45‑54,Cash,2025-07-21,2025-07-21,True
5,b151e0e3cae85ef2839cf9411d66373938556451,C657758,female,28,25‑34,Credit Card,2025-07-21,2025-07-21,True
6,196d8b9956165925cbe3fb5eba7ff3d792579671,C151197,female,49,45‑54,Cash,2025-07-21,2025-07-21,True
7,ffec33d90f607d48c94b5077bfcb797cec5c8355,C176086,female,32,25‑34,Credit Card,2025-07-21,2025-07-21,True
8,5336f67c010c0f429ff1edfc5078e2aa08fff476,C159642,male,69,65+,Credit Card,2025-07-21,2025-07-21,True
9,2b94b719c381043d639d8119247354b9b63f611f,C283361,female,60,55‑64,Credit Card,2025-07-21,2025-07-21,True
