In [0]:
from pyspark.sql.functions import col, explode, from_json, to_date, year, month
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from delta.tables import DeltaTable

bronze_table = "oliv_mitai_uc.bronze.olive_mitai_sales"
silver_table = "oliv_mitai_uc.silver.sales_silver"
invalid_table = "oliv_mitai_uc.silver.invalid_sales"

schema = StructType([
    StructField("bill_datetime", StringType(), True),
    StructField("bill_no", StringType(), True),
    StructField("store_code", StringType(), True),
    StructField("store_name", StringType(), True),
    StructField("address_line1", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("gstin", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("fssai_no", StringType(), True),
    StructField("cashier_code", StringType(), True),
    StructField("cashier_name", StringType(), True),
    StructField("counter_name", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("items", ArrayType(StructType([
        StructField("sku", StringType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("uom", StringType(), True),
        StructField("hsn_code", StringType(), True),
        StructField("quantity", DoubleType(), True),
        StructField("rate", DoubleType(), True),
        StructField("amount", DoubleType(), True),
        StructField("tax_percent", DoubleType(), True),
        StructField("tax_amount", DoubleType(), True)
    ])), True)
])

df_bronze = spark.table("oliv_mitai_uc.bronze.olive_mitai_sales")

# df_parsed = df_bronze.withColumn("data", from_json(col("value"), schema)).select("data.*")
# display(df_parsed)

df_flat = df_bronze.withColumn("item", explode("items")).select(
    "bill_no", "bill_datetime",
    "store_code", "store_name", "address_line1", "city", "state", "postal_code",
    "gstin", "phone", "fssai_no",
    "cashier_code", "cashier_name", "counter_name",
    "payment_method", "total_amount",
    col("item.sku").alias("sku"),
    col("item.product_name").alias("product_name"),
    col("item.category").alias("category"),
    col("item.uom").alias("uom"),
    col("item.hsn_code").alias("hsn_code"),
    col("item.quantity").alias("quantity"),
    col("item.rate").alias("rate"),
    col("item.amount").alias("amount"),
    col("item.tax_percent").alias("tax_percent"),
    col("item.tax_amount").alias("tax_amount")
)

df_flat = df_flat.withColumn("bill_date", to_date("bill_datetime"))
df_flat = df_flat.withColumn("year", year("bill_date")).withColumn("month", month("bill_date"))
# display(df_flat)

df_valid = df_flat.filter(
    col("bill_no").isNotNull() &
    col("store_code").isNotNull() &
    (col("total_amount") > 0) &
    (col("quantity") > 0)
)

df_invalid = df_flat.subtract(df_valid)

df_valid = df_valid.dropDuplicates(["bill_no", "product_name"])
df_invalid = df_invalid.dropDuplicates(["bill_no", "product_name"])

def upsert_or_create(table_name, df, merge_keys, partition_cols=None):
    db = ".".join(table_name.split(".")[:-1])
    tbl = table_name.split(".")[-1]
    existing_tables = [t.name for t in spark.catalog.listTables(db)]

    if tbl not in existing_tables:
        writer = df.write.format("delta").mode("overwrite")
        if partition_cols:
            writer = writer.partitionBy(partition_cols)
        writer.option("mergeSchema", "true").saveAsTable(table_name)
        print(f"Created table and loaded initial data (partitioned by {partition_cols}): {table_name}")
    else:
        delta_tbl = DeltaTable.forName(spark, table_name)
        merge_condition = " AND ".join([f"t.{k} = s.{k}" for k in merge_keys])
        (
            delta_tbl.alias("t")
            .merge(df.alias("s"), merge_condition)
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
            .execute()
        )
        print(f"Merge completed for table: {table_name}")

upsert_or_create(silver_table, df_valid, ["bill_no", "product_name"], partition_cols=["year", "month", "store_code"])
upsert_or_create(invalid_table, df_invalid, ["bill_no", "product_name"], partition_cols=["year", "month", "store_code"])

print("Bronze â†’ Silver ETL completed successfully with partitioning.")


In [0]:
%sql
select * from oliv_mitai_uc.silver.sales_silver;

In [0]:
%sql
select * from oliv_mitai_uc.silver.invalid_sales;