In [0]:
%load_ext autoreload
%autoreload 2
# Enables autoreload; learn more at https://docs.databricks.com/en/files/workspace-modules.html#autoreload-for-python-modules
# To disable autoreload; run %autoreload 0

# Imports 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Row
import os
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

from logger import log_message

In [0]:
base_path = "file:/Workspace/Users/masa.cirkovic@abo.fi/"
raw_data_path = base_path + "rawdata/"
bronze_path = base_path + "medallion/bronze/"
silver_path = base_path + "medallion/silver/"
golden_path = base_path + "medallion/gold/golden_data"
metadata_path = base_path + "metadata/"

In [0]:
bronze_car_sales_path = bronze_path + "car_sales_data"
bronze_car_sales_df = spark.read.format("delta").load(bronze_car_sales_path)

bronze_companies_path = bronze_path + "companies_data"
bronze_companies_df = spark.read.format("delta").load(bronze_companies_path)

bronze_customers_path = bronze_path + "customers_data"
bronze_customers_df = spark.read.format("delta").load(bronze_customers_path)

silver_car_sales_path = silver_path + "car_sales_data"
silver_car_sales_df = spark.read.format("delta").load(silver_car_sales_path)

silver_companies_path = silver_path + "companies_data"
silver_companies_df = spark.read.format("delta").load(silver_companies_path)

silver_customers_path = silver_path + "customers_data"
silver_customers_df = spark.read.format("delta").load(silver_customers_path)

golden_df = spark.read.format("delta").load(golden_path)

# Metadata Tracking

In [0]:
golden_metadata = {
    "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    "total_records": golden_df.count(),
    "source_datasets": ["car_sales_data", "companies_data", "customers_data"],
    "transformation_steps": "Bronze → Silver → Golden",
    "key_aggregations": ["Total Sales Per Day", "Most Popular Models", "Income Analysis"]
}

# Convert metadata to DataFrame
metadata_df = spark.createDataFrame([golden_metadata])

# Save metadata table
golden_metadata_path = metadata_path + "golden_metadata"

try:
    metadata_df.write.format("delta").mode("overwrite").save(golden_metadata_path)
    log_message("info", "metadata_tracking_and_data_lineage", "saving golden metadata", "Metadata for golden layer successfully saved")
except Exception as e:
    log_message("error", "metadata_tracking_and_data_lineage", "saving golden metadata", str(e))

In [0]:
display(metadata_df)

key_aggregations,last_updated,source_datasets,total_records,transformation_steps
"List(Total Sales Per Day, Most Popular Models, Income Analysis)",2025-03-20 11:25:17,"List(car_sales_data, companies_data, customers_data)",23388,Bronze → Silver → Golden


# Data Lineage

In [0]:
# Metadata entries for each dataset in Bronze Layer
metadata_entries = [
    # Bronze Layer
    Row(
        layer="Bronze",
        dataset_name="car_sales",
        record_count=bronze_car_sales_df.count(),
        transformation_steps="Raw data ingestion",
        source="car_sales_data.csv",
        file_path= bronze_path + "car_sales_data"
    ),
    Row(
        layer="Bronze",
        dataset_name="company",
        record_count=bronze_companies_df.count(),
        transformation_steps="Raw data ingestion",
        source="companies_data.json",
        file_path=bronze_path + "companies_data"
    ),
    Row(
        layer="Bronze",
        dataset_name="customers",
        record_count=bronze_customers_df.count(),
        transformation_steps="Raw data ingestion",
        source="customers_data.csv",
        file_path=bronze_path + "customers_data"
    ),

    # Silver Layer
    Row(
        layer="Silver",
        dataset_name="car_sales",
        record_count=silver_car_sales_df.count(),
        transformation_steps="Renamed Columns → Dropped Duplicates → Fixed Price Format → Dropped Missing Rows → Dropped Anomalies",
        source="bronze_car_sales_df",
        file_path=silver_path + "car_sales_data"
    ),
    Row(
        layer="Silver",
        dataset_name="company",
        record_count=silver_companies_df.count(),
        transformation_steps="Standardized Country Names",
        source="bronze_company_df",
        file_path=silver_path + "companies_data"
    ),
    Row(
        layer="Silver",
        dataset_name="customers",
        record_count=silver_customers_df.count(),
        transformation_steps="Renamed Columns → Dropped Duplicates → Fixed Income Format → Dropped Missing Rows",
        source="bronze_customers_df",
        file_path=silver_path + "customers_data"
    ),

    # Golden Layer
    Row(
        layer="Golden",
        dataset_name="golden_data",
        record_count=golden_df.count(),
        transformation_steps="Joined Car Sales ↔ Company ↔ Customers → Aggregated Sales",
        source="silver_car_sales_df, silver_company_df, silver_customers_df",
        file_path=golden_path
    )
]

# Convert list to DataFrame
metadata_df = spark.createDataFrame(metadata_entries)

metadata_path = metadata_path + "metadata_table"

try:
    # Write Metadata to Delta Table
    metadata_df.write.format("delta") \
        .mode("overwrite") \
        .save(metadata_path)

    log_message("info", "metadata_tracking_and_data_lineage", "saving metadata table", "Metadata for Data Lineage successfully saved")
except Exception as e:
    log_message("error", "metadata_tracking_and_data_lineage", "saving metadata table", str(e))

print("Metadata successfully saved.")

Metadata successfully saved.


In [0]:
# Load the saved Metadata Table
loaded_metadata_df = spark.read.format("delta").load(metadata_path)

# Display the loaded Metadata
display(loaded_metadata_df)

layer,dataset_name,record_count,transformation_steps,source,file_path
Silver,customers,23479,Renamed Columns → Dropped Duplicates → Fixed Income Format → Dropped Missing Rows,bronze_customers_df,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/silver/customers_data
Golden,golden_data,23388,Joined Car Sales ↔ Company ↔ Customers → Aggregated Sales,"silver_car_sales_df, silver_company_df, silver_customers_df",file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/gold/golden_data
Silver,car_sales,23813,Renamed Columns → Dropped Duplicates → Fixed Price Format → Dropped Missing Rows → Dropped Anomalies,bronze_car_sales_df,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/silver/car_sales_data
Silver,company,30,Standardized Country Names,bronze_company_df,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/silver/companies_data
Bronze,car_sales,24183,Raw data ingestion,car_sales_data.csv,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/bronze/car_sales_data
Bronze,company,30,Raw data ingestion,companies_data.json,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/bronze/companies_data
Bronze,customers,23906,Raw data ingestion,customers_data.csv,file:/Workspace/Users/masa.cirkovic@abo.fi/medallion/bronze/customers_data
