In [0]:
pip install azure-storage-blob azure-storage-file-datalake

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, trim, input_file_name
from azure.storage.blob import BlobServiceClient
import csv
from io import StringIO

# ----------------------------------------------------------------------------------
# 1. SPARK CONFIGURATION & AZURE STORAGE AUTHENTICATION
# ----------------------------------------------------------------------------------

# Azure Storage account details (using SAS Token)
STORAGE_ACCOUNT_NAME = "cdmo"
SAS_TOKEN = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-02-01T13:22:40Z&st=2025-01-31T05:22:40Z&spr=https&sig=eNjMZTrl03xT4e2cf5nA2fmHglRbbQaFYgTnqWaECF4%3D"  # Replace with your SAS token
CONTAINER_NAME = "config"
METADATA_FILE_NAME = "metadata_config_20250130.csv"

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("SilverLayerProcessing") \
    .getOrCreate()

# Configure Spark to use SAS Token for each relevant container
containers = ["01-bronze", "02-silver", "config"]
for container in containers:
    spark.conf.set(f"fs.azure.sas.{container}.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", SAS_TOKEN)

# Initialize BlobServiceClient using SAS Token
blob_service_client = BlobServiceClient(
    account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    credential=SAS_TOKEN
)

# ----------------------------------------------------------------------------------
# 2. LOAD METADATA FILE FROM BLOB STORAGE (wasbs://)
# ----------------------------------------------------------------------------------

def load_metadata():
    """
    Loads the metadata configuration file from Azure Blob Storage (wasbs://)
    and filters datasets for the 'Silver' processing layer.

    Returns:
        list: A list of dictionaries containing metadata configurations.
    """
    metadata_path = f"wasbs://{CONTAINER_NAME}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{METADATA_FILE_NAME}"

    try:
        print(f"🔄 Attempting to read metadata file from {metadata_path}...")
        metadata_df = spark.read.format("csv").option("header", "true").load(metadata_path)

        # Fill missing values to avoid NoneType errors
        metadata_df = metadata_df.fillna("")

        # Convert DataFrame to a list of dictionaries
        metadata_list = [row.asDict() for row in metadata_df.collect()]

        # Filter for 'silver' layer datasets
        metadata_list = [
            {
                "SourceContainer": row.get("SourceContainer", ""),
                "SourcePath": row.get("SourcePath", ""),
                "SourceFormat": row.get("SourceFormat", ""),
                "TargetContainer": row.get("TargetContainer", ""),
                "TargetPath": row.get("TargetPath", ""),
                "TargetFormat": row.get("TargetFormat", ""),
                "UniqueKey": row.get("UniqueKey", None),
                "AddTimestamp": row.get("AddTimestamp", "false").lower() == "true",  # Default to False if missing
            }
            for row in metadata_list if row.get("Layer", "").strip().lower() == "silver"
        ]

        print(f"📌 Filtered metadata for 'Silver' layer: {len(metadata_list)} entries found.")
        return metadata_list

    except Exception as e:
        print(f"❌ Error loading metadata file: {e}")
        raise

# ----------------------------------------------------------------------------------
# 3. APPLY TRANSFORMATIONS SPECIFIC TO THE SILVER LAYER
# ----------------------------------------------------------------------------------

def transform_silver_layer(source_df):
    """
    Applies transformations for Silver Layer, such as trimming string columns and 
    adding a processed timestamp.

    Args:
        source_df (DataFrame): Source DataFrame to be transformed.

    Returns:
        DataFrame: Transformed DataFrame.
    """
    try:
        # Trim all string columns
        string_cols = [col_name for col_name, dtype in source_df.dtypes if dtype == "string"]
        for col_name in string_cols:
            source_df = source_df.withColumn(col_name, trim(source_df[col_name]))

        # Add timestamp column
        source_df = source_df.withColumn("ProcessedTimestamp", current_timestamp())

        print("✅ Silver layer transformations applied successfully.")
        return source_df
    except Exception as e:
        print(f"❌ Error during Silver layer transformations: {e}")
        raise

# ----------------------------------------------------------------------------------
# 4. PERFORM DELTA MERGE INTO SILVER LAYER
# ----------------------------------------------------------------------------------

def merge_into_silver_layer(source_df, target_path, unique_key):
    """
    Merges data into the Silver Layer using Delta format. Updates existing records
    and inserts new ones based on the unique key.

    Args:
        source_df (DataFrame): Transformed DataFrame to merge.
        target_path (str): Target Silver Layer Delta table path.
        unique_key (str): Column used as the primary key for merging.
    """
    try:
        # Check if Delta table exists
        if spark._jsparkSession.catalog().tableExists(f"delta.`{target_path}`"):
            target_df = spark.read.format("delta").load(target_path)
        else:
            print(f"⚠️ Target path {target_path} does not exist. Writing as new table.")
            source_df.write.format("delta").mode("overwrite").save(target_path)
            return

        # Register DataFrames as temporary views for SQL-based merge
        source_df.createOrReplaceTempView("source_temp_view")
        target_df.createOrReplaceTempView("target_temp_view")

        # Merge query for Delta Lake
        merge_query = f"""
        MERGE INTO delta.`{target_path}` AS target
        USING source_temp_view AS source
        ON target.{unique_key} = source.{unique_key}
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
        """
        spark.sql(merge_query)

        print(f"✅ Data successfully merged into Silver Layer at {target_path}")
    except Exception as e:
        print(f"❌ Error during Delta Merge: {e}")
        raise

# ----------------------------------------------------------------------------------
# 5. PROCESS DATA BASED ON METADATA CONFIGURATION
# ----------------------------------------------------------------------------------

def process_data(metadata_list):
    """
    Reads data from the Bronze layer, applies Silver layer transformations,
    and merges it into the Silver layer.

    Args:
        metadata_list (list): List of metadata configurations.
    """
    for metadata in metadata_list:
        try:
            # Extract metadata details
            source_container = metadata["SourceContainer"]
            source_path = metadata["SourcePath"]
            source_format = metadata["SourceFormat"]
            target_container = metadata["TargetContainer"]
            target_path = metadata["TargetPath"]
            unique_key = metadata.get("UniqueKey")

            # Validate required fields
            if not all([source_container, source_path, source_format, target_container, target_path, unique_key]):
                print(f"⚠️ Skipping entry due to missing fields: {metadata}")
                continue

            # Construct full paths
            source_path_wasbs = f"wasbs://{source_container}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{source_path}"
            target_path_wasbs = f"wasbs://{target_container}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{target_path}"

            print(f"🔄 Processing {source_path_wasbs} ➝ {target_path_wasbs}")

            # Read source data in Delta format
            source_df = spark.read.format("delta").load(source_path_wasbs)

            # Apply Silver Layer transformations
            silver_df = transform_silver_layer(source_df)

            # Perform Delta Merge
            merge_into_silver_layer(silver_df, target_path_wasbs, unique_key)

        except Exception as e:
            print(f"❌ Error processing data for Source Path '{metadata.get('SourcePath', 'Unknown')}': {e}")

# ----------------------------------------------------------------------------------
# 6. MAIN EXECUTION: LOAD METADATA & PROCESS DATA
# ----------------------------------------------------------------------------------

if __name__ == "__main__":
    metadata_list = load_metadata()
    process_data(metadata_list)


🔄 Attempting to read metadata file from wasbs://config@cdmo.blob.core.windows.net/metadata_config_20250130.csv...
📌 Filtered metadata for 'Silver' layer: 5 entries found.
🔄 Processing wasbs://01-bronze@cdmo.blob.core.windows.net/customerfeedback/ ➝ wasbs://02-silver@cdmo.blob.core.windows.net/customerfeedback_transformed/
✅ Silver layer transformations applied successfully.
⚠️ Target path wasbs://02-silver@cdmo.blob.core.windows.net/customerfeedback_transformed/ does not exist. Writing as new table.
🔄 Processing wasbs://01-bronze@cdmo.blob.core.windows.net/manufacturebatch/ ➝ wasbs://02-silver@cdmo.blob.core.windows.net/manufacturebatch_transformed/
✅ Silver layer transformations applied successfully.
⚠️ Target path wasbs://02-silver@cdmo.blob.core.windows.net/manufacturebatch_transformed/ does not exist. Writing as new table.
🔄 Processing wasbs://01-bronze@cdmo.blob.core.windows.net/productformula/ ➝ wasbs://02-silver@cdmo.blob.core.windows.net/productformula_transformed/
✅ Silver lay

In [0]:
from pyspark.sql import SparkSession
from azure.storage.blob import BlobServiceClient

# ----------------------------------------------------------------------------------
# 1. SPARK CONFIGURATION & AZURE STORAGE AUTHENTICATION
# ----------------------------------------------------------------------------------

# Define Azure Storage account details (Using SAS Token)
STORAGE_ACCOUNT_NAME = "cdmo"  # Replace with actual storage account name
SAS_TOKEN = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-02-01T13:22:40Z&st=2025-01-31T05:22:40Z&spr=https&sig=eNjMZTrl03xT4e2cf5nA2fmHglRbbQaFYgTnqWaECF4%3D"   # Replace with your SAS token
CONTAINER_NAME = "02-silver"  # Target container where Silver tables are stored

# Initialize Spark session
spark = SparkSession.builder.appName("SilverLayerValidation").getOrCreate()

# Configure Spark to use the SAS Token for authentication
spark.conf.set(f"fs.azure.sas.{CONTAINER_NAME}.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", SAS_TOKEN)

# Initialize BlobServiceClient to list tables dynamically
blob_service_client = BlobServiceClient(
    account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    credential=SAS_TOKEN
)

# ----------------------------------------------------------------------------------
# 2. LIST AND VALIDATE TABLES IN THE 02-SILVER CONTAINER
# ----------------------------------------------------------------------------------

def list_tables(container_name):
    """
    Retrieves a list of all directories (tables) from the specified container.

    Args:
        container_name (str): The Azure Blob Storage container name.

    Returns:
        list: A list of detected table names.
    """
    try:
        container_client = blob_service_client.get_container_client(container_name)
        blobs = container_client.list_blobs()

        # Extract unique table names (assuming folders as table names)
        tables = set(blob.name.split('/')[0] for blob in blobs if '/' in blob.name)

        print(f"📌 Found {len(tables)} tables in the container: {tables}")
        return list(tables)

    except Exception as e:
        print(f"❌ Error listing tables in container '{container_name}': {e}")
        return []

def validate_silver_tables(container_name, tables):
    """
    Validates the record count for each table in the Silver layer.

    Args:
        container_name (str): The Azure Blob Storage container name.
        tables (list): A list of table names to validate.
    """
    for table in tables:
        # Construct the full wasbs:// path for each table in the Silver layer
        table_path = f"wasbs://{container_name}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{table}/"

        print(f"🔍 Validating table: {table} at {table_path}")

        try:
            # Attempt to read the table
            df = spark.read.format("delta").option("mergeSchema", "true").load(table_path)

            # Get the count of records
            record_count = df.count()

            if record_count > 0:
                print(f"✅ Table: {table}, Record Count: {record_count}")
            else:
                print(f"⚠️ Table: {table} exists but is empty.")

        except Exception as read_error:
            print(f"⚠️ Table '{table}' does not exist or is not accessible: {read_error}")

# ----------------------------------------------------------------------------------
# 3. MAIN EXECUTION
# ----------------------------------------------------------------------------------

if __name__ == "__main__":
    tables = list_tables(CONTAINER_NAME)  # Get list of tables dynamically
    if tables:
        validate_silver_tables(CONTAINER_NAME, tables)
    else:
        print("⚠️ No tables found in the Silver container.")


📌 Found 5 tables in the container: {'customerfeedback_transformed', 'supplier_transformed', 'sales_transformed', 'manufacturebatch_transformed', 'productformula_transformed'}
🔍 Validating table: customerfeedback_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/customerfeedback_transformed/
✅ Table: customerfeedback_transformed, Record Count: 100
🔍 Validating table: supplier_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/supplier_transformed/
✅ Table: supplier_transformed, Record Count: 40
🔍 Validating table: sales_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/sales_transformed/
✅ Table: sales_transformed, Record Count: 100
🔍 Validating table: manufacturebatch_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/manufacturebatch_transformed/
✅ Table: manufacturebatch_transformed, Record Count: 100
🔍 Validating table: productformula_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/productformula_transformed/
✅ Table: productformula_

In [0]:
from pyspark.sql import SparkSession
from azure.storage.blob import BlobServiceClient

# ----------------------------------------------------------------------------------
# 1. SPARK CONFIGURATION & AZURE STORAGE AUTHENTICATION
# ----------------------------------------------------------------------------------

# Define Azure Storage account details (Using SAS Token)
STORAGE_ACCOUNT_NAME = "cdmo"  # Replace with actual storage account name
SAS_TOKEN = "sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2025-02-01T13:22:40Z&st=2025-01-31T05:22:40Z&spr=https&sig=eNjMZTrl03xT4e2cf5nA2fmHglRbbQaFYgTnqWaECF4%3D"  # Replace with your SAS token
CONTAINER_NAME = "02-silver"  # Target container where Silver tables are stored

# Initialize Spark session
spark = SparkSession.builder.appName("SilverLayerValidation").getOrCreate()

# Configure Spark to use the SAS Token for authentication
spark.conf.set(f"fs.azure.sas.{CONTAINER_NAME}.{STORAGE_ACCOUNT_NAME}.blob.core.windows.net", SAS_TOKEN)

# Initialize BlobServiceClient to list tables dynamically
blob_service_client = BlobServiceClient(
    account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    credential=SAS_TOKEN
)

# ----------------------------------------------------------------------------------
# 2. LIST AND VALIDATE TABLES IN THE 02-SILVER CONTAINER
# ----------------------------------------------------------------------------------

def list_tables(container_name):
    """
    Retrieves a list of all directories (tables) from the specified container.

    Args:
        container_name (str): The Azure Blob Storage container name.

    Returns:
        list: A list of detected table names.
    """
    try:
        container_client = blob_service_client.get_container_client(container_name)
        blobs = container_client.list_blobs()

        # Extract unique table names (assuming folders as table names)
        tables = set(blob.name.split('/')[0] for blob in blobs if '/' in blob.name)

        print(f"📌 Found {len(tables)} tables in the container: {tables}")
        return list(tables)

    except Exception as e:
        print(f"❌ Error listing tables in container '{container_name}': {e}")
        return []

def validate_silver_tables(container_name, tables):
    """
    Validates the record count and displays the first 5 rows of each table.

    Args:
        container_name (str): The Azure Blob Storage container name.
        tables (list): A list of table names to validate.
    """
    for table in tables:
        # Construct the full wasbs:// path for each table in the Silver layer
        table_path = f"wasbs://{container_name}@{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{table}/"

        print(f"🔍 Validating table: {table} at {table_path}")

        try:
            # Attempt to read the table
            df = spark.read.format("delta").option("mergeSchema", "true").load(table_path)

            # Get the count of records
            record_count = df.count()

            if record_count > 0:
                print(f"✅ Table: {table}, Record Count: {record_count}")

                # Display the first 5 rows for validation
                print(f"📊 Preview of the first 5 rows in {table}:")
                df.show(5, truncate=False)

            else:
                print(f"⚠️ Table: {table} exists but is empty.")

        except Exception as read_error:
            print(f"⚠️ Table '{table}' does not exist or is not accessible: {read_error}")

# ----------------------------------------------------------------------------------
# 3. MAIN EXECUTION
# ----------------------------------------------------------------------------------

if __name__ == "__main__":
    tables = list_tables(CONTAINER_NAME)  # Get list of tables dynamically
    if tables:
        validate_silver_tables(CONTAINER_NAME, tables)
    else:
        print("⚠️ No tables found in the Silver container.")


📌 Found 5 tables in the container: {'customerfeedback_transformed', 'supplier_transformed', 'sales_transformed', 'manufacturebatch_transformed', 'productformula_transformed'}
🔍 Validating table: customerfeedback_transformed at wasbs://02-silver@cdmo.blob.core.windows.net/customerfeedback_transformed/
✅ Table: customerfeedback_transformed, Record Count: 100
📊 Preview of the first 5 rows in customerfeedback_transformed:
+---+------------------------------------+------------------------------------+------------------------------------+------+--------+------------+-----------------------------------------------------------------------------------------------------------------+-----------------------+----------------------+
|id |FeedbackID                          |ProductID                           |CustomerID                          |Rating|Comments|FeedbackDate|Filename                                                                                                         |LoadTimestam