In [0]:
import pyspark.sql.functions as f

In [0]:
def file_copy(
    source_file_path:str,
    target_file_path:str
    ):
    """
    Copies a file from the specified source path to the target path.

    Args:
        source_file_path (str): The path to the source file to be copied.
        target_file_path (str): The destination path where the file will be copied.
    
    """
    dbutils.fs.cp( source_file_path , target_file_path )                                    
    print(f"File Copied from {source_file_path } to : {target_file_path}")

In [0]:
def read_data(
    source_file_path:str,
    source_file_format:str,
    read_options:dict={
        "header": "true",
        "inferSchema": "true"
    },
    lower_case:bool=True,
    audit_columns:bool=True):
    """
    Reads data from the specified source file path and loads it into a Spark DataFrame.

    Args:
        source_file_path (str): The path to the source data file.
        source_file_format (str): The file format of source data to read (e.g., 'parquet','csv').
        read_options (dict): Dictionary of read options to apply when loading the data. Defaults header & inferSchema to True
        lower_case (bool, optional): If True, converts column names to lower case. Defaults to True.
        audit_columns (bool, optional): If True, adds audit columns such as source filename and insert timestamp. Defaults to True.

    Returns:
        DataFrame: Spark DataFrame containing the loaded data.
    """
    
    print(f"============== Reading from {source_file_path} ==============")

    if source_file_format == 'parquet':
        df = spark.read.options(**read_options).parquet(source_file_path)

    elif source_file_format == 'csv':
        df = spark.read.options(**read_options).csv(source_file_path)

    else:
        raise Exception("Not a valid or supported file format")

    if lower_case:
        df = df.selectExpr([ f"`{c}` as `{c.lower()}`" for c in df.columns ])
    
    if audit_columns:
        df = df.withColumn('source_filename',f.expr('_metadata.file_path'))\
               .withColumn('insert_timestamp',f.current_timestamp())

    print(f"============== Read from {source_file_path} ==============")

    return df

In [0]:
def write_data(
    df,
    target_file_format:str,
    target_file_path:str,
    mode_type:str,
    write_options:dict={
        "mode": "overwrite"
    }
    ):
    """
    Writes data from a Spark DataFrame to the specified target file path.

    Args:
        df (DataFrame): Spark DataFrame containing the data to be written.
        target_file_path (str): The path to the target file where the data will be written.
        target_file_format (str) : The file format of target data to write (e.g., 'parquet','csv').
        mode_type (str) : The mode of writing the data. Defaults to overwrite.
        write_options (dict): Dictionary of write options to apply when writing the data. Defaults to mode overwrite.
    """
    print(f"============== Writing to {target_file_path} ==============")

    if target_file_format == 'delta':
        df.write.format(target_file_format).mode(mode_type).options(**write_options).saveAsTable(target_file_path)

    else:
        df.write.format(target_file_format).mode(mode_type).options(**write_options).save(target_file_path)
    
    print(f"============== Wrote to {target_file_path} ==============")