In [None]:
import dlt
from pyspark.sql.types import (
    StructType,
    StructField,
    IntegerType,
    StringType,
    BooleanType,
    DateType,
    FloatType,
)
from pyspark.sql import functions as F


inbound_folder = "/Volumes/satya_test/dlt_test_incr_load/full_load/data/"

schema = StructType(
    [
        StructField("userId", IntegerType(), True),
        StructField("id", IntegerType(), True),
        StructField("title", StringType(), True),
        StructField("body", StringType(), True),
    ]
)


@dlt.view(comment="new users data incrementally ingested from landing zone")
def inbound_users():
    df = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("globPathFilter", "*.json")
        .schema(schema)
        .option("cloudFiles.inferColumnTypes", "true")
        .load(inbound_folder)
    )
    df = (
        df.withColumn("_lh_ingestion_timestamp", F.current_timestamp())
        .withColumn("_lh_applicable_date", F.current_date())
        .withColumn("_lh_input_file_path", F.col("_metadata.file_path"))
        .withColumn("_lh_file_mod_time", F.col("_metadata.file_modification_time"))
        .withColumn("Operation", F.lit(""))
    )  # added the operation Column
    return df


#data quality check    
rules={"valid_id": "userId IS NOT NULL", "valid_user": "id IS NOT NULL AND title IS NOT NULL"}       
quarantine_rules = "NOT({0})".format(" AND ".join(rules.values()))


@dlt.table(name="_raw_users_quantine", comment="Raw data from landing zone")
@dlt.expect_all_or_drop(rules)
def data_check():
    return dlt.readStream("inbound_users")


dlt.create_streaming_table(
    name="_raw_users", 
    #Temporary=False,
    table_properties={
        "delta.enableChangeDataFeed": "true", #enabled CDF
        "delta.enableRowTracking": "true", #enables row level tracking
    },
)
pk = "id"
dlt.expect_all_or_drop(rules)
#implemented the _raw table as SCD-Type 2 table
dlt.apply_changes(
    target="_raw_users",
    source="inbound_users",
    keys=[pk],
    stored_as_scd_type="2",
    sequence_by="_lh_file_mod_time",
    track_history_except_column_list=[
        "_lh_ingestion_timestamp",
        "_lh_input_file_path",
        "_lh_file_mod_time",
        "_lh_applicable_date",
    ],
)

@dlt.view
def _raw_users_view():
  return (spark.readStream\
    .format("delta") \
    .option("readChangeFeed", True) \
    .table("live._raw_users") \
    .filter((F.col("_change_type")!="update_preimage"))
    .select("userId","id","title","body","__START_AT","__END_AT",F.col("_commit_timestamp").alias("commit_timestamp"))
    .withColumnRenamed('__START_AT','Start_date')
    .withColumnRenamed('__END_AT','End_date')
    )


dlt.create_streaming_table(name="users",
        comment= f"Clean, merged data from the CDC",
        table_properties={
        "delta.enableChangeDataFeed": "true", #enabled CDF
        "delta.enableRowTracking": "true", #enables row level tracking
    }
        )

dlt.apply_changes(
  target = "users", #The customer table being materilized
  source = "_raw_users_view", #the incoming CDC
  keys = [pk], #Primary key to match the rows to upsert/delete
  stored_as_scd_type = "1",
  sequence_by = F.col("commit_timestamp")
)