In [None]:
import dlt
from pyspark.sql.functions import col, count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema for bronze and silver tables
bronze_schema = StructType([
    StructField("id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("ip_address", StringType(), True)
])

# Define the schema for gold table
gold_schema = StructType([
    StructField("gender", StringType(), True),
    StructField("activity_count", IntegerType(), True)  # Ensure this matches your expected type
])

# Bronze Table: Raw data ingestion
@dlt.table(
    name="bronze",
    comment="Raw data ingested from source",
    table_properties={"quality": "bronze"},
    path="/mnt/dataprocessing/bronze_layer"
)
@dlt.expect("valid_id", "id IS NOT NULL")
def bronze_table():
    try:
        df = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("header", "true")
            .schema(bronze_schema)  # Apply schema
            .load("/mnt/dataprocessing/raw_data")
        )
        return df
    except Exception as e:
        # Log the exception and return an empty DataFrame
        print(f"Error in bronze_table: {e}")
        return spark.createDataFrame([], schema=bronze_schema)


Name,Type
id,string
first_name,string
last_name,string
email,string
gender,string
ip_address,string
_rescued_data,string


In [None]:
# Silver Table: Preprocessed data
@dlt.table(
    name="silver",
    comment="Preprocessed data",
    table_properties={"quality": "silver"},
    path="/mnt/dataprocessing/silver_layer"
)
@dlt.expect("valid_id", "id IS NOT NULL")
def silver_table():
    return (
        dlt.read_stream("bronze")
        .filter(col("id").isNotNull())
    )
    """try:
        bronze_df = dlt.read("bronze")
        if bronze_df.isEmpty():
            # Return an empty DataFrame if the bronze table has no data
            return spark.createDataFrame([], schema=bronze_schema)
        return (
            bronze_df.filter(col("id").isNotNull())  # Example preprocessing step
        )
    except Exception as e:
        # Log the exception
        print(f"Error in silver_table: {e}")
        return spark.createDataFrame([], schema=bronze_schema)"""
    

Name,Type
id,string
first_name,string
last_name,string
email,string
gender,string
ip_address,string
_rescued_data,string


In [None]:
# Gold Table: Aggregated data
@dlt.table(
    name="gold",
    comment="Aggregated data for analytics",
    table_properties={"quality": "gold"},
    path="/mnt/dataprocessing/gold_layer"
)
@dlt.expect("valid_activity_count", "activity_count > 0")
def gold_table():
    return (
        dlt.read("silver")
        .groupBy("gender")
        .agg(count("*").alias("activity_count"))
    )
    """try:
        silver_df = dlt.read("silver")
        if silver_df.isEmpty():
            # Return an empty DataFrame with the gold schema if no data in silver
            return spark.createDataFrame([], schema=gold_schema)
        return (
            silver_df.groupBy("gender")
            .agg(count("*").alias("activity_count").cast(IntegerType()))  # Cast to IntegerType
        )
    except Exception as e:
        # Log the exception
        print(f"Error in gold_table: {e}")
        return spark.createDataFrame([], schema=gold_schema)"""

Name,Type
gender,string
activity_count,bigint
