# Global Functions

In [0]:
# Import Modules

from delta.tables import DeltaTable
from pyspark.sql.functions import current_timestamp

In [0]:
# Add Ingestion Date

def add_ingestion_date(input_df):
    output_df = input_df.withColumn('ingestion_date', current_timestamp())

    return output_df

In [0]:
# Re-Arrange Parition Column

def re_arrange_partition_column(input_df, partition_column):
  column_list = []

  for column_name in input_df.schema.names:
    if column_name != partition_column:
      column_list.append(column_name)

  column_list.append(partition_column)

  output_df = input_df.select(column_list)
  
  return output_df

In [0]:
# Overwrite Partition

def overwrite_partition(input_df, db_name, table_name, partition_column):
  output_df = re_arrange_partition_column(input_df, partition_column)

  spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
  
  if (spark._jsparkSession.catalog().tableExists(f'{db_name}.{table_name}')):
    output_df.write.mode('overwrite').insertInto(f'{db_name}.{table_name}')
  else:
    output_df.write.mode('overwrite').partitionBy(partition_column).format('parquet').saveAsTable(f'{db_name}.{table_name}')

In [0]:
# Merge Non-Partitioned Delta Data

def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition):
    from delta.tables import DeltaTable
    spark.conf.set('spark.databricks.optimizer.dynamicPartitionPruning', 'true')

    if (spark._jsparkSession.catalog().tableExists(f'{db_name}.{table_name}')):
        deltaTable = DeltaTable.forPath(spark, f'{folder_path}/{table_name}')

        deltaTable.alias('tgt').merge(
            input_df.alias('src'),
            merge_condition
        ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

    else:
        input_df.write.mode('overwrite').format('delta').saveAsTable(f'{db_name}.{table_name}')   

In [0]:
# Merge Partitioned Delta Data

def merge_partitioned_delta_data(input_df, db_name, table_name, folder_path, partition_column, merge_condition):
    from delta.tables import DeltaTable
    spark.conf.set('spark.databricks.optimizer.dynamicPartitionPruning', 'true')

    if (spark._jsparkSession.catalog().tableExists(f'{db_name}.{table_name}')):
        deltaTable = DeltaTable.forPath(spark, f'{folder_path}/{table_name}')

        deltaTable.alias('tgt').merge(
            input_df.alias('src'),
            merge_condition
        ) \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()

    else:
        input_df.write.mode('overwrite').partitionBy(partition_column).format('delta').saveAsTable(f'{db_name}.{table_name}')   