In [0]:
import dlt
from pyspark.sql.functions import explode, col, regexp_replace, lit

@dlt.table(
    comment="Flattened user data from JSON source with masked sensitive info"
)
def users_bronze():
    try:
        # Read JSON data
        df_users = spark.read.json(
            "/Volumes/dlt_manas/my_schema/delta_live_tables/Landing/live_data/users.json"
        )

        # Explode rows
        df_users_exploded = df_users.withColumn("user", explode(col("rows")))

        # Select required columns
        users_bronze = df_users_exploded.select(
            col("user.id").alias("id"),
            col("user.name").alias("name"),
            col("user.username").alias("username"),
            # Mask email: keep first 2 chars and domain
            regexp_replace(col("user.email"), r"(^..)[^@]*(@.*$)", r"$1****$2").alias("email"),
            # Mask phone: keep last 2 digits
            regexp_replace(col("user.phone"), r".*(\d{2})$", r"****$1").alias("phone"),
            col("user.gender").alias("gender"),
            col("user.food_preference").alias("food_preference"),
            col("user.locality").alias("locality"),
            col("user.address").alias("address"),
            col("user.role").alias("role"),
            col("user.is_active").alias("is_active"),
            col("user.created_at").alias("created_at"),
            col("user.updated_at").alias("updated_at")
        )

        return users_bronze

    except Exception as e:
        # Log exception details in a separate DLT table
        error_df = spark.createDataFrame(
            [(str(e), "users_bronze", "Possible dropped columns or schema mismatch")],
            ["error_message", "table_name", "details"]
        )
        dlt.create_streaming_table("exception_log")
        return error_df

In [0]:
import dlt
from pyspark.sql.functions import explode, col

@dlt.table(
    comment="Flattened activity logs from JSON source"
)
def raw_dataBronze():
    # Read the raw JSON file
    df = spark.read.json(
        "/Volumes/dlt_manas/my_schema/delta_live_tables/Landing/live_data/raw-data.json"
    )

    # Explode the array column (replace 'data.activityLogs' with actual column if different)
    df_exploded = df.withColumn(
        "activityLog",
        explode(col("data.activityLogs"))
    )

    # Select and flatten the required fields
    raw_data_bronze = df_exploded.select(
        col("activityLog.details.amount").alias("amount"),
        col("activityLog.details.order_id").alias("order_id"),
        col("activityLog.details.restaurant_name").alias("restaurant_name"),
        col("activityLog.details.role").alias("role"),
        col("activityLog.details.success").alias("success"),
        col("activityLog.details.total_amount").alias("total_amount"),
        col("activityLog.details.items_count").alias("items_count"),
        col("activityLog.timestamp").alias("timestamp"),
        col("activityLog.user_id").alias("user_id")
    )

    return raw_data_bronze