In [0]:
import dlt
from pyspark.sql.functions import col, current_date
from pyspark.sql.functions import input_file_name, current_date
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Define the schema for incoming streaming data
json_schema = StructType([
    StructField("address", StringType(), True),
    StructField("certified", StringType(), True),
    StructField("city", StringType(), True),
    StructField("country", StringType(), True),
    StructField("dealer_notes", StringType(), True),
    StructField("drive", StringType(), True),
    StructField("email", StringType(), True),
    StructField("engine", StringType(), True),
    StructField("exterior_color", StringType(), True),
    StructField("heading", StringType(), True),
    StructField("interior_color", StringType(), True),
    StructField("lat", DoubleType(), True),  
    StructField("lon", DoubleType(), True),
    StructField("mileage", StringType(), True),
    StructField("name", StringType(), True),
    StructField("optional_features", StringType(), True),
    StructField("packages", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("price", StringType(), True),
    StructField("standard_features", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("stock_number", StringType(), True),
    StructField("stock_type", StringType(), True),
    StructField("transmission", StringType(), True),
    StructField("url", StringType(), True),
    StructField("vin", StringType(), True)
])

# Define the Delta Live Table
@dlt.table(
    name="dev_team_offshore.bronze.bronze_table",
    comment="Bronze Table for Streaming Data",
    partition_cols=["load_date"]
)
def bronze_table():
    return (
        spark.readStream
        .schema(json_schema)
        .json("/FileStore/tables/streaming/*", multiLine=True)
        .withColumn("File_name", col("_metadata.file_path"))   # Add File Name
        .withColumn("load_date", current_date().cast("string"))  # Add Load Date
    )


In [0]:
@dlt.table(
    name="dev_team_offshore.silver.silver_table",  #  Explicit schema reference
    comment="Silver Table for Cleaned Streaming Data",
    table_properties={"quality": "silver"},
    partition_cols=["load_date"]
)
def silver_table():
    return (
        dlt.read_stream("dev_team_offshore.bronze.bronze_table")  # Read from Bronze Layer
        #.dropDuplicates(["vin"])  # Remove duplicates based on VIN
        #.filter(col("price").isNotNull())  # Remove records with null price
        #.filter(col("mileage").isNotNull())  # Remove records with null mileage
        .withColumn("processed_date", current_date().cast("string"))  # Add Processed Date
    )