In [1]:
from pyspark.sql import SparkSession
import seaborn as sns
from pyspark.sql.functions import col, count, sum,DataFrame,when
from pyspark.sql.types import NumericType,StringType
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
from pyspark.sql.window import Window


In [2]:
spark = SparkSession.builder.master('local[*]').appName('credit-risk-prediction')\
.config("spark.executor.instances", "1")\
.config("spark.executor.memory", "4g") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memoryOverhead", "4g") \
.getOrCreate()

24/09/25 16:28:11 WARN Utils: Your hostname, langchain resolves to a loopback address: 127.0.1.1; using 192.168.0.103 instead (on interface wlp3s0)
24/09/25 16:28:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/09/25 16:28:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/25 16:28:12 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
csv_file_path = '../Data/credit_risk_dataset.csv'
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

In [4]:
(training_data, validation_data) = df.randomSplit([0.8, 0.2], seed=42)

In [5]:
def drop_null_values(df: DataFrame) -> DataFrame:
    return df.dropna()
training_data = drop_null_values(training_data)
validation_data = drop_null_values(validation_data)


In [6]:
def drop_age_above_threshold(df, age_column, threshold):
    return df.filter(F.col(age_column) <= threshold)

training_data = drop_age_above_threshold(training_data, 'person_age', 80)

validation_data = drop_age_above_threshold(validation_data, 'person_age', 80)

In [7]:
dt = training_data.toPandas()
dt.isnull().sum()

person_age                    0
person_income                 0
person_home_ownership         0
person_emp_length             0
loan_intent                   0
loan_grade                    0
loan_amnt                     0
loan_int_rate                 0
loan_status                   0
loan_percent_income           0
cb_person_default_on_file     0
cb_person_cred_hist_length    0
dtype: int64

In [8]:
def categorize_age_groups(df, age_column, bins, labels, output_column):

    def assign_age_group(age):
        for i in range(len(bins) - 1):
            if bins[i] <= age <= bins[i + 1]:
                return labels[i]
        return labels[-1] 

    age_group_udf = F.udf(assign_age_group, StringType())
    return df.withColumn(output_column, age_group_udf(F.col(age_column)))

bins = [20, 26, 36, 46, 56, 66, 80]  
labels = ['20-25', '26-35', '36-45', '46-55', '56-65', '66-80']
training_data = categorize_age_groups(training_data, 'person_age', bins, labels, 'age_group')
validation_data = categorize_age_groups(validation_data, 'person_age', bins, labels, 'age_group')




In [9]:

training_data.select('age_group').distinct().show()

                                                                                

+---------+
|age_group|
+---------+
|    26-35|
|    20-25|
|    46-55|
|    36-45|
|    66-80|
|    56-65|
+---------+



In [10]:

training_data.select('age_group').show()

+---------+
|age_group|
+---------+
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
|    20-25|
+---------+
only showing top 20 rows



In [11]:
dt = training_data.toPandas()
dt.isnull().sum()

person_age                    0
person_income                 0
person_home_ownership         0
person_emp_length             0
loan_intent                   0
loan_grade                    0
loan_amnt                     0
loan_int_rate                 0
loan_status                   0
loan_percent_income           0
cb_person_default_on_file     0
cb_person_cred_hist_length    0
age_group                     0
dtype: int64

In [12]:
def assign_income_group(df: DataFrame, column: str) -> DataFrame:

    df = df.withColumn(
        'income_group',
        F.when(F.col(column).between(0, 25000), 'low')
        .when(F.col(column).between(25001, 50000), 'low-middle')
        .when(F.col(column).between(50001, 75000), 'middle')
        .when(F.col(column).between(75001, 100000), 'high-middle')
        .otherwise('high')
    )
    
    return df
training_data = assign_income_group(training_data, 'person_income')

In [13]:
validation_data = assign_income_group(validation_data, 'person_income')

In [14]:
def assign_loan_amount_group(df: DataFrame, column: str) -> DataFrame:
    df = df.withColumn(
        'loan_amount_group',
        F.when(F.col(column).between(0, 5000), 'small')
        .when(F.col(column).between(5001, 10000), 'medium')
        .when(F.col(column).between(10001, 15000), 'high')
        .otherwise('very high')
    )
    
    return df
training_data = assign_loan_amount_group(training_data, 'loan_amnt')

validation_data = assign_loan_amount_group(validation_data, 'loan_amnt')

In [15]:
training_data.select('loan_amount_group').show()

+-----------------+
|loan_amount_group|
+-----------------+
|           medium|
|           medium|
|        very high|
|            small|
|            small|
|        very high|
|            small|
|           medium|
|             high|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
|            small|
+-----------------+
only showing top 20 rows



In [16]:
def drop_rows_by_emp_length(df: DataFrame, column: str, threshold: int) -> DataFrame:
    filtered_df = df.filter(F.col(column) <= threshold)
    return filtered_df
training_data = drop_rows_by_emp_length(training_data, 'person_emp_length', 60)
validation_data_data = drop_rows_by_emp_length(validation_data, 'person_emp_length', 60)

In [17]:
def drop_and_reset_index(df: DataFrame, column_to_drop: str) -> DataFrame:
    df = df.drop(column_to_drop)
    window_spec = Window.orderBy(F.monotonically_increasing_id())
    df_with_index = df.withColumn("index", F.row_number().over(window_spec) - 1)
    df_final = df_with_index.drop("index")
    return df_final

training_data = drop_and_reset_index(training_data, 'index')
validation_data = drop_and_reset_index(validation_data, 'index')

In [18]:
count_df = training_data.groupBy(col('person_home_ownership')).count()

count_df.show()

+---------------------+-----+
|person_home_ownership|count|
+---------------------+-----+
|                  OWN| 1766|
|                 RENT|11635|
|             MORTGAGE| 9418|
|                OTHER|   72|
+---------------------+-----+



In [19]:
def create_ratios(df):
    df = df.withColumn('loan_to_income_ratio', col('loan_amnt') / col('person_income'))
    df = df.withColumn('loan_to_emp_length_ratio', col('person_emp_length') / col('loan_amnt'))
    df = df.withColumn('int_rate_to_loan_amt_ratio', col('loan_int_rate') / col('loan_amnt'))
    
    return df

In [20]:
training_data = create_ratios(training_data)

In [21]:
validation_data = create_ratios(validation_data)

In [22]:
training_data.show()

+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|age_group|income_group|loan_amount_group|loan_to_income_ratio|loan_to_emp_length_ratio|int_rate_to_loan_amt_ratio|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+
|        20|        32652|             MORT

In [23]:
col_list = ['person_age',
 'person_income',
 'person_home_ownership',
 'person_emp_length',
 'loan_intent', 
 'loan_grade',
 'loan_amnt',
 'loan_int_rate',
 'loan_status',
 'loan_percent_income',
 'cb_person_default_on_file',
 'cb_person_cred_hist_length',
'age_group',
'income_group',
'loan_amount_group']


scale_cols = ['person_income','person_age','person_emp_length', 'loan_amnt','loan_int_rate','cb_person_cred_hist_length','loan_percent_income','loan_to_income_ratio', 'loan_to_emp_length_ratio',
       'int_rate_to_loan_amt_ratio']
encoding_colums = ['cb_person_default_on_file','loan_grade', 'person_home_ownership','loan_intent','income_group','age_group','loan_amount_group']


In [24]:
def drop_duplicates(df, subset=None):
    return df.dropDuplicates(subset)

training_data  = drop_duplicates(training_data)
validation_data = drop_duplicates(validation_data)

In [25]:
def shuffle_dataframe(df, seed=43):
   
    df_shuffled = df.withColumn("random", F.rand(seed))
    df_shuffled = df_shuffled.orderBy("random").drop("random")

    return df_shuffled
training_data = shuffle_dataframe(training_data)


In [26]:
training_data.select('loan_amount_group').show()

+-----------------+
|loan_amount_group|
+-----------------+
|           medium|
|            small|
|        very high|
|           medium|
|           medium|
|           medium|
|           medium|
|           medium|
|            small|
|             high|
|            small|
|             high|
|             high|
|        very high|
|            small|
|        very high|
|             high|
|            small|
|        very high|
|           medium|
+-----------------+
only showing top 20 rows



In [27]:
training_data.groupBy(col('loan_status')).count().show()

                                                                                

+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1| 4994|
|          0|17812|
+-----------+-----+



first we will encode our categorical feature then appply smote data oversampling

In [28]:
training_data.select([count(when(col(c).isNull(), c)).alias(c) for c in training_data.columns]).show()

+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|age_group|income_group|loan_amount_group|loan_to_income_ratio|loan_to_emp_length_ratio|int_rate_to_loan_amt_ratio|
+----------+-------------+---------------------+-----------------+-----------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+
|         0|            0|                    0|             

In [29]:
# from pyspark.ml import Transformer
# from pyspark.ml.param.shared import Param, Params
# from pyspark.ml.pipeline import Pipeline
# from pyspark.sql import DataFrame
# import pyspark.sql.functions as F
# from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

# class OneHotEncoderCustom(Transformer, DefaultParamsReadable, DefaultParamsWritable):
#     def __init__(self, encoding_columns=None):
#         super(OneHotEncoderCustom, self).__init__()
#         self.encoding_columns = Param(self, "encoding_columns", "columns to be one-hot encoded")
#         if encoding_columns is not None:
#             self._set(encoding_columns=encoding_columns)
    
#     def _transform(self, df: DataFrame) -> DataFrame:
#         encoding_columns = self.getOrDefault(self.encoding_columns)
#         for column in encoding_columns:
#             distinct_values = df.select(column).distinct().rdd.map(lambda x: x[0]).collect()
#             for value in distinct_values:
#                 df = df.withColumn(f"{column}_{value}", F.expr(f"CASE WHEN {column} = '{value}' THEN '1' ELSE '0' END"))
#             df = df.drop(column)
#         return df

#     def getEncodingColumns(self):
#         return self.getOrDefault(self.encoding_columns)

# # columns to encode
# encoding_columns = ['cb_person_default_on_file', 'loan_grade', 'person_home_ownership', 'loan_intent', 'income_group', 'age_group', 'loan_amount_group']
# one_hot_encoder = OneHotEncoderCustom(encoding_columns=encoding_columns)
# pipeline = Pipeline(stages=[one_hot_encoder])
# pipeline_model = pipeline.fit(training_data)
# # Saving the pipeline model
# pipeline_model.save("./encoding_pipeline")

# # Loading the pipeline model
# from pyspark.ml.pipeline import PipelineModel
# loaded_pipeline_model = PipelineModel.load("./encoding_pipeline")

In [30]:
# transformed_train_df = pipeline_model.transform(training_data)
# transformed_train_df.show()

In [31]:
out

NameError: name 'out' is not defined

In [32]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import DataFrame

class OneHotEncoderPipeline:
    def __init__(self, input_col: str, output_col: str):
        self.input_col = input_col
        self.output_col = output_col
        self.pipeline = None
        self.pipeline_model = None

    def fit(self, df: DataFrame) -> None:
        """Fit the pipeline on the provided DataFrame."""
        indexer = StringIndexer(inputCol=self.input_col, outputCol=f"{self.input_col}_index")
        encoder = OneHotEncoder(inputCols=[f"{self.input_col}_index"], outputCols=[self.output_col])
        
        self.pipeline = Pipeline(stages=[indexer, encoder])
        self.pipeline_model = self.pipeline.fit(df)

    def save(self, path: str) -> None:
        """Save the fitted pipeline model to the specified path."""
        if self.pipeline_model is not None:
            self.pipeline_model.save(path)

    @classmethod
    def load(cls, path: str) -> 'OneHotEncoderPipeline':
        """Load a pipeline model from the specified path."""
        instance = cls(input_col="", output_col="")
        instance.pipeline_model = PipelineModel.load(path)
        return instance

    def transform(self, df: DataFrame) -> DataFrame:
        """Transform the DataFrame using the fitted pipeline model."""
        if self.pipeline_model is None:
            raise Exception("Model must be fitted before calling transform.")
        return self.pipeline_model.transform(df)


In [85]:

one_hot_encoder_pipeline = OneHotEncoderPipeline(input_col='person_home_ownership', output_col='person_home_ownership_ohe')
one_hot_encoder_pipeline.fit(training_data)

one_hot_encoder_pipeline.save("one_hot_encoder_pipeline")


loaded_pipeline = OneHotEncoderPipeline.load("one_hot_encoder_pipeline")

transformed_new_df = loaded_pipeline.transform(training_data)

transformed_new_df.show()




+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+---------------------------+-------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|age_group|income_group|loan_amount_group|loan_to_income_ratio|loan_to_emp_length_ratio|int_rate_to_loan_amt_ratio|person_home_ownership_index|person_home_ownership_ohe|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+---------

                                                                                

In [86]:
data_ = spark.read.csv("./test_data.csv",header=True, inferSchema=True)
data_.show()

+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+
|        22|        59000|                 RENT|              123|         PERSONAL|         D|    35000|        16.02|               0.59|                        Y|                         3|
|        21|         9600|                  OWN|                5|        EDUCATION|         B|     1000|        11.14|                0.1|                        N|                         2|
|        25|         9600|         

In [44]:
data_.columns

['person_age',
 'person_income',
 'person_home_ownership',
 'person_emp_length',
 'loan_intent',
 'loan_grade',
 'loan_amnt',
 'loan_int_rate',
 'loan_percent_income',
 'cb_person_default_on_file',
 'cb_person_cred_hist_length']

In [87]:
transformed_test_df = loaded_pipeline.transform(data_)
transformed_test_df.show()

+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+---------------------------+-------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|person_home_ownership_index|person_home_ownership_ohe|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+---------------------------+-------------------------+
|        22|        59000|                 RENT|              123|         PERSONAL|         D|    35000|        16.02|               0.59|                        Y|                         3|                        0.0|            (3,[0],[1.0])|
|        21|

In [None]:
from pyspark.ml.pipeline import PipelineModel

loaded_pipeline_model = PipelineModel.load("./encoding_pipeline")

# Apply the loaded pipeline model to transform data
transformed_training_data = loaded_pipeline_model.transform(training_data)
transformed_training_data.show(truncate=False)

AttributeError: module '__main__' has no attribute 'OneHotEncoderCustom'

In [None]:
# applying to validation data
transformed_test_df = loaded_pipeline.transform(validation_data)
transformed_test_df.show()

+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+---------------------------+-------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|age_group|income_group|loan_amount_group|loan_to_income_ratio|loan_to_emp_length_ratio|int_rate_to_loan_amt_ratio|person_home_ownership_index|person_home_ownership_ohe|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+---------

In [None]:
test_data = spark.read.csv("./loan_data.csv")

In [None]:
transformed_training_data.select('loan_to_emp_length_ratio').show()

+------------------------+
|loan_to_emp_length_ratio|
+------------------------+
|                  2.0E-4|
|    0.002666666666666...|
|    7.647058823529412E-4|
|    0.001285714285714...|
|    4.878048780487805E-4|
|    0.001111111111111...|
|                  9.0E-4|
|    0.001166666666666...|
|                  0.0012|
|                     0.0|
|    0.003888888888888...|
|    9.821428571428572E-4|
|                  2.5E-4|
|                     0.0|
|                   0.001|
|                  3.5E-4|
|    8.695652173913044E-5|
|                  0.0035|
|                  2.5E-4|
|    0.001081081081081081|
+------------------------+
only showing top 20 rows



In [None]:
transformed_training_data.groupBy(col('loan_status')).count().show()

+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1| 4994|
|          0|17812|
+-----------+-----+



In [None]:
majority_class = transformed_training_data.filter(F.col('loan_status') == 0)
minority_class = transformed_training_data.filter(F.col('loan_status') == 1)

majority_count = majority_class.count()
minority_count = minority_class.count()
upsample_ratio = majority_count // minority_count

upsampled_minority_class = minority_class.withColumn(
    "dummy", F.explode(F.array([F.lit(x) for x in range(upsample_ratio)]))
).drop("dummy")

remaining_count = majority_count - upsampled_minority_class.count()
extra_minority_rows = minority_class.limit(remaining_count)

final_minority_class = upsampled_minority_class.unionAll(extra_minority_rows)

upsampled_train_df = majority_class.unionAll(final_minority_class)

upsampled_train_df.groupBy("loan_status").count().show()

                                                                                

+-----------+-----+
|loan_status|count|
+-----------+-----+
|          1|17812|
|          0|17812|
+-----------+-----+



In [None]:
len(transformed_training_data.columns)

45

In [None]:
# from pyspark.ml.feature import VectorAssembler, StandardScaler
# from pyspark.ml import Pipeline
# from pyspark.sql.functions import udf
# from pyspark.sql.types import DoubleType

# def extract_scalar(vector):
#     # Assumes the vector is a list with a single float value
#     return float(vector[0]) if vector else None

# # Register the UDF
# extract_scalar_udf = udf(extract_scalar, DoubleType())

# def scale_columns_individually(df, columns_to_scale, pipeline_path, save_model=True):
#     stages = []
#     for col_name in columns_to_scale:
#         # Step 1: Vectorize each column individually
#         assembler = VectorAssembler(inputCols=[col_name], outputCol=f"{col_name}_vec")
        
#         # Step 2: Apply Standard Scaling to the vectorized column
#         scaler = StandardScaler(inputCol=f"{col_name}_vec", outputCol=f"scaled_{col_name}", withMean=True, withStd=True)
        
#         # Add stages to the pipeline
#         stages.extend([assembler, scaler])
    
#     # Step 3: Define the pipeline with all stages
#     pipeline = Pipeline(stages=stages)
    
#     # Step 4: Fit the pipeline to the data
#     scaled_pipeline_model = pipeline.fit(df)
    
#     # Optionally save the pipeline model if requested
#     if save_model:
#         scaled_pipeline_model.save(pipeline_path)
    
#     # Transform the DataFrame to get scaled features
#     scaled_df = scaled_pipeline_model.transform(df)
    
#     # Drop the original columns and vector columns
#     vector_columns = [f"{col}_vec" for col in columns_to_scale]
#     scaled_columns = [f"scaled_{col}" for col in columns_to_scale]
    
#     for col in scaled_columns:
#         # Apply UDF to convert vectors to scalar values
#         scaled_df = scaled_df.withColumn(col, extract_scalar_udf(F.col(col)))
    
#     # Drop the vector columns and original columns
#     columns_to_drop = vector_columns + columns_to_scale
#     scaled_df = scaled_df.drop(*columns_to_drop)
    
#     return scaled_df

# # Example usage of the function
# columns_to_scale = ['person_age', 'person_income', 'person_emp_length', 'loan_amnt', 
#                     'loan_int_rate', 'cb_person_cred_hist_length', 
#                     'loan_to_emp_length_ratio', 'int_rate_to_loan_amt_ratio']

# # Call the function to scale and drop original columns
# scaled_train_df = scale_columns_individually(upsampled_train_df, columns_to_scale, "./scaled_pipeline_model", save_model=True)

# # Show the resulting DataFrame
# scaled_train_df.show(truncate=False)


In [None]:
# scaled_train_df.select('scaled_loan_to_emp_length_ratio').show()

In [None]:

# from pyspark.ml.pipeline import PipelineModel
# scaled_pipeline_model = PipelineModel.load("./scaled_pipeline_model")

# # applying to validation data
# scaled_test_df = scaled_pipeline_model.transform(transformed_test_df)
# scaled_test_df.show()

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
from pyspark.sql import functions as F

# def extract_scalar(vector):
#     return float(vector[0]) if vector else None

# extract_scalar_udf = udf(extract_scalar, DoubleType())

def min_max_scale(df, columns_to_scale, min_val=-3, max_val=3):
    for col_name in columns_to_scale:
        min_col, max_col = df.select(F.min(col_name), F.max(col_name)).first()
        scaled_col = (F.col(col_name) - min_col) / (max_col - min_col) * (max_val - min_val) + min_val
        df = df.withColumn(f"scaled_{col_name}", scaled_col)
    
    return df

def scale_columns_individually(df, columns_to_scale, pipeline_path=None, save_model=False):
    scaled_df = min_max_scale(df, columns_to_scale, min_val=-3, max_val=3)
    
    scaled_df = scaled_df.drop(*columns_to_scale)
    
    return scaled_df


columns_to_scale = ['person_age', 'person_income', 'person_emp_length', 'loan_amnt', 
                    'loan_int_rate', 'cb_person_cred_hist_length', 
                    'loan_to_emp_length_ratio', 'int_rate_to_loan_amt_ratio']

scaled_train_df = scale_columns_individually(upsampled_train_df, columns_to_scale, save_model=False)

scaled_train_df.show(truncate=False)




+-----------+-------------------+--------------------+---------------------------+---------------------------+------------+------------+------------+------------+------------+------------+------------+-------------------------+--------------------------+------------------------------+---------------------------+-----------------------------+-------------------+--------------------+---------------------+---------------------------+-------------------+-----------------------+----------------+-----------------+-------------------+------------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------------+------------------------+---------------------------+-----------------------+-----------------+--------------------+------------------------+--------------------+---------------------+---------------------------------+-------------------------------+---------------------------------+
|loan_status|loan_percent_income|loan_

                                                                                

In [None]:
scaled_test_df = scale_columns_individually(transformed_test_df, columns_to_scale, save_model=False)
scaled_test_df.show(truncate=False)

+-----------+-------------------+--------------------+---------------------------+---------------------------+------------+------------+------------+------------+------------+------------+------------+-------------------------+--------------------------+------------------------------+---------------------------+-----------------------------+-------------------+--------------------+---------------------+---------------------------+-------------------+-----------------------+----------------+-----------------+-------------------+------------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------------+------------------------+---------------------------+-----------------------+-------------------+--------------------+------------------------+-------------------+--------------------+---------------------------------+-------------------------------+---------------------------------+
|loan_status|loan_percent_income|loan_

In [None]:
all_dtypes = scaled_test_df.dtypes

# Filter for string columns
string_columns = [col for col, dtype in all_dtypes if dtype == 'string']

# Print the string columns
print("String columns:", string_columns)

String columns: []


In [None]:
features = [feature for feature in scaled_train_df.columns if feature not in 'loan_status' ]

In [None]:
features

['loan_percent_income',
 'loan_to_income_ratio',
 'cb_person_default_on_file_Y',
 'cb_person_default_on_file_N',
 'loan_grade_F',
 'loan_grade_E',
 'loan_grade_B',
 'loan_grade_D',
 'loan_grade_C',
 'loan_grade_A',
 'loan_grade_G',
 'person_home_ownership_OWN',
 'person_home_ownership_RENT',
 'person_home_ownership_MORTGAGE',
 'person_home_ownership_OTHER',
 'loan_intent_DEBTCONSOLIDATION',
 'loan_intent_VENTURE',
 'loan_intent_PERSONAL',
 'loan_intent_EDUCATION',
 'loan_intent_HOMEIMPROVEMENT',
 'loan_intent_MEDICAL',
 'income_group_low-middle',
 'income_group_low',
 'income_group_high',
 'income_group_middle',
 'income_group_high-middle',
 'age_group_26-35',
 'age_group_20-25',
 'age_group_46-55',
 'age_group_36-45',
 'age_group_66-80',
 'age_group_56-65',
 'loan_amount_group_high',
 'loan_amount_group_medium',
 'loan_amount_group_very high',
 'loan_amount_group_small',
 'scaled_person_age',
 'scaled_person_income',
 'scaled_person_emp_length',
 'scaled_loan_amnt',
 'scaled_loan_int_

In [None]:
features = [col for col in scaled_train_df.columns if col != 'loan_status']

assembler = VectorAssembler(inputCols=features, outputCol="features")
train_assembled = assembler.transform(scaled_train_df)
train_assembled.show(truncate=False)

                                                                                

+-----------+-------------------+--------------------+---------------------------+---------------------------+------------+------------+------------+------------+------------+------------+------------+-------------------------+--------------------------+------------------------------+---------------------------+-----------------------------+-------------------+--------------------+---------------------+---------------------------+-------------------+-----------------------+----------------+-----------------+-------------------+------------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------------+------------------------+---------------------------+-----------------------+-----------------+--------------------+------------------------+--------------------+---------------------+---------------------------------+-------------------------------+---------------------------------+---------------------------------------

In [None]:
features_train = [col for col in scaled_test_df.columns if col != 'loan_status']

assembler = VectorAssembler(inputCols=features_train, outputCol="features")
test_assembled = assembler.transform(scaled_test_df)
test_assembled.show(truncate=False)

+-----------+-------------------+--------------------+---------------------------+---------------------------+------------+------------+------------+------------+------------+------------+------------+-------------------------+--------------------------+------------------------------+---------------------------+-----------------------------+-------------------+--------------------+---------------------+---------------------------+-------------------+-----------------------+----------------+-----------------+-------------------+------------------------+---------------+---------------+---------------+---------------+---------------+---------------+----------------------+------------------------+---------------------------+-----------------------+-------------------+--------------------+------------------------+-------------------+--------------------+---------------------------------+-------------------------------+---------------------------------+---------------------------------------

In [None]:
out

NameError: name 'out' is not defined

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

rf = RandomForestClassifier(labelCol='loan_status', featuresCol='features', numTrees=100, maxDepth=10, seed=42)
rf_model = rf.fit(train_assembled)
rf_predictions = rf_model.transform(test_assembled)
binary_evaluator = BinaryClassificationEvaluator(labelCol='loan_status', metricName="areaUnderROC")
multi_evaluator = MulticlassClassificationEvaluator(labelCol='loan_status', predictionCol="prediction")
roc_auc = binary_evaluator.evaluate(rf_predictions)
print(f"Random Forest ROC AUC: {roc_auc}")
accuracy = multi_evaluator.setMetricName("accuracy").evaluate(rf_predictions)
print(f"Random Forest Accuracy: {accuracy}")
precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(rf_predictions)
print(f"Random Forest Precision: {precision}")
recall = multi_evaluator.setMetricName("weightedRecall").evaluate(rf_predictions)
print(f"Random Forest Recall: {recall}")
f1_score = multi_evaluator.setMetricName("f1").evaluate(rf_predictions)
print(f"Random Forest F1-Score: {f1_score}")


24/09/15 18:20:23 WARN DAGScheduler: Broadcasting large task binary with size 1155.6 KiB
24/09/15 18:20:24 WARN DAGScheduler: Broadcasting large task binary with size 1875.5 KiB
24/09/15 18:20:26 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
24/09/15 18:20:27 WARN DAGScheduler: Broadcasting large task binary with size 4.7 MiB
24/09/15 18:20:29 WARN DAGScheduler: Broadcasting large task binary with size 7.1 MiB
24/09/15 18:20:31 WARN DAGScheduler: Broadcasting large task binary with size 1164.4 KiB
24/09/15 18:20:33 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB
                                                                                

Random Forest ROC AUC: 0.9129104369300424


24/09/15 18:20:37 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


Random Forest Accuracy: 0.8886561954624782


24/09/15 18:20:38 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


Random Forest Precision: 0.8895054782978165


24/09/15 18:20:39 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


Random Forest Recall: 0.8886561954624781


24/09/15 18:20:39 WARN DAGScheduler: Broadcasting large task binary with size 4.5 MiB


Random Forest F1-Score: 0.8890605119910593


In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Initialize the RandomForestClassifier
rf = RandomForestClassifier(labelCol='loan_status', featuresCol='features', seed=42)

paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [30,50, 100, 200])     
             .addGrid(rf.maxDepth, [5, 10, 20])         
             .addGrid(rf.maxBins, [32, 64])            
             .build())

# Define the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="loan_status", metricName="areaUnderROC")


crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)  


cv_model = crossval.fit(train_assembled)

# Get the best model from cross-validation
best_rf_model = cv_model.bestModel

# Make predictions on the test data
predictions = best_rf_model.transform(test_assembled)

# Evaluate the best model
roc_auc = evaluator.evaluate(predictions)
accuracy_evaluator = BinaryClassificationEvaluator(labelCol="loan_status", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)

print(f"Best Random Forest ROC AUC: {roc_auc}")
print(f"Best Random Forest Accuracy: {accuracy}")

# Print the best hyperparameters
print(f"Best Number of Trees: {best_rf_model.getNumTrees}")
print(f"Best Max Depth: {best_rf_model.getMaxDepth()}")
print(f"Best Max Bins: {best_rf_model.getMaxBins()}")


                                                                                

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from xgboost.spark import SparkXGBClassifier
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

classifier = SparkXGBClassifier(
    num_workers=1, 
    use_gpu=True,  
    tree_method='gpu_hist',  
    gpu_memory_limit=2048,  
    max_depth=6,  
    features_col='features',  
    label_col='loan_status'
)

model = classifier.fit(train_assembled)

predictions = model.transform(test_assembled)

predictions.select("loan_status", "prediction", "probability").show()



In [None]:

# Create parameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(classifier.max_depth, [4, 6, 8]) \
    .addGrid(classifier.learning_rate, [0.1, 0.2, 0.3]) \
    .addGrid(classifier.n_estimators, [100, 200, 300]) \
    .addGrid(classifier.gamma, [0, 0.1, 0.2]) \
    .build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol='loan_status', 
    rawPredictionCol='prediction', 
    metricName='areaUnderROC'
)

# Cross validator
crossval = CrossValidator(
    estimator=classifier,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,  
    parallelism=4  
)

# Fit cross-validator model
cvModel = crossval.fit(train_assembled)

# Evaluate best model
predictions = cvModel.transform(test_assembled)
predictions.select("loan_status", "prediction", "probability").show()

# Get the best model hyperparameters
best_model = cvModel.bestModel
print(f"Best Max Depth: {best_model.getOrDefault('max_depth')}")
print(f"Best Learning Rate: {best_model.getOrDefault('eta')}")
print(f"Best Number of Estimators: {best_model.getOrDefault('n_estimators')}")


2024-09-18 12:12:47,303 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'device': 'cpu', 'gamma': 0.1, 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'binary:logistic', 'tree_method': 'gpu_hist', 'gpu_memory_limit': 2048, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-09-18 12:12:47,567 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'device': 'cpu', 'gamma': 0, 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'binary:logistic', 'tree_method': 'gpu_hist', 'gpu_memory_limit': 2048, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-09-18 12:12:48,185 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'device': 'cpu', 'gamma': 0.2, 'learning_rate': 0.1, 'max_depth': 4, 'objective': 'binary:logis

In [None]:
model = classifier.fit(train_assembled)

predictions = model.transform(test_assembled)

predictions.select("loan_status", "prediction", "probability").show()

2024-09-18 12:07:04,437 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cpu', 'max_depth': 6, 'tree_method': 'gpu_hist', 'gpu_memory_limit': 2048, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-09-18 12:07:06,719 INFO XGBoost-PySpark: _train_booster Leveraging cuda:0 to train with QDM: off
INFO:XGBoost-PySpark:Leveraging cuda:0 to train with QDM: off
[12:07:07] task 0 got new rank 0                                    (0 + 1) / 1]

    E.g. tree_method = "hist", device = "cuda"

Parameters: { "gpu_memory_limit" } are not used.


    E.g. tree_method = "hist", device = "cuda"

2024-09-18 12:07:09,230 INFO XGBoost-PySpark: _fit Finished xgboost training!   

    E.g. tree_method = "hist", device = "cuda"

2024-09-18 12:07:10,799 INFO XGBoost-PySpark: predict_udf Do the inference on the CPUs

    E.g. tree_method = "hi

+-----------+----------+--------------------+
|loan_status|prediction|         probability|
+-----------+----------+--------------------+
|          1|       1.0|[0.02195566892623...|
|          0|       0.0|[0.69172662496566...|
|          0|       0.0|[0.62312602996826...|
|          0|       0.0|[0.99933755397796...|
|          1|       1.0|[9.11355018615722...|
|          0|       0.0|[0.96366149187088...|
|          0|       0.0|[0.85166758298873...|
|          0|       0.0|[0.83862352371215...|
|          0|       0.0|[0.94276672601699...|
|          1|       0.0|[0.79952681064605...|
|          0|       0.0|[0.95439666509628...|
|          0|       0.0|[0.83728677034378...|
|          0|       0.0|[0.86421978473663...|
|          0|       0.0|[0.88907843828201...|
|          0|       0.0|[0.92349255084991...|
|          0|       0.0|[0.97977650165557...|
|          0|       0.0|[0.94796627759933...|
|          1|       1.0|[0.00174254179000...|
|          0|       0.0|[0.8544288

Potential solutions:
- Use a data structure that matches the device ordinal in the booster.
- Set the device for booster before call to inplace_predict.


                                                                                

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from xgboost.spark import SparkXGBClassifier
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
# Train the model using the classifier
classifier = SparkXGBClassifier(
    num_workers=1,
    use_gpu=True,
    tree_method='gpu_hist',
    gpu_memory_limit=2048,
    max_depth=4,         # Maximum depth of the tree
    eta=0.1,             # Learning rate (0.1 is a good default)
    subsample=0.8,       # Use 80% of data for training each tree
    colsample_bytree=0.8, # Use 80% of features for each tree
    n_estimators=20,    # Number of boosting rounds
    features_col='features',
    label_col='loan_status'
)

# Fit the model to the training data
model = classifier.fit(train_assembled)

# Make predictions on the test data
predictions = model.transform(test_assembled)

# Evaluate the predictions
evaluator = BinaryClassificationEvaluator(
    labelCol="loan_status",  # Actual label
    rawPredictionCol="probability",  # Use predicted probabilities for AUC
    metricName="areaUnderROC"  # Use ROC for evaluation
)

# Calculate AUC
auc = evaluator.evaluate(predictions)
print(f"Area under ROC: {auc:.4f}")

# You can also evaluate accuracy using a different evaluator
evaluator_acc = BinaryClassificationEvaluator(
    labelCol="loan_status", 
    rawPredictionCol="prediction", 
    metricName="accuracy"
)

# Calculate accuracy
accuracy = evaluator_acc.evaluate(predictions)
print(f"Accuracy: {accuracy:.4f}")


2024-09-18 11:20:13,083 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 1 workers with
	booster params: {'objective': 'binary:logistic', 'colsample_bytree': 0.8, 'device': 'cpu', 'max_depth': 4, 'subsample': 0.8, 'tree_method': 'gpu_hist', 'gpu_memory_limit': 2048, 'eta': 0.1, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 20}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-09-18 11:20:16,774 INFO XGBoost-PySpark: _train_booster Leveraging cuda:0 to train with QDM: off
[11:20:17] task 0 got new rank 0

    E.g. tree_method = "hist", device = "cuda"

Parameters: { "gpu_memory_limit" } are not used.


    E.g. tree_method = "hist", device = "cuda"

2024-09-18 11:20:19,722 INFO XGBoost-PySpark: _fit Finished xgboost training!   

    E.g. tree_method = "hist", device = "cuda"



In [None]:
import xgboost as xgb
print(xgb.__version__)
print(xgb.get_config())


2.0.0
{'use_rmm': False, 'verbosity': 1}


In [None]:
#https://repo1.maven.org/maven2/ai/rapids/cudf/24.04.0/
#https://repo1.maven.org/maven2/ai/rapids/xgboost4j_2.11/1.0.0-Beta2/
#https://repo1.maven.org/maven2/ai/rapids/xgboost4j-spark_2.11/1.0.0-Beta2/

In [None]:
cd jar_file

/media/lang_chain/Storage/Documents/credit-risk-prediction/notebook/jar_file


In [None]:
ls

cudf-24.04.0-cuda11.jar         xgboost4j-spark_2.11-1.0.0-Beta2.jar
xgboost4j_2.11-1.0.0-Beta2.jar


In [None]:
import os
jarfile_path = os.listdir('/media/lang_chain/Storage/Documents/credit-risk-prediction/notebook/jar_file')
for file in jarfile_path:
    print(os.path.join('jar_file',file))

jar_file/cudf-24.04.0-cuda11.jar
jar_file/xgboost4j_2.11-1.0.0-Beta2.jar
jar_file/xgboost4j-spark_2.11-1.0.0-Beta2.jar


In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars jar_file/cudf-24.04.0-cuda11.jar,jar_file/xgboost4j_2.11-1.0.0-Beta2.jar,jar_file/xgboost4j-spark_2.11-1.0.0-Beta2.jar'


In [None]:
import findspark

In [None]:

findspark.init()

In [None]:
import os
os.path.exists("jar_file/xgboost4j-spark_2.11-1.0.0-Beta2.jar")

True

In [None]:
spark.sparkContext.addPyFile("jar_file/xgboost4j-spark_2.11-1.0.0-Beta2.jar")

In [None]:
from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier

In [None]:
from xgboost.spark import SparkXGBClassifier

In [None]:
classifier_ = SparkXGBClassifier(
  features_col='features',
  label_col='loan_status',
  num_workers=2,
  device="cuda",
)

In [None]:
model = classifier_.fit(train_assembled)


2024-09-18 12:03:04,704 INFO XGBoost-PySpark: _fit Running xgboost-2.0.0 on 2 workers with
	booster params: {'objective': 'binary:logistic', 'device': 'cuda', 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
2024-09-18 12:03:08,369 INFO XGBoost-PySpark: _train_booster Leveraging cuda:0 to train with QDM: off
2024-09-18 12:03:08,400 INFO XGBoost-PySpark: _train_booster Leveraging cuda:1 to train with QDM: off
[12:03:09] task 1 got new rank 0
[12:03:09] task 0 got new rank 1
24/09/18 12:03:10 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 584, in main
    eval_type = read_int(infile)
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(309, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 273, in dump_stream
    return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream)
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 81, in dump_stream
    for batch in iterator:
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 266, in init_stream_yield_batches
    for series in iterator:
  File "/media/lang_chain/Storage/Documents/credit-risk-prediction/.conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 356, in func
    for result_batch, result_type in result_iter:
  File "/home/lang_chain/.local/lib/python3.10/site-packages/xgboost/spark/core.py", line 965, in _train_booster
    with CommunicatorContext(context, **_rabit_args):
  File "/home/lang_chain/.local/lib/python3.10/site-packages/xgboost/core.py", line 729, in inner_f
    return func(**kwargs)
  File "/home/lang_chain/.local/lib/python3.10/site-packages/xgboost/training.py", line 181, in train
    bst.update(dtrain, i, obj)
  File "/home/lang_chain/.local/lib/python3.10/site-packages/xgboost/core.py", line 2049, in update
    _check_call(
  File "/home/lang_chain/.local/lib/python3.10/site-packages/xgboost/core.py", line 281, in _check_call
    raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [12:03:10] /workspace/src/collective/nccl_device_communicator.cu:40: Check failed: n_uniques == world_size_ (1 vs. 2) : Multiple processes within communication group running on same CUDA device is not supported. b96c1eb44939eb0b2cf09472af1b7aa8

Stack trace:
  [bt] (0) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x1ba0be) [0x7f62ea5ba0be]
  [bt] (1) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x78cc34) [0x7f62eab8cc34]
  [bt] (2) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x7869b5) [0x7f62eab869b5]
  [bt] (3) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0xaea3f7) [0x7f62eaeea3f7]
  [bt] (4) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x63c86b) [0x7f62eaa3c86b]
  [bt] (5) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x55f3bd) [0x7f62ea95f3bd]
  [bt] (6) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x48fa61) [0x7f62ea88fa61]
  [bt] (7) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x4c00a6) [0x7f62ea8c00a6]
  [bt] (8) /home/lang_chain/.local/lib/python3.10/site-packages/xgboost/lib/libxgboost.so(+0x4c4c04) [0x7f62ea8c4c04]



	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:99)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:85)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:80)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:397)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1996)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:232)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1963)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2437)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [46]:
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.ml.param.shared import Param
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

In [78]:
class OneHotEncoderCustom(Transformer, DefaultParamsWritable, DefaultParamsReadable):
    def __init__(self, encoding_columns=None, predefined_columns=None):
        super(OneHotEncoderCustom, self).__init__()
        self.encoding_columns = Param(self, "encoding_columns", "column to be one-hot encoded")
        self.predefined_columns = predefined_columns

        if encoding_columns is not None:
            self._set(encoding_columns=encoding_columns)

    def _transform(self, df: DataFrame) -> DataFrame:
        # Get the column name to encode
        encoding_columns = self.getOrDefault(self.encoding_columns)

        # Check if the encoding column exists in the DataFrame
        if encoding_columns not in df.columns:
            raise ValueError(f"Column {encoding_columns} does not exist in DataFrame. Available columns: {df.columns}")

        # Get distinct values from the specified column (these are the categories for one-hot encoding)
        distinct_values = df.select(encoding_columns).distinct().rdd.map(lambda x: x[0]).collect()

        # Create one-hot encoded columns based on distinct values
        for value in distinct_values:
            if value is not None:
                # Create new columns like encoding_column_value with 1 if matches, else 0
                df = df.withColumn(f"{encoding_columns}_{value}", F.when(F.col(encoding_columns) == value, 1).otherwise(0))

        # Ensure all predefined columns are present, even if they were not in the test data
        if self.predefined_columns:
            for predefined_column in self.predefined_columns:
                if predefined_column not in df.columns:
                    # Add the missing predefined column with a default value of 0
                    df = df.withColumn(predefined_column, F.lit(0))

        # Optionally drop the original column after one-hot encoding
        df = df.drop(encoding_columns)

        return df

In [79]:
from pyspark.ml import Pipeline

# Create your OneHotEncoderCustom instance
person_home_ownership_default = [
    'person_home_ownership_MORTGAGE',
    'person_home_ownership_RENT',
    'person_home_ownership_OTHER',
    'person_home_ownership_OWN'
]

# Initialize the custom OneHotEncoder
person_home_ownership_encoder = OneHotEncoderCustom(
    encoding_columns='person_home_ownership',
    predefined_columns=person_home_ownership_default
)

pipeline = Pipeline(stages=[person_home_ownership_encoder])

# Fit the pipeline to training data
transformed_pipeline = pipeline.fit(training_data)

# Transform the training data
transformed_trained_dataframe = transformed_pipeline.transform(training_data)

# Save the pipeline
transformed_pipeline.save('./encoder_pipeline')

                                                                                

In [80]:
data_ = spark.read.csv("./test_data.csv",header=True, inferSchema=True)
data_.show()

+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+
|person_age|person_income|person_home_ownership|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|
+----------+-------------+---------------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+
|        22|        59000|                 RENT|              123|         PERSONAL|         D|    35000|        16.02|               0.59|                        Y|                         3|
|        21|         9600|                  OWN|                5|        EDUCATION|         B|     1000|        11.14|                0.1|                        N|                         2|
|        25|         9600|         

In [81]:
loaded_pipeline = PipelineModel.load('./encoder_pipeline')

transformed_train_dataframe = loaded_pipeline.transform(training_data)
transformed_train_dataframe.show()

                                                                                

+----------+-------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+---------+------------+-----------------+--------------------+------------------------+--------------------------+-------------------------+--------------------------+------------------------------+---------------------------+
|person_age|person_income|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_status|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|age_group|income_group|loan_amount_group|loan_to_income_ratio|loan_to_emp_length_ratio|int_rate_to_loan_amt_ratio|person_home_ownership_OWN|person_home_ownership_RENT|person_home_ownership_MORTGAGE|person_home_ownership_OTHER|
+----------+-------------+-----------------+-----------------+----------+---------+-------------+-----------+-------------------+-------------------------+--------------------------+

In [82]:
loaded_pipeline = PipelineModel.load('./encoder_pipeline')

transformed_test_dataframe = loaded_pipeline.transform(data_)

transformed_test_dataframe.show()



+----------+-------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+-------------------------+--------------------------+------------------------------+
|person_age|person_income|person_emp_length|      loan_intent|loan_grade|loan_amnt|loan_int_rate|loan_percent_income|cb_person_default_on_file|cb_person_cred_hist_length|person_home_ownership_OWN|person_home_ownership_RENT|person_home_ownership_MORTGAGE|
+----------+-------------+-----------------+-----------------+----------+---------+-------------+-------------------+-------------------------+--------------------------+-------------------------+--------------------------+------------------------------+
|        22|        59000|              123|         PERSONAL|         D|    35000|        16.02|               0.59|                        Y|                         3|                        0|                         1|            

                                                                                

In [83]:
transformed_test_dataframe.columns

['person_age',
 'person_income',
 'person_emp_length',
 'loan_intent',
 'loan_grade',
 'loan_amnt',
 'loan_int_rate',
 'loan_percent_income',
 'cb_person_default_on_file',
 'cb_person_cred_hist_length',
 'person_home_ownership_OWN',
 'person_home_ownership_RENT',
 'person_home_ownership_MORTGAGE']

In [84]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.sql import DataFrame

class OneHotEncoderPipeline:
    def __init__(self, input_col: str, output_col: str):
        self.input_col = input_col
        self.output_col = output_col
        self.pipeline = None
        self.pipeline_model = None

    def fit(self, df: DataFrame) -> None:
        """Fit the pipeline on the provided DataFrame."""
        indexer = StringIndexer(inputCol=self.input_col, outputCol=f"{self.input_col}_index")
        encoder = OneHotEncoder(inputCols=[f"{self.input_col}_index"], outputCols=[self.output_col])
        
        self.pipeline = Pipeline(stages=[indexer, encoder])
        self.pipeline_model = self.pipeline.fit(df)

    def save(self, path: str) -> None:
        """Save the fitted pipeline model to the specified path."""
        if self.pipeline_model is not None:
            self.pipeline_model.save(path)

    @classmethod
    def load(cls, path: str) -> 'OneHotEncoderPipeline':
        """Load a pipeline model from the specified path."""
        instance = cls(input_col="", output_col="")
        instance.pipeline_model = PipelineModel.load(path)
        return instance

    def transform(self, df: DataFrame) -> DataFrame:
        """Transform the DataFrame using the fitted pipeline model."""
        if self.pipeline_model is None:
            raise Exception("Model must be fitted before calling transform.")
        return self.pipeline_model.transform(df)
