In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, countDistinct, when, isnan, count, first
from pyspark.sql.types import IntegerType, StringType, DoubleType, FloatType
from pyspark.sql import functions as F
import time
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, Imputer, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from xgboost.spark import SparkXGBClassifier

import os
import sys


os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['HADOOP_HOME'] = r'C:\Users\timj3\hadoop'
os.environ['hadoop.home.dir'] = r'C:\Users\timj3\hadoop'

In [26]:
## Define functions

def initialize_spark():
    """
    Initialize and return a Spark session.
    """
    spark = SparkSession.builder \
        .appName("MortageGroupProject") \
        .master("local[*]") \
        .config("spark.eventLog.enabled", "true") \
        .config("spark.eventLog.dir", r"\Users\timj3\SparkLogs")  \
        .config("spark.eventLog.gcMetrics.youngGenerationGarbageCollectors", "G1 Young Generation") \
        .config("spark.eventLog.gcMetrics.oldGenerationGarbageCollectors", "G1 Old Generation") \
        .config("spark.sql.codegen.wholeStage", False) \
        .getOrCreate()
    
#         %%configure -f
#         {
#         "driverMemory": "1000M",
#         "executorMemory": "2000M",
#         "executorCores": 1,
#         "numExecutors": 10
#         }
    
    #spark = SparkSession.builder \
#    .appName("MortageGroupProject") \
#    .master("local[*]") \
#    .config("spark.driver.memory", "4g") \
#    .config("spark.sql.shuffle.partitions", "8") \
#    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
#    .config("spark.memory.fraction", "0.6") \
#    .config("spark.memory.offHeap.enabled", "true") \
#    .config("spark.memory.offHeap.size", "1g") \
#    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#    .config("spark.eventLog.enabled", "true") \
#    .config("spark.eventLog.dir", "/Users/mattcurtis/Documents/CS 5644/SparkLogs") \
#    .config("spark.sql.codegen.wholeStage", False) \
#    .getOrCreate()

    # Hide all the warnings
    spark.sparkContext.setLogLevel("ERROR")

    return spark

    #spark = SparkSession.builder \
#    .appName("MortageGroupProject") \
#    .master("local[*]") \
#    .config("spark.driver.memory", "4g") \
#    .config("spark.sql.shuffle.partitions", "8") \
#    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
#    .config("spark.memory.fraction", "0.6") \
#    .config("spark.memory.offHeap.enabled", "true") \
#    .config("spark.memory.offHeap.size", "1g") \
#    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
#    .config("spark.eventLog.enabled", "true") \
#    .config("spark.eventLog.dir", "/Users/mattcurtis/Documents/CS 5644/SparkLogs") \
#    .config("spark.sql.codegen.wholeStage", False) \
#    .getOrCreate()

    # Hide all the warnings
    spark.sparkContext.setLogLevel("ERROR")

    return spark

def print_execution_time(start_time, task_name):
    """
    Prints the execution time for a given task in HH:MM:SS format.
    
    :param start_time: The start time in seconds (usually obtained using time.time()).
    :param task_name: A string representing the name of the task.
    """
    end_time = time.time()  # Record the current time as the end time
    execution_time_seconds = end_time - start_time  # Calculate the execution time in seconds

    # Convert seconds to HH:MM:SS format
    minutes, seconds = divmod(execution_time_seconds, 60)
    hours, minutes = divmod(minutes, 60)
    print(f"{task_name} took: {int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}")
    
def print_column_names_and_types(df: DataFrame):
    """
    Print the names and data types of all columns in a Spark DataFrame.

    :param df: The DataFrame whose column names and data types are to be printed.
    """
    for column, datatype in df.dtypes:
        print(f"Column: {column}, Type: {datatype}")
        
def print_row_count(df: DataFrame):
    """
    Print the total number of rows in a Spark DataFrame.

    :param df: The DataFrame for which to count and print the number of rows.
    """
    row_count = df.count()
    print(f"Total number of rows in the DataFrame: {row_count:,}")
    
def show_unique_values(data: DataFrame, column_name: str):
    """
    Displays unique values of a specified column in a Spark DataFrame.

    Parameters:
    data (DataFrame): The Spark DataFrame to analyze.
    column_name (str): The name of the column for which to show unique values.
    """
    unique_values = data.select(column_name).distinct()
    unique_values.show()
    
def print_dataframe_info(data: DataFrame):
    """
    prints list of columns in dataframe

    Parameters:
    data (DataFrame): The Spark DataFrame to analyze.
    """
    # Printing the columns
    print("Columns left:")
    print(sorted(data.columns))
    print("Number of Columns", len(data.columns))
    
def drop_columns(df: DataFrame, columns_to_drop: list) -> DataFrame:
    """
    Drop specified columns from a PySpark DataFrame.

    :param df: The DataFrame from which to drop columns.
    :param columns_to_drop: A list of column names to drop.
    :return: A DataFrame with the specified columns removed.
    """
    for col in columns_to_drop:
        if col in df.columns:
            df = df.drop(col)
        else:
            print(f"Column '{col}' not found in DataFrame. drop_columns")
    return df

def remove_na_rows(df: DataFrame, columns_to_drop: list) -> DataFrame:
    """
    Drops rows with the nulls values.

    :param df: The DataFrame from which to drop columns.
    :param columns_to_drop: A list of column names to drop.
    :return: A DataFrame with the specified columns removed.
    """
    data_columns = data.columns
    for value in columns_to_drop[:]:
        if value not in data_columns:
            print(f"Column {value} not found in DataFrame. remove_na_rows")
            columns_to_drop.remove(value)
    return df.dropna(subset=columns_to_drop)

def remove_exempt(df: DataFrame, columns_to_drop: list) -> DataFrame:
    """
    Drops rows that are 'Exempt'.

    :param df: The DataFrame from which to drop columns.
    :param columns_to_drop: A list of column names to drop.
    :return: A DataFrame with the specified columns removed.
    """
    for column in columns_to_drop:
        if column in df.columns:
            df = df.filter(df[column] != "Exempt")
        else:
            print(f"Column '{column}' not found in DataFrame. remove_exempt")
    return df

def balance_classes(df: DataFrame, column: str, target_count: int) -> DataFrame:
    """
    Balance the number of rows for two classes in a PySpark DataFrame.

    :param df: The DataFrame to balance.
    :param column: The column name for the class labels.
    :param target_count: The target number of rows for each class.
    :return: A balanced DataFrame.
    """
    # Separate the DataFrame into two based on the class
    df_class_0 = df.filter(F.col(column) == 0)
    df_class_1 = df.filter(F.col(column) == 1)
    
    # Calculate the fraction to sample from the larger class
    fraction = target_count / df_class_1.count()
    fraction = min(1.0, fraction)  # Ensure the fraction is not more than 1

    # Sample from the larger class to match the count of the smaller class
    df_class_1_sampled = df_class_1.sample(withReplacement=False, fraction=fraction)

    # Combine the DataFrames
    return df_class_0.unionAll(df_class_1_sampled)

def count_unique_values(df: DataFrame, column_name: str) -> DataFrame:
    """
    Count the occurrences of each unique value in a specified column of a PySpark DataFrame.

    :param df: The DataFrame to analyze.
    :param column_name: The name of the column to count unique values for.
    :return: A DataFrame with counts of each unique value in the specified column.
    """
    # Group by the specified column and count occurrences
    value_counts = df.groupBy(column_name).count()

    # Sort by the specified column for better readability
    value_counts = value_counts.orderBy(column_name)

    return value_counts

def impute_and_replace_columns(df: DataFrame, columns_to_impute: list) -> DataFrame:
    """
    Impute missing values in specified columns of a DataFrame and replace the original columns with the imputed ones.

    :param df: The DataFrame to impute.
    :param columns_to_impute: List of column names to impute.
    :return: A DataFrame with imputed values, where specified columns are replaced by their imputed versions.
    """
    data_columns = data.columns
    for value in columns_to_impute[:]:
        if value not in data_columns:
            print(f"Column not in DataFrame: {value}. Imputer")
            one_hot_encoded_columns.remove(value)
    
    
    # Create the imputer for specified columns
    imputer = Imputer(
        inputCols=columns_to_impute, 
        outputCols=[c + "_imputed" for c in columns_to_impute]
    )    
    # Fit the imputer model and transform the DataFrame
    imputed_df = imputer.fit(df).transform(df)

    # Loop through all columns in the imputed DataFrame
    for col in imputed_df.columns:
        # Check if the column name ends with "_imputed"
        if col.endswith("_imputed"):
            # Extract the original column name
            original_col_name = col.replace("_imputed", "")

            # Drop the original column from the DataFrame and rename the imputed column
            imputed_df = imputed_df.drop(original_col_name).withColumnRenamed(col, original_col_name)

    return imputed_df

def convert_columns_to_numeric(data: DataFrame) -> DataFrame:
    """
    Convert columns in a DataFrame to numeric types where possible. Non-convertible columns are logged.

    :param data: The Spark DataFrame to analyze and convert.
    :return: The modified DataFrame with columns converted to numeric types where possible.
    """
    non_numerical = []
    for column in data.columns:
        original_data = data.select(column)
        try:
            # First try to cast the column to IntegerType
            data = data.withColumn(column, col(column).cast(IntegerType()))
        except:
            try:
                # If integer conversion fails, try to cast to FloatType
                data = data.withColumn(column, col(column).cast(FloatType()))
            except:
                # If both conversions fail, add the column to the non_numerical list
                non_numerical.append(column)

    # Log non-convertible columns
    if non_numerical:
        print("Could not convert the following columns to integer or float:", non_numerical)

    return data

def drop_all_null_columns(df: DataFrame) -> DataFrame:
    """
    Check for columns with all null values in the given DataFrame and drop them.

    :param df: The DataFrame to check for null values.
    :return: A DataFrame with columns containing all nulls dropped.
    """
    # Initialize a list for droppable columns
    droppable_columns = []

    for column in df.columns:
        # Check if there is at least one non-null value in the column
        first_non_null = df.filter(col(column).isNotNull()).select(column).first()
        if first_non_null is not None:
            # Skip to the next iteration as this column has at least one non-null value
            print(f"Skipping: {column}")
            continue
        # If the first non-null value is not found, add the column to the droppable list
        print(f"All null values: {column}")
        droppable_columns.append(column)
        
    print("Final list of columns containing all nulls")
    print(droppable_columns)

    # Drop columns that are entirely null
    for column in droppable_columns:
        df = df.drop(column)

    return df

def transform_and_filter_action_taken(df: DataFrame) -> DataFrame:
    """
    Transform the 'action_taken' column based on specific conditions, filter out certain rows,
    and rename the transformed column.

    :param df: The DataFrame containing the 'action_taken' column.
    :return: A DataFrame with the transformed 'action_taken' column and filtered rows.
    """
    # Transform the 'action_taken' column
    transformed_df = df.withColumn("action_taken_transformed",
                                   when(col("action_taken").isin([1, 2, 6, 8]), 1)
                                   .when(col("action_taken").isin([3, 7]), 0))
    
    # Drop rows with 'action_taken' as 4 or 5
    filtered_df = transformed_df.filter(~col("action_taken").isin([4, 5]))

    # Drop the original 'action_taken' column and rename the transformed column
    final_df = filtered_df.drop("action_taken").withColumnRenamed("action_taken_transformed", "action_taken")

    return final_df

def find_columns_for_one_hot_encoding(data: DataFrame, potential_columns: list) -> list:
    """
    Identify columns in a DataFrame that have more than one non-null value and are suitable for one-hot encoding.

    :param data: The Spark DataFrame to analyze.
    :param potential_columns: A list of column names to check for one-hot encoding suitability.
    :return: A list of column names that have more than one non-null value and are suitable for one-hot encoding.
    """
    one_hot_encodable_columns = []
    start_time = time.time()

    # Iterate through the list and check the count of distinct non-null values
    for column in potential_columns:
        distinct_count = data.agg(countDistinct(col(column)).alias("distinct_count")).collect()[0]["distinct_count"]
        if distinct_count > 1:
            one_hot_encodable_columns.append(column)

    # Print valid columns for one-hot encoding
    print("Columns valid for one-hot encoding:")
    print(one_hot_encodable_columns)
    print_execution_time(start_time, "Checking one hot encoded columns")

    return one_hot_encodable_columns

def create_preprocessing_pipeline(data, one_hot_encoded_columns, run_encoder):
    stages = []

    if run_encoder == 1:
        # Remove columns not in DataFrame's columns
        data_columns = data.columns
        for value in one_hot_encoded_columns[:]:
            if value not in data_columns:
                print(f"Column not in DataFrame: {value}. preprocessing pipeline")
                one_hot_encoded_columns.remove(value)

        print(f"Encoding {len(one_hot_encoded_columns)} columns")
        
        # Add StringIndexer and OneHotEncoder stages
        for column in one_hot_encoded_columns:
            string_indexer = StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="skip")
            encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[column + "_vec"])
            stages += [string_indexer, encoder]

        assembled_columns = [col + "_vec" for col in one_hot_encoded_columns] + \
                            [col for col in data.columns if col not in one_hot_encoded_columns and col != 'action_taken']
    else:
        # Include original columns excluding the target column
        assembled_columns = [col for col in data.columns if col != 'action_taken']

    # Add the VectorAssembler to the pipeline stages
    assembler = VectorAssembler(inputCols=assembled_columns, outputCol="featuresVec", handleInvalid="keep")
    scaler = MinMaxScaler(inputCol="featuresVec", outputCol="features")
    stages.append(assembler)
    stages.append(scaler)


    # Create the pipeline with all stages
    preprocessing_pipeline = Pipeline(stages=stages)
    return preprocessing_pipeline, assembler

def train_random_forest(data, assembler):
    print("START RANDOM FOREST")
    
    # Initialize RandomForestClassifier
    rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="action_taken")    

    # Add model to pipeline
    pipeline = Pipeline(stages=[rf_classifier])

    # Initialize variable to store the index, set to -1 as default
    rf_classifier_index = -1

    # Iterate through the pipeline stages
    for index, stage in enumerate(pipeline.getStages()):
        if stage.__class__.__name__ == "RandomForestClassifier":
            rf_classifier_index = index
            break
        
    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model
    start_time = time.time()
    model = pipeline.fit(train_data)
    print_execution_time(start_time, "Fitting model")
   
    # Make predictions
    predictions = model.transform(test_data)

    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "recallByLabel"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
        
    # Extract RandomForest model from the pipeline
    rf_model = model.stages[rf_classifier_index]

    # Get feature importances
    importances = rf_model.featureImportances

    # Should be feature names
    feature_names = assembler.getInputCols()

    # Zip feature names with their importances
    feature_importance_list = [(feature, importance) for feature, importance in zip(feature_names, importances)]

    # Sort the features by importance
    sorted_features = sorted(feature_importance_list, key=lambda x: x[1], reverse=True)

    # Display the sorted feature importances
    for feature, importance in sorted_features:
        print(f"Feature: {feature}, Importance: {importance}")
         
    print_execution_time(start_time, "Final Time")
    print("END RANDOM FOREST")
    
def train_random_forest_with_cv(data):
    print("START RANDOM FOREST WITH CROSS VALIDATION")
    
    # Initialize RandomForestClassifier
    rf_classifier = RandomForestClassifier(featuresCol="features", labelCol="action_taken")
    
    # Add model to pipeline
    pipeline = Pipeline(stages=[rf_classifier])

    # Define parameter grid for RandomForestClassifier
    paramGrid = (ParamGridBuilder()
                 .addGrid(rf_classifier.numTrees, [10, 20, 50])
                 .addGrid(rf_classifier.maxDepth, [5, 10, 20])
                 .build())

    # Initialize an evaluator for multiclass classification
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")

    # Initialize CrossValidator
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=3)

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model using CrossValidator
    start_time = time.time()
    cvModel = crossval.fit(train_data)
    print_execution_time(start_time, "Fitting model")
    
    # Best model from CrossValidator
    best_model = cvModel.bestModel

    # Make predictions
    predictions = best_model.transform(test_data)
    
    rf_model = best_model.stages[-1]  # Assuming RandomForestClassifier is the last stage
    
    # Print the parameters of the best RandomForest model
    print("Best Model Parameters:")
    print("-" * 30)
    for param, value in rf_model.extractParamMap().items():
        print(f"{param.name}: {value}")

    print_execution_time(start_time, "Final Time")
    print("END RANDOM FOREST WITH CROSS VALIDATION")
    
def train_logistic_regression(data):
    print("START LOGISTIC REGRESSION")
    
    # Initialize LogisticRegression
    lr_classifier = LogisticRegression(featuresCol="features", 
                              labelCol="action_taken", 
                              regParam=0.01, 
                              maxIter=10, 
                              elasticNetParam=0.0)

    # Define the pipeline with local stages
    pipeline = Pipeline(stages=[lr_classifier])

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model
    start_time = time.time()
    model = pipeline.fit(train_data)
    print_execution_time(start_time, "Fitting model")

    # Make predictions
    predictions = model.transform(test_data)

    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "precisionByLabel"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "recallByLabel"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print_execution_time(start_time, "Final time")

    print("END LOGISTIC REGRESSION")
    
def train_logistic_regression_with_cv(data):
    print("START LOGISTIC REGRESSION")
    
    # Initialize LogisticRegression
    lr = LogisticRegression(featuresCol="features", labelCol="action_taken")

    # Define the parameter grid
    paramGrid = ParamGridBuilder() \
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.maxIter, [10, 100]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
        .build()
    
    # Supposedly best metric for logistic regression
    roc_evaluator = BinaryClassificationEvaluator(labelCol="action_taken", metricName="areaUnderROC")

    # Create the CrossValidator
    cv = CrossValidator(estimator=lr,
                        estimatorParamMaps=paramGrid,
                        evaluator=roc_evaluator,
                        numFolds=3)

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model using CrossValidator
    start_time = time.time()
    cvModel = cv.fit(train_data)
    print_execution_time(start_time, "Fitting model")

    # Get the best model
    bestModel = cvModel.bestModel

    # Make predictions on test data
    predictions = bestModel.transform(test_data)

    # Define a new evaluator for other metrics
    multi_evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")

    # Evaluate the best model
    accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
    precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
    recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})
    f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})

    print(f"Best Model Parameters: regParam = {bestModel._java_obj.getRegParam()}, maxIter = {bestModel._java_obj.getMaxIter()}, elasticNetParam = {bestModel._java_obj.getElasticNetParam()}")
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    
    print_execution_time(start_time, "Final time")

    print("END LOGISTIC REGRESSION WITH CROSS-VALIDATION")

    return bestModel

def train_xgboost_classifier(data):
    print("START XGBOOST CLASSIFICATION")
    
    # Initialize the XGBoost classifier
    xgb_classifier = SparkXGBClassifier(features_col="features", label_col="action_taken", gamma=1.0, max_depth=5,
    min_child_weight=5,)

    # Define the pipeline with local stages
    pipeline = Pipeline(stages=[xgb_classifier])

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model
    start_time = time.time()
    model = pipeline.fit(train_data)
    print_execution_time(start_time, "Fitting model")

    # Make predictions
    predictions = model.transform(test_data)

    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print_execution_time(start_time, "Final time")

    print("END XGBOOST CLASSIFICATION")
    
def train_xgboost_with_cv(data):
    print("START XGBOOST CLASSIFICATION WITH CROSS-VALIDATION")
    
    # Initialize the XGBoost classifier
    xgb_classifier = SparkXGBClassifier(features_col="features", label_col="action_taken")

    # Define the pipeline with local stages
    pipeline = Pipeline(stages=[xgb_classifier])

    # Create a parameter grid for tuning the classifier
    paramGrid = ParamGridBuilder() \
        .addGrid(xgb_classifier.max_depth, [3, 5, 7]) \
        .addGrid(xgb_classifier.min_child_weight, [1, 5, 10]) \
        .addGrid(xgb_classifier.gamma, [0.1, 0.5, 1.0]) \
        .build()


    # Create an evaluator for the classifier
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction", metricName="accuracy")

    # Set up cross-validation
    crossval = CrossValidator(estimator=pipeline,
                              estimatorParamMaps=paramGrid,
                              evaluator=evaluator,
                              numFolds=3)

    # Train the model using cross-validation
    start_time = time.time()
    cvModel = crossval.fit(data)
    print_execution_time(start_time, "Fitting model")


    # Fetch best model
    bestModel = cvModel.bestModel
    
    try:
        bestModel = cvModel.bestModel
        bestXGBModel = bestModel.stages[-1]  # Assuming XGBClassifier is the last stage in the pipeline
        
        # Extract parameter values from the best model
        paramMap = bestXGBModel.extractParamMap()
        params = {param.name: value for param, value in paramMap.items()}
        
        print("Best Model Parameters:")
        for paramName, paramValue in params.items():
            print(f"{paramName}: {paramValue}")
    except Exception as e:
        print("Error retrieving model parameters:", e)
        
    bestModelPredictions = bestModel.transform(data)
    accuracy = evaluator.evaluate(bestModelPredictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(bestModelPredictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(bestModelPredictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(bestModelPredictions, {evaluator.metricName: "f1"})

    print(f"Best Model Metrics:")
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    
    print_execution_time(start_time, "Cross-validation training")

    print("END XGBOOST CLASSIFICATION WITH CROSS-VALIDATION")
    return bestModel

def train_multilayer_perceptron_with_cv(data):
    print("START MULTILAYER PERCEPTRON WITH CROSS-VALIDATION")

    # Initialize MultilayerPerceptronClassifier
    mlp = MultilayerPerceptronClassifier(featuresCol="features", labelCol="action_taken")
    
    # num_features is first layer, 2 is last last representing 0,1 values in action_taken
    num_features = len(data.select("features").first()[0])
    print("Number of features:", num_features)

    # Define the parameter grid
    paramGrid = ParamGridBuilder() \
        .addGrid(mlp.maxIter, [100, 200]) \
        .addGrid(mlp.layers, [[num_features, 5, 2], [num_features, 4, 3, 2]]) \
        .build()

    # Evaluator
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")

    # CrossValidator
    cv = CrossValidator(estimator=mlp,
                        estimatorParamMaps=paramGrid,
                        evaluator=evaluator,
                        numFolds=3)

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train model using CrossValidator
    start_time = time.time()
    cvModel = cv.fit(train_data)
    print_execution_time(start_time, "Fitting model")


    # Best model
    bestModel = cvModel.bestModel

    # Make predictions and evaluate
    predictions = bestModel.transform(test_data)
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})


    # Print best model params and metrics
    print("Best Model Parameters:")
    print(f"MaxIter: {bestModel.getMaxIter()}")

    # Converting layers to a list for a readable format
    layers = bestModel.getOrDefault(bestModel.layers)
    print(f"Layers: {layers}")

    print(f"Accuracy: {accuracy}")
    print(f"F1 Score: {f1}")

    print_execution_time(start_time, "Total Time")
    print("END MULTILAYER PERCEPTRON WITH CROSS-VALIDATION")

    return bestModel

def train_multilayer_perceptron(data):
    print("START MULTILAYER PERCEPTRON")
    
    num_features = len(data.select("features").first()[0])
    print("Number of features:", num_features)

    # Define the classifier
    mlp_classifier = MultilayerPerceptronClassifier(featuresCol="features", 
                                                    labelCol="action_taken", 
                                                    layers=[num_features, 4, 3, 2], 
                                                    maxIter=100)

    # Split the data
    (train_data, test_data) = data.randomSplit([0.8, 0.2], seed=42)

    # Train the model
    start_time = time.time()
    model = mlp_classifier.fit(train_data)
    print_execution_time(start_time, "Fitting model")
    
    # Make predictions
    predictions = model.transform(test_data)

    # Evaluate the model
    evaluator = MulticlassClassificationEvaluator(labelCol="action_taken", predictionCol="prediction")
    accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
    precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
    recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
    f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print_execution_time(start_time, "Final time")

    print("END MULTILAYER PERCEPTRON")

    return model


def columns_exceeding_null_threshold(dataframe, column_names, threshold_percentage):
    """
    Identify columns in the Spark DataFrame where the percentage of null values exceeds the given threshold.

    :param dataframe: The Spark DataFrame to check.
    :param column_names: A list of column names to check for null values.
    :param threshold_percentage: The percentage threshold for considering high null values.
    :return: A list of column names where the null percentage exceeds the threshold.
    """
    exceeding_columns = []
    total_rows = dataframe.count()

    for column_name in column_names:
        if column_name not in dataframe.columns:
            raise ValueError(f"Column '{column_name}' not found in DataFrame")
        
        null_count = dataframe.filter(col(column_name).isNull()).count()
        null_percentage = (null_count / total_rows) * 100
        if null_percentage >= threshold_percentage:
            exceeding_columns.append(column_name)
    
    return exceeding_columns

In [27]:
# Set local variables

columns_to_drop = [
        'activity_year','derived_msa_md','county_code','census_tract','derived_ethnicity','derived_race','derived_sex',
        'lei','applicant_ethnicity_2','applicant_ethnicity_3','applicant_ethnicity_4','applicant_ethnicity_5',
        'co_applicant_ethnicity_2','co_applicant_ethnicity_3','co_applicant_ethnicity_4','co_applicant_ethnicity_5',
        'applicant_ethnicity_observed','co_applicant_ethnicity_observed','applicant_race_2','applicant_race_3',
        'applicant_race_4','applicant_race_5','co_applicant_race_2','co_applicant_race_3','co_applicant_race_4',
        'co_applicant_race_5','applicant_race_observed','co_applicant_race_observed','applicant_sex_observed',
        'co_applicant_sex_observed','applicant_age_above_62','co_applicant_age_above_62','submission_of_application',
        'initially_payable_to_institution','aus_2','aus_3','aus_4','aus_5','denial_reason_1','denial_reason_2',
        'denial_reason_3','denial_reason_4','tract_population'
]

# Columns exhibit a high prevalence of NaN values or unexpected levels of importance in the model
columns_to_drop_test = [
    'combined_loan_to_value_ratio', 'discount_points', 'interest_rate', 'intro_rate_period',
    'lender_credits', 'multifamily_affordable_units', 'open_end_line_of_credit', 'origination_charges',
    'other_nonamortizing_features', 'preapproval', 'prepayment_penalty_term', 'rate_spread',
    'reverse_mortgage', 'total_loan_costs', 'total_points_and_fees'
]

columns_with_all_nulls = [
    'conforming_loan_limit',
    'derived_loan_product_type',
    'derived_dwelling_category',
    'derived_ethnicity',
    'derived_race',
    'derived_sex',
    'lei',
    'state_code',
    'co_applicant_age_above_62'
]

# Columns to remove 'Exempt'
columns_to_replace = ['combined_loan_to_value_ratio', 'interest_rate',
                      'debt_to_income_ratio', 'rate_spread', 'property_value']

# Columns to drop NaN values
columns_to_drop_na = ['applicant_ethnicity_1', 'income', 
                      'combined_loan_to_value_ratio', 'interest_rate', 
                      'property_value', 'loan_term', 'debt_to_income_ratio', 'action_taken']

# Columns to one hot encode
one_hot_encoded_columns = [
    'derived_loan_product_type', 'derived_dwelling_category', 'purchaser_type', 'preapproval', 
    'loan_type', 'loan_purpose', 'lien_status', 'reverse_mortgage', 'open_end_line_of_credit', 
    'business_or_commercial_purpose', 'hoepa_status', 'negative_amortization', 
    'interest_only_payment', 'balloon_payment', 'other_nonamortizing_features', 
    'construction_method', 'occupancy_type', 'manufactured_home_secured_property_type', 
    'manufactured_home_land_property_interest', 'total_units', 'ageapplicant', 
    'debt_to_income_ratio', 'applicant_credit_score_type', 'co_applicant_credit_score_type', 
    'applicant_ethnicity_1', 'applicant_race_1', 'co_applicant_race_1', 'applicant_sex', 
    'co_applicant_sex', 'applicant_age', 'co_applicant_age', 'aus_1'
]

columns_to_impute = [
    'business_or_commercial_purpose', 'debt_to_income_ratio', 'ffiec_msa_md_median_family_income', 
    'income', 'loan_amount', 'loan_purpose', 'loan_term', 'loan_type', 'negative_amortization', 
    'property_value', 'tract_median_age_of_housing_units', 'tract_minority_population_percent', 
    'tract_one_to_four_family_homes', 'tract_owner_occupied_units', 'tract_to_msa_income_percentage',
    'applicant_age', 'co_applicant_age'
]


In [28]:
# Simple spark test to test config.
spark = SparkSession.builder \
    .appName("Simple Spark Test") \
    .getOrCreate()
data = [("Java", 20000), ("Python", 100000), ("Scala", 3000)]
columns = ["Language", "Users"]
df = spark.createDataFrame(data, columns)
df.show()
spark.stop()

+--------+------+
|Language| Users|
+--------+------+
|    Java| 20000|
|  Python|100000|
|   Scala|  3000|
+--------+------+



In [29]:
# Initialize a Spark session
start_session = time.time()
spark = initialize_spark()

In [30]:
# Read the data in Spark
data = spark.read.csv("2021_public_lar_one_year_csv.csv", header=True)

# Preprocessing step time
start_time = time.time()

# Drop columns for various reasons
data = drop_columns(data, columns_to_drop)
data = drop_columns(data, columns_to_drop_test)
data = drop_columns(data, columns_with_all_nulls)

# Replace 'except'
data = remove_exempt(data, columns_to_replace)

# Drop NA rows
data = remove_na_rows(data, columns_to_drop_na)

# Find and drop all columns with all nulls values, if necessary
#data = drop_all_null_columns(data)

# Convert columns
data = convert_columns_to_numeric(data)

# Convert action_values
data = transform_and_filter_action_taken(data)

# Create new columns
data = data.withColumn("downpayment", col('property_value') - col('loan_amount'))
data = data.withColumn('downpayment_to_income_ratio', col('downpayment') / col('income'))
data = data.withColumn('loan_to_income_ratio', col('loan_amount') / col('income'))
data = data.withColumn('downpayment_percentage', col('downpayment') / col('property_value'))
data = data.filter(col("loan_to_income_ratio") <= 5000000)
             
# Impute data if necessary
data = impute_and_replace_columns(data, columns_to_impute)

# Imputed columns won't have nulls, don't check
#check_for_null = [col for col in data.columns if col not in columns_to_impute]
#threshold = 40
# Find columns that have greater than threshold% of nulls
#exceeding_columns = columns_exceeding_null_threshold(data, check_for_null, threshold)
#print(f"Columns exceeding {threshold}% null values: {exceeding_columns}")

for column in data.columns:
    data=data.dropna(subset=column)

#data = data.fillna(0)
#print_row_count(data)


print_execution_time(start_time, "Preprocessing")

# Use this to verify OHE columns have atleast 2 non-null values. Expensive function
#one_hot_encoded_columns = find_columns_for_one_hot_encoding(data,one_hot_encoded_columns)

print("Final list of columns and rows after pre-processing")
print_dataframe_info(data)

# Assemble the data
preprocessing_pipeline, assembler = create_preprocessing_pipeline(data, one_hot_encoded_columns, 1)
preprocessed_data = preprocessing_pipeline.fit(data).transform(data)
 
# Drop all columns except for label and vector
preprocessed_data = preprocessed_data.select("features", "action_taken")

# Balance the classes
count_class_0 = preprocessed_data.filter(preprocessed_data["action_taken"] == 0).count()
print(f"action_taken == 0: {count_class_0}")
preprocessed_data = balance_classes(preprocessed_data, "action_taken", count_class_0)

unique_value_counts = count_unique_values(preprocessed_data, "action_taken")
unique_value_counts.show()
print_row_count(preprocessed_data)
  
# Smaller DF if needed
sampled_data = preprocessed_data.sample(withReplacement=False, fraction=0.1)


Column 'derived_ethnicity' not found in DataFrame. drop_columns
Column 'derived_race' not found in DataFrame. drop_columns
Column 'derived_sex' not found in DataFrame. drop_columns
Column 'lei' not found in DataFrame. drop_columns
Column 'co_applicant_age_above_62' not found in DataFrame. drop_columns
Column 'combined_loan_to_value_ratio' not found in DataFrame. remove_exempt
Column 'interest_rate' not found in DataFrame. remove_exempt
Column 'rate_spread' not found in DataFrame. remove_exempt
Column combined_loan_to_value_ratio not found in DataFrame. remove_na_rows
Column interest_rate not found in DataFrame. remove_na_rows
Preprocessing took: 00:01:13
Final list of columns and rows after pre-processing
Columns left:
['action_taken', 'applicant_age', 'applicant_credit_score_type', 'applicant_ethnicity_1', 'applicant_race_1', 'applicant_sex', 'aus_1', 'balloon_payment', 'business_or_commercial_purpose', 'co_applicant_age', 'co_applicant_credit_score_type', 'co_applicant_ethnicity_1', 

In [7]:
train_random_forest(preprocessed_data, assembler)
#train_random_forest_with_cv(sampled_data)

START RANDOM FOREST
Fitting model took: 01:01:50
Total number of features: 151




Overall Precision: 0.9653334854025195
Overall Recall: 0.9627537950640318
Overall F1 Score: 0.962700593154407
Class 0.0 Precision: 0.9307395117724717
Class 0.0 Recall: 1.0
Class 0.0 F1 Score: 0.9641274818248552
Class 1.0 Precision: 1.0
Class 1.0 Recall: 0.9254294876305552
Class 1.0 F1 Score: 0.9612707124054635
Feature: applicant_age_vec, Importance: 0.3193672966778928
Feature: co_applicant_sex_vec, Importance: 0.21663356468817985
Feature: purchaser_type_vec, Importance: 0.1408917665019696
Feature: loan_type_vec, Importance: 0.08747930178835626
Feature: loan_purpose_vec, Importance: 0.08432806468116692
Feature: lien_status_vec, Importance: 0.017558836331046997
Feature: business_or_commercial_purpose_vec, Importance: 0.009146862453765147
Feature: ffiec_msa_md_median_family_income, Importance: 0.005094156463632877
Feature: downpayment_percentage, Importance: 0.004805949225583769
Feature: applicant_race_1_vec, Importance: 0.0038960134669265164
Feature: hoepa_status_vec, Importance: 0.003849

In [8]:
train_logistic_regression(sampled_data)
#train_logistic_regression_with_cv(sampled_data)

START LOGISTIC REGRESSION
Fitting model took: 00:32:22
Accuracy: 0.9678823187301333
Precision: 0.9401553134542164
Recall: 0.9994389564590138
F1 Score: 0.9678494174813996
Final time took: 01:35:25
END LOGISTIC REGRESSION


In [9]:
# train_xgboost_classifier(preprocessed_data)
# #train_xgboost_with_cv(sampled_data)

In [31]:
train_multilayer_perceptron(sampled_data)
#train_multilayer_perceptron_with_cv(sampled_data)

START MULTILAYER PERCEPTRON
Number of features: 151
Fitting model took: 00:23:21
Accuracy: 0.9690317128139085
Precision: 0.9705402827811467
Recall: 0.9690317128139085
F1 Score: 0.9690067986869753
Final time took: 01:35:16
END MULTILAYER PERCEPTRON


MultilayerPerceptronClassificationModel: uid=MultilayerPerceptronClassifier_5836deaf98a0, numLayers=4, numClasses=2, numFeatures=151

In [32]:
# Stop the Spark session
print_execution_time(start_session, "Total Session runtime")
spark.stop()

Total Session runtime took: 02:47:24
