In [0]:
from pyspark.sql.functions import current_timestamp
def add_ingestion_date(input_df):
    """
    Adds an ingestion date column to the input DataFrame.

    This function adds a new column named "ingest_date" to the provided DataFrame. 
    The new column contains the current timestamp at the time of the function call, 
    representing the date and time when the data was ingested.

    Args:
    input_df (DataFrame): The input DataFrame to which the ingestion date column will be added.

    Returns:
    DataFrame: A new DataFrame with an additional column "ingest_date" containing the current timestamp.
    """
    output=input_df.withColumn("ingest_date",current_timestamp())
    return output

In [0]:
def re_arrange_partition_column(input_df, partition_column):
  """
  Rearranges the columns of a DataFrame to place a specified partition column at the end.

  This function takes an input DataFrame and a partition column name as arguments. It then reorders the columns in
  the DataFrame so that the specified partition column is moved to the end, while preserving the order of the other columns.

  Parameters:
  input_df (DataFrame): The input DataFrame whose columns are to be rearranged.
  partition_column (str): The name of the column that should be moved to the end of the DataFrame.

  Returns:
  DataFrame: A new DataFrame with the columns reordered such that the specified partition column is at the end.
  """
  column_list = []
  for column_name in input_df.schema.names:
    if column_name != partition_column:
      column_list.append(column_name)
  column_list.append(partition_column)
  output_df = input_df.select(column_list)
  return output_df


In [0]:
def overwrite_partition(input_df, db_name, table_name, partition_column):
  """
  Merges data from the input DataFrame into a Delta table. If the Delta table already exists, performs a merge operation
  based on the specified merge condition. If the table does not exist, creates a new Delta table from the input DataFrame.
  
  Parameters:
  - input_df (DataFrame): The DataFrame containing the data to be merged into the Delta table.
  - db_name (str): The name of the database where the Delta table resides or will be created.
  - table_name (str): The name of the Delta table to merge data into or create.
  - folder_path (str): The path to the folder where the Delta table data is stored.
  - merge_condition (str): The SQL-like condition used to match records between the input DataFrame and the Delta table.
  - partition_column (str): The column used to partition the Delta table when it is created.
    
  Returns:
  None
  """
  output_df = re_arrange_partition_column(input_df, partition_column)
  spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    output_df.write.mode("overwrite").insertInto(f"{db_name}.{table_name}")
  else:
    output_df.write.mode("overwrite").partitionBy(partition_column).format("parquet").saveAsTable(f"{db_name}.{table_name}")

In [0]:
from delta.tables import DeltaTable
def merge_delta_data(input_df, db_name, table_name, folder_path, merge_condition, partition_column):
  """
  Merges data from the input DataFrame into a Delta table. If the Delta table already exists, performs a merge operation
  based on the specified merge condition. If the table does not exist, creates a new Delta table from the input DataFrame.
    
  Parameters:
  - input_df (DataFrame): The DataFrame containing the data to be merged into the Delta table.
  - db_name (str): The name of the database where the Delta table resides or will be created.
  - table_name (str): The name of the Delta table to merge data into or create.
  - folder_path (str): The path to the folder where the Delta table data is stored.
  - merge_condition (str): The SQL-like condition used to match records between the input DataFrame and the Delta table.
  - partition_column (str): The column used to partition the Delta table when it is created.
    
  """
  spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true")
  if (spark._jsparkSession.catalog().tableExists(f"{db_name}.{table_name}")):
    deltaTable = DeltaTable.forPath(spark, f"{folder_path}/{table_name}")
    deltaTable.alias("tgt").merge(
        input_df.alias("src"),
        merge_condition) \
      .whenMatchedUpdateAll()\
      .whenNotMatchedInsertAll()\
      .execute()
  else:
    input_df.write.mode("overwrite").partitionBy(partition_column).format("delta").saveAsTable(f"{db_name}.{table_name}")

In [0]:
def get_race_year_list(input_df,column_name):
    """
    Extracts a list of distinct values from a specified column in a DataFrame.

    Parameters:
    input_df (DataFrame): The input DataFrame from which to extract distinct values.
    column_name (str): The name of the column to extract distinct values from.

    Returns:
    list: A list containing distinct values from the specified column.
    """
    output_df=input_df.select(column_name).distinct().collect()
    race_year_list=[year.column_name for year in output_df ]
    return race_year_list