In [1]:
from pyspark.sql.functions import date_format, col, count, lit
from pyspark.sql.types import StringType
from datetime import datetime, date

In [2]:
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled","true")
spark.conf.set("spark.databricks.delta.properties.defaults.autoOptimize.autoCompact", "false")
spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")

In [28]:
def data_loader(Location, Format):
  
  if Location == None or Location == "":
    raise Exception("A Location must be included")
  if Format == None or Format == "":
    raise Exception("A Format must be included")
  
  df = spark.read.format(Format).options(header='true', inferSchema='true').load(Location)
    
  return(df)

In [29]:
DateToProcess = "2021-01-23"

In [30]:
DateToProcess = datetime.strptime(DateToProcess, "%Y-%m-%d").date()
DateToProcess_path = DateToProcess.strftime('%Y/%m/%d')
DateToProcess_str = DateToProcess.strftime('%Y%m%d')

In [31]:
raw_transaction = "/Example/Raw/Transaction/"+DateToProcess_path+"/Transaction_"+DateToProcess_str+".csv"
delta_transaction = "/Example/Prepared/Transaction/DELTA/"

In [32]:
df_transaction = data_loader(raw_transaction,'csv')
df_transaction = df_transaction.withColumn('year', date_format(col('date'),'yyyy').cast(StringType()))
df_transaction = df_transaction.withColumn('month', date_format(col('date'),'MM').cast(StringType()))
df_transaction = df_transaction.withColumn('day', date_format(col('date'),'dd').cast(StringType()))

df_transaction.createOrReplaceTempView('vw_Transaction')

In [36]:
%%sql

DROP TABLE IF EXISTS Example_Prepared_Transaction_DELTA

In [38]:
%%sql

CREATE TABLE Example_Prepared_Transaction_DELTA (
  `date` STRING,
  `transaction_id` STRING,
  `product_id` STRING,
  `product_name` STRING,
  `customer_id` STRING,
  `cost` DOUBLE,
  `currency` STRING,
  `quantity` INTEGER,
  `year` STRING,
  `month` STRING,
  `day` STRING
)
USING delta
PARTITIONED BY (year, month, day)
LOCATION '/Example/Prepared/Transaction/DELTA/'

In [39]:
%%sql
DESCRIBE Example_Prepared_Transaction_DELTA

In [40]:
%%sql
SELECT * FROM Example_Prepared_Transaction_DELTA

In [43]:
import delta

target = delta.DeltaTable.forPath(spark,"/Example/Prepared/Transaction/DELTA/")

(target.alias("target")
    .merge(df_transaction.alias("source"),
        "target.date = source.date \
        AND target.transaction_id = source.transaction_id \
        AND target.year = source.year \
        AND target.month = source.month \
        AND target.day = source.day"
    )
    .whenMatchedUpdate(set = {
        "product_id" : "source.product_id",
        "product_name" : "source.product_name",
        "customer_id" : "source.customer_id",
        "cost" : "source.cost",
        "currency" : "source.currency",
        "quantity" : "source.quantity",
    })
    .whenNotMatchedInsert(values = {
        "date" : "source.date",
        "transaction_id" : "source.transaction_id",
        "product_id" : "source.product_id",
        "product_name" : "source.product_name",
        "customer_id" : "source.customer_id",
        "cost" : "source.cost",
        "currency" : "source.currency",
        "quantity" : "source.quantity",
        "year" : "source.year",
        "month" : "source.month",
        "day" : "source.day",
    })
    .execute()
)