In [None]:
def create_container_path_dict(relative_path: str,  container_type: str,  filename:str):
    """
        This function creates the param dict for path that is consumed by function ->  get_adls_path_and_filename()
        :param relative_path: common folder ,  container_type: silver / bronze
        :return: dict of path parameters
    """
    # Define the today's date
    today = date.today()
    formatted_date = today.strftime("%Y/%m/%d")

    # Define path & parameters
    account_name_maindata   = f"{mssparkutils.env.getWorkspaceName()[8:15]}2"
    account_name_metadata   = f"{mssparkutils.env.getWorkspaceName()[8:15]}1"

    # Define the para dict
    path_to_destination_dict = {"account":f"adlsba{account_name_maindata}",
                                "folder_path":f"{relative_path}",
                                "filename": f"{filename}",
                                "date":f"{formatted_date}",
                                "container": f"{container_type}"}
    
    return path_to_destination_dict

In [None]:
def generate_dates(start_date_str, end_date_str, number_of_days_between):
    """
        function develops list of dates between given start date and end date
        :param start_date_str: start dates,  end_date_str: end_dates, number_of_days_between: interval between gives dates
        :return: list of dates
    """
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")

    dates_list = []

    while start_date <= end_date:
        # Add the current date to the list
        dates_list.append(start_date.strftime("%Y-%m-%d"))

        # Move to the first day of the next quarter
        start_date = start_date + timedelta(number_of_days_between)

    return dates_list

In [None]:
def list_files_in_directory(directory_path):
    """
    List files details in the given directory.

    Parameters:
        directory_path (str): Path to the directory.

    Returns:
        dict: A dictionary containing file details.
    """
    files = mssparkutils.fs.ls(directory_path)

    file_name = []
    is_dir = []
    is_file = []
    path = []
    size = []

    for file in files:
        file_name.append(file.name)
        is_dir.append(file.isDir)
        is_file.append(file.isFile)
        path.append(file.path)
        size.append(file.size)

    dir_dict = {
        "File Name": file_name,
        "Is Directory": is_dir,
        "Is File": is_file,
        "Path": path,
        "Size": size
    }

    return dir_dict

In [None]:
def df_shape(df):
    """
    Return the shape of Dataframe
    """
    return print(f"\n Dataframe Shape= Number of rows:{df.count()}, Number of columns:{len(df.columns)}")

In [None]:
# Ensure unique column names
def make_unique(column_list):
    counts = {}
    result = []
    for col in column_list:
        if col in counts:
            counts[col] += 1
            new_col = f"{col}_{counts[col]}"
        else:
            counts[col] = 0
            new_col = col
        result.append(new_col)
    return result

#### Alteryx to Pyspark

In [None]:
def join_and_union(left_df, right_df, left_col: str, right_col: str):
    """
    This function takes two dataframes and then proceeds for a left and inner join on them. Then performs unions both of the resultant datasets.
    """
    # Perform left join
    left_join_df = left_df.join(right_df, left_col, right_col, "left")

    # Perform inner join
    inner_join_df = left_df.join(right_df, left_col, right_col, "inner")

    # Union the resulting DataFrames
    union_df = left_join_df.union(inner_join_df)

    return union_df

In [None]:
def join_df_keep_columns(DF_L, DF_R, left_key, right_key, left_key_2, right_key_2,  type_of_join, keep_columns):
    """
    This function gives a DataFrame keeping only left/right DataFrame columns after joining two DataFrames, based on chosen DataFrame configuration.
    """
    # Alias the columns in the left DataFrame (df1) before the join
    df1_aliased = DF_L.alias("left")

    # Alias the columns in the right DataFrame (df2) before the join
    df2_aliased = DF_R.alias("right")

    if left_key_2==None and right_key_2==None:
        # Perform inner join between aliased DataFrames
        joined_df = df1_aliased.join(df2_aliased, (df1_aliased[left_key] == df2_aliased[right_key]), type_of_join)
    else:
        # Perform inner join between aliased DataFrames
        joined_df = df1_aliased.join(df2_aliased, (df1_aliased[left_key] == df2_aliased[right_key]) & (df1_aliased[left_key_2] == df2_aliased[right_key_2]), type_of_join)

    # Select only the columns from the left DataFrame (df1) after inner join
    if keep_columns == 'left':
        result_df = joined_df.select("left.*")
        result_df = result_df.select([F.col(x).alias(x.replace('left.', '')) for x in result_df.columns])
    else:
        result_df = joined_df.select("right.*")
        result_df = result_df.select([F.col(x).alias(x.replace('right.', '')) for x in result_df.columns])
    return result_df

In [None]:
def join_df_select_columns(DF_L, DF_R, left_key, right_key, type_of_join, keep_columns_left, keep_columns_right):
    """
    This function gives a DataFrame keeping only left/right DataFrame columns after joining two DataFrames, based on chosen DataFrame configuration.
    """
    # Alias the columns in the left DataFrame (DF_L) before the join
    df1_aliased = DF_L.alias("left")

    # Alias the columns in the right DataFrame (DF_R) before the join
    df2_aliased = DF_R.alias("right")

    # Perform inner join between aliased DataFrames
    join_condition = None
    for left_key, right_key in zip(left_key, right_key):
        condition = df1_aliased[left_key] == df2_aliased[right_key]
        if join_condition is None:
            join_condition = condition
        else:
            join_condition = join_condition & condition

    joined_df = df1_aliased.join(df2_aliased, join_condition, type_of_join)

    # Select only the specified columns from both DataFrames after inner join
    if keep_columns_left and keep_columns_right:
        left_cols = [col(f"left.{column}").alias(column) for column in keep_columns_left]
        right_cols = [col(f"right.{column}").alias(column) for column in keep_columns_right]
        result_df = joined_df.select(left_cols + right_cols)

    elif keep_columns_left:
        result_df = joined_df.select([col(f"left.{column}").alias(column) for column in keep_columns_left])
    elif keep_columns_right:
        result_df = joined_df.select([col(f"right.{column}").alias(column) for column in keep_columns_right])
    else:
        raise ValueError("At least one of keep_columns_left or keep_columns_right should be provided.")

    return result_df

In [None]:
def create_column(spark_df, new_column_name, starting_element_column, comparison_column):
    """
    This function creates a new column based on given condition.
    """
    # Extract the starting value from the DataFrame
    start_value = spark_df.select(starting_element_column).first()[starting_element_column]
    
    # Create a new DataFrame with the new column based on the condition
    result_df = spark_df.withColumn(new_column_name,
                                     when(col(comparison_column) < lit(start_value), col(starting_element_column))
                                     .otherwise(expr(f"date_add(`{starting_element_column}`, 1)")))
    
    return result_df

In [None]:
def create_alteryx_transpose(spark_df, key_columns):
    """
    This function replicates Alteryx transpose.
    """
    df=spark_df
    # Reshape DataFrame using melt and pivot
    id_vars = key_columns
    value_vars = [column for column in df.columns if column not in id_vars]

    melted_df = df.select(id_vars + [
        explode(array([
            struct(lit(c).alias("Name"), col(c).alias("Value")) for c in value_vars
        ])).alias("Agg_Value")
    ])

    # # Extract "Name" and "Value" from "Agg_Value"
    transposed_df = melted_df.select(
        *[col(var).alias(var) for var in id_vars],  # Rename id_vars columns
        col("Agg_Value").getItem("Name").alias("Name"),
        col("Agg_Value").getItem("Value").alias("Value")
    )

    return transposed_df

In [None]:
def create_alteryx_sample(spark_df, group_columns, select_method='first'):
    """
    This function replicates the Alteryx sample function.
    """
    from pyspark.sql.window import Window
    import pyspark.sql.functions as F

    # Define a window specification partitioned by the group columns and ordered by the same columns
    window_spec = Window.partitionBy(*group_columns).orderBy(F.lit(1))
    # Add row numbers to each row within the partition
    spark_df_with_row_number = spark_df.withColumn('row', F.row_number().over(window_spec))

    if select_method == 'first':
        # Get the first row for each group
        record_df = spark_df_with_row_number.filter(F.col('row') == 1).drop('row')
    elif select_method == 'last':
        # Get the last row for each group
        record_df = spark_df_with_row_number.selectExpr("*", "max(row) over (partition by `{}`) as max_row".format("`, `".join(group_columns))) \
                                                .filter((F.col('row') == F.col('max_row')) | (F.col('max_row').isNull())).drop('row', 'max_row')
    else:
        raise ValueError("Invalid select_method. Use 'first' or 'last'.")

    return record_df

In [None]:
def create_alteryx_cross_tab(spark_df, group_columns, pivot_column, value_column, method_agg):
    # Perform cross-tabulation
    if method_agg=="first":
        cross_tab_df = spark_df.groupBy(*group_columns).pivot(pivot_column).agg(first(value_column))
    elif method_agg=="last":
        cross_tab_df = spark_df.groupBy(*group_columns).pivot(pivot_column).agg(last(value_column))
    else:
        raise ValueError("Invalid select_method. Use 'first' or 'last'.")
    return cross_tab_df

In [None]:
def assert_columns_names(df_1, df_2):
    """
    This function checks two column names (headers) for two DataFrames and adds "None" value for missing columns, 
    and arranges the columns based on df_1.
    """
    # Find columns present in df_1 but not in df_2
    columns_not_in_df_2 = [col for col in df_1.columns if col not in df_2.columns]
    # Add missing columns with None values to df_2
    for col in columns_not_in_df_2:
        df_2 = df_2.withColumn(col, lit(None))
    
    # Find columns present in df_2 but not in df_1
    columns_not_in_df_1 = [col for col in df_2.columns if col not in df_1.columns]
    # Add missing columns with None values to df_1
    for col in columns_not_in_df_1:
        df_1 = df_1.withColumn(col, lit(None))
    
    # Arrange columns in df_2 to match the order of df_1
    df_2 = df_2.select(*df_1.columns)
    
    return df_1, df_2

In [None]:
def create_alteryx_unique(spark_df, column_name_list):
    """
    This func creates a unique df similiar as alteryx.
    """
    # Add a monotonically increasing ID column
    df = spark_df.withColumn('row_id', F.monotonically_increasing_id())

    # Define a window specification for ordering by the row_id
    window_spec = Window.partitionBy(*column_name_list).orderBy('row_id')

    # Assign row numbers based on the window specification
    unique_df = df.withColumn('row_num', F.row_number().over(window_spec))

    # Select only rows with row_num = 1 (first occurrence of each unique row)
    unique_df = unique_df.filter('row_num = 1').drop('row_id', 'row_num')

    return unique_df

In [None]:
def assert_sql_type_column_name(spark_df):
    from pyspark.sql.functions import col
    import re
    '''
    Assert SQL type column names by replacing spaces with underscores
    '''
    def clean_column_name(col_name):
        # Replace any character that is not a-z, A-Z, 0-9, or underscore with an underscore
        clean_name = re.sub(r'[^a-zA-Z0-9_]', '_', col_name.strip())
        # Replace multiple underscores with a single underscore
        clean_name = re.sub(r'_+', '_', clean_name)
        return clean_name

    # Use map to create a list of transformed columns
    cols = list(map(lambda col_name: col(col_name).alias(clean_column_name(col_name)), spark_df.columns))
    
    # Select these columns from the DataFrame
    spark_df = spark_df.select(*cols)
    
    return spark_df

### SCD 2 Merge in Delta Lake (Change tracking)

In [None]:
# import base64  # Ensure base64 is correctly imported

def create_sql_hash_key(uid_str: str, encode=False, decode=False, encode_std=None):
    """
    Creates a hash key using base64 encoding on a concatenated string.
    
    Parameters:
    uid_str: The string to encode or decode.
    encode: If True, the function will encode the string to base64.
    decode: If True, the function will decode the base64 string back to its original form.
    encode_std: utf-8/ ascii etc
    
    Returns:
    The encoded or decoded string based on the flag provided.
    """
    try:
        if uid_str is not None and encode:
            # ASCII encode (8 bit)
            sample_string_bytes = uid_str.encode(encode_std)
            # Convert byte string to base64 bytes
            base64_bytes = base64.b64encode(sample_string_bytes)
            # Decode the base64 bytes to a string
            base64_string = base64_bytes.decode(encode_std)
            return base64_string
        elif uid_str is not None and decode:
            # ASCII encode (8 bit)
            base64_bytes = uid_str.encode(encode_std)
            # Decode the base64 bytes back to the original string bytes
            sample_string_bytes = base64.b64decode(base64_bytes)
            # Decode the bytes to a string
            sample_string = sample_string_bytes.decode(encode_std)
            return sample_string
        else:
            raise ValueError("Check uid_str, encode, decode values or import python lib: base64")
    except Exception as e:
        print(f"Error:\n{e}")
        return None

In [None]:
def scd2_merge_df(bronze_spark_df, silver_delta_table, id_col, hash_col):
    """
    Merge the bronze DataFrame into the silver Delta table using SCD2 type for change tracking.
    """

    # Add change tracking columns to the bronze DataFrame
    bronze_spark_df = bronze_spark_df.withColumn("start_date", lit(None).cast("timestamp")) \
                                     .withColumn("end_date", lit(None).cast("timestamp")) \
                                     .withColumn("changed_column", lit(None).cast("string")) \
                                     .withColumn("record_flag", lit(None).cast("string"))\
                                     .withColumn("old_hash_col", lit(None).cast('string'))
    
    # Define merge condition
    merge_condition = f"silver.{id_col} = bronze.{id_col} AND silver.{hash_col} = bronze.{hash_col}"
    
    # Merge operation with SCD2 logic
    silver_delta_table.alias("silver").merge(
        bronze_spark_df.alias("bronze"),
        merge_condition
    ).whenNotMatchedInsertAll().execute()
    
    # Convert the updated Delta table back to a Spark DataFrame
    silver_spark_df = silver_delta_table.toDF()

    # Drop old has column
    silver_spark_df = silver_spark_df.drop("old_hash_col")

    # Change start date
    # Update the start_date for records where record_flag is "Current"
    silver_spark_df = silver_spark_df.withColumn( "start_date", 
                                                 when(col("record_flag") == "Current", col("start_date")
                                                      ).otherwise(current_timestamp())
                                                )
    
    # Define window for partitioning by ID with ordering
    custom_window = Window.partitionBy(id_col).orderBy("start_date")
    
    # Track changes using hash column
    silver_spark_df = silver_spark_df.withColumn("old_hash_col", lag(col(hash_col)).over(custom_window))

    # Update end date
    silver_spark_df = silver_spark_df.withColumn( "end_date", lead(col("start_date")).over(custom_window))

    # update record_flag
    silver_spark_df = silver_spark_df.withColumn( "record_flag", 
                                                    when(col("end_date").isNull(), 
                                                    lit("Current")).otherwise(lit("Historical"))
                                                )

    return silver_spark_df

In [None]:
def check_changed_columns_for_SCD2(hash_key_1: str, hash_key_2: str, delimiter: str, columns_list: list):
    """
    This function compares two strings (hash_key_1 and hash_key_2), splits them using a delimiter,
    and returns a list of columns whose values have changed between the two hash keys.
    """
    values_list_1 = hash_key_1.split(delimiter)
    values_list_2 = hash_key_2.split(delimiter)

    # define empty obj
    column_dict = {}
    changed_columns = []

    # run validation
    if len(values_list_1) == len(values_list_2):
        if len(values_list_1) == len(columns_list):
            # Map values to columns
            for values_1, values_2, col_name in zip(values_list_1, values_list_2, columns_list):
                column_dict[col_name] = [values_1, values_2]

            # Check for changed values
            for col, val in column_dict.items():
                if val[0] != val[1]:
                    changed_columns.append(col)

            return changed_columns
        else:
            raise ValueError("Hashed column values and column list don't match in length")
    else:
        raise ValueError("The two hashed column values don't match in length")