In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, substring, to_date, lit
from dateutil.relativedelta import relativedelta
from datetime import datetime
import calendar
import pandas as pd

# Function to load reference tables as temporary views
def load_reference_tables(db_path: str = "/user/hive/warehouse/delta_nip.db/"):
    """
    Loads reference Delta tables and registers them as temporary views.

    Parameters:
    - db_path: Base path to the delta database directory

    Outputs:
    - None, but creates temporary views for each table
    """

    tables = {
        "ref_organization": "delta_ref_org",
        "ref_lob": "delta_ref_lob",
        "ref_cov_desc": "delta_ref_cov_desc"
    }

    for table_name, view_name in tables.items():
        df = spark.read.format("delta").load(f"{db_path}{table_name}")
        df.createOrReplaceTempView(view_name)
        print(f"View '{view_name}' created from '{table_name}'")

# Function to get and format the as_of value and the target file suffixed by the fommatted date
def calculate_as_of_date(time_machine=1):
    """
    Calculate the 'as of' date by going back a specified number of months from the current date.

    Parameters:
    - time_machine (int): Number of months to go back from the current date. Default is 1.

    Returns:
    - str: The 'date suffix' in the format 'YYYYMMDD' and 'as of' date in 'YYYY-MM-DD' format.
    """

    # Get current date
    current_date = datetime.now()

    # Calculate target date by subtracting months (handles year transitions properly)
    target_date = current_date - relativedelta(months=time_machine)

    # Extract month and year from target date
    target_month = target_date.month
    target_year = target_date.year

    # Get the last day of the target month
    last_day = calendar.monthrange(target_year, target_month)[1]

    # Format the date strings
    date_suffix = f"{target_year:04d}{target_month:02d}{last_day:02d}"
    as_of = datetime.strptime(date_suffix, '%Y%m%d').strftime('%Y-%m-%d')

    return date_suffix, as_of

# Function to get the file path based on the table name, date suffix, and book type
def get_file_path(table_name, date_suffix, book="default"):
    """
    Get the file path based on the table name, date suffix, and book type.

    Parameters:
    - table_name (str): The name of the table.
    - date_suffix (str): The date suffix in the format 'YYYYMMDD'.
    - book (str): The book type ("jif", "program", or "default"). Default is "default".

    Returns:
    - str: The file path to the file.
    """

    file_map = {
        # Files prior to split between JIF and Programs
        "default": {
            "claims": f"I2I_CLAIM_NIP_{date_suffix}.CSV",
            "exposures": f"I2I_EXPOSURE_NIP_{date_suffix}.CSV",
            "organization": f"I2I_ORGANIZATION_NIP_{date_suffix}.CSV",
            "policy": f"I2I_POLICY_NIP_{date_suffix}.CSV",
            "premium": f"I2I_PREMIUM_NIP_{date_suffix}.CSV"
        },
        # JIF files
        "jif": {
            "claims": f"I2I_JIF_CLAIM_NIP_{date_suffix}.CSV",
            "exposures": f"I2I_JIF_EXPOSURE_NIP_{date_suffix}.CSV",
            "organization": f"I2I_JIF_ORGANIZATION_NIP_{date_suffix}.CSV",
            "policy": f"I2I_JIF_POLICY_NIP_{date_suffix}.CSV",
            "premium": f"I2I_JIF_PREMIUM_NIP_{date_suffix}.CSV"
        },
        # Programs files
        "program": {
            "claims": f"I2I_Programs_CLAIM_NIP_{date_suffix}.CSV",
            "exposures": f"I2I_Programs_EXPOSURE_NIP_{date_suffix}.CSV",
            "organization": f"I2I_Programs_ORGANIZATION_NIP_{date_suffix}.CSV",
            "policy": f"I2I_Programs_POLICY_NIP_{date_suffix}.CSV",
            "premium": f"I2I_Programs_PREMIUM_NIP_{date_suffix}.CSV"
        }
    }

    book = book if book in ["jif", "program"] else "default"
    file_name = file_map[book].get(table_name)

    files = dbutils.fs.ls("/mnt/sas")

    if any(file_info.name == file_name for file_info in files):
        return f"/mnt/sas/{file_name}"
    
    return "File not found"

# Function to convert SQL types to Spark types with precision/scale
def sql_to_spark_type(row):
    """
    Converts SQL types to Spark types with precision/scale.

    Parameters:
    - row: A row from the SQL Server metadata DataFrame.

    Returns:
    - A Spark type object.
    """

    sql_type = row["DATA_TYPE"].lower()
    is_nullable = row["IS_NULLABLE"] == "YES"
    
    # Handle different data types with their specific attributes 
    if sql_type in ["nvarchar", "char"]:
        # If length is specified use it, otherwise use a reasonable default
        length = row["CHARACTER_MAXIMUM_LENGTH"] 
        if pd.isna(length) or length == -1:  # -1 often means MAX
            return StringType()
        else:
            return StringType()  # Spark doesn't enforce string length
    
    # Map other types
    type_map = {
        "float": FloatType(),
        "date": DateType()
    }
    
    return type_map.get(sql_type, StringType())

# Function that builds Spark schema based on SQL Server metadata
def build_spark_schema(schema_pd):
    """
    Builds a Spark schema based on the SQL Server metadata.

    Parameters:
    - schema_pd: A pandas DataFrame containing the SQL Server metadata.

    Returns:
    - A list of StructField objects representing the Spark schema.
    """

    schema_fields = []
    for _, row in schema_pd.iterrows():
        is_nullable = row["IS_NULLABLE"] == "YES"
        field = StructField(
            row["COLUMN_NAME"], 
            sql_to_spark_type(row), 
            nullable=is_nullable
        )
        schema_fields.append(field)
    return schema_fields

# Function to align the schema of the import_01 DataFrame to the target import_02 delta table
def align_schema(df, target_df):
    """ Aligns the schema of the input DataFrame to match the target DataFrame's schema.

    Parameters:
    - df (DataFrame): The input DataFrame to align.
    - target_df (DataFrame): The target DataFrame whose schema to match.

    Returns:
    - DataFrame: A new DataFrame with the aligned schema.
    """
    # Get column lists for both DataFrames
    target_columns = set(target_df.columns)
    joined_columns = set(df.columns)

    # Build select expressions that match the target schema
    select_expressions = []

    for field in target_df.schema.fields:
        field_name = field.name

        if field_name in joined_columns:
            # Column exists, cast it to match the target schema
            select_expressions.append(
                col(field_name).cast(field.dataType).alias(field_name)
            )
        else:
            # Column doesn't exist, add a null with correct type
            select_expressions.append(
                lit(None).cast(field.dataType).alias(field_name)
            )

    # Apply the select expressions to create a DataFrame with matching schema
    return df.select(*select_expressions)