In [0]:
spark.sql("DROP TABLE IF EXISTS workspace.silver_click.click_data")
if spark.catalog.tableExists('workspace.silver_click.click_data'):
    print("table already exists..")
else:
    spark.sql("""
              CREATE OR REPLACE TABLE workspace.silver_click.click_data 
              (
                  UID STRING,
                  session STRING,
                  page STRING,
                  user_id integer,
                  device_type STRING,
                  latency_ms integer,
                  date date,
                  hour integer,
                  minute integer,
                  month STRING,
                  weekday STRING,
                  file STRING
              )
              USING DELTA
              TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported')
              """)
    print("table created...")

In [0]:
from pyspark.sql import functions as F
#df.withColumn("session", col("session_id").substr(-5, 5)).select("session").display(3)
bronze_table = "workspace.bronze_click.bronze_raw_data"
df = spark.read.table(bronze_table)
df_new = df.filter(F.col("is_loaded")==0)

df_new = df.withColumn("UID",F.md5(F.concat_ws("|", "user_id", "ts")))\
            .withColumn("session",F.substring_index( F.col("session_id"),'_',-1))\
                  .withColumn("page",F.substring_index( F.col("page"),'_',-1))\
                      .withColumn("date",F.to_date(F.col("ts"), "MM/dd/yyyy"))\
                          .withColumn("hour",F.hour(F.col("ts")))\
                               .withColumn("minute",F.minute(F.col("ts")))\
                                    .withColumn("month",F.date_format(F.col("ts"),'MMMM'))\
                                        .withColumn("weekday", F.date_format("ts", "EEEE")) \
                                            .withColumn("file",F.substring_index(F.col("file_name"),'.',1))\
              .select("UID","session","page","user_id","device_type","latency_ms","date","hour","minute","month","weekday","file")

df_new.limit(3).display()

In [0]:
from delta import DeltaTable

silver_table = "workspace.silver_click.click_data"

silver_dlt = DeltaTable.forName(spark, silver_table)

#df_new.createOrReplaceTempView("bronze_data")
silver_dlt.alias("silver").merge(
    df_new.alias("bronze"),
    """
    silver.UID = bronze.UID
    """
).whenNotMatchedInsertAll()\
.execute()
print("Insert completed on silver...")


In [0]:
spark.sql(f"select * from {bronze_table} limit 5").display()
spark.sql(f"select * from {silver_table} limit 5").display()

In [0]:
from delta import DeltaTable
bronze_table = "workspace.bronze_click.bronze_raw_data"
bronze_dlt = DeltaTable.forName(spark, bronze_table)

bronze_dlt.alias("bronze").merge(
    df_new.alias("silver"),
    """
    bronze.UID = silver.UID
    """
).whenMatchedUpdate(
    set = { "is_loaded": F.lit(1) }
).execute()

print("Bronze table has been updated... ")