In [0]:
from pyspark.sql.functions import current_timestamp

# Function to add an ingestion date column to the DataFrame
def add_ingestion_date(input_df):
  # Add a new column "ingestion_date" with the current timestamp
  output_df = input_df.withColumn("ingestion_date", current_timestamp())
  return output_df

In [0]:
def re_arrange_partition_column(input_df, partition_column):
  # Initialize an empty list to hold column names
  column_list = []
  
  # Iterate through the DataFrame's column names
  for column_name in input_df.schema.names:
    # Add columns to the list except the partition column
    if column_name != partition_column:
      column_list.append(column_name)
  
  # Append the partition column at the end of the list
  column_list.append(partition_column)
  
  # Select columns in the new order and return the DataFrame
  output_df = input_df.select(column_list)
  return output_df

In [0]:
def overwrite_partition(input_df, db_name, table_name, partition_column):
  # Re-arrange columns to ensure partition column is at the end
  output_df = re_arrange_partition_column(input_df, partition_column)
  
  # Enable dynamic partition overwrite mode
  spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
  
  # Check if the table exists
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    # Overwrite existing partitions in the table
    output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
  else:
    # Create a new table with partitioning
    output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")

In [0]:
def df_column_to_list(input_df, column_name):
  # Select the specified column and get distinct values
  df_row_list = input_df.select(column_name).distinct().collect()
  
  # Extract the column values from the rows and return as a list
  column_value_list = [row[column_name] for row in df_row_list]
  return column_value_list

In [0]:
def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition, partition_column):
  # Enable dynamic partition pruning for optimization
  spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")

  from delta.tables import DeltaTable
  
  # Check if the Delta table exists
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    # Load the existing Delta table
    deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
    
    # Perform merge operation (upsert) using the provided merge condition
    deltaTable.alias("tgt").merge(
        input_df.alias("src"),
        merge_condition) \
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()
  else:
    # Create a new Delta table with partitioning
    input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")